websucker-pip/mongo/mongocwarler.py
2023-04-04 14:04:33 +02:00

578 lines
18 KiB
Python

import pymongo
import pymongo.errors
import trafilatura
import trafilatura.feeds
import trafilatura.sitemaps
import trafilatura.spider
import trafilatura.utils
import trafilatura.external
import sys
import courlan
import urllib
from datetime import datetime
import click
import logging as LOGGER
import os
import pprint
import re
import time
import collections
import math
LANGUAGE= os.getenv("SUCKER_LANGUAGE","sk")
DOMAIN = os.getenv("SUCKER_DOMAIN","sk")
BATCHSIZE=os.getenv("SUCKER_BATCHSIZE",100)
CONNECTION=os.getenv("SUCKER_CONNECTION","mongodb://root:example@localhost:27017/")
DBNAME=os.getenv("SUCKER_DBNAME","crawler")
MINFILESIZE=300
MAXFILESIZE=10000000
MINTEXTSIZE=200
CHECK_PARAGRAPH_SIZE=150
TEXT_TRASH_SIZE=200
TEXT_TRASH_RATIO=0.6
def put_queue(db,channel,message):
queuecol = db["queue"]
queuecol.insert_one({"channel":channel,"message":message,"created_at":datetime.utcnow(),"started_at":None})
def reserve_queue(db,channel,message):
queuecol = db["queue"]
r = queuecol.find_one_and_delete({"channel":channel},sort={"created_at":-1})
def delete_queue(db,channel):
queuecol = db["queue"]
pass
def calculate_checksums(text):
"""
@return fingerprints of a paragraphs in text. Paragraphs are separated by a blank line
"""
checksums = []
sizes = []
hval = 0
hsz = 0
sz = 0
for c in text:
cv = ord(c)
sz += 1
if cv > 64:
hval += (hval << 3) + cv
zv = hval >> 31
hval &= 0x7fffffff
hval += zv
hsz += 1
if c == "\n" and hsz > 0:
if hsz > CHECK_PARAGRAPH_SIZE:
checksums.append(hval)
sizes.append(sz)
sz = 0
hsz = 0
if hsz > CHECK_PARAGRAPH_SIZE:
checksums.append(hval)
sizes.append(sz)
return checksums, sizes
def is_robot_good(link,rules):
# check robots.txt rules
if rules is not None and not rules.can_fetch("*", link):
return False
return True
def is_link_good(link):
r = courlan.check_url(link,strict=True,language=LANGUAGE)
if r is None:
return None
llink,lhostname = r
#print(llink,lhostname)
# hostname rules
if not lhostname.endswith(DOMAIN):
LOGGER.debug("bad hostname")
return None
if courlan.is_not_crawlable(llink):
LOGGER.debug("not crawlable")
return None
return llink
def get_link_doc(link,status="frontlink"):
r = courlan.check_url(link)
assert r is not None
link,host = r
domain = courlan.extract_domain(link)
return {"url":link,"host":host,"domain":domain,"status":status,"created_at":datetime.utcnow()}
def fetch_pages(link_batch):
htmls = []
#print(link_batch)
#print("zzzzzzzzzz")
for link in link_batch:
print("fetching:::::")
print(link)
final_link = link
response = trafilatura.fetch_url(link,decode=False)
time.sleep(2)
html = None
if response is not None :
good = True
if response.status != 200:
good = False
LOGGER.error('not a 200 response: %s for URL %s', response.status, url)
elif response.data is None or len(response.data) < MINFILESIZE:
LOGGER.error('too small/incorrect for URL %s', link)
good = False
# raise error instead?
elif len(response.data) > MAXFILESIZE:
good = False
LOGGER.error('too large: length %s for URL %s', len(response.data), link)
if good:
html = trafilatura.utils.decode_response(response)
final_link = response.url
if html is not None:
html, final_link = trafilatura.spider.refresh_detection(html, final_link)
# is there a meta-refresh on the page?
if final_link is None: # malformed or malicious content
html = None
htmls.append((final_link,html))
return htmls
def fetch_robot(base_url):
try:
rawrules = trafilatura.fetch_url("https://"+ base_url + "/robots.txt")
#print(rawrules)
rules = urllib.robotparser.RobotFileParser()
rules.parse(rawrules.split("\n"))
LOGGER.info('got robots')
except Exception as exc:
LOGGER.error('cannot read robots.txt: %s', exc)
rules = None
# exceptions happening here
return rules
def extract_pages(link_batch,responses):
out = []
for original_link,(final_link,html) in zip(link_batch,responses):
doc = None
assert original_link is not None
if html is not None:
doc = trafilatura.bare_extraction(html,url=final_link,with_metadata=True,include_formatting=False,target_language=LANGUAGE,favor_precision=True)
if doc is not None:
if not "text" in doc or len(doc["text"]) < MINTEXTSIZE:
# text too small
doc = None
out.append((original_link,final_link,html,doc))
return out
def index_pages(db,hostname,extracted_pages):
linkcol = db["links"]
htmlcol = db["html"]
contentcol = db["content"]
checkcol = db["check"]
links = []
for original_link,final_link,html,doc in extracted_pages:
state = "good"
link = original_link
if original_link != final_link:
linkcol.update_one({"url":original_link},{"$set":{"status":"redirect"}})
link = final_link
if html is None:
state = "html_error"
elif doc is None:
state = "content_error"
if doc is not None:
text = doc["text"]
checksums,sizes = calculate_checksums(text)
doc["text_size"] = len(text)
doc["paragraph_checksums"] = checksums
doc["paragraph_sizes"] = sizes
goodsz = sum(sizes)
# Not enough larger paragraphs
if len(text) < TEXT_TRASH_SIZE or goodsz/len(text) < TEXT_TRASH_RATIO:
state = "trash"
end_sentence_marker = re.compile("\w[\.]")
sentences = 0
for item in re.finditer(end_sentence_marker,text):
t = item.group(0)
if t[0].islower():
sentences += 1
doc["sentences"] = sentences
# check copy
if state == "good":
copysz = len(text) - goodsz
for chs,paragraph_size in zip(doc["paragraph_checksums"],doc["paragraph_sizes"]):
# index paragraph checksums
nd = checkcol.find_one({"_id":chs})
if nd is not None:
copysz += paragraph_size
if (copysz / len(text)) > TEXT_TRASH_RATIO:
state = "copy"
print(copysz)
if state == "good":
htdoc = get_link_doc(link,state)
htdoc["html"] = html
htdoc["html_size"] = len(html)
# can be revisited - upsert
del htdoc["url"]
htmlcol.update_one({"url":link},{"$set":htdoc},upsert=True)
doc.update(get_link_doc(link,"good"))
# todo extract links
print(doc)
del doc["url"]
contentcol.update_one({"url":link},{"$set":doc},upsert=True)
for chs in doc["paragraph_checksums"]:
try:
checkcol.insert_one({"_id":chs})
except pymongo.errors.DuplicateKeyError as err:
pass
linkcol.update_one({"url":original_link},{"$set":{"status":state}})
def extract_links(link_batch,responses,hostname,rules,default_status="frontlink"):
links = {}
badrobot = 0
for original_link,(final_link,html) in zip(link_batch,responses):
status = default_status
external_links = courlan.extract_links(html,final_link,external_bool=True,language=LANGUAGE)
for link in external_links:
links[link] = "frontlink"
internal_links = courlan.extract_links(html,final_link,external_bool=False,language=LANGUAGE)
#print(extracted_links)
for link in internal_links:
if not is_robot_good(link,rules):
badrobot += 1
continue
status = str(default_status)
#print(link,status)
links[link] = status
outlinks = []
badlink = 0
for link,status in links.items():
link = is_link_good(link)
if link is None:
badlink += 1
continue
outlinks.append((link,status))
print(f"{len(links)} total links, {badrobot} badrobot {badlink} badlinks")
return outlinks
def index_links(db,extracted_links):
linkcol=db["links"]
for link,status in extracted_links:
if not is_link_good(link):
continue
doc = get_link_doc(link,status)
try:
linkcol.insert_one(doc)
except pymongo.errors.DuplicateKeyError as ex:
pass
def get_link_features(link):
a, urlpath = courlan.get_host_and_path(link)
features = re.split("[/?&]",urlpath)
#features = re.split("[/?-_=]",urlpath)
res = []
for feature in features:
if len(feature) < 1:
continue
if feature.isdigit():
feature = "<NUM>"
res.append(feature)
if len(res) < 2:
return None
res = res[:-1]
print(res)
return res
class LinkClassifier:
def __init__(self):
self.goodcounter = collections.Counter()
self.badcounter = collections.Counter()
self.good_count = 0
self.bad_count = 0
self.alpha = 0.001
def train(self,db,hostname):
linkcol = db["links"]
res = linkcol.find({"host":hostname,"status": {"$not":{"$in":["frontlink","backlink"]}}})
testset = []
for i,item in enumerate(res):
link = item["url"]
state = item["status"]
cl = 0
if state == "good":
cl = 1
print(cl,state,link)
if i % 10 == 1:
testset.append((link,cl))
continue
features = get_link_features(link)
if features is None:
continue
lf = len(features)
if state == "good":
for feature in features:
self.good_count += 1
self.goodcounter[feature] += 1
else:
for feature in features:
self.bad_count += 1
self.badcounter[feature] += 1
self.bdictsize = len(self.badcounter)
self.gdictsize = len(self.goodcounter)
# eval
gg = 0
for l,cl in testset:
pcp = self.classify(l)
r = 0
if pcp > 0:
r = 1
if r == cl:
gg += 1
else:
print("MISS",l,cl,pcp)
print("Accuracy:")
print(len(testset))
print(gg / len(testset))
def classify(self,link):
features = get_link_features(link)
res = 0
gp = math.log(self.good_count) - math.log(self.good_count + self.bad_count)
bp = math.log(self.bad_count) - math.log(self.good_count + self.bad_count)
if features is None:
return math.exp(gp) - math.exp(bp)
gcc = math.log(self.gdictsize * self.alpha + self.good_count)
bcc = math.log(self.bdictsize * self.alpha + self.bad_count)
goodprob = 0
badprob = 0
for feature in features:
g = math.log((self.goodcounter[feature] + self.alpha)) - gcc
goodprob += g
b = math.log(self.badcounter[feature] + self.alpha) - bcc
badprob += b
print(feature,g,b)
if (goodprob + gp) > (badprob + bp):
#if goodprob > badprob:
res = 1
pa = math.exp(goodprob + gp)
pb = math.exp(badprob + bp)
return pa - pb
def get_links(db,hostname,status,batch_size):
linkcol = db["links"]
# count downloaded links
res = linkcol.aggregate([
{ "$match": { "status": {"$not":{"$in":["frontlink","backlink"]}},"host":hostname } },
{"$group":{"_id":None,
"count":{"$count":{}},
}
},
])
links = set()
out = list(res)
if len(out) == 0:
return list()
if out[0]["count"] < 200:
#res = linkcol.find({"status":status,"host":hostname},{"url":1},limit=batch_size)
# get random links
res = linkcol.aggregate([
{ "$match": { "status": status,"host":hostname } },
{ "$sample": { "size": batch_size } }
])
for i,doc in enumerate(res):
#print(">>>>>" + status)
#print(doc);
links.add(doc["url"])
if i >= batch_size:
break
else:
cl = LinkClassifier()
cl.train(db,hostname)
res = linkcol.aggregate([
{ "$match": { "status": status,"host":hostname } },
{ "$sample": { "size": batch_size * 100 } }
])
outlinks = []
for i,doc in enumerate(res):
#print(">>>>>" + status)
#print(doc);
link = doc["url"]
outlinks.append((doc["url"],cl.classify(link)))
outlinks = sorted(outlinks, key=lambda x: x[1],reverse=True)
links = [l[0] for l in outlinks[0:batch_size]]
# todo remove very bad links
return list(links)
def fetch_sitemap_links(start_link):
out = []
navigation_links = trafilatura.sitemaps.sitemap_search(start_link,target_lang=LANGUAGE)
for link in navigation_links:
out.append((link,"frontlink"))
return out
def process_links(db,hostname,status,links=[],rules=None,batch_size=BATCHSIZE):
#print(links)
responses = fetch_pages(links)
#print(responses)
extracted_pages = extract_pages(links,responses)
#print(extracted_pages)
extracted_links = extract_links(links,responses,hostname,rules,status)
#print(extracted_links)
index_links(db,extracted_links)
index_pages(db,hostname,extracted_pages)
def link_summary(db,hostname):
linkcol = db["links"]
#res = linkcol.distinct("hostname",{"hostname":hostname})
# count links
res = linkcol.aggregate([
{"$match":{"host":hostname}},
{"$group":{"_id":"$status","count":{"$sum":1}}},
])
badcount = 0
goodcount = 0
out = ["good","frontlink","backlink"]
info = {}
for item in res:
if item["_id"] not in out:
badcount += item["count"]
if item["_id"] == "good":
goodcount = item["count"]
info[item["_id"]] = item["count"]
good_prob = goodcount / (goodcount + badcount)
info["good_prob"] = good_prob
info["bad_documents"] = badcount
print(">>>Domain Content")
contentcol = db["content"]
res = contentcol.aggregate([
{"$match":{"host":hostname}},
#{"$project": {"textsum":{"$sum":"$text_size"}}}
{"$group":{"_id":None,
"text_size_sum":{"$sum":"$text_size"},
}
},
])
text_size = 0
for item in res:
text_size = item["text_size_sum"]
good_document_characters = text_size / goodcount
fetch_average_characters = text_size / (goodcount + badcount)
info["total_good_characters"] = text_size
info["average_good_characters"] = good_document_characters
info["average_fetch_characters"] = fetch_average_characters
domaincol = db["domain"]
print(json.dumps(info))
domaincol.update_one({"host":domain},{"$set":info},usert=True)
def domain_summary(db,hostname):
linkcol = db["links"]
#res = linkcol.distinct("hostname",{"hostname":hostname})
# count links
res = linkcol.aggregate([
{"$group":{"_id":"$hostname","text_size_sum":{"$sum":"$text_size"}}},
])
for item in res:
print(item)
@click.group()
def cli():
pass
@cli.command()
def createdb():
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
linkcol = db["links"]
linkcol.create_index("url",unique=True)
linkcol.create_index("host")
contentcol = db["content"]
contentcol.create_index("url",unique=True)
#contentcol.create_index({"paragraph_checksums":1})
contentcol.create_index("host")
htmlcol = db["html"]
htmlcol.create_index("url",unique=True)
domaincol = db["domains"]
domaincol.create_index("host",unique=True)
@cli.command()
@click.argument("link")
def parseurl(link):
link,hostname = courlan.check_url(link)
rawrules = trafilatura.fetch_url("https://"+ hostname + "/robots.txt")
print(rawrules)
rules = urllib.robotparser.RobotFileParser()
rules.parse(rawrules.split("\n"))
print(rules.can_fetch("*",link))
print(rules.site_maps())
print(rules.crawl_delay("*"))
html = trafilatura.fetch_url(link,decode=True)
doc = trafilatura.bare_extraction(html)
import pprint
pprint.pprint(doc)
@cli.command()
@click.argument("link")
def externaldomains(link):
html = trafilatura.fetch_url(link,decode=True)
external_links = courlan.extract_links(html,link,external_bool=True,language=LANGUAGE)
domains = set()
for l in external_links:
r = courlan.check_url(l)
if r is None:
pass
link,domain = r
domains.add(domain)
for d in domains:
print(d)
@cli.command()
@click.argument("start_link")
def classify(start_link):
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
start_link,hostname = courlan.check_url(start_link)
cl = LinkClassifier()
cl.train(db,hostname)
@cli.command()
@click.argument("start_link")
def visit(start_link):
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
start_link,hostname = courlan.check_url(start_link)
batch_size = BATCHSIZE
print("Getting frontlinks")
links = get_links(db,hostname,"frontlink",batch_size)
print(f"Got {len(links)} frontlinks")
if len(links) < batch_size:
print("Fetching sitemap links")
sitemap_links = fetch_sitemap_links(start_link)
index_links(db,sitemap_links)
links = get_links(db,hostname,"frontlink",batch_size)
links.insert(0,start_link)
if len(links) < batch_size:
back_links = get_links(db,hostname,"backlink",batch_size - len(links))
links += back_links
print("Processing links")
rules = fetch_robot(hostname)
responses = fetch_pages(links)
extracted_pages = extract_pages(links,responses)
extracted_links = extract_links(links,responses,hostname,rules,"backlink")
index_links(db,extracted_links)
index_pages(db,hostname,extracted_pages)
link_summary(db,hostname)
if __name__ == "__main__":
cli()