This commit is contained in:
Daniel Hladek 2020-05-09 11:50:50 +02:00
parent abeef76afb
commit 9e36952563
4 changed files with 195 additions and 72 deletions

View File

@ -14,6 +14,7 @@ import bs4
import pycurl
import urllib.robotparser
import collections
from websucker.parser import normalize_link,urlunparse
@ -99,6 +100,7 @@ class Response:
class Connection:
def __init__(self):
self.useragent = "Googlebot-News"
self.c = pycurl.Curl()
self.c.setopt(self.c.FOLLOWLOCATION, True)
# self.c.setopt(self.c.VERBOSE, True)
@ -108,7 +110,7 @@ class Connection:
self.c.setopt(self.c.HTTPHEADER, [
'Accept: text/html', 'Accept-Charset: UTF-8'])
self.c.setopt(self.c.HEADERFUNCTION, self.header)
self.c.setopt(self.c.USERAGENT, "Googlebot-News")
self.c.setopt(self.c.USERAGENT,self.useragent )
# #self.c.setopt(pycurl.COOKIEJAR, 'cookie.txt')
# #self.c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
self.robots = {}
@ -138,6 +140,18 @@ class Connection:
# Pycurl potom vyhodi 23, failed writing header
return 0
def crawl_delay(self,domain):
self.cache_robot(domain)
delay = 4
if domain in self.robots:
r = self.robots[domain]
if r is not None:
d = r.crawl_delay(self.useragent)
if d is not None:
delay = d
print("Waiting for {} s".format(delay))
time.sleep(delay)
def __del__(self):
self.c.close()
@ -189,6 +203,18 @@ class Connection:
link_status = "bad_type"
elif errno == 22:
link_status = "bad_httpcode"
elif errno == 28:
# 28 je connection timeout
link_status = "bad_connection"
elif errno == 60:
# 60 bad ssl certificate
link_status = "bad_connection"
elif errno == 16:
# 16 HTTP2
link_status = "bad_connection"
elif errno == 6:
# 60 Unable to resolve dns
link_status = "bad_connection"
else:
raise e
except UnicodeDecodeError as e:
@ -218,11 +244,9 @@ class Connection:
break
return responses
def is_robot_good(self, url):
schema, domain, path, query = normalize_link(url)
res = True
def cache_robot(self,domain):
if domain not in self.robots:
roboturl = urlunparse((schema, domain, "robots.txt", ""))
roboturl = urlunparse(("https", domain, "robots.txt", ""))
try:
r = self._download(roboturl)
if r[1] == "good":
@ -234,6 +258,11 @@ class Connection:
self.robots[domain] = None
except pycurl.error as err:
print(err)
def is_robot_good(self, url):
schema, domain, path, query = normalize_link(url)
self.cache_robot(domain)
res = True
if domain in self.robots and self.robots[domain] is not None:
res = self.robots[domain].can_fetch("Agent", url)
return res
@ -328,22 +357,45 @@ def get_domains(arg):
domains = arg.split(",")
return domains
def visit_links(links,connection,parser,db):
outlinks = []
for work_link in links:
responses = []
if parser.is_link_good(work_link) and connection.is_robot_good(work_link):
responses = connection.html_download2(work_link)
time.sleep(4)
db.index_responses(work_link,responses)
def parse_and_index(work_link,parser,responses,db):
target_link = work_link
links = []
if len(responses) > 0:
db.index_responses(work_link,responses)
lr = responses[-1]
if lr.content is not None:
target_link = lr.get_canonical()
parsed = ParsedDocument(parser,target_link)
parsed.extract(lr.content, lr.bs)
db.index_content(target_link,parsed)
outlinks += parsed.get_links()
links = parsed.get_links()
return target_link,links
def visit_sitemap(domain,connection,parser,db):
link = "http://" + domain
responses = connection.html_download2(link)
if len(responses) == 0:
return False
lr = responses[-1]
if lr.link_status.startswith("bad_"):
return False
target_link,outlinks = parse_and_index(link,parser,responses,db)
if len(outlinks) > 0:
db.index_follow_links(parser,outlinks,connection)
return True
def visit_links(links,connection,parser,db):
outlinks = []
for work_link in links:
responses = []
if parser.is_link_good(work_link) and connection.is_robot_good(work_link):
responses = connection.html_download2(work_link)
target_link,links = parse_and_index(work_link,parser,responses,db)
nl = normalize_link(target_link)
connection.crawl_delay(nl[1])
outlinks += links
if len(outlinks) > 0:
db.index_follow_links(parser,outlinks,connection)
@ -352,13 +404,37 @@ def visit_domain(domain,parser,db):
p = parser
# Get links from frontpage
# TODO Sitemap
sitemap = "http://" + domain
visit_links([sitemap],c,p,db)
db.check_domain(domain)
res = visit_sitemap(domain,c,parser,db)
if not res:
return False
for i in range(p.crawl_rounds):
# Visit links from frontpage
links = db.get_visit_links(domain,p.recent_links,p.old_links,p.random_links)
visit_links(links,c,p,db)
db.check_domain(domain)
return True
def process_domains(domains,visit,parser,db,queue):
print("Websucker Agenda>>")
for domain in domains:
print(domain)
if queue is not None:
print("Queuing:")
for domain in domains:
print(domain)
queue.put(domain[0])
if visit:
print("Visiting:")
for domain in domains:
print(domain)
visit_domain(domain[0],parser,db)
def work_domains(parser,db,queue):
while True:
print("Waiting for a new job:")
job = queue.reserve()
domain = job.body
queue.bury(job)
print("Visiting:")
visit_domain(domain,parser,db)
queue.delete(job)

