import cassandra import cassandra.cluster import cassandra.query import random import os import pkg_resources import datetime from websucker.parser import normalize_link,urlunparse import collections import math import json VERSION = "sucker6" def get_schema(): with pkg_resources.resource_stream(__name__,"schema.sql") as f: schema = f.read() return str(schema,encoding="utf8") class Data: """ Database of text documents """ def __init__(self,keyspace="websucker",cassandra_host="127.0.0.1",cassandra_port=9042): #print("Database {}@{}:{}".format(keyspace,cassandra_host, cassandra_port)) # execution profile ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0) profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep} self.cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles) self.session = self.cluster.connect(keyspace) self.check_document_select_query = self.session.prepare("SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=?" ) self.update_links = self.session.prepare(""" UPDATE links SET link_status = ?, redirect_target = ?, update_time = toTimestamp(now()) WHERE domain_name=? AND url_path=? AND url_query=? """) self.domain_quality_update = self.session.prepare(""" UPDATE domain_quality SET seen_count=?, good_size=?, good_count=?, good_probability=?, good_originality=?, average_good_characters=?, content_size=?, content_count=?, content_probability=?, content_originality=?, average_content_characters=?, fetched_count=?, average_fetched_good_characters=?, gain_ratio=?, update_time = toTimestamp(now()) WHERE domain_name=? AND day=toDate(now()) """) self.index_response_insert_html = self.session.prepare(""" INSERT INTO html( day, domain_name, source_link, target_link, redirect_links, status, headers, content, agent_version, update_time ) VALUES (toDate(now()),?,?,?,?,?,?,?,?,toTimestamp(now())); """) self.index_content_link_insert = self.session.prepare(""" INSERT INTO links ( url_schema, domain_name, url_path, url_query, link_status, update_time ) VALUES (?,?,?,?,'seen',?) IF NOT EXISTS """) self.daily_links_insert = self.session.prepare(""" INSERT INTO daily_links ( day, domain_name, url_path, url_query, link_status, body_size, link_originality, update_time ) VALUES (toDate(now()),?,?,?,?,?,?,toTimestamp(now())) """) self.daily_links_select = self.session.prepare(""" SELECT domain_name, link_status, count(link_status) FROM daily_links WHERE day=toDate(now()) GROUP BY domain_name,link_status """) # PArsed Content self.index_content_content_insert = self.session.prepare(""" INSERT INTO content( domain_name, target_link, links, title, description, section, authors, tags, article_published_time, text_date, body, body_size, agent_version, update_time ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?); """) self.paragraph_checksums_insert = self.session.prepare("INSERT INTO paragraph_checksums (checksum,url_hash) VALUES(?,?)") self.index_content_links_update = self.session.prepare("UPDATE links SET link_status=?, link_originality=?,body_size=?,url_schema=? WHERE domain_name=? AND url_path = ? AND url_query=? ") self.check_domain_count = self.session.prepare("select count(url_path) from links where domain_name=? and link_status = ?") self.check_domain_size = self.session.prepare("select sum(body_size),sum(link_originality) from links where domain_name=? and link_status =?") self.domains_select = self.session.prepare("SELECT domain_name,seen_count,fetched_count,gain_ratio,average_fetched_good_characters FROM domain_quality PER PARTITION LIMIT 1") def index_responses(self,source_link,responses): # Redirect links pl = normalize_link(source_link) domain = pl[1] npl = urlunparse(pl) for response in responses: tl = response.get_canonical() if npl != tl: self.update_link_status(npl,"redirect",tl) d = ( domain, npl, tl, response.redirects, response.status, response.headers, response.get_content(), VERSION, ) self.session.execute(self.index_response_insert_html,d) def summary(self,parser): gs = 0 cs = 0 fetched_documents = 0 vd = 0 unvisited_domains = 0 unvisited_junk_domains = 0 sl = 0 fd = 0 junk_domains = 0 rows = self.session.execute("SELECT domain_name,good_size,content_size, fetched_count,seen_count FROM domain_quality PER PARTITION LIMIT 1") # TODO submdomain analysis #dd = collections.defaultdict(set) second_level = set() for row in rows: domain = row[0] subdomains = domain.split(".") #d2 = subdomains[-2] + "." + subdomains[-1] if len(subdomains) > 2: d3 = ".".join(subdomains[0:-2]) second_level.add(d3) if not parser.is_domain_good(domain): junk_domains += 1 if row[1] is not None: gs += row[1] if row[2] is not None: cs += row[2] if row[3] is not None: fetched_documents += row[3] if row[4] is not None: sl += row[4] if row[3] is None or row[3] == 0: unvisited_domains += 1 if not parser.is_domain_good(domain): unvisited_junk_domains += 1 else: vd += 1 if row[4] is None or row[4] == 0: fd += 1 print("Good characters: {}".format(gs)) print("Fetched characters: {}".format(cs)) print("Fetched documents: {}".format(fetched_documents)) print("Visited domains: {}".format(vd)) print("Unvisited domains: {}".format(unvisited_domains)) print("Junk domains: {}".format(junk_domains)) print("Unvisited junk domains: {}".format(unvisited_junk_domains)) print("New links : {}".format(sl)) print("Second level domains: {}".format(len(second_level))) print("Finished domains : {}".format(fd)) #for d,sd in dd.items(): # if len(sd) > 1: # print(d + " " + ",".join(sd)) def daily_report(self): #rows = self.session.execute(self.daily_links_select) rows = self.session.execute("SELECT domain_name,count(link_status) FROM daily_links WHERE day=toDate(now()) GROUP BY day,domain_name") domains = [] for row in rows: domains.append(list(row)) total_count = 0 total_size = 0 out = [] for domain,count in domains: if count < 2: continue total_count += count rows = self.session.execute("SELECT link_status,count(link_status),sum(body_size) FROM daily_links WHERE day=toDate(now()) AND domain_name=%s GROUP BY day,domain_name,link_status",(domain,)) gc = 0 bs = 0 for row in rows: if row[0] == "good": gc = row[1] bs = row[2] total_size += bs out.append((domain,bs,gc/count,count)) print("Domain, characters,good ratio,documents") for i,value in enumerate(reversed(sorted(out,key=lambda x: x[3]))): if i < 20: print(value) #print("{},{},{},{}".format(value)) print("{} domains, {} documents, {} characters ".format(len(domains),total_count,total_size)) def update_link_status(self,link,status,redirect_target=None): pl = normalize_link(link) r = ( status, redirect_target, pl[1], pl[2], pl[3], ) res = self.session.execute(self.update_links ,r) def index_follow_links(self,parser,links,connection): # Index seen links follow_links = set() for l in links: if parser.is_link_good(l): #if connection is not None and parser.listen_robot and not connection.is_robot_good(l): # continue link = normalize_link(l,strip_query=parser.strip_query) follow_links.add(urlunparse(link)) newlinkdomains = set() newlinkcount = 0 for link in follow_links: value = [] nl = normalize_link(link) value += nl value.append(datetime.date.today()) rows = self.session.execute(self.index_content_link_insert,value) row = rows.one() if row.applied: newlinkdomains.add(nl[1]) newlinkcount += 1 for domain in newlinkdomains: self.check_domain(domain) print("{} new links, {} new domains".format(newlinkcount,len(newlinkdomains))) def index_content(self,target_link,parsed_document): nl = normalize_link(target_link) domain_name = nl[1] assert len(domain_name) > 1 pd = parsed_document body_length = 0 if pd.body is not None: body_length = len(pd.body) value = ( domain_name, target_link, pd.get_links(), pd.title, pd.description, pd.section, pd.authors, pd.tags, pd.article_published_time, pd.text_date, pd.body, body_length, VERSION, pd.current_time ) content_future = self.session.execute_async(self.index_content_content_insert,value) # result later link_status = "good" originality = 0 tsz = 0 if pd.body is None: link_status = "bad_parse" else: tsz = len(pd.body) if tsz < 300: link_status = "bad_small" if link_status == "good": futures = [] for pc,psz in zip(pd.paragraph_checksums,pd.paragraph_sizes): fut = self.session.execute_async(self.paragraph_checksums_insert,(pc,hash(nl[1] + "/" + nl[2] + "?" + nl[3]))) futures.append(fut) for fut in futures: fut.result() originality = self.check_document(pd.paragraph_checksums,pd.paragraph_sizes) if originality < 0.8: link_status = "bad_copy" self.session.execute(self.index_content_links_update,(link_status,originality,tsz,nl[0],nl[1],nl[2],nl[3])) content_future.result() print("<<<< " + link_status + " " + str(originality)) dl = ( nl[1], nl[2], nl[3], link_status, tsz, originality ) self.session.execute(self.daily_links_insert,dl) def check_document(self,paragraph_checksums,paragraph_sizes): tsz = sum(paragraph_sizes) if tsz == 0: return 0 copies = 0 futures = [] for pc,psz in zip(paragraph_checksums,paragraph_sizes): futures.append(self.session.execute_async(self.check_document_select_query,(pc,))) for fut,psz in zip(futures,paragraph_sizes): rows = fut.result() res = rows.one()[0] if res > 1: copies += psz return (tsz-copies)/tsz def check_domain(self, domain): assert len(domain) > 0 seen_count = None good_size = None good_count = None good_probability = None good_originality = None average_good_characters = None content_size = None content_count = None content_probability = None content_originality = None average_content_characters = None fetched_count = None average_fetched_good_characters = None gain_ratio = None counts = { "good":0, "bad_copy":0, "bad_small":0, "bad_httpcode":0, "bad_type":0, "bad_content":0, "bad_parse":0, "seen":0 } for k in counts.keys(): res = self.session.execute(self.check_domain_count,(domain,k)) co = res.one()[0] counts[k]= co seen_count = counts["seen"] good_count = counts["good"] content_count = counts["good"] + counts["bad_copy"] + counts["bad_small"] fetched_count = sum(counts.values()) - counts["seen"] if fetched_count > 0: content_probability = content_count / fetched_count good_probability = good_count / fetched_count sizes = { "good":0, "bad_copy":0, "bad_small":0 } originalities ={} for k in sizes.keys(): res = self.session.execute(self.check_domain_size,(domain,k)) row = res.one() co =row[0] originalities[k] = row[1] sizes[k]= co good_size = sizes["good"] content_size = sum(sizes.values()) if good_count > 0: good_originality = originalities["good"] / good_count if content_count > 0: content_originality = sum(originalities.values()) / content_count if good_count > 0: average_good_characters = good_size / good_count * good_originality average_fetched_good_characters = good_size * good_originality / fetched_count gain_ratio = average_fetched_good_characters / fetched_count if content_count > 0: average_content_characters = content_size / content_count #print(sizes) #print(originalities) uv = ( seen_count, good_size, good_count, good_probability, good_originality, average_good_characters, content_size, content_count, content_probability, content_originality, average_content_characters, fetched_count, average_fetched_good_characters, gain_ratio, domain) if fetched_count > 0 or seen_count > 0: self.session.execute(self.domain_quality_update,uv) return average_fetched_good_characters def all_domains(self,count): rows = self.session.execute(self.domains_select) domains = [] for row in rows: domain = row[0] seen_count = row[1] fetched_count = row[2] gain_ratio = row[3] afg = row[4] if fetched_count and afg and seen_count: domains.append(tuple(row)) l = len(domains) ss = min(l,count) res = [] if ss > 0: # sort according to ratio res = list(sorted(domains,key=lambda x:x[4],reverse=True))[0:ss] # returns sorted list of tuples domain,gain_ratio return res def get_best_domains(self,count,parser): # get all domains rows = self.session.execute(self.domains_select) domains = [] for row in rows: domain = row[0] seen_count = row[1] fetched_count = row[2] gain_ratio = row[3] afg = row[4] if seen_count and fetched_count and gain_ratio and parser.is_domain_good(domain): domains.append(list(row)) l = len(domains) ss = min(l,count) res = [] if ss > 0: # sort according to ratio res = list(sorted(domains,key=lambda x:x[3],reverse=True))[0:ss] # returns sorted list of tuples domain,gain_ratio return res def get_random_domains(self,count,parser): # get all domains rows = self.session.execute(self.domains_select) domains = [] for row in rows: domain = row[0] if parser.is_domain_good(domain): domains.append(list(row)) l = len(domains) ss = min(l,count) return random.sample(domains,ss) def get_unvisited_domains(self,count,parser): # get all domains rows = self.session.execute(self.domains_select) domains = [] # Analyze third level domains dd = collections.defaultdict(set) third_count = 0 for row in rows: domain = row[0] seen_count = row[1] fetched_count = row[2] gain_ratio = row[3] afg = row[4] if seen_count and not fetched_count: subdomains = domain.split(".") d2 = subdomains[-2] + "." + subdomains[-1] dd[d2].add(domain) # Select second level first result = [] # then select third level ll = list(dd.items()) random.shuffle(ll) domain_weight = count / len(ll) for domain,subdomains in ll: dl = list(subdomains) link_weight = domain_weight / len(dl) random.shuffle(dl) for d in dl: r = random.random() if r < link_weight: result.append((d,0)) return result def export_domain(self,domain): rows = self.session.execute("SELECT JSON * from content WHERE domain_name=%s",(domain,)) for row in rows: print(row[0]) def get_visit_links(self,domain,recent_count,old_count,random_count): dblinks = [] rows = self.session.execute("SELECT url_schema,url_path,url_query,update_time FROM links Where domain_name=%s AND link_status='seen'",(domain,)) for row in rows: link = urlunparse((row[0],domain,row[1],row[2])) dblinks.append((link,row[3])) visitlinks = [] dblinks.sort(key=lambda x:x[1]) random_links = [] for i,(link,time) in enumerate(dblinks): #print(link,time) if i < recent_count: visitlinks.append(link) elif i >= len(dblinks) - old_count: visitlinks.append(link) else: random_links.append(link) sc = min(random_count,len(random_links)) if sc > 0: visitlinks += random.sample(random_links,sc) return visitlinks