604 lines
20 KiB
Python
604 lines
20 KiB
Python
import cassandra
|
|
import cassandra.cluster
|
|
import cassandra.query
|
|
from cassandra.auth import PlainTextAuthProvider
|
|
import random
|
|
import os
|
|
import pkg_resources
|
|
import datetime
|
|
from websucker.parser import normalize_link,urlunparse
|
|
import collections
|
|
import math
|
|
import json
|
|
|
|
VERSION = "sucker6"
|
|
|
|
|
|
def calculate_checksums(text):
|
|
"""
|
|
@return fingerprints of a paragraphs in text. Paragraphs are separated by a blank line
|
|
"""
|
|
checksums = []
|
|
sizes = []
|
|
hval = 0
|
|
hsz = 0
|
|
sz = 0
|
|
for c in text:
|
|
cv = ord(c)
|
|
sz += 1
|
|
if cv > 64:
|
|
hval += (hval << 3) + cv
|
|
zv = hval >> 31
|
|
hval &= 0x7fffffff
|
|
hval += zv
|
|
hsz += 1
|
|
if c == "\n" and hsz > 0:
|
|
if hsz > 100:
|
|
checksums.append(hval)
|
|
sizes.append(sz)
|
|
sz = 0
|
|
hsz = 0
|
|
if hsz > 100:
|
|
checksums.append(hval)
|
|
sizes.append(sz)
|
|
return checksums, sizes
|
|
|
|
def get_schema():
|
|
with pkg_resources.resource_stream(__name__,"schema.sql") as f:
|
|
schema = f.read()
|
|
return str(schema,encoding="utf8")
|
|
|
|
def connect_cluster(cassandra_host,cassandra_port,username,password):
|
|
auth_provider = None
|
|
if username is not None and password is not None:
|
|
auth_provider = PlainTextAuthProvider(username=username, password=password)
|
|
# execution profile
|
|
ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0)
|
|
profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep}
|
|
cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles,auth_provider=auth_provider)
|
|
return cluster
|
|
|
|
class Data:
|
|
"""
|
|
Database of text documents
|
|
"""
|
|
def __init__(self,keyspace="websucker",cassandra_host="127.0.0.1",cassandra_port=9042,username=None,password=None):
|
|
print("Database {}@{}:{}".format(keyspace,cassandra_host, cassandra_port))
|
|
self.cluster = connect_cluster(cassandra_host,cassandra_port,username,password)
|
|
self.session = self.cluster.connect(keyspace)
|
|
|
|
self.check_document_select_query = self.session.prepare("SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=?" )
|
|
|
|
|
|
|
|
self.update_links = self.session.prepare("""
|
|
UPDATE links SET
|
|
link_status = ?,
|
|
redirect_target = ?,
|
|
update_time = toTimestamp(now())
|
|
WHERE
|
|
domain_name=? AND
|
|
url_path=? AND
|
|
url_query=?
|
|
""")
|
|
|
|
self.domain_quality_update = self.session.prepare("""
|
|
UPDATE domain_quality SET
|
|
seen_count=?,
|
|
good_size=?,
|
|
good_count=?,
|
|
good_probability=?,
|
|
good_originality=?,
|
|
average_good_characters=?,
|
|
content_size=?,
|
|
content_count=?,
|
|
content_probability=?,
|
|
content_originality=?,
|
|
average_content_characters=?,
|
|
fetched_count=?,
|
|
average_fetched_good_characters=?,
|
|
gain_ratio=?,
|
|
update_time = toTimestamp(now())
|
|
WHERE
|
|
domain_name=? AND
|
|
day=toDate(now())
|
|
""")
|
|
|
|
self.index_response_insert_html = self.session.prepare("""
|
|
INSERT INTO html(
|
|
day,
|
|
domain_name,
|
|
source_link,
|
|
target_link,
|
|
redirect_links,
|
|
status,
|
|
headers,
|
|
content,
|
|
agent_version,
|
|
update_time
|
|
) VALUES (toDate(now()),?,?,?,?,?,?,?,?,toTimestamp(now()));
|
|
""")
|
|
|
|
self.index_content_link_insert = self.session.prepare("""
|
|
INSERT INTO links (
|
|
url_schema,
|
|
domain_name,
|
|
url_path,
|
|
url_query,
|
|
link_status,
|
|
update_time
|
|
) VALUES (?,?,?,?,'seen',?) IF NOT EXISTS
|
|
""")
|
|
|
|
self.daily_links_insert = self.session.prepare("""
|
|
INSERT INTO daily_links (
|
|
day,
|
|
domain_name,
|
|
url_path,
|
|
url_query,
|
|
link_status,
|
|
body_size,
|
|
link_originality,
|
|
update_time
|
|
) VALUES (toDate(now()),?,?,?,?,?,?,toTimestamp(now()))
|
|
""")
|
|
self.daily_links_select = self.session.prepare("""
|
|
SELECT
|
|
domain_name,
|
|
link_status,
|
|
count(link_status)
|
|
FROM daily_links WHERE day=toDate(now()) GROUP BY domain_name,link_status
|
|
""")
|
|
# PArsed Content
|
|
self.index_content_content_insert = self.session.prepare("""
|
|
INSERT INTO content(
|
|
domain_name,
|
|
target_link,
|
|
links,
|
|
title,
|
|
description,
|
|
section,
|
|
authors,
|
|
tags,
|
|
article_published_time,
|
|
text_date,
|
|
body,
|
|
body_size,
|
|
agent_version,
|
|
update_time
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);
|
|
""")
|
|
|
|
self.paragraph_checksums_insert = self.session.prepare("INSERT INTO paragraph_checksums (checksum,url_hash) VALUES(?,?)")
|
|
self.index_content_links_update = self.session.prepare("UPDATE links SET link_status=?, link_originality=?,body_size=?,url_schema=? WHERE domain_name=? AND url_path = ? AND url_query=? ")
|
|
self.check_domain_count = self.session.prepare("select count(url_path) from links where domain_name=? and link_status = ?")
|
|
|
|
self.check_domain_size = self.session.prepare("select sum(body_size),sum(link_originality) from links where domain_name=? and link_status =?")
|
|
|
|
self.domains_select = self.session.prepare("SELECT domain_name,seen_count,fetched_count,gain_ratio,average_fetched_good_characters FROM domain_quality PER PARTITION LIMIT 1")
|
|
|
|
|
|
def index_responses(self,source_link,responses):
|
|
# Redirect links
|
|
pl = normalize_link(source_link)
|
|
domain = pl[1]
|
|
npl = urlunparse(pl)
|
|
for response in responses:
|
|
tl = response.get_canonical()
|
|
if npl != tl:
|
|
self.update_link_status(npl,"redirect",tl)
|
|
d = (
|
|
domain,
|
|
npl,
|
|
tl,
|
|
response.redirects,
|
|
response.status,
|
|
response.headers,
|
|
response.get_content(),
|
|
VERSION,
|
|
)
|
|
self.session.execute(self.index_response_insert_html,d)
|
|
|
|
def summary(self,parser):
|
|
gs = 0
|
|
cs = 0
|
|
fetched_documents = 0
|
|
vd = 0
|
|
unvisited_domains = 0
|
|
unvisited_junk_domains = 0
|
|
sl = 0
|
|
fd = 0
|
|
junk_domains = 0
|
|
rows = self.session.execute("SELECT domain_name,good_size,content_size, fetched_count,seen_count FROM domain_quality PER PARTITION LIMIT 1")
|
|
# TODO submdomain analysis
|
|
#dd = collections.defaultdict(set)
|
|
second_level = set()
|
|
for row in rows:
|
|
domain = row[0]
|
|
subdomains = domain.split(".")
|
|
#d2 = subdomains[-2] + "." + subdomains[-1]
|
|
if len(subdomains) > 2:
|
|
d3 = ".".join(subdomains[0:-2])
|
|
second_level.add(d3)
|
|
if not parser.is_domain_good(domain):
|
|
junk_domains += 1
|
|
if row[1] is not None:
|
|
gs += row[1]
|
|
if row[2] is not None:
|
|
cs += row[2]
|
|
if row[3] is not None:
|
|
fetched_documents += row[3]
|
|
if row[4] is not None:
|
|
sl += row[4]
|
|
if row[3] is None or row[3] == 0:
|
|
unvisited_domains += 1
|
|
if not parser.is_domain_good(domain):
|
|
unvisited_junk_domains += 1
|
|
else:
|
|
vd += 1
|
|
if row[4] is None or row[4] == 0:
|
|
fd += 1
|
|
print("Good characters: {}".format(gs))
|
|
print("Fetched characters: {}".format(cs))
|
|
print("Fetched documents: {}".format(fetched_documents))
|
|
print("Visited domains: {}".format(vd))
|
|
print("Unvisited domains: {}".format(unvisited_domains))
|
|
print("Junk domains: {}".format(junk_domains))
|
|
print("Unvisited junk domains: {}".format(unvisited_junk_domains))
|
|
print("New links : {}".format(sl))
|
|
print("Second level domains: {}".format(len(second_level)))
|
|
print("Finished domains : {}".format(fd))
|
|
#for d,sd in dd.items():
|
|
# if len(sd) > 1:
|
|
# print(d + " " + ",".join(sd))
|
|
|
|
def daily_report(self):
|
|
#rows = self.session.execute(self.daily_links_select)
|
|
rows = self.session.execute("SELECT domain_name,count(link_status) FROM daily_links WHERE day=toDate(now()) GROUP BY day,domain_name")
|
|
domains = []
|
|
for row in rows:
|
|
domains.append(list(row))
|
|
total_count = 0
|
|
total_size = 0
|
|
out = []
|
|
for domain,count in domains:
|
|
if count < 2:
|
|
continue
|
|
total_count += count
|
|
rows = self.session.execute("SELECT link_status,count(link_status),sum(body_size) FROM daily_links WHERE day=toDate(now()) AND domain_name=%s GROUP BY day,domain_name,link_status",(domain,))
|
|
gc = 0
|
|
bs = 0
|
|
for row in rows:
|
|
if row[0] == "good":
|
|
gc = row[1]
|
|
bs = row[2]
|
|
total_size += bs
|
|
out.append((domain,bs,gc/count,count))
|
|
print("Domain, characters,good ratio,documents")
|
|
for i,value in enumerate(reversed(sorted(out,key=lambda x: x[3]))):
|
|
if i < 20:
|
|
print(value)
|
|
#print("{},{},{},{}".format(value))
|
|
|
|
print("{} domains, {} documents, {} characters ".format(len(domains),total_count,total_size))
|
|
|
|
def update_link_status(self,link,status,redirect_target=None):
|
|
pl = normalize_link(link)
|
|
r = (
|
|
status,
|
|
redirect_target,
|
|
pl[1],
|
|
pl[2],
|
|
pl[3],
|
|
)
|
|
res = self.session.execute(self.update_links ,r)
|
|
|
|
def index_follow_links(self,parser,links,connection):
|
|
# Index seen links
|
|
follow_links = parser.filter_links(links)
|
|
|
|
newlinkdomains = set()
|
|
newlinkcount = 0
|
|
for link in follow_links:
|
|
value = []
|
|
nl = normalize_link(link)
|
|
value += nl
|
|
value.append(datetime.date.today())
|
|
rows = self.session.execute(self.index_content_link_insert,value)
|
|
row = rows.one()
|
|
if row.applied:
|
|
newlinkdomains.add(nl[1])
|
|
newlinkcount += 1
|
|
for domain in newlinkdomains:
|
|
self.check_domain(domain)
|
|
print("{} new links, {} new domains".format(newlinkcount,len(newlinkdomains)))
|
|
|
|
|
|
def index_content(self,target_link,parsed_document):
|
|
nl = normalize_link(target_link)
|
|
domain_name = nl[1]
|
|
assert len(domain_name) > 1
|
|
|
|
pd = parsed_document
|
|
body_length = 0
|
|
if pd.body is not None:
|
|
body_length = len(pd.body)
|
|
value = (
|
|
domain_name,
|
|
target_link,
|
|
pd.link_set,
|
|
pd.title,
|
|
pd.description,
|
|
pd.section,
|
|
pd.authors,
|
|
pd.tags,
|
|
pd.article_published_time,
|
|
pd.text_date,
|
|
pd.body,
|
|
body_length,
|
|
VERSION,
|
|
pd.current_time
|
|
)
|
|
print(value)
|
|
content_future = self.session.execute_async(self.index_content_content_insert,value)
|
|
# result later
|
|
|
|
link_status = "good"
|
|
originality = 0
|
|
tsz = 0
|
|
if pd.body is None:
|
|
link_status = "bad_parse"
|
|
else:
|
|
tsz = len(pd.body)
|
|
if tsz < 300:
|
|
link_status = "bad_small"
|
|
|
|
if link_status == "good":
|
|
|
|
futures = []
|
|
paragraph_checksums,paragraph_sizes = calculate_checksums(pd.body)
|
|
for pc,psz in zip(paragraph_checksums,paragraph_sizes):
|
|
fut = self.session.execute_async(self.paragraph_checksums_insert,(pc,hash(nl[1] + "/" + nl[2] + "?" + nl[3])))
|
|
futures.append(fut)
|
|
for fut in futures:
|
|
fut.result()
|
|
originality = self.check_document(paragraph_checksums,paragraph_sizes)
|
|
if originality < 0.8:
|
|
link_status = "bad_copy"
|
|
self.session.execute(self.index_content_links_update,(link_status,originality,tsz,nl[0],nl[1],nl[2],nl[3]))
|
|
content_future.result()
|
|
print("<<<< " + link_status + " " + str(originality))
|
|
dl = (
|
|
nl[1],
|
|
nl[2],
|
|
nl[3],
|
|
link_status,
|
|
tsz,
|
|
originality
|
|
)
|
|
self.session.execute(self.daily_links_insert,dl)
|
|
|
|
def check_document(self,paragraph_checksums,paragraph_sizes):
|
|
tsz = sum(paragraph_sizes)
|
|
if tsz == 0:
|
|
return 0
|
|
copies = 0
|
|
futures = []
|
|
for pc,psz in zip(paragraph_checksums,paragraph_sizes):
|
|
futures.append(self.session.execute_async(self.check_document_select_query,(pc,)))
|
|
|
|
for fut,psz in zip(futures,paragraph_sizes):
|
|
rows = fut.result()
|
|
res = rows.one()[0]
|
|
if res > 1:
|
|
copies += psz
|
|
return (tsz-copies)/tsz
|
|
|
|
|
|
def check_domain(self, domain):
|
|
assert len(domain) > 0
|
|
seen_count = None
|
|
good_size = None
|
|
good_count = None
|
|
good_probability = None
|
|
good_originality = None
|
|
average_good_characters = None
|
|
content_size = None
|
|
content_count = None
|
|
content_probability = None
|
|
content_originality = None
|
|
average_content_characters = None
|
|
fetched_count = None
|
|
average_fetched_good_characters = None
|
|
gain_ratio = None
|
|
|
|
counts = {
|
|
"good":0,
|
|
"bad_copy":0,
|
|
"bad_small":0,
|
|
"bad_httpcode":0,
|
|
"bad_type":0,
|
|
"bad_content":0,
|
|
"bad_parse":0,
|
|
"seen":0
|
|
}
|
|
for k in counts.keys():
|
|
res = self.session.execute(self.check_domain_count,(domain,k))
|
|
co = res.one()[0]
|
|
counts[k]= co
|
|
|
|
seen_count = counts["seen"]
|
|
good_count = counts["good"]
|
|
content_count = counts["good"] + counts["bad_copy"] + counts["bad_small"]
|
|
|
|
fetched_count = sum(counts.values()) - counts["seen"]
|
|
|
|
if fetched_count > 0:
|
|
content_probability = content_count / fetched_count
|
|
good_probability = good_count / fetched_count
|
|
sizes = {
|
|
"good":0,
|
|
"bad_copy":0,
|
|
"bad_small":0
|
|
}
|
|
|
|
originalities ={}
|
|
|
|
for k in sizes.keys():
|
|
res = self.session.execute(self.check_domain_size,(domain,k))
|
|
row = res.one()
|
|
co =row[0]
|
|
originalities[k] = row[1]
|
|
sizes[k]= co
|
|
good_size = sizes["good"]
|
|
content_size = sum(sizes.values())
|
|
if good_count > 0:
|
|
good_originality = originalities["good"] / good_count
|
|
if content_count > 0:
|
|
content_originality = sum(originalities.values()) / content_count
|
|
|
|
if good_count > 0:
|
|
average_good_characters = good_size / good_count * good_originality
|
|
average_fetched_good_characters = good_size * good_originality / fetched_count
|
|
|
|
gain_ratio = average_fetched_good_characters / fetched_count
|
|
|
|
if content_count > 0:
|
|
average_content_characters = content_size / content_count
|
|
|
|
#print(sizes)
|
|
#print(originalities)
|
|
uv = (
|
|
seen_count,
|
|
good_size,
|
|
good_count,
|
|
good_probability,
|
|
good_originality,
|
|
average_good_characters,
|
|
content_size,
|
|
content_count,
|
|
content_probability,
|
|
content_originality,
|
|
average_content_characters,
|
|
fetched_count,
|
|
average_fetched_good_characters,
|
|
gain_ratio,
|
|
domain)
|
|
if fetched_count > 0 or seen_count > 0:
|
|
self.session.execute(self.domain_quality_update,uv)
|
|
return average_fetched_good_characters
|
|
|
|
def all_domains(self,count):
|
|
rows = self.session.execute(self.domains_select)
|
|
domains = []
|
|
for row in rows:
|
|
domain = row[0]
|
|
seen_count = row[1]
|
|
fetched_count = row[2]
|
|
gain_ratio = row[3]
|
|
afg = row[4]
|
|
if fetched_count and afg and seen_count:
|
|
domains.append(tuple(row))
|
|
l = len(domains)
|
|
ss = min(l,count)
|
|
res = []
|
|
if ss > 0:
|
|
# sort according to ratio
|
|
res = list(sorted(domains,key=lambda x:x[4],reverse=True))[0:ss]
|
|
# returns sorted list of tuples domain,gain_ratio
|
|
return res
|
|
|
|
def get_best_domains(self,count,parser):
|
|
# get all domains
|
|
rows = self.session.execute(self.domains_select)
|
|
domains = []
|
|
for row in rows:
|
|
domain = row[0]
|
|
seen_count = row[1]
|
|
fetched_count = row[2]
|
|
gain_ratio = row[3]
|
|
afg = row[4]
|
|
if seen_count and fetched_count and gain_ratio and parser.is_domain_good(domain):
|
|
domains.append(list(row))
|
|
l = len(domains)
|
|
ss = min(l,count)
|
|
res = []
|
|
if ss > 0:
|
|
# sort according to ratio
|
|
res = list(sorted(domains,key=lambda x:x[3],reverse=True))[0:ss]
|
|
# returns sorted list of tuples domain,gain_ratio
|
|
return res
|
|
|
|
def get_random_domains(self,count,parser):
|
|
# get all domains
|
|
rows = self.session.execute(self.domains_select)
|
|
domains = []
|
|
for row in rows:
|
|
domain = row[0]
|
|
if parser.is_domain_good(domain):
|
|
domains.append(list(row))
|
|
l = len(domains)
|
|
ss = min(l,count)
|
|
return random.sample(domains,ss)
|
|
|
|
def get_unvisited_domains(self,count,parser):
|
|
# get all domains
|
|
rows = self.session.execute(self.domains_select)
|
|
domains = []
|
|
# Analyze third level domains
|
|
dd = collections.defaultdict(set)
|
|
third_count = 0
|
|
for row in rows:
|
|
domain = row[0]
|
|
seen_count = row[1]
|
|
fetched_count = row[2]
|
|
gain_ratio = row[3]
|
|
afg = row[4]
|
|
if seen_count and not fetched_count:
|
|
subdomains = domain.split(".")
|
|
d2 = subdomains[-2] + "." + subdomains[-1]
|
|
dd[d2].add(domain)
|
|
# Select second level first
|
|
result = []
|
|
# then select third level
|
|
ll = list(dd.items())
|
|
random.shuffle(ll)
|
|
domain_weight = count / len(ll)
|
|
for domain,subdomains in ll:
|
|
dl = list(subdomains)
|
|
link_weight = domain_weight / len(dl)
|
|
random.shuffle(dl)
|
|
for d in dl:
|
|
r = random.random()
|
|
if r < link_weight:
|
|
result.append((d,0))
|
|
return result
|
|
|
|
def export_domain(self,domain):
|
|
rows = self.session.execute("SELECT JSON * from content WHERE domain_name=%s",(domain,))
|
|
for row in rows:
|
|
print(row[0])
|
|
|
|
def get_visit_links(self,domain,recent_count,old_count,random_count):
|
|
dblinks = []
|
|
rows = self.session.execute("SELECT url_schema,url_path,url_query,update_time FROM links Where domain_name=%s AND link_status='seen'",(domain,))
|
|
for row in rows:
|
|
link = urlunparse((row[0],domain,row[1],row[2]))
|
|
dblinks.append((link,row[3]))
|
|
|
|
visitlinks = []
|
|
dblinks.sort(key=lambda x:x[1])
|
|
random_links = []
|
|
for i,(link,time) in enumerate(dblinks):
|
|
#print(link,time)
|
|
if i < recent_count:
|
|
visitlinks.append(link)
|
|
elif i >= len(dblinks) - old_count:
|
|
visitlinks.append(link)
|
|
else:
|
|
random_links.append(link)
|
|
sc = min(random_count,len(random_links))
|
|
if sc > 0:
|
|
visitlinks += random.sample(random_links,sc)
|
|
return visitlinks
|