websucker-pip/mongo/mongocwarler.py
2023-04-05 17:09:42 +02:00

592 lines
19 KiB
Python

import pymongo
import pymongo.errors
import trafilatura
import trafilatura.feeds
import trafilatura.sitemaps
import trafilatura.spider
import trafilatura.utils
import trafilatura.external
import sys
import courlan
import urllib
from datetime import datetime
import click
import logging as LOGGER
import os
import pprint
import re
import time
import collections
import math
import random
LANGUAGE= os.getenv("SUCKER_LANGUAGE","sk")
DOMAIN = os.getenv("SUCKER_DOMAIN","sk")
BATCHSIZE=int(os.getenv("SUCKER_BATCHSIZE","10"))
CONNECTION=os.getenv("SUCKER_CONNECTION","mongodb://root:example@localhost:27017/")
DBNAME=os.getenv("SUCKER_DBNAME","crawler")
MINFILESIZE=300
MAXFILESIZE=10000000
MINTEXTSIZE=200
CHECK_PARAGRAPH_SIZE=150
TEXT_TRASH_SIZE=200
TEXT_TRASH_RATIO=0.6
def put_queue(db,channel,message):
queuecol = db["queue"]
queuecol.insert_one({"channel":channel,"message":message,"created_at":datetime.utcnow(),"started_at":None})
def reserve_queue(db,channel,message):
queuecol = db["queue"]
r = queuecol.find_one_and_delete({"channel":channel},sort={"created_at":-1})
def delete_queue(db,channel):
queuecol = db["queue"]
pass
def calculate_checksums(text):
"""
@return fingerprints of a paragraphs in text. Paragraphs are separated by a blank line
"""
checksums = []
sizes = []
hval = 0
hsz = 0
sz = 0
for c in text:
cv = ord(c)
sz += 1
if cv > 64: # ignore non-ascii
hval += (hval << 3) + cv
zv = hval >> 31
hval &= 0x7fffffff
hval += zv
hsz += 1
if c == "\n" and hsz > 0:
if hsz > CHECK_PARAGRAPH_SIZE:
checksums.append(hval)
sizes.append(sz)
sz = 0
hsz = 0
if hsz > CHECK_PARAGRAPH_SIZE:
checksums.append(hval)
sizes.append(sz)
return checksums, sizes
def is_robot_good(link,rules):
# check robots.txt rules
if rules is not None and not rules.can_fetch("*", link):
return False
return True
def is_link_good(link):
r = courlan.check_url(link,strict=True,language=LANGUAGE)
if r is None:
return None
llink,lhostname = r
#print(llink,lhostname)
# hostname rules
if not lhostname.endswith(DOMAIN):
LOGGER.debug("bad hostname")
return None
if courlan.is_not_crawlable(llink):
LOGGER.debug("not crawlable")
return None
return llink
def get_link_doc(link,status="frontlink"):
r = courlan.check_url(link)
assert r is not None
link,host = r
domain = courlan.extract_domain(link)
return {"url":link,"host":host,"domain":domain,"status":status,"created_at":datetime.utcnow()}
def fetch_page(link):
print("fetching:::::")
print(link)
final_link = link
response = trafilatura.fetch_url(link,decode=False)
time.sleep(2)
html = None
if response is not None :
good = True
if response.status != 200:
good = False
LOGGER.error('not a 200 response: %s for URL %s', response.status, url)
elif response.data is None or len(response.data) < MINFILESIZE:
LOGGER.error('too small/incorrect for URL %s', link)
good = False
# raise error instead?
elif len(response.data) > MAXFILESIZE:
good = False
LOGGER.error('too large: length %s for URL %s', len(response.data), link)
if good:
html = trafilatura.utils.decode_response(response)
final_link = response.url
if html is not None:
html, final_link = trafilatura.spider.refresh_detection(html, final_link)
# is there a meta-refresh on the page?
if final_link is None: # malformed or malicious content
html = None
return final_link,html
def fetch_robot(base_url):
try:
rawrules = trafilatura.fetch_url("https://"+ base_url + "/robots.txt")
#print(rawrules)
rules = urllib.robotparser.RobotFileParser()
rules.parse(rawrules.split("\n"))
LOGGER.info('got robots')
except Exception as exc:
LOGGER.error('cannot read robots.txt: %s', exc)
rules = None
# exceptions happening here
return rules
def extract_pages(link_batch,responses):
out = []
for original_link,(final_link,html) in zip(link_batch,responses):
doc = None
assert original_link is not None
if html is not None:
doc = trafilatura.bare_extraction(html,url=final_link,with_metadata=True,include_formatting=False,target_language=LANGUAGE,favor_precision=True)
if doc is not None:
if not "text" in doc or len(doc["text"]) < MINTEXTSIZE:
# text too small
doc = None
out.append((original_link,final_link,html,doc))
return out
def index_pages(db,hostname,extracted_pages):
linkcol = db["links"]
htmlcol = db["html"]
contentcol = db["content"]
checkcol = db["check"]
links = []
for original_link,final_link,html,doc in extracted_pages:
state = "good"
link = original_link
if original_link != final_link:
linkcol.update_one({"url":original_link},{"$set":{"status":"redirect"}})
link = final_link
if html is None:
state = "html_error"
elif doc is None:
state = "content_error"
if doc is not None:
text = doc["text"]
checksums,sizes = calculate_checksums(text)
doc["text_size"] = len(text)
doc["paragraph_checksums"] = checksums
doc["paragraph_sizes"] = sizes
goodsz = sum(sizes)
# Not enough larger paragraphs
if len(text) < TEXT_TRASH_SIZE or goodsz/len(text) < TEXT_TRASH_RATIO:
state = "trash"
end_sentence_marker = re.compile("\w[\.]")
sentences = 0
for item in re.finditer(end_sentence_marker,text):
t = item.group(0)
if t[0].islower():
sentences += 1
doc["sentences"] = sentences
# check copy
if state == "good":
copysz = len(text) - goodsz
for chs,paragraph_size in zip(doc["paragraph_checksums"],doc["paragraph_sizes"]):
# index paragraph checksums
nd = checkcol.find_one({"_id":chs})
if nd is not None:
copysz += paragraph_size
if (copysz / len(text)) > TEXT_TRASH_RATIO:
state = "copy"
print(copysz)
if state == "good":
htdoc = get_link_doc(link,state)
htdoc["html"] = html
htdoc["html_size"] = len(html)
# can be revisited - upsert
del htdoc["url"]
htmlcol.update_one({"url":link},{"$set":htdoc},upsert=True)
doc.update(get_link_doc(link,"good"))
# todo extract links
print(doc)
del doc["url"]
contentcol.update_one({"url":link},{"$set":doc},upsert=True)
for chs in doc["paragraph_checksums"]:
try:
checkcol.insert_one({"_id":chs})
except pymongo.errors.DuplicateKeyError as err:
pass
linkcol.update_one({"url":link},{"$set":{"status":state}})
def extract_links(link_batch,responses,hostname,rules,default_status="frontlink"):
links = {}
badrobot = 0
for original_link,(final_link,html) in zip(link_batch,responses):
status = default_status
external_links = courlan.extract_links(html,final_link,external_bool=True,language=LANGUAGE)
for link in external_links:
links[link] = "frontlink"
internal_links = courlan.extract_links(html,final_link,external_bool=False,language=LANGUAGE)
#print(extracted_links)
for link in internal_links:
if not is_robot_good(link,rules):
badrobot += 1
continue
status = str(default_status)
#print(link,status)
links[link] = status
outlinks = []
badlink = 0
for link,status in links.items():
link = is_link_good(link)
if link is None:
badlink += 1
continue
outlinks.append((link,status))
print(f"{len(links)} total links, {badrobot} badrobot {badlink} badlinks")
return outlinks
def index_links(db,extracted_links):
linkcol=db["links"]
for link,status in extracted_links:
if not is_link_good(link):
continue
if status == "frontlink" or status == "backlink":
doc = get_link_doc(link,status)
try:
linkcol.insert_one(doc)
# dont overwrite
except pymongo.errors.DuplicateKeyError as ex:
pass
else:
print("updating " + link,status)
linkcol.update_one({"url":link},{"$set":{"status":status,"updated_at":datetime.utcnow()}})
def get_link_features(link):
a, urlpath = courlan.get_host_and_path(link)
features = re.split("[/?&]",urlpath)
#features = re.split("[/?-_=]",urlpath)
res = []
for i,feature in enumerate(features):
if len(feature) < 1:
continue
if feature.isdigit():
feature = "<NUM>"
res.append(str(i)+ "-" + feature)
if len(res) < 2:
return None
res = res[:-1]
print(res)
return res
class LinkClassifier:
def __init__(self):
self.goodcounter = collections.Counter()
self.badcounter = collections.Counter()
self.good_count = 0
self.bad_count = 0
self.alpha = 0.001
def train(self,links):
testset = []
for i,item in enumerate(links):
link = item["url"]
state = item["status"]
cl = 0
if state == "good":
cl = 1
print(cl,state,link)
if i % 10 == 1:
testset.append((link,cl))
continue
features = get_link_features(link)
if features is None:
continue
lf = len(features)
if state == "good":
for feature in features:
self.good_count += 1
self.goodcounter[feature] += 1
else:
for feature in features:
self.bad_count += 1
self.badcounter[feature] += 1
self.bdictsize = len(self.badcounter)
self.gdictsize = len(self.goodcounter)
# eval
gg = 0
for l,cl in testset:
pcp = self.classify(l)
r = 0
if pcp > 0:
r = 1
if r == cl:
gg += 1
else:
print("MISS",l,cl,pcp)
print("Accuracy:")
print(len(testset))
acc = gg / len(testset)
print(acc)
return acc
def classify(self,link):
if self.good_count + self.bad_count == 0:
return random.uniform(-0.1,0.1)
features = get_link_features(link)
res = 0
gp = math.log(self.good_count) - math.log(self.good_count + self.bad_count)
bp = math.log(self.bad_count) - math.log(self.good_count + self.bad_count)
if features is None:
return math.exp(gp) - math.exp(bp)
gcc = math.log(self.gdictsize * self.alpha + self.good_count)
bcc = math.log(self.bdictsize * self.alpha + self.bad_count)
goodprob = 0
badprob = 0
for feature in features:
g = math.log((self.goodcounter[feature] + self.alpha)) - gcc
goodprob += g
b = math.log(self.badcounter[feature] + self.alpha) - bcc
badprob += b
print(feature,g,b)
if (goodprob + gp) > (badprob + bp):
#if goodprob > badprob:
res = 1
pa = math.exp(goodprob + gp)
pb = math.exp(badprob + bp)
return pa - pb
def get_links(db,hostname,status,batch_size):
linkcol = db["links"]
res = linkcol.find({"host":hostname,"status":status},limit=batch_size)
links = []
for item in res:
links.append(item["url"])
print("Got {} {}".format(len(links),status))
return links
def fetch_sitemap_links(start_link):
out = []
navigation_links = trafilatura.sitemaps.sitemap_search(start_link,target_lang=LANGUAGE)
for link in navigation_links:
out.append((link,"frontlink"))
print("Fetched {} sitemap links".format(len(out)))
return out
def fetch_front_links(start_link,rules):
start_link,hostname = courlan.check_url(start_link)
response = fetch_page(start_link)
extracted_links = extract_links([start_link],[response],hostname,rules,"frontlink")
print("Fetched {} frontlinks".format(len(extracted_links)))
return extracted_links
def link_summary(db,hostname):
linkcol = db["links"]
#res = linkcol.distinct("hostname",{"hostname":hostname})
res = linkcol.aggregate([
{"$match":{"host":hostname}},
{"$group":{"_id":"$status",
"count":{"$count":{}},
}
},
])
badcount = 0
goodcount = 0
info = {}
crawled_count = 0
for item in res:
count = item["count"]
st = item["_id"]
print(st,count)
if st == "good":
goodcount += count
if st != "frontlink" and st != "backlink":
crawled_count += count
info[st] = count
baclink_cout = 0
if "backlink" in info:
backlink_count = info["backlink"]
good_prob= 0
if crawled_count > 0:
good_prob = goodcount / crawled_count
info["good_prob"] = good_prob
print(">>>Domain Content")
contentcol = db["content"]
res = contentcol.aggregate([
{"$match":{"host":hostname}},
#{"$project": {"textsum":{"$sum":"$text_size"}}}
{"$group":{"_id":None,
"text_size_sum":{"$sum":"$text_size"},
}
},
])
text_size = 0
for item in res:
text_size = item["text_size_sum"]
good_document_characters = 0
fetch_average_characters = 0
if goodcount > 0:
good_document_characters = text_size / goodcount
fetch_average_characters = text_size / crawled_count
info["total_good_characters"] = text_size
info["average_good_characters"] = good_document_characters
info["average_fetch_characters"] = fetch_average_characters
domaincol = db["domain"]
print(info)
domaincol.update_one({"host":hostname},{"$set":info},upsert=True)
def sample_links(db,hostname,status,batch_size):
print("Getting backlinks")
linkcol = db["links"]
res = linkcol.find({"host":hostname,"status": {"$not":{"$in":["frontlink","backlink"]}}})
cl = LinkClassifier()
crawled_links = list(res)
crawled_count = len(crawled_links)
min_train_size = 200
prediction_accuracy = 0
if crawled_count > min_train_size:
# train on crawled links
prediction_accuracy = cl.train(crawled_links)
sample_set_size = 10000
res = linkcol.find({"host":hostname,"status": status},limit = sample_set_size)
sample_links = []
predicted_good = 0
for item in res:
for item in res:
cll = cl.classify(item["url"])
sample_links.append((item["url"],cll))
if cll > 0:
predicted_good += 1
# TODO frontlinks are not unique!
sample_links.sort(key=lambda x: x[1],reverse=True)
predicted_good_prob = predicted_good / len(sample_links)
domaincol = db["domain"]
info = {
"predicted_good_prob":predicted_good_prob,
"prediction_accuracy": prediction_accuracy,
"crawled_count": crawled_count,
}
print(info)
domaincol.update_one({"host":hostname},{"$set":info})
links = [l[0] for l in sample_links[0:batch_size]]
return links
def domain_summary(db,hostname):
linkcol = db["links"]
#res = linkcol.distinct("hostname",{"hostname":hostname})
# count links
res = linkcol.aggregate([
{"$group":{"_id":"$hostname","text_size_sum":{"$sum":"$text_size"}}},
])
for item in res:
print(item)
@click.group()
def cli():
pass
@cli.command()
def createdb():
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
linkcol = db["links"]
linkcol.create_index("url",unique=True)
linkcol.create_index("host")
contentcol = db["content"]
contentcol.create_index("url",unique=True)
#contentcol.create_index({"paragraph_checksums":1})
contentcol.create_index("host")
htmlcol = db["html"]
htmlcol.create_index("url",unique=True)
domaincol = db["domains"]
domaincol.create_index("host",unique=True)
@cli.command()
@click.argument("link")
def parseurl(link):
link,hostname = courlan.check_url(link)
rawrules = trafilatura.fetch_url("https://"+ hostname + "/robots.txt")
print(rawrules)
rules = urllib.robotparser.RobotFileParser()
rules.parse(rawrules.split("\n"))
print(rules.can_fetch("*",link))
print(rules.site_maps())
print(rules.crawl_delay("*"))
html = trafilatura.fetch_url(link,decode=True)
doc = trafilatura.bare_extraction(html)
import pprint
pprint.pprint(doc)
@cli.command()
@click.argument("link")
def externaldomains(link):
html = trafilatura.fetch_url(link,decode=True)
external_links = courlan.extract_links(html,link,external_bool=True,language=LANGUAGE)
domains = set()
for l in external_links:
r = courlan.check_url(l)
if r is None:
pass
link,domain = r
domains.add(domain)
for d in domains:
print(d)
@cli.command()
@click.argument("start_link")
def classify(start_link):
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
start_link,hostname = courlan.check_url(start_link)
cl = LinkClassifier()
cl.train(db,hostname)
@cli.command()
@click.argument("start_link")
def visit(start_link):
myclient = pymongo.MongoClient(CONNECTION)
db=myclient[DBNAME]
start_link,hostname = courlan.check_url(start_link)
batch_size = BATCHSIZE
rules = fetch_robot(hostname)
# renew front links
sitemap_links = fetch_sitemap_links(start_link)
index_links(db,sitemap_links)
front_links = fetch_front_links(start_link,rules)
index_links(db,front_links)
# start crawling
# frontlinks first
links = sample_links(db,hostname,"frontlink",batch_size)
links.insert(0,start_link)
# then backlinks
if len(links) < batch_size:
back_links = sample_links(db,hostname,"backlink",batch_size - len(links))
links += back_links
# index results
print("Processing links")
responses = []
for link in links:
responses.append(fetch_page(link))
extracted_pages = extract_pages(links,responses)
extracted_links = extract_links(links,responses,hostname,rules,"backlink")
index_links(db,extracted_links)
index_pages(db,hostname,extracted_pages)
link_summary(db,hostname)
if __name__ == "__main__":
cli()