From 9e36952563787d833eb9944878172d6140b31537 Mon Sep 17 00:00:00 2001 From: Daniel Hladek Date: Sat, 9 May 2020 11:50:50 +0200 Subject: [PATCH] zz --- websucker/agent.py | 112 ++++++++++++++++++++++++++++++++------- websucker/cli.py | 122 ++++++++++++++++++++++++++----------------- websucker/db.py | 28 +++++++--- websucker/schema.sql | 5 ++ 4 files changed, 195 insertions(+), 72 deletions(-) diff --git a/websucker/agent.py b/websucker/agent.py index 2d2f718..de1efd5 100755 --- a/websucker/agent.py +++ b/websucker/agent.py @@ -14,6 +14,7 @@ import bs4 import pycurl import urllib.robotparser +import collections from websucker.parser import normalize_link,urlunparse @@ -99,6 +100,7 @@ class Response: class Connection: def __init__(self): + self.useragent = "Googlebot-News" self.c = pycurl.Curl() self.c.setopt(self.c.FOLLOWLOCATION, True) # self.c.setopt(self.c.VERBOSE, True) @@ -108,7 +110,7 @@ class Connection: self.c.setopt(self.c.HTTPHEADER, [ 'Accept: text/html', 'Accept-Charset: UTF-8']) self.c.setopt(self.c.HEADERFUNCTION, self.header) - self.c.setopt(self.c.USERAGENT, "Googlebot-News") + self.c.setopt(self.c.USERAGENT,self.useragent ) # #self.c.setopt(pycurl.COOKIEJAR, 'cookie.txt') # #self.c.setopt(pycurl.COOKIEFILE, 'cookie.txt') self.robots = {} @@ -138,6 +140,18 @@ class Connection: # Pycurl potom vyhodi 23, failed writing header return 0 + def crawl_delay(self,domain): + self.cache_robot(domain) + delay = 4 + if domain in self.robots: + r = self.robots[domain] + if r is not None: + d = r.crawl_delay(self.useragent) + if d is not None: + delay = d + print("Waiting for {} s".format(delay)) + time.sleep(delay) + def __del__(self): self.c.close() @@ -189,6 +203,18 @@ class Connection: link_status = "bad_type" elif errno == 22: link_status = "bad_httpcode" + elif errno == 28: + # 28 je connection timeout + link_status = "bad_connection" + elif errno == 60: + # 60 bad ssl certificate + link_status = "bad_connection" + elif errno == 16: + # 16 HTTP2 + link_status = "bad_connection" + elif errno == 6: + # 60 Unable to resolve dns + link_status = "bad_connection" else: raise e except UnicodeDecodeError as e: @@ -218,11 +244,9 @@ class Connection: break return responses - def is_robot_good(self, url): - schema, domain, path, query = normalize_link(url) - res = True + def cache_robot(self,domain): if domain not in self.robots: - roboturl = urlunparse((schema, domain, "robots.txt", "")) + roboturl = urlunparse(("https", domain, "robots.txt", "")) try: r = self._download(roboturl) if r[1] == "good": @@ -234,6 +258,11 @@ class Connection: self.robots[domain] = None except pycurl.error as err: print(err) + + def is_robot_good(self, url): + schema, domain, path, query = normalize_link(url) + self.cache_robot(domain) + res = True if domain in self.robots and self.robots[domain] is not None: res = self.robots[domain].can_fetch("Agent", url) return res @@ -328,22 +357,45 @@ def get_domains(arg): domains = arg.split(",") return domains +def parse_and_index(work_link,parser,responses,db): + target_link = work_link + links = [] + if len(responses) > 0: + db.index_responses(work_link,responses) + lr = responses[-1] + if lr.content is not None: + target_link = lr.get_canonical() + parsed = ParsedDocument(parser,target_link) + parsed.extract(lr.content, lr.bs) + db.index_content(target_link,parsed) + links = parsed.get_links() + return target_link,links + +def visit_sitemap(domain,connection,parser,db): + link = "http://" + domain + responses = connection.html_download2(link) + if len(responses) == 0: + return False + lr = responses[-1] + if lr.link_status.startswith("bad_"): + return False + + target_link,outlinks = parse_and_index(link,parser,responses,db) + if len(outlinks) > 0: + db.index_follow_links(parser,outlinks,connection) + return True + + def visit_links(links,connection,parser,db): outlinks = [] for work_link in links: responses = [] if parser.is_link_good(work_link) and connection.is_robot_good(work_link): responses = connection.html_download2(work_link) - time.sleep(4) - db.index_responses(work_link,responses) - if len(responses) > 0: - lr = responses[-1] - if lr.content is not None: - target_link = lr.get_canonical() - parsed = ParsedDocument(parser,target_link) - parsed.extract(lr.content, lr.bs) - db.index_content(target_link,parsed) - outlinks += parsed.get_links() + target_link,links = parse_and_index(work_link,parser,responses,db) + nl = normalize_link(target_link) + connection.crawl_delay(nl[1]) + outlinks += links if len(outlinks) > 0: db.index_follow_links(parser,outlinks,connection) @@ -352,13 +404,37 @@ def visit_domain(domain,parser,db): p = parser # Get links from frontpage # TODO Sitemap - sitemap = "http://" + domain - visit_links([sitemap],c,p,db) - db.check_domain(domain) + res = visit_sitemap(domain,c,parser,db) + if not res: + return False for i in range(p.crawl_rounds): # Visit links from frontpage links = db.get_visit_links(domain,p.recent_links,p.old_links,p.random_links) visit_links(links,c,p,db) db.check_domain(domain) + return True +def process_domains(domains,visit,parser,db,queue): + print("Websucker Agenda>>") + for domain in domains: + print(domain) + if queue is not None: + print("Queuing:") + for domain in domains: + print(domain) + queue.put(domain[0]) + if visit: + print("Visiting:") + for domain in domains: + print(domain) + visit_domain(domain[0],parser,db) +def work_domains(parser,db,queue): + while True: + print("Waiting for a new job:") + job = queue.reserve() + domain = job.body + queue.bury(job) + print("Visiting:") + visit_domain(domain,parser,db) + queue.delete(job) diff --git a/websucker/cli.py b/websucker/cli.py index 862cd66..94a85ec 100644 --- a/websucker/cli.py +++ b/websucker/cli.py @@ -1,4 +1,4 @@ -from websucker.agent import Connection,visit_links,visit_domain +from websucker.agent import Connection,visit_links,visit_domain,process_domains,work_domains from websucker.agent import ParsedDocument from websucker.parser import BaseParser from websucker.parser import normalize_link,urlunparse @@ -7,23 +7,37 @@ from websucker.db import Data from websucker.db import get_schema import click import pprint +import greenstalk +import os def create_database_from_context(ctx): return Data(ctx.obj["cassandra_keyspace"],ctx.obj["cassandra_host"],ctx.obj["cassandra_port"]) +def create_queue_from_context(ctx): + return greenstalk.Client(ctx.obj["beanstalkd_host"],ctx.obj["beanstalkd_port"],use=ctx.obj["beanstalkd_tube"],watch=ctx.obj["beanstalkd_tube"],encoding="utf8") + + @click.group() @click.pass_context @click.option("--cassandra-keyspace",metavar="CASSANDRA_KEYSPACE",help="cassandra keyspace (if defined, value read from CASSANDRA_KEYSPACE env variable)",envvar="CASSANDRA_KEYSPACE",default="websucker",show_default=True) @click.option("--cassandra-host",metavar="CASSANDRA_HOST",help="cassandra host (if defined, value read from CASSANDRA_HOST env variable)",envvar="CASSANDRA_HOST",default="127.0.0.1",show_default=True) @click.option("--cassandra-port",metavar="CASSANDRA_PORT",help="cassandra port (if defined, value read from CASSANDRA_PORT env variable)",envvar="CASSANDRA_PORT",default=9042,show_default=True) +@click.option("--beanstalkd-tube",metavar="BEANSTALKD_TUBE",help="beanstalkd keyspace (if defined, value read from BEANSTALKD_TUBE env variable)",envvar="BEANSTALKD_TUBE",default="websucker",show_default=True) +@click.option("--beanstalkd-host",metavar="BEANSTALKD_HOST",help="beanstalkd host (if defined, value read from beanstalkd_HOST env variable)",envvar="BEANSTALKD_HOST",default="127.0.0.1",show_default=True) +@click.option("--beanstalkd-port",metavar="BEANSTALKD_PORT",help="beanstalkd port (if defined, value read from BEANSTALKD_PORT env variable)",envvar="BEANSTALKD_PORT",default=11300,show_default=True) @click.option("--justext-language",metavar="JUSTEXT_LANGUAGE",help="Target language (if defined, value read from JUSTEXT_LANGUAGE env variable)",envvar="JUSTEXT_LANGUAGE",default="English",show_default=True) @click.option("--parser",metavar="file_name",help="zzz") @click.option("--visit",is_flag=True) -def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,justext_language,parser,visit): +@click.option("--queue",is_flag=True) +def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,beanstalkd_tube,beanstalkd_host,beanstalkd_port,justext_language,parser,visit,queue): ctx.ensure_object(dict) p = BaseParser() p.justext_language = justext_language + + suckerfile = os.getcwd() + "/Suckerfile.py" + if os.path.isfile(suckerfile): + parser = suckerfile if parser is not None: p = load_parser(parser) assert p is not None @@ -31,21 +45,66 @@ def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,justext_language,pa ctx.obj["cassandra_host"] = cassandra_host ctx.obj["cassandra_port"] = cassandra_port ctx.obj["cassandra_keyspace"] = cassandra_keyspace + ctx.obj["beanstalkd_host"] = beanstalkd_host + ctx.obj["beanstalkd_port"] = beanstalkd_port + ctx.obj["beanstalkd_tube"] = beanstalkd_tube ctx.obj["visit"] = visit + ctx.obj["queue"] = queue -@cli.command(help="Print domains") +@cli.command(help="All domains") @click.pass_context @click.argument("count",type=int,default=20) def all(ctx,count): - p = ctx.obj["parser"] - c = Connection() db = create_database_from_context(ctx) res = db.all_domains(count) - for row in res: - print(",".join(map(str,row))) - if ctx.obj["visit"]: - visit_domain(row[0],p,db) + q = None + if ctx.obj["queue"]: + q = create_queue_from_context(ctx) + process_domains(res,ctx.obj["visit"],ctx.obj["parser"],db,q) + +@cli.command(help="Work queue") +@click.pass_context +def work(ctx): + db = create_database_from_context(ctx) + q = create_queue_from_context(ctx) + work_domains(ctx.obj["parser"],db,q) + + +@cli.command(help="find best domains") +@click.pass_context +@click.argument("count",type=int,default=20) +#@click.option("visit",is_flag=True) +def best(ctx, count): + db = create_database_from_context(ctx) + domains = db.get_best_domains(count) + q = None + if ctx.obj["queue"]: + q = create_queue_from_context(ctx) + process_domains(domains,ctx.obj["visit"],ctx.obj["parser"],db,q) + + +@cli.command(help="Find unvisited domains, Visit a site, get links and crawl") +@click.pass_context +@click.argument("count",type=int,default=20) +def unvisited(ctx, count): + db = create_database_from_context(ctx) + domains = db.get_unvisited_domains(count) + + q = None + if ctx.obj["queue"]: + q = create_queue_from_context(ctx) + process_domains(domains,ctx.obj["visit"],ctx.obj["parser"],db,q) + +@cli.command(help="Visit url and get links. Start here") +@click.pass_context +@click.argument("link") +def start(ctx, link): + db = create_database_from_context(ctx) + p = ctx.obj["parser"] + c = Connection() + visit_links([link],c,p,db) + db.check_domain(domain) @cli.command(help="Continue crawling of seen links from a domain") @click.pass_context @@ -58,44 +117,6 @@ def crawl(ctx, domain): visit_links(links,c,p,db) db.check_domain(domain) -@cli.command(help="find best domains") -@click.pass_context -@click.argument("count",type=int,default=20) -#@click.option("visit",is_flag=True) -def best(ctx, count): - db = create_database_from_context(ctx) - p = ctx.obj["parser"] - domains = db.get_best_domains(count) - for domain,gr in domains: - print(domain,gr) - if ctx.obj["visit"]: - visit_domain(domain,p,db) - - -@cli.command(help="Find unvisited domains, Visit a site, get links and crawl") -@click.pass_context -@click.argument("count",type=int,default=20) -def unvisited(ctx, count): - db = create_database_from_context(ctx) - p = ctx.obj["parser"] - c = Connection() - domains = db.get_unvisited_domains(count) - for domain in domains: - print(domain) - if ctx.obj["visit"]: - visit_domain(domain,p,db) - -@cli.command(help="Visit url, get links and crawl. Start here") -@click.pass_context -@click.argument("link") -def visit(ctx, link): - db = create_database_from_context(ctx) - p = ctx.obj["parser"] - c = Connection() - nl = normalize_link(link) - domain=nl[1] - visit_domain(domain,p,db) - @cli.command(help="Update domain statistics") @click.pass_context @click.argument("domain") @@ -109,7 +130,12 @@ def check(ctx,domain): def report(ctx): db = create_database_from_context(ctx) db.daily_report() - + if ctx.obj["queue"]: + q = create_queue_from_context(ctx) + stats = q.stats_tube(ctx.obj["beanstalkd_tube"]) + buried = stats["current-jobs-buried"] + ready = stats["current-jobs-buried"] + print("{} ready jobs, {} burried jobs".format(ready,buried)) @cli.command(help="Print keyspace schema") def schema(): schema = get_schema() diff --git a/websucker/db.py b/websucker/db.py index 5a82681..83116d6 100644 --- a/websucker/db.py +++ b/websucker/db.py @@ -162,13 +162,28 @@ INSERT INTO content( self.session.execute(self.index_response_insert_html,d) def daily_report(self): - rows = self.session.execute(self.daily_links_select) + #rows = self.session.execute(self.daily_links_select) + rows = self.session.execute("SELECT domain_name,count(link_status) FROM daily_links WHERE day=toDate(now()) GROUP BY day,domain_name") + domains = [] for row in rows: - print(row[0],row[1],row[2]) + domains.append(list(row)) + total_count = 0 + total_size = 0 + for domain,count in sorted(domains,key=lambda x:x[1]): + total_count += count + rows = self.session.execute("SELECT link_status,count(link_status),sum(body_size) FROM daily_links WHERE day=toDate(now()) AND domain_name=%s GROUP BY day,domain_name,link_status",(domain,)) + gc = 0 + bs = 0 + for row in rows: + if row[0] == "good": + gc = row[1] + bs = row[2] + total_size += bs + print(domain,gc/count,bs,count) + print("{} domains, {} documents, {} characters ".format(len(domains),total_count,total_size)) def index_follow_links(self,parser,links,connection): # Index seen links - follow_links = set() for l in links: if parser.is_link_good(l): @@ -362,6 +377,7 @@ INSERT INTO content( domain) if fetched_count > 0 or seen_count > 0: self.session.execute(self.domain_quality_update,uv) + print(uv) return average_fetched_good_characters def all_domains(self,count): @@ -395,13 +411,13 @@ INSERT INTO content( gain_ratio = row[3] afg = row[4] if seen_count and fetched_count and gain_ratio: - domains.append((domain,gain_ratio)) + domains.append(list(row)) l = len(domains) ss = min(l,count) res = [] if ss > 0: # sort according to ratio - res = list(sorted(domains,key=lambda x:x[1],reverse=True))[0:ss] + res = list(sorted(domains,key=lambda x:x[3],reverse=True))[0:ss] # returns sorted list of tuples domain,gain_ratio return res @@ -416,7 +432,7 @@ INSERT INTO content( gain_ratio = row[3] afg = row[4] if seen_count and not fetched_count: - domains.append(domain) + domains.append(row) ss = min(len(domains),count) return random.sample(domains,ss) diff --git a/websucker/schema.sql b/websucker/schema.sql index 5b54909..783757b 100644 --- a/websucker/schema.sql +++ b/websucker/schema.sql @@ -92,3 +92,8 @@ CREATE TABLE html ( PRIMARY KEY(day,domain_name,source_link) ); +CREATE TABLE domain_connections ( + domain_name TEXT, + linked_domain TEXT, + PRIMARY KEY (domain_name,linked_domain) +);