View File

@ -1,4 +1,4 @@
from websucker.agent import Connection,visit_links,visit_domain
from websucker.agent import Connection,visit_links,visit_domain,process_domains,work_domains
from websucker.agent import ParsedDocument
from websucker.parser import BaseParser
from websucker.parser import normalize_link,urlunparse
@ -7,23 +7,37 @@ from websucker.db import Data
from websucker.db import get_schema
import click
import pprint
import greenstalk
import os
def create_database_from_context(ctx):
return Data(ctx.obj["cassandra_keyspace"],ctx.obj["cassandra_host"],ctx.obj["cassandra_port"])
def create_queue_from_context(ctx):
return greenstalk.Client(ctx.obj["beanstalkd_host"],ctx.obj["beanstalkd_port"],use=ctx.obj["beanstalkd_tube"],watch=ctx.obj["beanstalkd_tube"],encoding="utf8")
@click.group()
@click.pass_context
@click.option("--cassandra-keyspace",metavar="CASSANDRA_KEYSPACE",help="cassandra keyspace (if defined, value read from CASSANDRA_KEYSPACE env variable)",envvar="CASSANDRA_KEYSPACE",default="websucker",show_default=True)
@click.option("--cassandra-host",metavar="CASSANDRA_HOST",help="cassandra host (if defined, value read from CASSANDRA_HOST env variable)",envvar="CASSANDRA_HOST",default="127.0.0.1",show_default=True)
@click.option("--cassandra-port",metavar="CASSANDRA_PORT",help="cassandra port (if defined, value read from CASSANDRA_PORT env variable)",envvar="CASSANDRA_PORT",default=9042,show_default=True)
@click.option("--beanstalkd-tube",metavar="BEANSTALKD_TUBE",help="beanstalkd keyspace (if defined, value read from BEANSTALKD_TUBE env variable)",envvar="BEANSTALKD_TUBE",default="websucker",show_default=True)
@click.option("--beanstalkd-host",metavar="BEANSTALKD_HOST",help="beanstalkd host (if defined, value read from beanstalkd_HOST env variable)",envvar="BEANSTALKD_HOST",default="127.0.0.1",show_default=True)
@click.option("--beanstalkd-port",metavar="BEANSTALKD_PORT",help="beanstalkd port (if defined, value read from BEANSTALKD_PORT env variable)",envvar="BEANSTALKD_PORT",default=11300,show_default=True)
@click.option("--justext-language",metavar="JUSTEXT_LANGUAGE",help="Target language (if defined, value read from JUSTEXT_LANGUAGE env variable)",envvar="JUSTEXT_LANGUAGE",default="English",show_default=True)
@click.option("--parser",metavar="file_name",help="zzz")
@click.option("--visit",is_flag=True)
def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,justext_language,parser,visit):
@click.option("--queue",is_flag=True)
def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,beanstalkd_tube,beanstalkd_host,beanstalkd_port,justext_language,parser,visit,queue):
ctx.ensure_object(dict)
p = BaseParser()
p.justext_language = justext_language
suckerfile = os.getcwd() + "/Suckerfile.py"
if os.path.isfile(suckerfile):
parser = suckerfile
if parser is not None:
p = load_parser(parser)
assert p is not None
@ -31,21 +45,66 @@ def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,justext_language,pa
ctx.obj["cassandra_host"] = cassandra_host
ctx.obj["cassandra_port"] = cassandra_port
ctx.obj["cassandra_keyspace"] = cassandra_keyspace
ctx.obj["beanstalkd_host"] = beanstalkd_host
ctx.obj["beanstalkd_port"] = beanstalkd_port
ctx.obj["beanstalkd_tube"] = beanstalkd_tube
ctx.obj["visit"] = visit
ctx.obj["queue"] = queue
@cli.command(help="Print domains")
@cli.command(help="All domains")
@click.pass_context
@click.argument("count",type=int,default=20)
def all(ctx,count):
p = ctx.obj["parser"]
c = Connection()
db = create_database_from_context(ctx)
res = db.all_domains(count)
for row in res:
print(",".join(map(str,row)))
if ctx.obj["visit"]:
visit_domain(row[0],p,db)
q = None
if ctx.obj["queue"]:
q = create_queue_from_context(ctx)
process_domains(res,ctx.obj["visit"],ctx.obj["parser"],db,q)
@cli.command(help="Work queue")
@click.pass_context
def work(ctx):
db = create_database_from_context(ctx)
q = create_queue_from_context(ctx)
work_domains(ctx.obj["parser"],db,q)
@cli.command(help="find best domains")
@click.pass_context
@click.argument("count",type=int,default=20)
#@click.option("visit",is_flag=True)
def best(ctx, count):
db = create_database_from_context(ctx)
domains = db.get_best_domains(count)
q = None
if ctx.obj["queue"]:
q = create_queue_from_context(ctx)
process_domains(domains,ctx.obj["visit"],ctx.obj["parser"],db,q)
@cli.command(help="Find unvisited domains, Visit a site, get links and crawl")
@click.pass_context
@click.argument("count",type=int,default=20)
def unvisited(ctx, count):
db = create_database_from_context(ctx)
domains = db.get_unvisited_domains(count)
q = None
if ctx.obj["queue"]:
q = create_queue_from_context(ctx)
process_domains(domains,ctx.obj["visit"],ctx.obj["parser"],db,q)
@cli.command(help="Visit url and get links. Start here")
@click.pass_context
@click.argument("link")
def start(ctx, link):
db = create_database_from_context(ctx)
p = ctx.obj["parser"]
c = Connection()
visit_links([link],c,p,db)
db.check_domain(domain)
@cli.command(help="Continue crawling of seen links from a domain")
@click.pass_context
@ -58,44 +117,6 @@ def crawl(ctx, domain):
visit_links(links,c,p,db)
db.check_domain(domain)
@cli.command(help="find best domains")
@click.pass_context
@click.argument("count",type=int,default=20)
#@click.option("visit",is_flag=True)
def best(ctx, count):
db = create_database_from_context(ctx)
p = ctx.obj["parser"]
domains = db.get_best_domains(count)
for domain,gr in domains:
print(domain,gr)
if ctx.obj["visit"]:
visit_domain(domain,p,db)
@cli.command(help="Find unvisited domains, Visit a site, get links and crawl")
@click.pass_context
@click.argument("count",type=int,default=20)
def unvisited(ctx, count):
db = create_database_from_context(ctx)
p = ctx.obj["parser"]
c = Connection()
domains = db.get_unvisited_domains(count)
for domain in domains:
print(domain)
if ctx.obj["visit"]:
visit_domain(domain,p,db)
@cli.command(help="Visit url, get links and crawl. Start here")
@click.pass_context
@click.argument("link")
def visit(ctx, link):
db = create_database_from_context(ctx)
p = ctx.obj["parser"]
c = Connection()
nl = normalize_link(link)
domain=nl[1]
visit_domain(domain,p,db)
@cli.command(help="Update domain statistics")
@click.pass_context
@click.argument("domain")
@ -109,7 +130,12 @@ def check(ctx,domain):
def report(ctx):
db = create_database_from_context(ctx)
db.daily_report()
if ctx.obj["queue"]:
q = create_queue_from_context(ctx)
stats = q.stats_tube(ctx.obj["beanstalkd_tube"])
buried = stats["current-jobs-buried"]
ready = stats["current-jobs-buried"]
print("{} ready jobs, {} burried jobs".format(ready,buried))
@cli.command(help="Print keyspace schema")
def schema():
schema = get_schema()

