websucker-pip/websucker/db.py
2023-02-26 14:12:22 +01:00

575 lines
19 KiB
Python

import cassandra
import cassandra.cluster
import cassandra.query
from cassandra.auth import PlainTextAuthProvider
import random
import os
import pkg_resources
import datetime
from websucker.parser import normalize_link,urlunparse
import collections
import math
import json
VERSION = "sucker6"
def get_schema():
with pkg_resources.resource_stream(__name__,"schema.sql") as f:
schema = f.read()
return str(schema,encoding="utf8")
class Data:
"""
Database of text documents
"""
def __init__(self,keyspace="websucker",cassandra_host="127.0.0.1",cassandra_port=9042,username=None,password=None):
print("Database {}@{}:{}".format(keyspace,cassandra_host, cassandra_port))
auth_provider = None
if username is not None and password is not None:
auth_provider = PlainTextAuthProvider(username=username, password=password)
# execution profile
ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0)
profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep}
self.cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles,auth_provider=auth_provider)
self.session = self.cluster.connect(keyspace)
self.check_document_select_query = self.session.prepare("SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=?" )
self.update_links = self.session.prepare("""
UPDATE links SET
link_status = ?,
redirect_target = ?,
update_time = toTimestamp(now())
WHERE
domain_name=? AND
url_path=? AND
url_query=?
""")
self.domain_quality_update = self.session.prepare("""
UPDATE domain_quality SET
seen_count=?,
good_size=?,
good_count=?,
good_probability=?,
good_originality=?,
average_good_characters=?,
content_size=?,
content_count=?,
content_probability=?,
content_originality=?,
average_content_characters=?,
fetched_count=?,
average_fetched_good_characters=?,
gain_ratio=?,
update_time = toTimestamp(now())
WHERE
domain_name=? AND
day=toDate(now())
""")
self.index_response_insert_html = self.session.prepare("""
INSERT INTO html(
day,
domain_name,
source_link,
target_link,
redirect_links,
status,
headers,
content,
agent_version,
update_time
) VALUES (toDate(now()),?,?,?,?,?,?,?,?,toTimestamp(now()));
""")
self.index_content_link_insert = self.session.prepare("""
INSERT INTO links (
url_schema,
domain_name,
url_path,
url_query,
link_status,
update_time
) VALUES (?,?,?,?,'seen',?) IF NOT EXISTS
""")
self.daily_links_insert = self.session.prepare("""
INSERT INTO daily_links (
day,
domain_name,
url_path,
url_query,
link_status,
body_size,
link_originality,
update_time
) VALUES (toDate(now()),?,?,?,?,?,?,toTimestamp(now()))
""")
self.daily_links_select = self.session.prepare("""
SELECT
domain_name,
link_status,
count(link_status)
FROM daily_links WHERE day=toDate(now()) GROUP BY domain_name,link_status
""")
# PArsed Content
self.index_content_content_insert = self.session.prepare("""
INSERT INTO content(
domain_name,
target_link,
links,
title,
description,
section,
authors,
tags,
article_published_time,
text_date,
body,
body_size,
agent_version,
update_time
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);
""")
self.paragraph_checksums_insert = self.session.prepare("INSERT INTO paragraph_checksums (checksum,url_hash) VALUES(?,?)")
self.index_content_links_update = self.session.prepare("UPDATE links SET link_status=?, link_originality=?,body_size=?,url_schema=? WHERE domain_name=? AND url_path = ? AND url_query=? ")
self.check_domain_count = self.session.prepare("select count(url_path) from links where domain_name=? and link_status = ?")
self.check_domain_size = self.session.prepare("select sum(body_size),sum(link_originality) from links where domain_name=? and link_status =?")
self.domains_select = self.session.prepare("SELECT domain_name,seen_count,fetched_count,gain_ratio,average_fetched_good_characters FROM domain_quality PER PARTITION LIMIT 1")
def index_responses(self,source_link,responses):
# Redirect links
pl = normalize_link(source_link)
domain = pl[1]
npl = urlunparse(pl)
for response in responses:
tl = response.get_canonical()
if npl != tl:
self.update_link_status(npl,"redirect",tl)
d = (
domain,
npl,
tl,
response.redirects,
response.status,
response.headers,
response.get_content(),
VERSION,
)
self.session.execute(self.index_response_insert_html,d)
def summary(self,parser):
gs = 0
cs = 0
fetched_documents = 0
vd = 0
unvisited_domains = 0
unvisited_junk_domains = 0
sl = 0
fd = 0
junk_domains = 0
rows = self.session.execute("SELECT domain_name,good_size,content_size, fetched_count,seen_count FROM domain_quality PER PARTITION LIMIT 1")
# TODO submdomain analysis
#dd = collections.defaultdict(set)
second_level = set()
for row in rows:
domain = row[0]
subdomains = domain.split(".")
#d2 = subdomains[-2] + "." + subdomains[-1]
if len(subdomains) > 2:
d3 = ".".join(subdomains[0:-2])
second_level.add(d3)
if not parser.is_domain_good(domain):
junk_domains += 1
if row[1] is not None:
gs += row[1]
if row[2] is not None:
cs += row[2]
if row[3] is not None:
fetched_documents += row[3]
if row[4] is not None:
sl += row[4]
if row[3] is None or row[3] == 0:
unvisited_domains += 1
if not parser.is_domain_good(domain):
unvisited_junk_domains += 1
else:
vd += 1
if row[4] is None or row[4] == 0:
fd += 1
print("Good characters: {}".format(gs))
print("Fetched characters: {}".format(cs))
print("Fetched documents: {}".format(fetched_documents))
print("Visited domains: {}".format(vd))
print("Unvisited domains: {}".format(unvisited_domains))
print("Junk domains: {}".format(junk_domains))
print("Unvisited junk domains: {}".format(unvisited_junk_domains))
print("New links : {}".format(sl))
print("Second level domains: {}".format(len(second_level)))
print("Finished domains : {}".format(fd))
#for d,sd in dd.items():
# if len(sd) > 1:
# print(d + " " + ",".join(sd))
def daily_report(self):
#rows = self.session.execute(self.daily_links_select)
rows = self.session.execute("SELECT domain_name,count(link_status) FROM daily_links WHERE day=toDate(now()) GROUP BY day,domain_name")
domains = []
for row in rows:
domains.append(list(row))
total_count = 0
total_size = 0
out = []
for domain,count in domains:
if count < 2:
continue
total_count += count
rows = self.session.execute("SELECT link_status,count(link_status),sum(body_size) FROM daily_links WHERE day=toDate(now()) AND domain_name=%s GROUP BY day,domain_name,link_status",(domain,))
gc = 0
bs = 0
for row in rows:
if row[0] == "good":
gc = row[1]
bs = row[2]
total_size += bs
out.append((domain,bs,gc/count,count))
print("Domain, characters,good ratio,documents")
for i,value in enumerate(reversed(sorted(out,key=lambda x: x[3]))):
if i < 20:
print(value)
#print("{},{},{},{}".format(value))
print("{} domains, {} documents, {} characters ".format(len(domains),total_count,total_size))
def update_link_status(self,link,status,redirect_target=None):
pl = normalize_link(link)
r = (
status,
redirect_target,
pl[1],
pl[2],
pl[3],
)
res = self.session.execute(self.update_links ,r)
def index_follow_links(self,parser,links,connection):
# Index seen links
follow_links = set()
for l in links:
if parser.is_link_good(l):
#if connection is not None and parser.listen_robot and not connection.is_robot_good(l):
# continue
link = normalize_link(l,strip_query=parser.strip_query)
follow_links.add(urlunparse(link))
newlinkdomains = set()
newlinkcount = 0
for link in follow_links:
value = []
nl = normalize_link(link)
value += nl
value.append(datetime.date.today())
rows = self.session.execute(self.index_content_link_insert,value)
row = rows.one()
if row.applied:
newlinkdomains.add(nl[1])
newlinkcount += 1
for domain in newlinkdomains:
self.check_domain(domain)
print("{} new links, {} new domains".format(newlinkcount,len(newlinkdomains)))
def index_content(self,target_link,parsed_document):
nl = normalize_link(target_link)
domain_name = nl[1]
assert len(domain_name) > 1
pd = parsed_document
body_length = 0
if pd.body is not None:
body_length = len(pd.body)
value = (
domain_name,
target_link,
pd.get_links(),
pd.title,
pd.description,
pd.section,
pd.authors,
pd.tags,
pd.article_published_time,
pd.text_date,
pd.body,
body_length,
VERSION,
pd.current_time
)
content_future = self.session.execute_async(self.index_content_content_insert,value)
# result later
link_status = "good"
originality = 0
tsz = 0
if pd.body is None:
link_status = "bad_parse"
else:
tsz = len(pd.body)
if tsz < 300:
link_status = "bad_small"
if link_status == "good":
futures = []
for pc,psz in zip(pd.paragraph_checksums,pd.paragraph_sizes):
fut = self.session.execute_async(self.paragraph_checksums_insert,(pc,hash(nl[1] + "/" + nl[2] + "?" + nl[3])))
futures.append(fut)
for fut in futures:
fut.result()
originality = self.check_document(pd.paragraph_checksums,pd.paragraph_sizes)
if originality < 0.8:
link_status = "bad_copy"
self.session.execute(self.index_content_links_update,(link_status,originality,tsz,nl[0],nl[1],nl[2],nl[3]))
content_future.result()
print("<<<< " + link_status + " " + str(originality))
dl = (
nl[1],
nl[2],
nl[3],
link_status,
tsz,
originality
)
self.session.execute(self.daily_links_insert,dl)
def check_document(self,paragraph_checksums,paragraph_sizes):
tsz = sum(paragraph_sizes)
if tsz == 0:
return 0
copies = 0
futures = []
for pc,psz in zip(paragraph_checksums,paragraph_sizes):
futures.append(self.session.execute_async(self.check_document_select_query,(pc,)))
for fut,psz in zip(futures,paragraph_sizes):
rows = fut.result()
res = rows.one()[0]
if res > 1:
copies += psz
return (tsz-copies)/tsz
def check_domain(self, domain):
assert len(domain) > 0
seen_count = None
good_size = None
good_count = None
good_probability = None
good_originality = None
average_good_characters = None
content_size = None
content_count = None
content_probability = None
content_originality = None
average_content_characters = None
fetched_count = None
average_fetched_good_characters = None
gain_ratio = None
counts = {
"good":0,
"bad_copy":0,
"bad_small":0,
"bad_httpcode":0,
"bad_type":0,
"bad_content":0,
"bad_parse":0,
"seen":0
}
for k in counts.keys():
res = self.session.execute(self.check_domain_count,(domain,k))
co = res.one()[0]
counts[k]= co
seen_count = counts["seen"]
good_count = counts["good"]
content_count = counts["good"] + counts["bad_copy"] + counts["bad_small"]
fetched_count = sum(counts.values()) - counts["seen"]
if fetched_count > 0:
content_probability = content_count / fetched_count
good_probability = good_count / fetched_count
sizes = {
"good":0,
"bad_copy":0,
"bad_small":0
}
originalities ={}
for k in sizes.keys():
res = self.session.execute(self.check_domain_size,(domain,k))
row = res.one()
co =row[0]
originalities[k] = row[1]
sizes[k]= co
good_size = sizes["good"]
content_size = sum(sizes.values())
if good_count > 0:
good_originality = originalities["good"] / good_count
if content_count > 0:
content_originality = sum(originalities.values()) / content_count
if good_count > 0:
average_good_characters = good_size / good_count * good_originality
average_fetched_good_characters = good_size * good_originality / fetched_count
gain_ratio = average_fetched_good_characters / fetched_count
if content_count > 0:
average_content_characters = content_size / content_count
#print(sizes)
#print(originalities)
uv = (
seen_count,
good_size,
good_count,
good_probability,
good_originality,
average_good_characters,
content_size,
content_count,
content_probability,
content_originality,
average_content_characters,
fetched_count,
average_fetched_good_characters,
gain_ratio,
domain)
if fetched_count > 0 or seen_count > 0:
self.session.execute(self.domain_quality_update,uv)
return average_fetched_good_characters
def all_domains(self,count):
rows = self.session.execute(self.domains_select)
domains = []
for row in rows:
domain = row[0]
seen_count = row[1]
fetched_count = row[2]
gain_ratio = row[3]
afg = row[4]
if fetched_count and afg and seen_count:
domains.append(tuple(row))
l = len(domains)
ss = min(l,count)
res = []
if ss > 0:
# sort according to ratio
res = list(sorted(domains,key=lambda x:x[4],reverse=True))[0:ss]
# returns sorted list of tuples domain,gain_ratio
return res
def get_best_domains(self,count,parser):
# get all domains
rows = self.session.execute(self.domains_select)
domains = []
for row in rows:
domain = row[0]
seen_count = row[1]
fetched_count = row[2]
gain_ratio = row[3]
afg = row[4]
if seen_count and fetched_count and gain_ratio and parser.is_domain_good(domain):
domains.append(list(row))
l = len(domains)
ss = min(l,count)
res = []
if ss > 0:
# sort according to ratio
res = list(sorted(domains,key=lambda x:x[3],reverse=True))[0:ss]
# returns sorted list of tuples domain,gain_ratio
return res
def get_random_domains(self,count,parser):
# get all domains
rows = self.session.execute(self.domains_select)
domains = []
for row in rows:
domain = row[0]
if parser.is_domain_good(domain):
domains.append(list(row))
l = len(domains)
ss = min(l,count)
return random.sample(domains,ss)
def get_unvisited_domains(self,count,parser):
# get all domains
rows = self.session.execute(self.domains_select)
domains = []
# Analyze third level domains
dd = collections.defaultdict(set)
third_count = 0
for row in rows:
domain = row[0]
seen_count = row[1]
fetched_count = row[2]
gain_ratio = row[3]
afg = row[4]
if seen_count and not fetched_count:
subdomains = domain.split(".")
d2 = subdomains[-2] + "." + subdomains[-1]
dd[d2].add(domain)
# Select second level first
result = []
# then select third level
ll = list(dd.items())
random.shuffle(ll)
domain_weight = count / len(ll)
for domain,subdomains in ll:
dl = list(subdomains)
link_weight = domain_weight / len(dl)
random.shuffle(dl)
for d in dl:
r = random.random()
if r < link_weight:
result.append((d,0))
return result
def export_domain(self,domain):
rows = self.session.execute("SELECT JSON * from content WHERE domain_name=%s",(domain,))
for row in rows:
print(row[0])
def get_visit_links(self,domain,recent_count,old_count,random_count):
dblinks = []
rows = self.session.execute("SELECT url_schema,url_path,url_query,update_time FROM links Where domain_name=%s AND link_status='seen'",(domain,))
for row in rows:
link = urlunparse((row[0],domain,row[1],row[2]))
dblinks.append((link,row[3]))
visitlinks = []
dblinks.sort(key=lambda x:x[1])
random_links = []
for i,(link,time) in enumerate(dblinks):
#print(link,time)
if i < recent_count:
visitlinks.append(link)
elif i >= len(dblinks) - old_count:
visitlinks.append(link)
else:
random_links.append(link)
sc = min(random_count,len(random_links))
if sc > 0:
visitlinks += random.sample(random_links,sc)
return visitlinks