2020-05-07 14:09:45 +00:00
import cassandra
import cassandra . cluster
import random
import os
import pkg_resources
import datetime
from websucker . parser import normalize_link , urlunparse
VERSION = " sucker6 "
def get_schema ( ) :
with pkg_resources . resource_stream ( __name__ , " schema.sql " ) as f :
schema = f . read ( )
return str ( schema , encoding = " utf8 " )
class Data :
"""
Database of text documents
"""
def __init__ ( self , keyspace = " websucker " , cassandra_host = " 127.0.0.1 " , cassandra_port = 9042 ) :
# execution profile
ep = cassandra . cluster . ExecutionProfile ( request_timeout = 240.0 )
profiles = { cassandra . cluster . EXEC_PROFILE_DEFAULT : ep }
self . cluster = cassandra . cluster . Cluster ( [ cassandra_host ] , port = cassandra_port , execution_profiles = profiles )
self . session = self . cluster . connect ( keyspace )
self . check_document_select_query = self . session . prepare ( " SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=? " )
self . index_response_link_update = self . session . prepare ( """
UPDATE links SET
link_status = ' redirect ' ,
redirect_target = ? ,
update_time = toTimestamp ( now ( ) )
WHERE
domain_name = ? AND
url_path = ? AND
url_query = ?
""" )
self . domain_quality_update = self . session . prepare ( """
UPDATE domain_quality SET
seen_count = ? ,
good_size = ? ,
good_count = ? ,
good_probability = ? ,
good_originality = ? ,
average_good_characters = ? ,
content_size = ? ,
content_count = ? ,
content_probability = ? ,
content_originality = ? ,
average_content_characters = ? ,
fetched_count = ? ,
average_fetched_good_characters = ? ,
gain_ratio = ? ,
update_time = toTimestamp ( now ( ) )
WHERE
domain_name = ? AND
day = toDate ( now ( ) )
""" )
self . index_response_insert_html = self . session . prepare ( """
INSERT INTO html (
day ,
domain_name ,
source_link ,
target_link ,
redirect_links ,
status ,
headers ,
content ,
agent_version ,
update_time
) VALUES ( toDate ( now ( ) ) , ? , ? , ? , ? , ? , ? , ? , ? , toTimestamp ( now ( ) ) ) ;
""" )
self . index_content_link_insert = self . session . prepare ( """
INSERT INTO links (
url_schema ,
domain_name ,
url_path ,
url_query ,
link_status ,
update_time
) VALUES ( ? , ? , ? , ? , ' seen ' , ? ) IF NOT EXISTS
""" )
self . daily_links_insert = self . session . prepare ( """
INSERT INTO daily_links (
day ,
domain_name ,
url_path ,
url_query ,
link_status ,
body_size ,
link_originality ,
update_time
) VALUES ( toDate ( now ( ) ) , ? , ? , ? , ? , ? , ? , toTimestamp ( now ( ) ) )
""" )
self . daily_links_select = self . session . prepare ( """
SELECT
domain_name ,
link_status ,
count ( link_status )
FROM daily_links WHERE day = toDate ( now ( ) ) GROUP BY domain_name , link_status
""" )
# PArsed Content
self . index_content_content_insert = self . session . prepare ( """
INSERT INTO content (
domain_name ,
target_link ,
links ,
title ,
description ,
section ,
authors ,
tags ,
article_published_time ,
text_date ,
body ,
body_size ,
agent_version ,
update_time
) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? ) ;
""" )
self . paragraph_checksums_insert = self . session . prepare ( " INSERT INTO paragraph_checksums (checksum,url_hash) VALUES(?,?) " )
self . index_content_links_update = self . session . prepare ( " UPDATE links SET link_status=?, link_originality=?,body_size=?,url_schema=? WHERE domain_name=? AND url_path = ? AND url_query=? " )
self . check_domain_count = self . session . prepare ( " select count(url_path) from links where domain_name=? and link_status = ? " )
self . check_domain_size = self . session . prepare ( " select sum(body_size),sum(link_originality) from links where domain_name=? and link_status =? " )
self . domains_select = self . session . prepare ( " SELECT domain_name,seen_count,fetched_count,gain_ratio,average_fetched_good_characters FROM domain_quality PER PARTITION LIMIT 1 " )
def index_responses ( self , source_link , responses ) :
# Redirect links
pl = normalize_link ( source_link )
for response in responses :
tl = response . get_canonical ( )
r = (
tl ,
pl [ 1 ] ,
pl [ 2 ] ,
pl [ 3 ] ,
)
if pl != tl :
res = self . session . execute ( self . index_response_link_update , r )
d = (
pl [ 1 ] ,
source_link ,
response . get_canonical ( ) ,
response . redirects ,
response . status ,
response . headers ,
response . get_content ( ) ,
VERSION ,
)
self . session . execute ( self . index_response_insert_html , d )
2020-05-10 09:48:17 +00:00
def summary ( self , parser ) :
gs = 0
cs = 0
fc = 0
vd = 0
ud = 0
sl = 0
fd = 0
jd = 0
rows = self . session . execute ( " SELECT domain_name,good_size,content_size, fetched_count,seen_count FROM domain_quality PER PARTITION LIMIT 1 " )
for row in rows :
if not parser . is_domain_good ( row [ 0 ] ) :
jd + = 1
if row [ 1 ] is not None :
gs + = row [ 1 ]
if row [ 2 ] is not None :
cs + = row [ 2 ]
if row [ 3 ] is not None :
fc + = row [ 3 ]
if row [ 4 ] is not None :
sl + = row [ 4 ]
if row [ 3 ] is None or row [ 3 ] == 0 :
ud + = 1
else :
vd + = 1
if row [ 4 ] is None or row [ 4 ] == 0 :
fd + = 1
print ( " Good characters: {} " . format ( gs ) )
print ( " Fetched characters: {} " . format ( cs ) )
print ( " Fetched documents: {} " . format ( fc ) )
print ( " Visited domains: {} " . format ( vd ) )
print ( " Unvisited domains: {} " . format ( ud ) )
print ( " Junk domains: {} " . format ( jd ) )
print ( " New links : {} " . format ( sl ) )
print ( " Finished domains : {} " . format ( fd ) )
2020-05-07 14:09:45 +00:00
def daily_report ( self ) :
2020-05-09 09:50:50 +00:00
#rows = self.session.execute(self.daily_links_select)
rows = self . session . execute ( " SELECT domain_name,count(link_status) FROM daily_links WHERE day=toDate(now()) GROUP BY day,domain_name " )
domains = [ ]
2020-05-07 14:09:45 +00:00
for row in rows :
2020-05-09 09:50:50 +00:00
domains . append ( list ( row ) )
total_count = 0
total_size = 0
for domain , count in sorted ( domains , key = lambda x : x [ 1 ] ) :
total_count + = count
rows = self . session . execute ( " SELECT link_status,count(link_status),sum(body_size) FROM daily_links WHERE day=toDate(now()) AND domain_name= %s GROUP BY day,domain_name,link_status " , ( domain , ) )
gc = 0
bs = 0
for row in rows :
if row [ 0 ] == " good " :
gc = row [ 1 ]
bs = row [ 2 ]
total_size + = bs
print ( domain , gc / count , bs , count )
print ( " {} domains, {} documents, {} characters " . format ( len ( domains ) , total_count , total_size ) )
2020-05-07 14:09:45 +00:00
def index_follow_links ( self , parser , links , connection ) :
# Index seen links
follow_links = set ( )
for l in links :
if parser . is_link_good ( l ) :
#if connection is not None and parser.listen_robot and not connection.is_robot_good(l):
# continue
link = normalize_link ( l , strip_query = parser . strip_query )
follow_links . add ( urlunparse ( link ) )
newlinkdomains = set ( )
for link in follow_links :
value = [ ]
nl = normalize_link ( link )
value + = nl
value . append ( datetime . date . today ( ) )
rows = self . session . execute ( self . index_content_link_insert , value )
row = rows . one ( )
if row . applied :
newlinkdomains . add ( nl [ 1 ] )
for domain in newlinkdomains :
self . check_domain ( domain )
def index_content ( self , target_link , parsed_document ) :
nl = normalize_link ( target_link )
domain_name = nl [ 1 ]
assert len ( domain_name ) > 1
pd = parsed_document
body_length = 0
if pd . body is not None :
body_length = len ( pd . body )
value = (
domain_name ,
target_link ,
pd . get_links ( ) ,
pd . title ,
pd . description ,
pd . section ,
pd . authors ,
pd . tags ,
pd . article_published_time ,
pd . text_date ,
pd . body ,
body_length ,
VERSION ,
pd . current_time
)
content_future = self . session . execute_async ( self . index_content_content_insert , value )
# result later
link_status = " good "
originality = 0
tsz = 0
if pd . body is None :
link_status = " bad_parse "
else :
tsz = len ( pd . body )
if tsz < 300 :
link_status = " bad_small "
if link_status == " good " :
futures = [ ]
for pc , psz in zip ( pd . paragraph_checksums , pd . paragraph_sizes ) :
fut = self . session . execute_async ( self . paragraph_checksums_insert , ( pc , hash ( nl [ 1 ] + " / " + nl [ 2 ] + " ? " + nl [ 3 ] ) ) )
futures . append ( fut )
for fut in futures :
fut . result ( )
originality = self . check_document ( pd . paragraph_checksums , pd . paragraph_sizes )
if originality < 0.8 :
link_status = " bad_copy "
print ( nl )
self . session . execute ( self . index_content_links_update , ( link_status , originality , tsz , nl [ 0 ] , nl [ 1 ] , nl [ 2 ] , nl [ 3 ] ) )
content_future . result ( )
print ( " <<<< " + link_status + " " + str ( originality ) )
dl = (
nl [ 1 ] ,
nl [ 2 ] ,
nl [ 3 ] ,
link_status ,
tsz ,
originality
)
self . session . execute ( self . daily_links_insert , dl )
def check_document ( self , paragraph_checksums , paragraph_sizes ) :
tsz = sum ( paragraph_sizes )
if tsz == 0 :
return 0
copies = 0
futures = [ ]
for pc , psz in zip ( paragraph_checksums , paragraph_sizes ) :
futures . append ( self . session . execute_async ( self . check_document_select_query , ( pc , ) ) )
for fut , psz in zip ( futures , paragraph_sizes ) :
rows = fut . result ( )
res = rows . one ( ) [ 0 ]
if res > 1 :
copies + = psz
return ( tsz - copies ) / tsz
def check_domain ( self , domain ) :
assert len ( domain ) > 0
seen_count = None
good_size = None
good_count = None
good_probability = None
good_originality = None
average_good_characters = None
content_size = None
content_count = None
content_probability = None
content_originality = None
average_content_characters = None
fetched_count = None
average_fetched_good_characters = None
gain_ratio = None
counts = {
" good " : 0 ,
" bad_copy " : 0 ,
" bad_small " : 0 ,
" bad_httpcode " : 0 ,
" bad_type " : 0 ,
" bad_content " : 0 ,
" bad_parse " : 0 ,
" seen " : 0
}
for k in counts . keys ( ) :
res = self . session . execute ( self . check_domain_count , ( domain , k ) )
co = res . one ( ) [ 0 ]
counts [ k ] = co
seen_count = counts [ " seen " ]
good_count = counts [ " good " ]
content_count = counts [ " good " ] + counts [ " bad_copy " ] + counts [ " bad_small " ]
fetched_count = sum ( counts . values ( ) ) - counts [ " seen " ]
if fetched_count > 0 :
content_probability = content_count / fetched_count
good_probability = good_count / fetched_count
sizes = {
" good " : 0 ,
" bad_copy " : 0 ,
" bad_small " : 0
}
originalities = { }
for k in sizes . keys ( ) :
res = self . session . execute ( self . check_domain_size , ( domain , k ) )
row = res . one ( )
co = row [ 0 ]
originalities [ k ] = row [ 1 ]
sizes [ k ] = co
good_size = sizes [ " good " ]
content_size = sum ( sizes . values ( ) )
if good_count > 0 :
good_originality = originalities [ " good " ] / good_count
if content_count > 0 :
content_originality = sum ( originalities . values ( ) ) / content_count
if good_count > 0 :
average_good_characters = good_size / good_count * good_originality
average_fetched_good_characters = good_size * good_originality / fetched_count
gain_ratio = average_fetched_good_characters / fetched_count
if content_count > 0 :
average_content_characters = content_size / content_count
#print(sizes)
#print(originalities)
uv = (
seen_count ,
good_size ,
good_count ,
good_probability ,
good_originality ,
average_good_characters ,
content_size ,
content_count ,
content_probability ,
content_originality ,
average_content_characters ,
fetched_count ,
average_fetched_good_characters ,
gain_ratio ,
domain )
if fetched_count > 0 or seen_count > 0 :
self . session . execute ( self . domain_quality_update , uv )
2020-05-09 09:50:50 +00:00
print ( uv )
2020-05-07 14:09:45 +00:00
return average_fetched_good_characters
def all_domains ( self , count ) :
rows = self . session . execute ( self . domains_select )
domains = [ ]
for row in rows :
domain = row [ 0 ]
seen_count = row [ 1 ]
fetched_count = row [ 2 ]
gain_ratio = row [ 3 ]
afg = row [ 4 ]
if fetched_count and afg and seen_count :
domains . append ( tuple ( row ) )
l = len ( domains )
ss = min ( l , count )
res = [ ]
if ss > 0 :
# sort according to ratio
res = list ( sorted ( domains , key = lambda x : x [ 4 ] , reverse = True ) ) [ 0 : ss ]
# returns sorted list of tuples domain,gain_ratio
return res
def get_best_domains ( self , count ) :
# get all domains
rows = self . session . execute ( self . domains_select )
domains = [ ]
for row in rows :
domain = row [ 0 ]
seen_count = row [ 1 ]
fetched_count = row [ 2 ]
gain_ratio = row [ 3 ]
afg = row [ 4 ]
if seen_count and fetched_count and gain_ratio :
2020-05-09 09:50:50 +00:00
domains . append ( list ( row ) )
2020-05-07 14:09:45 +00:00
l = len ( domains )
ss = min ( l , count )
res = [ ]
if ss > 0 :
# sort according to ratio
2020-05-09 09:50:50 +00:00
res = list ( sorted ( domains , key = lambda x : x [ 3 ] , reverse = True ) ) [ 0 : ss ]
2020-05-07 14:09:45 +00:00
# returns sorted list of tuples domain,gain_ratio
return res
def get_unvisited_domains ( self , count ) :
# get all domains
rows = self . session . execute ( self . domains_select )
domains = [ ]
for row in rows :
domain = row [ 0 ]
seen_count = row [ 1 ]
fetched_count = row [ 2 ]
gain_ratio = row [ 3 ]
afg = row [ 4 ]
if seen_count and not fetched_count :
2020-05-09 09:50:50 +00:00
domains . append ( row )
2020-05-07 14:09:45 +00:00
ss = min ( len ( domains ) , count )
return random . sample ( domains , ss )
def get_visit_links ( self , domain , recent_count , old_count , random_count ) :
dblinks = [ ]
rows = self . session . execute ( " SELECT url_schema,url_path,url_query,update_time FROM links Where domain_name= %s AND link_status= ' seen ' " , ( domain , ) )
for row in rows :
link = urlunparse ( ( row [ 0 ] , domain , row [ 1 ] , row [ 2 ] ) )
dblinks . append ( ( link , row [ 3 ] ) )
visitlinks = [ ]
dblinks . sort ( key = lambda x : x [ 1 ] )
random_links = [ ]
for i , ( link , time ) in enumerate ( dblinks ) :
#print(link,time)
if i < recent_count :
visitlinks . append ( link )
elif i > = len ( dblinks ) - old_count :
visitlinks . append ( link )
else :
random_links . append ( link )
sc = min ( random_count , len ( random_links ) )
if sc > 0 :
visitlinks + = random . sample ( random_links , sc )
return visitlinks