websucker-pip/mongo/mongocrawler.py
2023-04-17 15:09:49 +02:00

771 lines
25 KiB
Python

import pymongo
import pymongo.errors
import trafilatura
import trafilatura.feeds
import trafilatura.sitemaps
import trafilatura.spider
import trafilatura.utils
import trafilatura.external
import sys
import courlan
import urllib
from datetime import datetime as dat
import datetime
import click
import logging as LOGGER
import os
import pprint
import re
import time
import collections
import math
import random
import hashlib
from bs4 import BeautifulSoup
import urllib.parse
import os.path
import binascii
import json
# database options
CONNECTION=os.getenv("SUCKER_CONNECTION","mongodb://root:example@localhost:27017/")
DBNAME=os.getenv("SUCKER_DBNAME","crawler")
# retrieving filter
BATCH_SIZE = int(os.getenv("SUCKER_BATCH_SIZE","10"))
MIN_FILE_SIZE=int(os.getenv("SUCKER_MIN_FILE_SIZE","300"))
MAX_FILE_SIZE=int(os.getenv("SUCKER_MAX_FILE_SIZE","10000000"))
# document originality filter
MIN_TEXT_SIZE=int(os.getenv("SUCKER_MIN_TEXT_SIZE","200"))
CHECK_PARAGRAPH_SIZE=int(os.getenv("SUCKER_CHECK_PARAGRAPH_SIZE","150"))
TEXT_TRASH_RATIO=float(os.getenv("SUCKER_TEXT_TRASH_RATIO","0.6"))
# link and domain sampling
DISCOVER_LINK_RATIO = float(os.getenv("SUCKER_DISCOVER_LINK_RATIO","0.3"))
SAMPLE_SET_SIZE = int(os.getenv("SUCKER_DISCOVER_LINK_RATIO","10000"))
CLASSIFIER_SET_SIZE = int(os.getenv("SUCKER_DISCOVER_LINK_RATIO","200"))
# link filter
LANGUAGE= os.getenv("SUCKER_LANGUAGE","sk")
DOMAIN = os.getenv("SUCKER_DOMAIN","sk")
STOP_PATHS=os.getenv("SUCKER_STOP_PATHS","xml,rss,login,admin").split(",")
def get_bs_links(link,html):
# Extrakcia linkov zo stranky
bs = BeautifulSoup(html, "lxml")
base = link
if bs.base is not None and "href" in bs.base.attrs:
base = bs.base["href"]
base = urllib.parse.urlparse(courlan.normalize_url(base))
links = set()
# Normalizacia linkov
for l in bs.find_all("a", href=True):
if "rel" in l.attrs and l.attrs["rel"] == "nofollow" or "nofollow" in l.attrs:
continue
href = l["href"]
try:
parsed = urllib.parse.urlparse(courlan.normalize_url(href))
netloc = parsed.netloc
path = os.path.normpath(parsed.path)
scheme = parsed.scheme
query = parsed.query
# internal link
if parsed.netloc == "":
scheme = base.scheme
netloc = base.netloc
if not parsed.path.startswith("/"):
path = os.path.normpath(base.path +"/" + path)
if not scheme.startswith("http"):
continue
if path.startswith("/"):
path = path[1:]
if path.endswith(")"):
# javascript
continue
href = urllib.parse.urlunparse((scheme,netloc,path,"",query,""))
href = courlan.normalize_url(href)
links.add(href)
except ValueError as err:
print(err)
pass
return links
def split_train(res):
trainset = []
testset = []
for i,item in enumerate(res):
if i % 10 == 0:
testset.append(item)
else:
trainset.append(item)
return trainset,testset
def calculate_checksums(text):
"""
@return fingerprints of a paragraphs in text. Paragraphs are separated by a blank line
"""
checksums = []
sizes = []
hval = 0
hsz = 0
sz = 0
for c in text:
cv = ord(c)
sz += 1
if cv > 64: # ignore non-ascii
hval += (hval << 3) + cv
zv = hval >> 31
hval &= 0x7fffffff
hval += zv
hsz += 1
if c == "\n" and hsz > 0:
if hsz > CHECK_PARAGRAPH_SIZE:
checksums.append(hval)
sizes.append(sz)
sz = 0
hsz = 0
if hsz > CHECK_PARAGRAPH_SIZE:
checksums.append(hval)
sizes.append(sz)
return checksums, sizes
def is_robot_good(link,rules):
# check robots.txt rules
if rules is not None and not rules.can_fetch("*", link):
return False
return True
def is_link_good(link):
r = courlan.check_url(link,strict=True,language=LANGUAGE)
if r is None:
return None
llink,lhostname = r
paths = set(llink.split("/"))
for item in STOP_PATHS:
if item in paths:
return None
#print(llink,lhostname)
# hostname rules
if not lhostname.endswith(DOMAIN):
LOGGER.debug("bad hostname")
return None
if courlan.is_not_crawlable(llink):
LOGGER.debug("not crawlable")
return None
return llink
def get_link_doc(link:str,status="frontlink")->dict:
r = courlan.check_url(link)
assert r is not None
link,host = r
domain = courlan.extract_domain(link)
return {"url":link,"host":host,"domain":domain,"status":status,"created_at":dat.utcnow()}
def fetch_page(link:str)->(str,str):
print("fetching:::::")
print(link)
final_link = link
response = trafilatura.fetch_url(link,decode=False)
time.sleep(2)
html = None
if response is not None :
good = True
print(response)
if response.status != 200:
good = False
LOGGER.error('not a 200 response: %s for URL %s', response.status, url)
elif response.data is None or len(response.data) < MIN_FILE_SIZE:
LOGGER.error('too small/incorrect for URL %s', link)
good = False
# raise error instead?
elif len(response.data) > MAX_FILE_SIZE:
good = False
LOGGER.error('too large: length %s for URL %s', len(response.data), link)
if good:
html = trafilatura.utils.decode_response(response)
if html is not None:
html, final_link = trafilatura.spider.refresh_detection(html, final_link)
# is there a meta-refresh on the page?
if final_link is None: # malformed or malicious content
html = None
final_link = courlan.normalize_url(final_link)
return final_link,html
def fetch_robot(base_url:str)->urllib.robotparser.RobotFileParser:
try:
rawrules = trafilatura.fetch_url("https://"+ base_url + "/robots.txt")
#print(rawrules)
rules = urllib.robotparser.RobotFileParser()
rules.parse(rawrules.split("\n"))
LOGGER.info('got robots')
except Exception as exc:
LOGGER.error('cannot read robots.txt: %s', exc)
rules = None
# exceptions happening here
return rules
def extract_page(final_link,html):
doc = None
if html is not None:
doc = trafilatura.bare_extraction(html,url=final_link,with_metadata=True,include_formatting=False,target_language=LANGUAGE,favor_precision=True)
if doc is not None:
if not "text" in doc or len(doc["text"]) < MIN_TEXT_SIZE:
# text too small
doc = None
return doc
def set_content_checksums(doc):
text = doc["text"]
checksums,sizes = calculate_checksums(text)
doc["text_size"] = len(text)
doc["text_md5"] = hashlib.md5(text.encode("utf8")).hexdigest()
doc["paragraph_checksums"] = checksums
doc["paragraph_sizes"] = sizes
doc["paragraph_sizes_sum"] = sum(sizes)
end_sentence_marker = re.compile("\w[\.]")
sentences = 0
for item in re.finditer(end_sentence_marker,text):
t = item.group(0)
if t[0].islower():
sentences += 1
doc["sentences_count"] = sentences
def index_page(db,original_link,final_link,html,doc,filter_content=True):
linkcol = db["links"]
htmlcol = db["html"]
contentcol = db["content"]
checkcol = db["check"]
state = "good"
link = original_link
if original_link != final_link:
linkcol.update_one({"url":original_link},{"$set":{"status":"redirect"}})
link = final_link
if html is None:
state = "html_error"
elif doc is None:
state = "content_error"
if doc is not None:
set_content_checksums(doc)
tsz = doc["text_size"]
psz = doc["paragraph_sizes_sum"]
if filter_content and (tsz < MIN_TEXT_SIZE or psz/tsz < TEXT_TRASH_RATIO):
state = "small"
# check copy
if state == "good":
origsz = 0
for chs,paragraph_size in zip(doc["paragraph_checksums"],doc["paragraph_sizes"]):
# index paragraph checksums
nd = checkcol.find_one({"_id":chs})
if nd is None:
origsz += paragraph_size
doc["original_text_size"] = origsz
if filter_content and (1 - (origsz / tsz)) > TEXT_TRASH_RATIO:
state = "copy"
if state == "good":
htdoc = get_link_doc(link,state)
htdoc["html"] = html
htdoc["html_size"] = len(html)
htdoc["html_md5"]= hashlib.md5(html.encode("utf8")).hexdigest()
# can be revisited - upsert
del htdoc["url"]
htmlcol.update_one({"url":link},{"$set":htdoc},upsert=True)
doc.update(get_link_doc(link,"good"))
# todo extract links
print(link,doc)
del doc["url"]
contentcol.update_one({"url":link},{"$set":doc},upsert=True)
for chs in doc["paragraph_checksums"]:
checkcol.update_one({"_id":chs},{"$inc":{"count":1}},upsert=True)
linkdoc = get_link_doc(link,state)
del linkdoc["url"]
linkcol.update_one({"url":link},{"$set":linkdoc})
return state
def save_batch_info(db,host,states,docs):
good_document_count = 0
original_text_size = 0
batch_size = 0
d = host.split(".")
domain = d[-2] + "." + d[-1]
for state,doc in zip(states,docs):
batch_size += 1
if state == "good":
good_document_count += 1
original_text_size += doc["original_text_size"]
batchdoc = {
"host": host,
"domain": domain,
"created_at": dat.utcnow(),
"good_document_count":good_document_count,
"original_text_size":original_text_size,
"good_prob": good_document_count / batch_size,
"batch_size": batch_size,
}
db["batches"].insert_one(batchdoc)
def extract_links(link_batch:list,responses:list,hostname:str,rules,default_status="frontlink")->list:
links = {}
badrobot = 0
for original_link,(final_link,html) in zip(link_batch,responses):
status = default_status
if html is None or len(html) < 256:
continue
page_links = get_bs_links(final_link,html)
for link in page_links:
if not courlan.is_external(link,final_link) and not is_robot_good(link,rules):
badrobot += 1
continue
status = str(default_status)
links[link] = status
outlinks = []
badlink = 0
for link,status in links.items():
link = is_link_good(link)
if link is None:
badlink += 1
continue
outlinks.append((link,status))
print(f"{len(links)} total links, {badrobot} badrobot {badlink} badlinks")
return outlinks
def index_links(db,extracted_links):
linkcol=db["links"]
for link,status in extracted_links:
if not is_link_good(link):
continue
if status == "frontlink" or status == "backlink":
doc = get_link_doc(link,status)
try:
linkcol.insert_one(doc)
# dont overwrite
except pymongo.errors.DuplicateKeyError as ex:
pass
else:
print("updating " + link,status)
linkcol.update_one({"url":link},{"$set":{"status":status,"updated_at":dat.utcnow()}})
def get_link_features(link):
a, urlpath = courlan.get_host_and_path(link)
features = re.split("[/?&]",urlpath)
#features = re.split("[/?-_=]",urlpath)
res = []
for i,feature in enumerate(features):
if len(feature) < 1:
continue
feature = re.sub("[0-9]","*",feature)
res.append(str(i)+ "-" + feature)
if len(res) < 2:
return None
res = res[:-1]
return res
class LinkClassifier:
def __init__(self):
self.goodcounter = collections.Counter()
self.badcounter = collections.Counter()
self.good_count = 0
self.bad_count = 0
self.alpha = 0.001
def train(self,links):
for i,item in enumerate(links):
link = item["url"]
state = item["status"]
cl = 0
if state == "good":
cl = 1
print(cl,state,link)
features = get_link_features(link)
if features is None:
continue
lf = len(features)
if state == "good":
for feature in features:
self.good_count += 1
self.goodcounter[feature] += 1
else:
for feature in features:
self.bad_count += 1
self.badcounter[feature] += 1
self.bdictsize = len(self.badcounter)
self.gdictsize = len(self.goodcounter)
def test(self,testset):
# eval
gg = 0
true_positive = 0
positive = 0
false_negative = 0
for item in testset:
l = item["url"]
cl = 0
if item["status"] == "good":
cl = 1
pcp = self.classify(l)
r = 0
if pcp > 0:
r = 1
if cl == 1:
if r == 1:
true_positive += 1
positive += 1
if r == 1 and cl == 0:
false_negative += 1
if r == cl:
gg += 1
else:
print("MISS",l,cl,pcp)
print(len(testset))
print("Precision: {}, Recall: {}".format(true_positive/positive,true_positive/(true_positive+false_negative)))
print("Accuracy:")
acc = gg / len(testset)
print(acc)
return acc
def classify(self,link):
if self.good_count == 0 or self.bad_count == 0:
return random.uniform(-0.1,0.1)
features = get_link_features(link)
res = 0
gp = math.log(self.good_count) - math.log(self.good_count + self.bad_count)
bp = math.log(self.bad_count) - math.log(self.good_count + self.bad_count)
if features is None:
return math.exp(gp) - math.exp(bp)
gcc = math.log(self.gdictsize * self.alpha + self.good_count)
bcc = math.log(self.bdictsize * self.alpha + self.bad_count)
goodprob = 0
badprob = 0
for feature in features:
g = math.log((self.goodcounter[feature] + self.alpha)) - gcc
goodprob += g
b = math.log(self.badcounter[feature] + self.alpha) - bcc
badprob += b
pa = math.exp(goodprob + gp)
pb = math.exp(badprob + bp)
return pa - pb #+ random.uniform(-0.001,0.001)
def get_links(db,hostname,status,batch_size):
linkcol = db["links"]
res = linkcol.find({"host":hostname,"status":status},limit=batch_size)
links = []
for item in res:
links.append(item["url"])
print("Got {} {}".format(len(links),status))
return links
def fetch_sitemap_links(start_link):
out = []
navigation_links = trafilatura.sitemaps.sitemap_search(start_link,target_lang=LANGUAGE)
for link in navigation_links:
out.append((link,"frontlink"))
print("Fetched {} sitemap links".format(len(out)))
return out
def fetch_front_links(start_link,rules):
start_link,hostname = courlan.check_url(start_link)
response = fetch_page(start_link)
extracted_links = extract_links([start_link],[response],hostname,rules,"frontlink")
print("Fetched {} frontlinks".format(len(extracted_links)))
return extracted_links
def link_summary(db,hostname):
linkcol = db["links"]
#res = linkcol.distinct("hostname",{"hostname":hostname})
res = linkcol.aggregate([
{"$match":{"host":hostname}},
{"$group":{"_id":"$status",
"count":{"$sum":1},
}
},
])
badcount = 0
goodcount = 0
info = {}
crawled_count = 0
bad_crawl_count = 0
for item in res:
count = item["count"]
st = item["_id"]
print(st,count)
if st == "good":
goodcount += count
if st != "frontlink" and st != "backlink":
crawled_count += count
if st != "good":
bad_crawl_count += count
info[st] = count
info["crawled_count"] = crawled_count
info["bad_crawl_count"] = bad_crawl_count
baclink_cout = 0
if "backlink" in info:
backlink_count = info["backlink"]
good_prob= 0
if crawled_count > 0:
good_prob = goodcount / crawled_count
info["good_prob"] = good_prob
print(">>>Domain Content")
contentcol = db["content"]
res = contentcol.aggregate([
{"$match":{"host":hostname}},
#{"$project": {"textsum":{"$sum":"$text_size"}}}
{"$group":{"_id":None,
"text_size_sum":{"$sum":"$text_size"},
}
},
])
text_size = 0
for item in res:
text_size = item["text_size_sum"]
good_document_characters = 0
fetch_average_characters = 0
if goodcount > 0:
good_document_characters = text_size / goodcount
fetch_average_characters = text_size / crawled_count
info["total_good_characters"] = text_size
info["average_good_characters"] = good_document_characters
info["average_fetch_characters"] = fetch_average_characters
domaincol = db["domains"]
domaincol.update_one({"host":hostname},{"$set":info},upsert=True)
res = domaincol.find_one({"host":hostname})
print(res)
def sample_links(db,hostname,status,batch_size):
print("Sampling links")
linkcol = db["links"]
res = linkcol.find({"host":hostname,"status": {"$not":{"$in":["frontlink","backlink"]}}})
cl = LinkClassifier()
crawled_links = list(res)
crawled_count = len(crawled_links)
prediction_accuracy = 0
if crawled_count > CLASSIFIER_SET_SIZE:
# train on crawled links
trainset,testset = split_train(crawled_links)
cl.train(trainset)
prediction_accuracy = cl.test(testset)
sample_set_size = SAMPLE_SET_SIZE
res = linkcol.find({"host":hostname,"status": status})
predicted_good = 0
visitcounter = collections.Counter()
good_links = []
discover_links = []
for item in res:
link = item["url"]
cll = cl.classify(link)
if cll > 0:
good_links.append(link)
features = get_link_features(link)
discover_links.append(link)
if features is None:
continue
for feature in features:
visitcounter[feature] += 1
mls = int(min(batch_size*(1- DISCOVER_LINK_RATIO),len(good_links)))
random.shuffle(good_links)
links = list(good_links[0:mls])
numdiscover = len(discover_links)
eval_discover_links = []
for link in discover_links:
features = get_link_features(link)
prob = 0
if features is not None:
for feature in features:
c = visitcounter[feature]
prob -= math.log(c) / c
eval_discover_links.append((link,prob))
eval_discover_links.sort(key=lambda x: x[1],reverse=True)
#print(eval_discover_links)
mls = int(min(batch_size * DISCOVER_LINK_RATIO,len(eval_discover_links)))
links += [l[0] for l in eval_discover_links[0:mls]]
return list(set(links))
def domain_summary(db,hostname):
linkcol = db["links"]
#res = linkcol.distinct("hostname",{"hostname":hostname})
# count links
res = linkcol.aggregate([
{"$group":{"_id":"$hostname","text_size_sum":{"$sum":"$text_size"}}},
])
for item in res:
print(item)
def createdb():
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
linkcol = db["links"]
linkcol.create_index("url",unique=True)
linkcol.create_index("host")
contentcol = db["content"]
contentcol.create_index("url")
contentcol.create_index("text_md5",unique=True)
#contentcol.create_index({"paragraph_checksums":1})
contentcol.create_index("host")
htmlcol = db["html"]
htmlcol.create_index("url")
htmlcol.create_index("html_md5",unique=True)
domaincol = db["domains"]
domaincol.create_index("host",unique=True)
domaincol.create_index(("average_fetch_characters",pymongo.DESCENDING))
batchcol = db["batches"]
batchcol.create_index("host")
batchcol.create_index("created_at")
def parseurl(link):
link,hostname = courlan.check_url(link)
rawrules = trafilatura.fetch_url("https://"+ hostname + "/robots.txt")
print(rawrules)
rules = urllib.robotparser.RobotFileParser()
rules.parse(rawrules.split("\n"))
print(rules.can_fetch("*",link))
print(rules.site_maps())
print(rules.crawl_delay("*"))
html = trafilatura.fetch_url(link,decode=True)
get_bs_links(link,html)
doc = trafilatura.bare_extraction(html)
import pprint
pprint.pprint(doc)
internal_links, external_links = get_bs_links(link,html)
print(internal_links)
print(external_links)
def externaldomains(link):
html = trafilatura.fetch_url(link,decode=True)
external_links = courlan.extract_links(html,link,external_bool=True,language=LANGUAGE)
domains = set()
for l in external_links:
r = courlan.check_url(l)
if r is None:
pass
link,domain = r
domains.add(domain)
for d in domains:
print(d)
def classify(start_link):
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
start_link,hostname = courlan.check_url(start_link)
cl = LinkClassifier()
linkcol = db["links"]
res = linkcol.find({"host":hostname,"status": {"$not":{"$in":["frontlink","backlink"]}}})
trainset, testset = split_train(res)
cl.train(trainset)
cl.test(testset)
def visit(hostname,filter_content=True):
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
batch_size = BATCH_SIZE
rules = fetch_robot(hostname)
start_link = "https://" + hostname
# renew front links
front_links = fetch_front_links(start_link,rules)
index_links(db,front_links)
# start crawling
# frontlinks first
links = sample_links(db,hostname,"frontlink",batch_size)
if start_link not in links:
links.insert(0,start_link)
print("sampled")
print(links)
# index results
print("Processing links")
responses = []
for link in links:
responses.append(fetch_page(link))
extracted_pages = []
for original_link,(final_link,html) in zip(links,responses):
doc = None
assert original_link is not None
doc = extract_page(final_link,html)
extracted_pages.append((original_link,final_link,html,doc))
extracted_links = extract_links(links,responses,hostname,rules,"frontlink")
index_links(db,extracted_links)
final_states = []
docs = []
for original_link,final_link,html,doc in extracted_pages:
status = index_page(db,original_link,final_link,html,doc,filter_content)
final_states.append(status)
docs.append(doc)
save_batch_info(db,hostname,final_states,docs)
link_summary(db,hostname)
def crawl_summary():
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
batchcol = db["batches"]
yesterday = datetime.datetime.today() - datetime.timedelta(days=1)
print(yesterday)
res = batchcol.aggregate([
{"$match":{"created_at":{"$lt": yesterday.utcnow()}}},
{"$group":{"_id":"$host",
"document_count":{"$sum":"$document_count"},
"good_document_count":{"$sum":"$good_document_count"},
"batch_size":{"$sum":"$batch_size"},
"original_text_size":{"$sum":"$original_text_size"},
}
},
{"$sort":{"original_text_size":-1}},
])
print(">>>> Batches")
headers = ["_id","document_count","good_document_count","batch_size","original_text_size"]
print("\t".join(headers))
for item in res:
values = [str(item[x]) for x in headers]
print("\t".join(values))
def import_html():
myclient= pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
for l in sys.stdin:
hdoc = json.loads(l)
url = hdoc["url"]
html = BeautifulSoup(binascii.a2b_qp(hdoc["quoted_html"])).prettify()
doc = extract_page(url,html)
if doc is not None:
print(doc)
status = index_page(db,url,url,html,doc)
print(status)
def sample_domains():
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
linkscol = db["links"]
# discover domains
domains = linkscol.distinct("host",filter={"status":"frontlink"})
all_domains = []
for domain in domains:
all_domains.append(domain)
sample_size = min(int(DISCOVER_LINK_RATIO* BATCH_SIZE), len(all_domains))
print(">>> Discover domains {}".format(sample_size))
sample_domains = random.sample(all_domains,sample_size)
domaincol = db["domains"]
# exploit domains
res = domaincol.find({"average_fetch_characters":{"$gt":1000}}).sort("average_fetch_characters",-1)
all_domains = []
for item in res:
all_domains.append(item["host"])
sample_size = min(int((1 - DISCOVER_LINK_RATIO) * BATCH_SIZE),len(all_domains))
print(">>>> Best domains {}".format(sample_size))
sample_domains += random.sample(all_domains,sample_size)
for domain in sample_domains:
print(domain)