2020-05-07 14:09:45 +00:00
import cassandra
import cassandra . cluster
2023-02-26 13:10:58 +00:00
import cassandra . query
2023-02-23 15:08:02 +00:00
from cassandra . auth import PlainTextAuthProvider
2020-05-07 14:09:45 +00:00
import random
import os
import pkg_resources
import datetime
from websucker . parser import normalize_link , urlunparse
2020-06-06 09:29:36 +00:00
import collections
import math
2023-02-26 13:10:58 +00:00
import json
2020-05-07 14:09:45 +00:00
VERSION = " sucker6 "
2023-02-28 18:29:51 +00:00
def calculate_checksums ( self , text ) :
"""
@return fingerprints of a paragraphs in text . Paragraphs are separated by a blank line
"""
checksums = [ ]
sizes = [ ]
hval = 0
hsz = 0
sz = 0
for c in text :
cv = ord ( c )
sz + = 1
if cv > 64 :
hval + = ( hval << 3 ) + cv
zv = hval >> 31
hval & = 0x7fffffff
hval + = zv
hsz + = 1
if c == " \n " and hsz > 0 :
if hsz > 100 :
checksums . append ( hval )
sizes . append ( sz )
sz = 0
hsz = 0
if hsz > 100 :
checksums . append ( hval )
sizes . append ( sz )
return checksums , sizes
2020-05-07 14:09:45 +00:00
def get_schema ( ) :
with pkg_resources . resource_stream ( __name__ , " schema.sql " ) as f :
schema = f . read ( )
return str ( schema , encoding = " utf8 " )
2023-02-28 07:56:35 +00:00
def connect_cluster ( cassandra_host , cassandra_port , username , password ) :
auth_provider = None
if username is not None and password is not None :
auth_provider = PlainTextAuthProvider ( username = username , password = password )
# execution profile
ep = cassandra . cluster . ExecutionProfile ( request_timeout = 240.0 )
profiles = { cassandra . cluster . EXEC_PROFILE_DEFAULT : ep }
cluster = cassandra . cluster . Cluster ( [ cassandra_host ] , port = cassandra_port , execution_profiles = profiles , auth_provider = auth_provider )
return cluster
2020-05-07 14:09:45 +00:00
class Data :
"""
Database of text documents
"""
2023-02-23 15:08:02 +00:00
def __init__ ( self , keyspace = " websucker " , cassandra_host = " 127.0.0.1 " , cassandra_port = 9042 , username = None , password = None ) :
2020-06-04 11:44:22 +00:00
print ( " Database {} @ {} : {} " . format ( keyspace , cassandra_host , cassandra_port ) )
2023-02-28 07:56:35 +00:00
self . cluster = connect_cluster ( cassandra_host , cassandra_port , username , password )
2020-05-07 14:09:45 +00:00
self . session = self . cluster . connect ( keyspace )
self . check_document_select_query = self . session . prepare ( " SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=? " )
2020-06-04 11:44:22 +00:00
self . update_links = self . session . prepare ( """
2020-05-07 14:09:45 +00:00
UPDATE links SET
2020-06-04 11:44:22 +00:00
link_status = ? ,
2020-05-07 14:09:45 +00:00
redirect_target = ? ,
update_time = toTimestamp ( now ( ) )
WHERE
domain_name = ? AND
url_path = ? AND
url_query = ?
""" )
self . domain_quality_update = self . session . prepare ( """
UPDATE domain_quality SET
seen_count = ? ,
good_size = ? ,
good_count = ? ,
good_probability = ? ,
good_originality = ? ,
average_good_characters = ? ,
content_size = ? ,
content_count = ? ,
content_probability = ? ,
content_originality = ? ,
average_content_characters = ? ,
fetched_count = ? ,
average_fetched_good_characters = ? ,
gain_ratio = ? ,
update_time = toTimestamp ( now ( ) )
WHERE
domain_name = ? AND
day = toDate ( now ( ) )
""" )
self . index_response_insert_html = self . session . prepare ( """
INSERT INTO html (
day ,
domain_name ,
source_link ,
target_link ,
redirect_links ,
status ,
headers ,
content ,
agent_version ,
update_time
) VALUES ( toDate ( now ( ) ) , ? , ? , ? , ? , ? , ? , ? , ? , toTimestamp ( now ( ) ) ) ;
""" )
self . index_content_link_insert = self . session . prepare ( """
INSERT INTO links (
url_schema ,
domain_name ,
url_path ,
url_query ,
link_status ,
update_time
) VALUES ( ? , ? , ? , ? , ' seen ' , ? ) IF NOT EXISTS
""" )
self . daily_links_insert = self . session . prepare ( """
INSERT INTO daily_links (
day ,
domain_name ,
url_path ,
url_query ,
link_status ,
body_size ,
link_originality ,
update_time
) VALUES ( toDate ( now ( ) ) , ? , ? , ? , ? , ? , ? , toTimestamp ( now ( ) ) )
""" )
self . daily_links_select = self . session . prepare ( """
SELECT
domain_name ,
link_status ,
count ( link_status )
FROM daily_links WHERE day = toDate ( now ( ) ) GROUP BY domain_name , link_status
""" )
# PArsed Content
self . index_content_content_insert = self . session . prepare ( """
INSERT INTO content (
domain_name ,
target_link ,
links ,
title ,
description ,
section ,
authors ,
tags ,
article_published_time ,
text_date ,
body ,
body_size ,
agent_version ,
update_time
) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? ) ;
""" )
self . paragraph_checksums_insert = self . session . prepare ( " INSERT INTO paragraph_checksums (checksum,url_hash) VALUES(?,?) " )
self . index_content_links_update = self . session . prepare ( " UPDATE links SET link_status=?, link_originality=?,body_size=?,url_schema=? WHERE domain_name=? AND url_path = ? AND url_query=? " )
self . check_domain_count = self . session . prepare ( " select count(url_path) from links where domain_name=? and link_status = ? " )
self . check_domain_size = self . session . prepare ( " select sum(body_size),sum(link_originality) from links where domain_name=? and link_status =? " )
self . domains_select = self . session . prepare ( " SELECT domain_name,seen_count,fetched_count,gain_ratio,average_fetched_good_characters FROM domain_quality PER PARTITION LIMIT 1 " )
def index_responses ( self , source_link , responses ) :
# Redirect links
pl = normalize_link ( source_link )
2020-06-06 09:29:36 +00:00
domain = pl [ 1 ]
npl = urlunparse ( pl )
2020-05-07 14:09:45 +00:00
for response in responses :
tl = response . get_canonical ( )
2020-06-06 09:29:36 +00:00
if npl != tl :
self . update_link_status ( npl , " redirect " , tl )
2020-05-07 14:09:45 +00:00
d = (
2020-06-06 09:29:36 +00:00
domain ,
npl ,
tl ,
2020-05-07 14:09:45 +00:00
response . redirects ,
response . status ,
response . headers ,
response . get_content ( ) ,
VERSION ,
)
self . session . execute ( self . index_response_insert_html , d )
2020-05-10 09:48:17 +00:00
def summary ( self , parser ) :
gs = 0
cs = 0
2020-06-08 14:09:47 +00:00
fetched_documents = 0
2020-05-10 09:48:17 +00:00
vd = 0
2020-06-08 14:09:47 +00:00
unvisited_domains = 0
unvisited_junk_domains = 0
2020-05-10 09:48:17 +00:00
sl = 0
fd = 0
2020-06-08 14:09:47 +00:00
junk_domains = 0
2020-05-10 09:48:17 +00:00
rows = self . session . execute ( " SELECT domain_name,good_size,content_size, fetched_count,seen_count FROM domain_quality PER PARTITION LIMIT 1 " )
2020-06-06 09:29:36 +00:00
# TODO submdomain analysis
#dd = collections.defaultdict(set)
2020-06-08 14:09:47 +00:00
second_level = set ( )
2020-05-10 09:48:17 +00:00
for row in rows :
2020-06-06 09:29:36 +00:00
domain = row [ 0 ]
2020-06-08 14:09:47 +00:00
subdomains = domain . split ( " . " )
2020-06-06 09:29:36 +00:00
#d2 = subdomains[-2] + "." + subdomains[-1]
2020-06-08 14:09:47 +00:00
if len ( subdomains ) > 2 :
d3 = " . " . join ( subdomains [ 0 : - 2 ] )
second_level . add ( d3 )
2020-06-06 09:29:36 +00:00
if not parser . is_domain_good ( domain ) :
2020-06-08 14:09:47 +00:00
junk_domains + = 1
2020-05-10 09:48:17 +00:00
if row [ 1 ] is not None :
gs + = row [ 1 ]
if row [ 2 ] is not None :
cs + = row [ 2 ]
if row [ 3 ] is not None :
2020-06-08 14:09:47 +00:00
fetched_documents + = row [ 3 ]
2020-05-10 09:48:17 +00:00
if row [ 4 ] is not None :
sl + = row [ 4 ]
if row [ 3 ] is None or row [ 3 ] == 0 :
2020-06-08 14:09:47 +00:00
unvisited_domains + = 1
if not parser . is_domain_good ( domain ) :
unvisited_junk_domains + = 1
2020-05-10 09:48:17 +00:00
else :
vd + = 1
if row [ 4 ] is None or row [ 4 ] == 0 :
fd + = 1
print ( " Good characters: {} " . format ( gs ) )
print ( " Fetched characters: {} " . format ( cs ) )
2020-06-08 14:09:47 +00:00
print ( " Fetched documents: {} " . format ( fetched_documents ) )
2020-05-10 09:48:17 +00:00
print ( " Visited domains: {} " . format ( vd ) )
2020-06-08 14:09:47 +00:00
print ( " Unvisited domains: {} " . format ( unvisited_domains ) )
print ( " Junk domains: {} " . format ( junk_domains ) )
print ( " Unvisited junk domains: {} " . format ( unvisited_junk_domains ) )
2020-05-10 09:48:17 +00:00
print ( " New links : {} " . format ( sl ) )
2020-06-08 14:09:47 +00:00
print ( " Second level domains: {} " . format ( len ( second_level ) ) )
2020-05-10 09:48:17 +00:00
print ( " Finished domains : {} " . format ( fd ) )
2020-06-06 09:29:36 +00:00
#for d,sd in dd.items():
# if len(sd) > 1:
# print(d + " " + ",".join(sd))
2020-05-10 09:48:17 +00:00
2020-05-07 14:09:45 +00:00
def daily_report ( self ) :
2020-05-09 09:50:50 +00:00
#rows = self.session.execute(self.daily_links_select)
rows = self . session . execute ( " SELECT domain_name,count(link_status) FROM daily_links WHERE day=toDate(now()) GROUP BY day,domain_name " )
domains = [ ]
2020-05-07 14:09:45 +00:00
for row in rows :
2020-05-09 09:50:50 +00:00
domains . append ( list ( row ) )
total_count = 0
total_size = 0
2020-05-13 13:20:20 +00:00
out = [ ]
for domain , count in domains :
if count < 2 :
continue
2020-05-09 09:50:50 +00:00
total_count + = count
rows = self . session . execute ( " SELECT link_status,count(link_status),sum(body_size) FROM daily_links WHERE day=toDate(now()) AND domain_name= %s GROUP BY day,domain_name,link_status " , ( domain , ) )
gc = 0
bs = 0
for row in rows :
if row [ 0 ] == " good " :
gc = row [ 1 ]
bs = row [ 2 ]
total_size + = bs
2020-05-13 13:20:20 +00:00
out . append ( ( domain , bs , gc / count , count ) )
print ( " Domain, characters,good ratio,documents " )
for i , value in enumerate ( reversed ( sorted ( out , key = lambda x : x [ 3 ] ) ) ) :
if i < 20 :
print ( value )
#print("{},{},{},{}".format(value))
2020-05-09 09:50:50 +00:00
print ( " {} domains, {} documents, {} characters " . format ( len ( domains ) , total_count , total_size ) )
2020-05-07 14:09:45 +00:00
2020-06-06 09:29:36 +00:00
def update_link_status ( self , link , status , redirect_target = None ) :
pl = normalize_link ( link )
2020-06-04 11:44:22 +00:00
r = (
status ,
redirect_target ,
pl [ 1 ] ,
pl [ 2 ] ,
pl [ 3 ] ,
)
res = self . session . execute ( self . update_links , r )
2020-05-07 14:09:45 +00:00
def index_follow_links ( self , parser , links , connection ) :
# Index seen links
follow_links = set ( )
for l in links :
if parser . is_link_good ( l ) :
#if connection is not None and parser.listen_robot and not connection.is_robot_good(l):
# continue
link = normalize_link ( l , strip_query = parser . strip_query )
follow_links . add ( urlunparse ( link ) )
newlinkdomains = set ( )
2020-06-06 09:29:36 +00:00
newlinkcount = 0
2020-05-07 14:09:45 +00:00
for link in follow_links :
value = [ ]
nl = normalize_link ( link )
value + = nl
value . append ( datetime . date . today ( ) )
rows = self . session . execute ( self . index_content_link_insert , value )
row = rows . one ( )
if row . applied :
newlinkdomains . add ( nl [ 1 ] )
2020-06-06 09:29:36 +00:00
newlinkcount + = 1
2020-05-07 14:09:45 +00:00
for domain in newlinkdomains :
self . check_domain ( domain )
2020-06-06 09:29:36 +00:00
print ( " {} new links, {} new domains " . format ( newlinkcount , len ( newlinkdomains ) ) )
2020-05-07 14:09:45 +00:00
def index_content ( self , target_link , parsed_document ) :
nl = normalize_link ( target_link )
domain_name = nl [ 1 ]
assert len ( domain_name ) > 1
pd = parsed_document
body_length = 0
if pd . body is not None :
body_length = len ( pd . body )
value = (
domain_name ,
target_link ,
2023-02-28 18:29:51 +00:00
pd . link_set ,
2020-05-07 14:09:45 +00:00
pd . title ,
pd . description ,
pd . section ,
pd . authors ,
pd . tags ,
pd . article_published_time ,
pd . text_date ,
pd . body ,
body_length ,
VERSION ,
pd . current_time
)
content_future = self . session . execute_async ( self . index_content_content_insert , value )
# result later
link_status = " good "
originality = 0
tsz = 0
if pd . body is None :
link_status = " bad_parse "
else :
tsz = len ( pd . body )
if tsz < 300 :
link_status = " bad_small "
if link_status == " good " :
futures = [ ]
2023-02-28 18:29:51 +00:00
paragraph_checksums , paragraph_sizes = calculate_checksums ( pd . text )
for pc , psz in zip ( paragraph_checksums , paragraph_sizes ) :
2020-05-07 14:09:45 +00:00
fut = self . session . execute_async ( self . paragraph_checksums_insert , ( pc , hash ( nl [ 1 ] + " / " + nl [ 2 ] + " ? " + nl [ 3 ] ) ) )
futures . append ( fut )
for fut in futures :
fut . result ( )
2023-02-28 18:29:51 +00:00
originality = self . check_document ( paragraph_checksums , paragraph_sizes )
2020-05-07 14:09:45 +00:00
if originality < 0.8 :
link_status = " bad_copy "
self . session . execute ( self . index_content_links_update , ( link_status , originality , tsz , nl [ 0 ] , nl [ 1 ] , nl [ 2 ] , nl [ 3 ] ) )
content_future . result ( )
print ( " <<<< " + link_status + " " + str ( originality ) )
dl = (
nl [ 1 ] ,
nl [ 2 ] ,
nl [ 3 ] ,
link_status ,
tsz ,
originality
)
self . session . execute ( self . daily_links_insert , dl )
def check_document ( self , paragraph_checksums , paragraph_sizes ) :
tsz = sum ( paragraph_sizes )
if tsz == 0 :
return 0
copies = 0
futures = [ ]
for pc , psz in zip ( paragraph_checksums , paragraph_sizes ) :
futures . append ( self . session . execute_async ( self . check_document_select_query , ( pc , ) ) )
for fut , psz in zip ( futures , paragraph_sizes ) :
rows = fut . result ( )
res = rows . one ( ) [ 0 ]
if res > 1 :
copies + = psz
return ( tsz - copies ) / tsz
def check_domain ( self , domain ) :
assert len ( domain ) > 0
seen_count = None
good_size = None
good_count = None
good_probability = None
good_originality = None
average_good_characters = None
content_size = None
content_count = None
content_probability = None
content_originality = None
average_content_characters = None
fetched_count = None
average_fetched_good_characters = None
gain_ratio = None
counts = {
" good " : 0 ,
" bad_copy " : 0 ,
" bad_small " : 0 ,
" bad_httpcode " : 0 ,
" bad_type " : 0 ,
" bad_content " : 0 ,
" bad_parse " : 0 ,
" seen " : 0
}
for k in counts . keys ( ) :
res = self . session . execute ( self . check_domain_count , ( domain , k ) )
co = res . one ( ) [ 0 ]
counts [ k ] = co
seen_count = counts [ " seen " ]
good_count = counts [ " good " ]
content_count = counts [ " good " ] + counts [ " bad_copy " ] + counts [ " bad_small " ]
fetched_count = sum ( counts . values ( ) ) - counts [ " seen " ]
if fetched_count > 0 :
content_probability = content_count / fetched_count
good_probability = good_count / fetched_count
sizes = {
" good " : 0 ,
" bad_copy " : 0 ,
" bad_small " : 0
}
originalities = { }
for k in sizes . keys ( ) :
res = self . session . execute ( self . check_domain_size , ( domain , k ) )
row = res . one ( )
co = row [ 0 ]
originalities [ k ] = row [ 1 ]
sizes [ k ] = co
good_size = sizes [ " good " ]
content_size = sum ( sizes . values ( ) )
if good_count > 0 :
good_originality = originalities [ " good " ] / good_count
if content_count > 0 :
content_originality = sum ( originalities . values ( ) ) / content_count
if good_count > 0 :
average_good_characters = good_size / good_count * good_originality
average_fetched_good_characters = good_size * good_originality / fetched_count
gain_ratio = average_fetched_good_characters / fetched_count
if content_count > 0 :
average_content_characters = content_size / content_count
#print(sizes)
#print(originalities)
uv = (
seen_count ,
good_size ,
good_count ,
good_probability ,
good_originality ,
average_good_characters ,
content_size ,
content_count ,
content_probability ,
content_originality ,
average_content_characters ,
fetched_count ,
average_fetched_good_characters ,
gain_ratio ,
domain )
if fetched_count > 0 or seen_count > 0 :
self . session . execute ( self . domain_quality_update , uv )
return average_fetched_good_characters
def all_domains ( self , count ) :
rows = self . session . execute ( self . domains_select )
domains = [ ]
for row in rows :
domain = row [ 0 ]
seen_count = row [ 1 ]
fetched_count = row [ 2 ]
gain_ratio = row [ 3 ]
afg = row [ 4 ]
if fetched_count and afg and seen_count :
domains . append ( tuple ( row ) )
l = len ( domains )
ss = min ( l , count )
res = [ ]
if ss > 0 :
# sort according to ratio
res = list ( sorted ( domains , key = lambda x : x [ 4 ] , reverse = True ) ) [ 0 : ss ]
# returns sorted list of tuples domain,gain_ratio
return res
2020-05-20 07:22:19 +00:00
def get_best_domains ( self , count , parser ) :
2020-05-07 14:09:45 +00:00
# get all domains
rows = self . session . execute ( self . domains_select )
domains = [ ]
for row in rows :
domain = row [ 0 ]
seen_count = row [ 1 ]
fetched_count = row [ 2 ]
gain_ratio = row [ 3 ]
afg = row [ 4 ]
2020-05-20 07:22:19 +00:00
if seen_count and fetched_count and gain_ratio and parser . is_domain_good ( domain ) :
2020-05-09 09:50:50 +00:00
domains . append ( list ( row ) )
2020-05-07 14:09:45 +00:00
l = len ( domains )
ss = min ( l , count )
res = [ ]
if ss > 0 :
# sort according to ratio
2020-05-09 09:50:50 +00:00
res = list ( sorted ( domains , key = lambda x : x [ 3 ] , reverse = True ) ) [ 0 : ss ]
2020-05-07 14:09:45 +00:00
# returns sorted list of tuples domain,gain_ratio
return res
2020-07-04 07:34:31 +00:00
def get_random_domains ( self , count , parser ) :
# get all domains
rows = self . session . execute ( self . domains_select )
domains = [ ]
for row in rows :
domain = row [ 0 ]
if parser . is_domain_good ( domain ) :
domains . append ( list ( row ) )
l = len ( domains )
ss = min ( l , count )
return random . sample ( domains , ss )
2020-05-20 07:22:19 +00:00
def get_unvisited_domains ( self , count , parser ) :
2020-05-07 14:09:45 +00:00
# get all domains
rows = self . session . execute ( self . domains_select )
domains = [ ]
2020-06-06 09:29:36 +00:00
# Analyze third level domains
dd = collections . defaultdict ( set )
third_count = 0
2020-05-07 14:09:45 +00:00
for row in rows :
domain = row [ 0 ]
seen_count = row [ 1 ]
fetched_count = row [ 2 ]
gain_ratio = row [ 3 ]
afg = row [ 4 ]
2020-06-06 09:29:36 +00:00
if seen_count and not fetched_count :
subdomains = domain . split ( " . " )
d2 = subdomains [ - 2 ] + " . " + subdomains [ - 1 ]
dd [ d2 ] . add ( domain )
# Select second level first
result = [ ]
# then select third level
ll = list ( dd . items ( ) )
random . shuffle ( ll )
domain_weight = count / len ( ll )
for domain , subdomains in ll :
dl = list ( subdomains )
link_weight = domain_weight / len ( dl )
random . shuffle ( dl )
for d in dl :
r = random . random ( )
if r < link_weight :
result . append ( ( d , 0 ) )
return result
2023-02-26 13:10:58 +00:00
def export_domain ( self , domain ) :
rows = self . session . execute ( " SELECT JSON * from content WHERE domain_name= %s " , ( domain , ) )
for row in rows :
print ( row [ 0 ] )
2020-05-07 14:09:45 +00:00
def get_visit_links ( self , domain , recent_count , old_count , random_count ) :
dblinks = [ ]
rows = self . session . execute ( " SELECT url_schema,url_path,url_query,update_time FROM links Where domain_name= %s AND link_status= ' seen ' " , ( domain , ) )
for row in rows :
link = urlunparse ( ( row [ 0 ] , domain , row [ 1 ] , row [ 2 ] ) )
dblinks . append ( ( link , row [ 3 ] ) )
visitlinks = [ ]
dblinks . sort ( key = lambda x : x [ 1 ] )
random_links = [ ]
for i , ( link , time ) in enumerate ( dblinks ) :
#print(link,time)
if i < recent_count :
visitlinks . append ( link )
elif i > = len ( dblinks ) - old_count :
visitlinks . append ( link )
else :
random_links . append ( link )
sc = min ( random_count , len ( random_links ) )
if sc > 0 :
visitlinks + = random . sample ( random_links , sc )
return visitlinks