View File

@ -162,13 +162,28 @@ INSERT INTO content(
self.session.execute(self.index_response_insert_html,d)
def daily_report(self):
rows = self.session.execute(self.daily_links_select)
#rows = self.session.execute(self.daily_links_select)
rows = self.session.execute("SELECT domain_name,count(link_status) FROM daily_links WHERE day=toDate(now()) GROUP BY day,domain_name")
domains = []
for row in rows:
print(row[0],row[1],row[2])
domains.append(list(row))
total_count = 0
total_size = 0
for domain,count in sorted(domains,key=lambda x:x[1]):
total_count += count
rows = self.session.execute("SELECT link_status,count(link_status),sum(body_size) FROM daily_links WHERE day=toDate(now()) AND domain_name=%s GROUP BY day,domain_name,link_status",(domain,))
gc = 0
bs = 0
for row in rows:
if row[0] == "good":
gc = row[1]
bs = row[2]
total_size += bs
print(domain,gc/count,bs,count)
print("{} domains, {} documents, {} characters ".format(len(domains),total_count,total_size))
def index_follow_links(self,parser,links,connection):
# Index seen links
follow_links = set()
for l in links:
if parser.is_link_good(l):
@ -362,6 +377,7 @@ INSERT INTO content(
domain)
if fetched_count > 0 or seen_count > 0:
self.session.execute(self.domain_quality_update,uv)
print(uv)
return average_fetched_good_characters
def all_domains(self,count):
@ -395,13 +411,13 @@ INSERT INTO content(
gain_ratio = row[3]
afg = row[4]
if seen_count and fetched_count and gain_ratio:
domains.append((domain,gain_ratio))
domains.append(list(row))
l = len(domains)
ss = min(l,count)
res = []
if ss > 0:
# sort according to ratio
res = list(sorted(domains,key=lambda x:x[1],reverse=True))[0:ss]
res = list(sorted(domains,key=lambda x:x[3],reverse=True))[0:ss]
# returns sorted list of tuples domain,gain_ratio
return res
@ -416,7 +432,7 @@ INSERT INTO content(
gain_ratio = row[3]
afg = row[4]
if seen_count and not fetched_count:
domains.append(domain)
domains.append(row)
ss = min(len(domains),count)
return random.sample(domains,ss)

View File

@ -92,3 +92,8 @@ CREATE TABLE html (
PRIMARY KEY(day,domain_name,source_link)
);
CREATE TABLE domain_connections (
domain_name TEXT,
linked_domain TEXT,
PRIMARY KEY (domain_name,linked_domain)
);