445 lines
14 KiB
Python
445 lines
14 KiB
Python
|
import cassandra
|
||
|
import cassandra.cluster
|
||
|
import random
|
||
|
import os
|
||
|
import pkg_resources
|
||
|
import datetime
|
||
|
from websucker.parser import normalize_link,urlunparse
|
||
|
|
||
|
VERSION = "sucker6"
|
||
|
|
||
|
|
||
|
def get_schema():
|
||
|
with pkg_resources.resource_stream(__name__,"schema.sql") as f:
|
||
|
schema = f.read()
|
||
|
return str(schema,encoding="utf8")
|
||
|
|
||
|
class Data:
|
||
|
"""
|
||
|
Database of text documents
|
||
|
"""
|
||
|
def __init__(self,keyspace="websucker",cassandra_host="127.0.0.1",cassandra_port=9042):
|
||
|
# execution profile
|
||
|
ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0)
|
||
|
profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep}
|
||
|
self.cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles)
|
||
|
self.session = self.cluster.connect(keyspace)
|
||
|
|
||
|
self.check_document_select_query = self.session.prepare("SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=?" )
|
||
|
|
||
|
|
||
|
|
||
|
self.index_response_link_update = self.session.prepare("""
|
||
|
UPDATE links SET
|
||
|
link_status ='redirect',
|
||
|
redirect_target = ?,
|
||
|
update_time = toTimestamp(now())
|
||
|
WHERE
|
||
|
domain_name=? AND
|
||
|
url_path=? AND
|
||
|
url_query=?
|
||
|
""")
|
||
|
|
||
|
self.domain_quality_update = self.session.prepare("""
|
||
|
UPDATE domain_quality SET
|
||
|
seen_count=?,
|
||
|
good_size=?,
|
||
|
good_count=?,
|
||
|
good_probability=?,
|
||
|
good_originality=?,
|
||
|
average_good_characters=?,
|
||
|
content_size=?,
|
||
|
content_count=?,
|
||
|
content_probability=?,
|
||
|
content_originality=?,
|
||
|
average_content_characters=?,
|
||
|
fetched_count=?,
|
||
|
average_fetched_good_characters=?,
|
||
|
gain_ratio=?,
|
||
|
update_time = toTimestamp(now())
|
||
|
WHERE
|
||
|
domain_name=? AND
|
||
|
day=toDate(now())
|
||
|
""")
|
||
|
|
||
|
self.index_response_insert_html = self.session.prepare("""
|
||
|
INSERT INTO html(
|
||
|
day,
|
||
|
domain_name,
|
||
|
source_link,
|
||
|
target_link,
|
||
|
redirect_links,
|
||
|
status,
|
||
|
headers,
|
||
|
content,
|
||
|
agent_version,
|
||
|
update_time
|
||
|
) VALUES (toDate(now()),?,?,?,?,?,?,?,?,toTimestamp(now()));
|
||
|
""")
|
||
|
|
||
|
self.index_content_link_insert = self.session.prepare("""
|
||
|
INSERT INTO links (
|
||
|
url_schema,
|
||
|
domain_name,
|
||
|
url_path,
|
||
|
url_query,
|
||
|
link_status,
|
||
|
update_time
|
||
|
) VALUES (?,?,?,?,'seen',?) IF NOT EXISTS
|
||
|
""")
|
||
|
|
||
|
self.daily_links_insert = self.session.prepare("""
|
||
|
INSERT INTO daily_links (
|
||
|
day,
|
||
|
domain_name,
|
||
|
url_path,
|
||
|
url_query,
|
||
|
link_status,
|
||
|
body_size,
|
||
|
link_originality,
|
||
|
update_time
|
||
|
) VALUES (toDate(now()),?,?,?,?,?,?,toTimestamp(now()))
|
||
|
""")
|
||
|
self.daily_links_select = self.session.prepare("""
|
||
|
SELECT
|
||
|
domain_name,
|
||
|
link_status,
|
||
|
count(link_status)
|
||
|
FROM daily_links WHERE day=toDate(now()) GROUP BY domain_name,link_status
|
||
|
""")
|
||
|
# PArsed Content
|
||
|
self.index_content_content_insert = self.session.prepare("""
|
||
|
INSERT INTO content(
|
||
|
domain_name,
|
||
|
target_link,
|
||
|
links,
|
||
|
title,
|
||
|
description,
|
||
|
section,
|
||
|
authors,
|
||
|
tags,
|
||
|
article_published_time,
|
||
|
text_date,
|
||
|
body,
|
||
|
body_size,
|
||
|
agent_version,
|
||
|
update_time
|
||
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);
|
||
|
""")
|
||
|
|
||
|
self.paragraph_checksums_insert = self.session.prepare("INSERT INTO paragraph_checksums (checksum,url_hash) VALUES(?,?)")
|
||
|
self.index_content_links_update = self.session.prepare("UPDATE links SET link_status=?, link_originality=?,body_size=?,url_schema=? WHERE domain_name=? AND url_path = ? AND url_query=? ")
|
||
|
self.check_domain_count = self.session.prepare("select count(url_path) from links where domain_name=? and link_status = ?")
|
||
|
|
||
|
self.check_domain_size = self.session.prepare("select sum(body_size),sum(link_originality) from links where domain_name=? and link_status =?")
|
||
|
|
||
|
self.domains_select = self.session.prepare("SELECT domain_name,seen_count,fetched_count,gain_ratio,average_fetched_good_characters FROM domain_quality PER PARTITION LIMIT 1")
|
||
|
|
||
|
|
||
|
def index_responses(self,source_link,responses):
|
||
|
# Redirect links
|
||
|
pl = normalize_link(source_link)
|
||
|
for response in responses:
|
||
|
tl = response.get_canonical()
|
||
|
r = (
|
||
|
tl,
|
||
|
pl[1],
|
||
|
pl[2],
|
||
|
pl[3],
|
||
|
)
|
||
|
if pl != tl:
|
||
|
res = self.session.execute(self.index_response_link_update,r)
|
||
|
d = (
|
||
|
pl[1],
|
||
|
source_link,
|
||
|
response.get_canonical(),
|
||
|
response.redirects,
|
||
|
response.status,
|
||
|
response.headers,
|
||
|
response.get_content(),
|
||
|
VERSION,
|
||
|
)
|
||
|
self.session.execute(self.index_response_insert_html,d)
|
||
|
|
||
|
def daily_report(self):
|
||
|
rows = self.session.execute(self.daily_links_select)
|
||
|
for row in rows:
|
||
|
print(row[0],row[1],row[2])
|
||
|
|
||
|
def index_follow_links(self,parser,links,connection):
|
||
|
# Index seen links
|
||
|
|
||
|
follow_links = set()
|
||
|
for l in links:
|
||
|
if parser.is_link_good(l):
|
||
|
#if connection is not None and parser.listen_robot and not connection.is_robot_good(l):
|
||
|
# continue
|
||
|
link = normalize_link(l,strip_query=parser.strip_query)
|
||
|
follow_links.add(urlunparse(link))
|
||
|
|
||
|
newlinkdomains = set()
|
||
|
for link in follow_links:
|
||
|
value = []
|
||
|
nl = normalize_link(link)
|
||
|
value += nl
|
||
|
value.append(datetime.date.today())
|
||
|
rows = self.session.execute(self.index_content_link_insert,value)
|
||
|
row = rows.one()
|
||
|
if row.applied:
|
||
|
newlinkdomains.add(nl[1])
|
||
|
for domain in newlinkdomains:
|
||
|
self.check_domain(domain)
|
||
|
|
||
|
|
||
|
def index_content(self,target_link,parsed_document):
|
||
|
nl = normalize_link(target_link)
|
||
|
domain_name = nl[1]
|
||
|
assert len(domain_name) > 1
|
||
|
|
||
|
pd = parsed_document
|
||
|
body_length = 0
|
||
|
if pd.body is not None:
|
||
|
body_length = len(pd.body)
|
||
|
value = (
|
||
|
domain_name,
|
||
|
target_link,
|
||
|
pd.get_links(),
|
||
|
pd.title,
|
||
|
pd.description,
|
||
|
pd.section,
|
||
|
pd.authors,
|
||
|
pd.tags,
|
||
|
pd.article_published_time,
|
||
|
pd.text_date,
|
||
|
pd.body,
|
||
|
body_length,
|
||
|
VERSION,
|
||
|
pd.current_time
|
||
|
)
|
||
|
content_future = self.session.execute_async(self.index_content_content_insert,value)
|
||
|
# result later
|
||
|
|
||
|
link_status = "good"
|
||
|
originality = 0
|
||
|
tsz = 0
|
||
|
if pd.body is None:
|
||
|
link_status = "bad_parse"
|
||
|
else:
|
||
|
tsz = len(pd.body)
|
||
|
if tsz < 300:
|
||
|
link_status = "bad_small"
|
||
|
|
||
|
if link_status == "good":
|
||
|
|
||
|
futures = []
|
||
|
for pc,psz in zip(pd.paragraph_checksums,pd.paragraph_sizes):
|
||
|
fut = self.session.execute_async(self.paragraph_checksums_insert,(pc,hash(nl[1] + "/" + nl[2] + "?" + nl[3])))
|
||
|
futures.append(fut)
|
||
|
for fut in futures:
|
||
|
fut.result()
|
||
|
originality = self.check_document(pd.paragraph_checksums,pd.paragraph_sizes)
|
||
|
if originality < 0.8:
|
||
|
link_status = "bad_copy"
|
||
|
print(nl)
|
||
|
self.session.execute(self.index_content_links_update,(link_status,originality,tsz,nl[0],nl[1],nl[2],nl[3]))
|
||
|
content_future.result()
|
||
|
print("<<<< " + link_status + " " + str(originality))
|
||
|
dl = (
|
||
|
nl[1],
|
||
|
nl[2],
|
||
|
nl[3],
|
||
|
link_status,
|
||
|
tsz,
|
||
|
originality
|
||
|
)
|
||
|
self.session.execute(self.daily_links_insert,dl)
|
||
|
|
||
|
def check_document(self,paragraph_checksums,paragraph_sizes):
|
||
|
tsz = sum(paragraph_sizes)
|
||
|
if tsz == 0:
|
||
|
return 0
|
||
|
copies = 0
|
||
|
futures = []
|
||
|
for pc,psz in zip(paragraph_checksums,paragraph_sizes):
|
||
|
futures.append(self.session.execute_async(self.check_document_select_query,(pc,)))
|
||
|
|
||
|
for fut,psz in zip(futures,paragraph_sizes):
|
||
|
rows = fut.result()
|
||
|
res = rows.one()[0]
|
||
|
if res > 1:
|
||
|
copies += psz
|
||
|
return (tsz-copies)/tsz
|
||
|
|
||
|
|
||
|
def check_domain(self, domain):
|
||
|
assert len(domain) > 0
|
||
|
seen_count = None
|
||
|
good_size = None
|
||
|
good_count = None
|
||
|
good_probability = None
|
||
|
good_originality = None
|
||
|
average_good_characters = None
|
||
|
content_size = None
|
||
|
content_count = None
|
||
|
content_probability = None
|
||
|
content_originality = None
|
||
|
average_content_characters = None
|
||
|
fetched_count = None
|
||
|
average_fetched_good_characters = None
|
||
|
gain_ratio = None
|
||
|
|
||
|
counts = {
|
||
|
"good":0,
|
||
|
"bad_copy":0,
|
||
|
"bad_small":0,
|
||
|
"bad_httpcode":0,
|
||
|
"bad_type":0,
|
||
|
"bad_content":0,
|
||
|
"bad_parse":0,
|
||
|
"seen":0
|
||
|
}
|
||
|
for k in counts.keys():
|
||
|
res = self.session.execute(self.check_domain_count,(domain,k))
|
||
|
co = res.one()[0]
|
||
|
counts[k]= co
|
||
|
|
||
|
seen_count = counts["seen"]
|
||
|
good_count = counts["good"]
|
||
|
content_count = counts["good"] + counts["bad_copy"] + counts["bad_small"]
|
||
|
|
||
|
fetched_count = sum(counts.values()) - counts["seen"]
|
||
|
|
||
|
if fetched_count > 0:
|
||
|
content_probability = content_count / fetched_count
|
||
|
good_probability = good_count / fetched_count
|
||
|
sizes = {
|
||
|
"good":0,
|
||
|
"bad_copy":0,
|
||
|
"bad_small":0
|
||
|
}
|
||
|
|
||
|
originalities ={}
|
||
|
|
||
|
for k in sizes.keys():
|
||
|
res = self.session.execute(self.check_domain_size,(domain,k))
|
||
|
row = res.one()
|
||
|
co =row[0]
|
||
|
originalities[k] = row[1]
|
||
|
sizes[k]= co
|
||
|
good_size = sizes["good"]
|
||
|
content_size = sum(sizes.values())
|
||
|
if good_count > 0:
|
||
|
good_originality = originalities["good"] / good_count
|
||
|
if content_count > 0:
|
||
|
content_originality = sum(originalities.values()) / content_count
|
||
|
|
||
|
if good_count > 0:
|
||
|
average_good_characters = good_size / good_count * good_originality
|
||
|
average_fetched_good_characters = good_size * good_originality / fetched_count
|
||
|
|
||
|
gain_ratio = average_fetched_good_characters / fetched_count
|
||
|
|
||
|
if content_count > 0:
|
||
|
average_content_characters = content_size / content_count
|
||
|
|
||
|
#print(sizes)
|
||
|
#print(originalities)
|
||
|
uv = (
|
||
|
seen_count,
|
||
|
good_size,
|
||
|
good_count,
|
||
|
good_probability,
|
||
|
good_originality,
|
||
|
average_good_characters,
|
||
|
content_size,
|
||
|
content_count,
|
||
|
content_probability,
|
||
|
content_originality,
|
||
|
average_content_characters,
|
||
|
fetched_count,
|
||
|
average_fetched_good_characters,
|
||
|
gain_ratio,
|
||
|
domain)
|
||
|
if fetched_count > 0 or seen_count > 0:
|
||
|
self.session.execute(self.domain_quality_update,uv)
|
||
|
return average_fetched_good_characters
|
||
|
|
||
|
def all_domains(self,count):
|
||
|
rows = self.session.execute(self.domains_select)
|
||
|
domains = []
|
||
|
for row in rows:
|
||
|
domain = row[0]
|
||
|
seen_count = row[1]
|
||
|
fetched_count = row[2]
|
||
|
gain_ratio = row[3]
|
||
|
afg = row[4]
|
||
|
if fetched_count and afg and seen_count:
|
||
|
domains.append(tuple(row))
|
||
|
l = len(domains)
|
||
|
ss = min(l,count)
|
||
|
res = []
|
||
|
if ss > 0:
|
||
|
# sort according to ratio
|
||
|
res = list(sorted(domains,key=lambda x:x[4],reverse=True))[0:ss]
|
||
|
# returns sorted list of tuples domain,gain_ratio
|
||
|
return res
|
||
|
|
||
|
def get_best_domains(self,count):
|
||
|
# get all domains
|
||
|
rows = self.session.execute(self.domains_select)
|
||
|
domains = []
|
||
|
for row in rows:
|
||
|
domain = row[0]
|
||
|
seen_count = row[1]
|
||
|
fetched_count = row[2]
|
||
|
gain_ratio = row[3]
|
||
|
afg = row[4]
|
||
|
if seen_count and fetched_count and gain_ratio:
|
||
|
domains.append((domain,gain_ratio))
|
||
|
l = len(domains)
|
||
|
ss = min(l,count)
|
||
|
res = []
|
||
|
if ss > 0:
|
||
|
# sort according to ratio
|
||
|
res = list(sorted(domains,key=lambda x:x[1],reverse=True))[0:ss]
|
||
|
# returns sorted list of tuples domain,gain_ratio
|
||
|
return res
|
||
|
|
||
|
def get_unvisited_domains(self,count):
|
||
|
# get all domains
|
||
|
rows = self.session.execute(self.domains_select)
|
||
|
domains = []
|
||
|
for row in rows:
|
||
|
domain = row[0]
|
||
|
seen_count = row[1]
|
||
|
fetched_count = row[2]
|
||
|
gain_ratio = row[3]
|
||
|
afg = row[4]
|
||
|
if seen_count and not fetched_count:
|
||
|
domains.append(domain)
|
||
|
ss = min(len(domains),count)
|
||
|
return random.sample(domains,ss)
|
||
|
|
||
|
def get_visit_links(self,domain,recent_count,old_count,random_count):
|
||
|
dblinks = []
|
||
|
rows = self.session.execute("SELECT url_schema,url_path,url_query,update_time FROM links Where domain_name=%s AND link_status='seen'",(domain,))
|
||
|
for row in rows:
|
||
|
link = urlunparse((row[0],domain,row[1],row[2]))
|
||
|
dblinks.append((link,row[3]))
|
||
|
|
||
|
visitlinks = []
|
||
|
dblinks.sort(key=lambda x:x[1])
|
||
|
random_links = []
|
||
|
for i,(link,time) in enumerate(dblinks):
|
||
|
#print(link,time)
|
||
|
if i < recent_count:
|
||
|
visitlinks.append(link)
|
||
|
elif i >= len(dblinks) - old_count:
|
||
|
visitlinks.append(link)
|
||
|
else:
|
||
|
random_links.append(link)
|
||
|
sc = min(random_count,len(random_links))
|
||
|
if sc > 0:
|
||
|
visitlinks += random.sample(random_links,sc)
|
||
|
return visitlinks
|