import pymongo import pymongo.errors import trafilatura import trafilatura.feeds import trafilatura.sitemaps import trafilatura.spider import trafilatura.utils import trafilatura.external import sys import courlan import urllib from datetime import datetime as dat import datetime import click import logging as LOGGER import os import pprint import re import time import collections import math import random import hashlib from bs4 import BeautifulSoup import urllib.parse import os.path import binascii import json from config import * def get_bs_links(link,html): # Extrakcia linkov zo stranky bs = BeautifulSoup(html, "lxml") base = link if bs.base is not None and "href" in bs.base.attrs: base = bs.base["href"] base = urllib.parse.urlparse(courlan.normalize_url(base)) links = set() # Normalizacia linkov for l in bs.find_all("a", href=True): if "rel" in l.attrs and l.attrs["rel"] == "nofollow" or "nofollow" in l.attrs: continue href = l["href"] try: parsed = urllib.parse.urlparse(courlan.normalize_url(href)) netloc = parsed.netloc path = os.path.normpath(parsed.path) scheme = parsed.scheme query = parsed.query # internal link if parsed.netloc == "": scheme = base.scheme netloc = base.netloc if not parsed.path.startswith("/"): path = os.path.normpath(base.path +"/" + path) if not scheme.startswith("http"): continue if path.startswith("/"): path = path[1:] if path.endswith(")"): # javascript continue href = urllib.parse.urlunparse((scheme,netloc,path,"",query,"")) href = courlan.normalize_url(href) links.add(href) except ValueError as err: print(err) pass return links def split_train(res): trainset = [] testset = [] for i,item in enumerate(res): if i % 10 == 0: testset.append(item) else: trainset.append(item) return trainset,testset def calculate_checksums(text): """ Paragraph separation must be compatible with text extraction. Are paragraphs separated with a blank line or a white line? @return fingerprints of a paragraphs in text. Paragraphs are separated by a new line. """ checksums = [] sizes = [] hval = 0 hsz = 0 sz = 0 for c in text: cv = ord(c) sz += 1 if cv > 64: # ignore non-ascii hval += (hval << 3) + cv zv = hval >> 31 hval &= 0x7fffffff hval += zv hsz += 1 if c == "\n" and hsz > 0: if hsz > CHECK_PARAGRAPH_SIZE: checksums.append(hval) sizes.append(sz) sz = 0 hsz = 0 if hsz > CHECK_PARAGRAPH_SIZE: checksums.append(hval) sizes.append(sz) return checksums, sizes def is_robot_good(link,rules): # check robots.txt rules if rules is not None and not rules.can_fetch("*", link): return False return True def is_link_good(link): r = courlan.check_url(link,strict=True,language=LANGUAGE) if r is None: return None llink,lhostname = r paths = set(llink.split("/")) for item in STOP_PATHS: if item in paths: return None #print(llink,lhostname) # hostname rules if not lhostname.endswith(DOMAIN): LOGGER.debug("bad hostname") return None if courlan.is_not_crawlable(llink): LOGGER.debug("not crawlable") return None return llink def get_link_doc(link:str,status="frontlink")->dict: parsed = urllib.parse.urlparse(courlan.normalize_url(link)) url = urllib.parse.urlunparse(parsed) tokens = parsed.netloc.split(".") domain = tokens[-2] + "." + tokens[-1] return {"url":link,"host":parsed.netloc,"domain":domain,"status":status,"created_at":dat.utcnow()} def fetch_page(link:str)->(str,str): print("fetching:::::") print(link) final_link = link response = trafilatura.fetch_response(link,decode=False) time.sleep(2) html = None if response is not None : good = True if response.status != 200: good = False LOGGER.error('not a 200 response: %s for URL %s', response.status, link) elif response.data is None or len(response.data) < MIN_FILE_SIZE: LOGGER.error('too small/incorrect for URL %s', link) good = False # raise error instead? elif len(response.data) > MAX_FILE_SIZE: good = False LOGGER.error('too large: length %s for URL %s', len(response.data), link) if good: html = trafilatura.utils.decode_file(response.data) if html is not None: html, final_link = trafilatura.spider.refresh_detection(html, final_link) # is there a meta-refresh on the page? if final_link is None: # malformed or malicious content html = None final_link = courlan.normalize_url(final_link) return final_link,html def fetch_robot(base_url:str)->urllib.robotparser.RobotFileParser: try: rawrules = trafilatura.fetch_url("https://"+ base_url + "/robots.txt") #print(rawrules) rules = urllib.robotparser.RobotFileParser() rules.parse(rawrules.split("\n")) LOGGER.info('got robots') except Exception as exc: LOGGER.error('cannot read robots.txt: %s', exc) rules = None # exceptions happening here return rules def extract_page(final_link,html): doc = None if html is not None: doc = trafilatura.bare_extraction(html,url=final_link,with_metadata=True,include_formatting=False,target_language=LANGUAGE,favor_precision=True) if doc is not None: lines = doc["text"].split("\n") # filter out tables good_lines = [] for line in lines: if line.startswith("|") or line.startswith("1 2 3 4") or line.startswith("12345"): continue good_lines.append(line) doc["text"] = "\n".join(good_lines) # text too small if not "text" in doc or len(doc["text"]) < MIN_TEXT_SIZE: doc = None return doc def set_content_checksums(doc): text = doc["text"] checksums,sizes = calculate_checksums(text) doc["text_size"] = len(text) doc["text_md5"] = hashlib.md5(text.encode("utf8")).hexdigest() doc["paragraph_checksums"] = checksums doc["paragraph_sizes"] = sizes doc["paragraph_sizes_sum"] = sum(sizes) end_sentence_marker = re.compile("\w[\.]") sentences = 0 for item in re.finditer(end_sentence_marker,text): t = item.group(0) if t[0].islower(): sentences += 1 doc["sentences_count"] = sentences def index_page(db,original_link:str,html:bytes,doc,filter_content=True): linkcol = db["links"] htmlcol = db["html"] contentcol = db["content"] checkcol = db["check"] state = "good" link = original_link if html is None: state = "html_error" elif doc is None: state = "content_error" if doc is not None: if original_link != doc["url"]: linkcol.update_one({"url":original_link},{"$set":{"status":"redirect"}}) link = doc["url"] set_content_checksums(doc) tsz = doc["text_size"] psz = doc["paragraph_sizes_sum"] if filter_content and (tsz < MIN_TEXT_SIZE or psz/tsz < TEXT_TRASH_RATIO): state = "small" # check copy if state == "good": origsz = 0 for chs,paragraph_size in zip(doc["paragraph_checksums"],doc["paragraph_sizes"]): # index paragraph checksums nd = checkcol.find_one({"_id":chs}) if nd is None: origsz += paragraph_size doc["original_text_size"] = origsz if filter_content and (1 - (origsz / tsz)) > TEXT_TRASH_RATIO: state = "copy" if state == "good": htdoc = get_link_doc(link,state) htdoc["html"] = html htdoc["html_size"] = len(html) htdoc["html_md5"]= hashlib.md5(html.encode("utf8")).hexdigest() # can be revisited - upsert del htdoc["url"] htmlcol.update_one({"url":link},{"$set":htdoc},upsert=True) doc.update(get_link_doc(link,"good")) # todo extract links print(link,doc) del doc["url"] contentcol.update_one({"url":link},{"$set":doc},upsert=True) for chs in doc["paragraph_checksums"]: checkcol.update_one({"_id":chs},{"$inc":{"count":1}},upsert=True) linkdoc = get_link_doc(link,state) del linkdoc["url"] linkcol.update_one({"url":link},{"$set":linkdoc}) return state def save_batch_info(db,host,states,docs): good_document_count = 0 original_text_size = 0 batch_size = 0 d = host.split(".") domain = d[-2] + "." + d[-1] for state,doc in zip(states,docs): batch_size += 1 if state == "good": good_document_count += 1 original_text_size += doc["original_text_size"] batchdoc = { "host": host, "domain": domain, "created_at": dat.utcnow(), "good_document_count":good_document_count, "original_text_size":original_text_size, "good_prob": good_document_count / batch_size, "batch_size": batch_size, } db["batches"].insert_one(batchdoc) def extract_links(link_batch:list,responses:list,rules,default_status="frontlink")->list: links = {} badrobot = 0 for original_link,(final_link,html) in zip(link_batch,responses): status = default_status if html is None or len(html) < 256: continue page_links = get_bs_links(final_link,html) for link in page_links: if not courlan.is_external(link,final_link) and not is_robot_good(link,rules): badrobot += 1 continue status = str(default_status) links[link] = status outlinks = [] badlink = 0 for link,status in links.items(): link = is_link_good(link) if link is None: badlink += 1 continue outlinks.append((link,status)) print(f"{len(links)} total links, {badrobot} badrobot {badlink} badlinks") return outlinks def index_links(db,extracted_links): linkcol=db["links"] for link,status in extracted_links: if not is_link_good(link): continue if status == "frontlink" : doc = get_link_doc(link,status) try: linkcol.insert_one(doc) # dont overwrite except pymongo.errors.DuplicateKeyError as ex: pass else: print("updating " + link,status) linkcol.update_one({"url":link},{"$set":{"status":status,"updated_at":dat.utcnow()}}) def get_link_features(link): a, urlpath = courlan.get_host_and_path(link) features = re.split("[/?&]",urlpath) #features = re.split("[/?-_=]",urlpath) res = [] for i,feature in enumerate(features): if len(feature) < 1: continue feature = re.sub("[0-9]","*",feature) res.append(str(i)+ "-" + feature) if len(res) < 2: return None res = res[:-1] return res class LinkClassifier: def __init__(self): self.goodcounter = collections.Counter() self.badcounter = collections.Counter() self.good_count = 0 self.bad_count = 0 self.alpha = 0.001 def train(self,links): for i,item in enumerate(links): link = item["url"] state = item["status"] cl = 0 if state == "good": cl = 1 print(cl,state,link) features = get_link_features(link) if features is None: continue lf = len(features) if state == "good": for feature in features: self.good_count += 1 self.goodcounter[feature] += 1 else: for feature in features: self.bad_count += 1 self.badcounter[feature] += 1 self.bdictsize = len(self.badcounter) self.gdictsize = len(self.goodcounter) def test(self,testset): # eval gg = 0 true_positive = 0 positive = 0 false_negative = 0 for item in testset: l = item["url"] cl = 0 if item["status"] == "good": cl = 1 pcp = self.classify(l) r = 0 if pcp > 0: r = 1 if cl == 1: if r == 1: true_positive += 1 positive += 1 if r == 1 and cl == 0: false_negative += 1 if r == cl: gg += 1 else: print("MISS",l,cl,pcp) print(len(testset)) print("Precision: {}, Recall: {}".format(true_positive/positive,true_positive/(true_positive+false_negative))) print("Accuracy:") acc = gg / len(testset) print(acc) return acc def classify(self,link): if self.good_count == 0 or self.bad_count == 0: return random.uniform(-0.1,0.1) features = get_link_features(link) res = 0 gp = math.log(self.good_count) - math.log(self.good_count + self.bad_count) bp = math.log(self.bad_count) - math.log(self.good_count + self.bad_count) if features is None: return math.exp(gp) - math.exp(bp) gcc = math.log(self.gdictsize * self.alpha + self.good_count) bcc = math.log(self.bdictsize * self.alpha + self.bad_count) goodprob = 0 badprob = 0 for feature in features: g = math.log((self.goodcounter[feature] + self.alpha)) - gcc goodprob += g b = math.log(self.badcounter[feature] + self.alpha) - bcc badprob += b pa = math.exp(goodprob + gp) pb = math.exp(badprob + bp) return pa - pb #+ random.uniform(-0.001,0.001) def get_links(db,hostname,status,batch_size): linkcol = db["links"] res = linkcol.find({"host":hostname,"status":status},limit=batch_size) links = [] for item in res: links.append(item["url"]) print("Got {} {}".format(len(links),status)) return links def fetch_sitemap_links(start_link): out = [] navigation_links = trafilatura.sitemaps.sitemap_search(start_link,target_lang=LANGUAGE) for link in navigation_links: out.append((link,"frontlink")) print("Fetched {} sitemap links".format(len(out))) return out def fetch_front_links(start_link,rules): start_link,hostname = courlan.check_url(start_link) response = fetch_page(start_link) extracted_links = extract_links([start_link],[response],rules,"frontlink") print("Fetched {} frontlinks".format(len(extracted_links))) return extracted_links def link_summary(db,hostname): linkcol = db["links"] #res = linkcol.distinct("hostname",{"hostname":hostname}) res = linkcol.aggregate([ {"$match":{"host":hostname}}, {"$group":{"_id":"$status", "count":{"$sum":1}, } }, ]) badcount = 0 goodcount = 0 info = {} crawled_count = 0 bad_crawl_count = 0 for item in res: count = item["count"] st = item["_id"] print(st,count) if st == "good": goodcount += count if st != "frontlink": crawled_count += count if st != "good": bad_crawl_count += count info[st] = count info["crawled_count"] = crawled_count info["bad_crawl_count"] = bad_crawl_count baclink_cout = 0 good_prob= 0 if crawled_count > 0: good_prob = goodcount / crawled_count info["good_prob"] = good_prob print(">>>Domain Content") contentcol = db["content"] res = contentcol.aggregate([ {"$match":{"host":hostname}}, #{"$project": {"textsum":{"$sum":"$text_size"}}} {"$group":{"_id":None, "text_size_sum":{"$sum":"$text_size"}, } }, ]) text_size = 0 for item in res: text_size = item["text_size_sum"] good_document_characters = 0 fetch_average_characters = 0 if goodcount > 0: good_document_characters = text_size / goodcount fetch_average_characters = text_size / crawled_count info["total_good_characters"] = text_size info["average_good_characters"] = good_document_characters info["average_fetch_characters"] = fetch_average_characters domaincol = db["domains"] domaincol.update_one({"host":hostname},{"$set":info},upsert=True) res = domaincol.find_one({"host":hostname}) print(res) def sample_links(db,hostname,status,batch_size): linkcol = db["links"] res = linkcol.find({"host":hostname,"status": {"$not":{"$in":["frontlink"]}}}) cl = LinkClassifier() crawled_links = list(res) crawled_count = len(crawled_links) prediction_accuracy = 0 if crawled_count > CLASSIFIER_SET_SIZE: # train on crawled links trainset,testset = split_train(crawled_links) cl.train(trainset) prediction_accuracy = cl.test(testset) sample_set_size = SAMPLE_SET_SIZE res = linkcol.find({"host":hostname,"status": status}) predicted_good = 0 visitcounter = collections.Counter() good_links = [] discover_links = [] for item in res: link = item["url"] cll = cl.classify(link) if cll > 0: good_links.append(link) features = get_link_features(link) discover_links.append(link) if features is None: continue for feature in features: visitcounter[feature] += 1 mls = int(min(batch_size*(1- DISCOVER_LINK_RATIO),len(good_links))) random.shuffle(good_links) links = list(good_links[0:mls]) numdiscover = len(discover_links) eval_discover_links = [] for link in discover_links: features = get_link_features(link) prob = 0 if features is not None: for feature in features: c = visitcounter[feature] prob -= math.log(c) / c eval_discover_links.append((link,prob)) eval_discover_links.sort(key=lambda x: x[1],reverse=True) #print(eval_discover_links) mls = int(min(batch_size * DISCOVER_LINK_RATIO,len(eval_discover_links))) links += [l[0] for l in eval_discover_links[0:mls]] return list(set(links)) def domain_summary(db,hostname): linkcol = db["links"] #res = linkcol.distinct("hostname",{"hostname":hostname}) # count links res = linkcol.aggregate([ {"$group":{"_id":"$hostname","text_size_sum":{"$sum":"$text_size"}}}, ]) for item in res: print(item) def dropdb(): myclient = pymongo.MongoClient(CONNECTION) print("write name of database to drop") dbname = sys.stdin.readline().strip() myclient.drop_database(dbname) def createdb(): myclient = pymongo.MongoClient(CONNECTION) db=myclient[DBNAME] linkcol = db["links"] linkcol.create_index("url",unique=True) linkcol.create_index("host") contentcol = db["content"] contentcol.create_index("url") contentcol.create_index("text_md5",unique=True) #contentcol.create_index({"paragraph_checksums":1}) contentcol.create_index("host") htmlcol = db["html"] htmlcol.create_index("url") htmlcol.create_index("html_md5",unique=True) domaincol = db["domains"] domaincol.create_index("host",unique=True) domaincol.create_index([("average_fetch_characters",pymongo.DESCENDING)]) batchcol = db["batches"] batchcol.create_index("host") batchcol.create_index("created_at") def parseurl(link): link,hostname = courlan.check_url(link) rawrules = trafilatura.fetch_url("https://"+ hostname + "/robots.txt") print(rawrules) rules = urllib.robotparser.RobotFileParser() rules.parse(rawrules.split("\n")) print(rules.can_fetch("*",link)) print(rules.site_maps()) print(rules.crawl_delay("*")) html = trafilatura.fetch_url(link,decode=True) get_bs_links(link,html) doc = extract_page(link,html) if doc is not None: import pprint pprint.pprint(doc) links = get_bs_links(link,html) print(links) def externaldomains(link): html = trafilatura.fetch_url(link,decode=True) external_links = courlan.extract_links(html,link,external_bool=True,language=LANGUAGE) domains = set() for l in external_links: r = courlan.check_url(l) if r is None: pass link,domain = r domains.add(domain) for d in domains: print(d) def classify(start_link): myclient = pymongo.MongoClient(CONNECTION) db=myclient[DBNAME] start_link,hostname = courlan.check_url(start_link) cl = LinkClassifier() linkcol = db["links"] res = linkcol.find({"host":hostname,"status": {"$not":{"$in":["frontlink"]}}}) trainset, testset = split_train(res) cl.train(trainset) cl.test(testset) def index_pages(db,hostname,extracted_pages,filter_content): final_states = [] docs = [] for original_link,html,doc in extracted_pages: status = index_page(db,original_link,html,doc,filter_content) final_states.append(status) docs.append(doc) save_batch_info(db,hostname,final_states,docs) def fetch_and_extract(links,rules): responses = [] for link in links: responses.append(fetch_page(link)) extracted_pages = [] for original_link,(final_link,html) in zip(links,responses): doc = None assert original_link is not None doc = extract_page(final_link,html) extracted_pages.append((original_link,html,doc)) extracted_links = extract_links(links,responses,rules,"frontlink") return extracted_pages, extracted_links def visit(hostname,filter_content=True): myclient = pymongo.MongoClient(CONNECTION) db=myclient[DBNAME] batch_size = BATCH_SIZE rules = fetch_robot(hostname) start_link = "https://" + hostname # renew front links front_links = fetch_front_links(start_link,rules) index_links(db,front_links) # start crawling # frontlinks first links = sample_links(db,hostname,"frontlink",batch_size) if start_link not in links: links.insert(0,start_link) print("sampled") print(links) # index results print("Processing links") extracted_pages, extracted_links = fetch_and_extract(links,rules) extracted_links = extract_links(links,responses,hostname,rules,"frontlink") index_links(db,extracted_links) final_states = [] docs = [] for original_link,final_link,html,doc in extracted_pages: status = index_page(db,original_link,final_link,html,doc,filter_content) final_states.append(status) docs.append(doc) save_batch_info(db,hostname,final_states,docs) index_pages(db,hostname,extracted_pages,filter_content) index_links(db, extracted_links) link_summary(db,hostname) def crawl_summary(): myclient = pymongo.MongoClient(CONNECTION) db=myclient[DBNAME] contentcol = db["content"] res = contentcol.aggregate([ {"$group":{"_id":None,"total_text_size":{"$sum":"$text_size"}}} ]) print(">>>>> Total text size in content") for item in res: print(item) linkscol = db["links"] # find counts of link statuses res = linkscol.aggregate([ {"$group":{"_id":"$status","count":{"$sum":1}}} ]) print(">>>>> Link status counts") for item in res: print(item["_id"],item["count"]) batchcol = db["batches"] yesterday = datetime.datetime.today() - datetime.timedelta(days=1) print(yesterday) res = batchcol.aggregate([ {"$match":{"created_at":{"$lt": yesterday.utcnow()}}}, {"$group":{"_id":"$host", "document_count":{"$sum":"$document_count"}, "good_document_count":{"$sum":"$good_document_count"}, "batch_size":{"$sum":"$batch_size"}, "original_text_size":{"$sum":"$original_text_size"}, } }, {"$sort":{"original_text_size":-1}}, {"$limit":100}, ]) print(">>>> Batches") headers = ["_id","document_count","good_document_count","batch_size","original_text_size"] print("\t".join(headers)) for item in res: values = [str(item[x]) for x in headers] print("\t".join(values)) def _extr(hdoc): url = hdoc["url"] html = binascii.a2b_qp(hdoc["quoted_html"]) doc = extract_page(url,html) return doc def import_html(): myclient= pymongo.MongoClient(CONNECTION) db=myclient[DBNAME] linkscol = db["links"] buffer = [] counter = 0 for i,l in enumerate(sys.stdin): hdoc = json.loads(l) url = hdoc["url"] r = linkscol.find_one({"url":url}) if r is not None and r["status"] != "frontlink": print(">>>>" + str(i) + " copy: " + url) continue buffer.append(hdoc) if len(buffer) < 128: continue from multiprocessing import Pool outs = [] with Pool(8) as p: outs = p.map(_extr,buffer) for hdoc,doc in zip(buffer,outs): if doc is None: print("bad html" + hdoc["url"]) continue status = index_page(db,hdoc["url"], binascii.a2b_qp(hdoc["quoted_html"]),doc) counter += 1 print( ">>> " + str(counter) + " " + str(i) + " " + hdoc["url"] + " " + status) del buffer[:] def sample_domains(): myclient = pymongo.MongoClient(CONNECTION) db=myclient[DBNAME] linkscol = db["links"] # discover domains domains = linkscol.distinct("host",filter={"status":"frontlink"}) all_domains = [] for domain in domains: all_domains.append(domain) sample_size = min(int(DISCOVER_LINK_RATIO* BATCH_SIZE), len(all_domains)) print(">>> Discover domains {}".format(sample_size)) sample_domains = random.sample(all_domains,sample_size) domaincol = db["domains"] # exploit domains res = domaincol.find({"average_fetch_characters":{"$gt":1000}}).sort("average_fetch_characters",-1) all_domains = [] for item in res: all_domains.append(item["host"]) sample_size = min(int((1 - DISCOVER_LINK_RATIO) * BATCH_SIZE),len(all_domains)) print(">>>> Best domains {}".format(sample_size)) sample_domains += random.sample(all_domains,sample_size) for domain in sample_domains: print(domain)