From b5bab45113c7c460fc86093b3aa77a6a55d2e3ae Mon Sep 17 00:00:00 2001 From: Daniel Hladek Date: Wed, 16 Feb 2022 14:01:29 +0100 Subject: [PATCH 1/5] zz --- websucker/dump.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 websucker/dump.py diff --git a/websucker/dump.py b/websucker/dump.py new file mode 100644 index 0000000..b62bd4a --- /dev/null +++ b/websucker/dump.py @@ -0,0 +1,30 @@ +import cassandra +import cassandra.cluster +import cassandra.query +import json +import datetime +import sys + +cassandra_host = sys.argv[1] +cassandra_port = sys.argv[2] +keyspace = "websucker" + +ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0,row_factory=cassandra.query.dict_factory) +profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep} +cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles) +select_documents = "select json * from content" +with cluster.connect(keyspace) as session: + #session.row_factory = cassandra.query.dict_factory + select_html = session.prepare("select json * from html where day=? and domain_name=? LIMIT 1") + rows = session.execute(select_documents) + for row in rows: + doc = json.loads(row["[json]"]) + dt = doc["update_time"] + d = dt.split()[0] + hrows = session.execute(select_html,(d,doc["domain_name"])) + html = {} + for h in hrows: + html = json.loads(h["[json]"]) + break + doc["html_data"] = html + print(json.dumps(doc)) From b75336c8a651346ededd548293386843fbbe86c1 Mon Sep 17 00:00:00 2001 From: Daniel Hladek Date: Fri, 18 Feb 2022 10:31:59 +0100 Subject: [PATCH 2/5] dump --- websucker/{queue.py => webqueue.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename websucker/{queue.py => webqueue.py} (100%) diff --git a/websucker/queue.py b/websucker/webqueue.py similarity index 100% rename from websucker/queue.py rename to websucker/webqueue.py From d37bafb666f1e2984304e52afd9dc85cc6553896 Mon Sep 17 00:00:00 2001 From: Daniel Hladek Date: Fri, 18 Feb 2022 13:58:05 +0100 Subject: [PATCH 3/5] zz --- websucker/dump.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/websucker/dump.py b/websucker/dump.py index b62bd4a..d9be4da 100644 --- a/websucker/dump.py +++ b/websucker/dump.py @@ -4,6 +4,7 @@ import cassandra.query import json import datetime import sys +from urllib.parse import urlparse cassandra_host = sys.argv[1] cassandra_port = sys.argv[2] @@ -16,9 +17,30 @@ select_documents = "select json * from content" with cluster.connect(keyspace) as session: #session.row_factory = cassandra.query.dict_factory select_html = session.prepare("select json * from html where day=? and domain_name=? LIMIT 1") + select_link = session.prepare("select link_status from links where domain_name=? and url_path=? and url_query=? LIMIT 1") rows = session.execute(select_documents) for row in rows: doc = json.loads(row["[json]"]) + target_link = doc["target_link"] + parsed_link = urlparse(target_link) + netloc = parsed_link[1].strip().lower() + path = parsed_link[2].strip() + # strip leading / + if len(path) > 1 and path[0] == "/": + path = path[1:] + query = parsed_link[4] + lrows = session.execute(select_link,(netloc,path,query)) + status = None + for l in lrows: + status = str(l["link_status"]) + break + #assert status is not None + if status is None: + continue + #print(status) + # skip bad links + if not status == "good": + continue dt = doc["update_time"] d = dt.split()[0] hrows = session.execute(select_html,(d,doc["domain_name"])) @@ -27,4 +49,6 @@ with cluster.connect(keyspace) as session: html = json.loads(h["[json]"]) break doc["html_data"] = html + del doc["links"] + #print(parsed_link) print(json.dumps(doc)) From 56bf59e5e951038a655f8abebdd35fbdac262b58 Mon Sep 17 00:00:00 2001 From: Daniel Hladek Date: Thu, 23 Feb 2023 16:08:02 +0100 Subject: [PATCH 4/5] password and name --- websucker/cli.py | 8 ++++++-- websucker/db.py | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/websucker/cli.py b/websucker/cli.py index db149c4..0be29ec 100644 --- a/websucker/cli.py +++ b/websucker/cli.py @@ -12,7 +12,7 @@ import os def create_database_from_context(ctx): - return Data(ctx.obj["cassandra_keyspace"],ctx.obj["cassandra_host"],ctx.obj["cassandra_port"]) + return Data(ctx.obj["cassandra_keyspace"],ctx.obj["cassandra_host"],ctx.obj["cassandra_port"],ctx.obj["cassandra_username"],ctx.obj["cassandra_password"]) def create_queue_from_context(ctx): return greenstalk.Client((ctx.obj["beanstalkd_host"],ctx.obj["beanstalkd_port"]),use=ctx.obj["beanstalkd_tube"],watch=ctx.obj["beanstalkd_tube"],encoding="utf8") @@ -23,13 +23,15 @@ def create_queue_from_context(ctx): @click.option("--cassandra-keyspace",metavar="CASSANDRA_KEYSPACE",help="cassandra keyspace (if defined, value read from CASSANDRA_KEYSPACE env variable)",envvar="CASSANDRA_KEYSPACE",default="websucker",show_default=True) @click.option("--cassandra-host",metavar="CASSANDRA_HOST",help="cassandra host (if defined, value read from CASSANDRA_HOST env variable)",envvar="CASSANDRA_HOST",default="127.0.0.1",show_default=True) @click.option("--cassandra-port",metavar="CASSANDRA_PORT",help="cassandra port (if defined, value read from CASSANDRA_PORT env variable)",envvar="CASSANDRA_PORT",default=9042,show_default=True) +@click.option("--cassandra-username",metavar="CASSANDRA_USERNAME",help="cassandra username (if defined, value read from CASSANDRA_USERNAME env variable)",envvar="CASSANDRA_USERNAME") +@click.option("--cassandra-password",metavar="CASSANDRA_PASSWORD",help="cassandra password (if defined, value read from CASSANDRA_PASSWORD env variable)",envvar="CASSANDRA_PASSWORD") @click.option("--beanstalkd-tube",metavar="BEANSTALKD_TUBE",help="beanstalkd keyspace (if defined, value read from BEANSTALKD_TUBE env variable)",envvar="BEANSTALKD_TUBE",default="websucker",show_default=True) @click.option("--beanstalkd-host",metavar="BEANSTALKD_HOST",help="beanstalkd host (if defined, value read from beanstalkd_HOST env variable)",envvar="BEANSTALKD_HOST",default="127.0.0.1",show_default=True) @click.option("--beanstalkd-port",metavar="BEANSTALKD_PORT",help="beanstalkd port (if defined, value read from BEANSTALKD_PORT env variable)",envvar="BEANSTALKD_PORT",default=11300,show_default=True) @click.option("--parser",metavar="file_name",help="zzz") @click.option("--visit",is_flag=True) @click.option("--queue",is_flag=True) -def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,beanstalkd_tube,beanstalkd_host,beanstalkd_port,parser,visit,queue): +def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,cassandra_username,cassandra_password,beanstalkd_tube,beanstalkd_host,beanstalkd_port,parser,visit,queue): ctx.ensure_object(dict) p = BaseParser() if parser is not None: @@ -44,6 +46,8 @@ def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,beanstalkd_tube,bea ctx.obj["parser"] = p ctx.obj["cassandra_host"] = cassandra_host ctx.obj["cassandra_port"] = cassandra_port + ctx.obj["cassandra_username"] = cassandra_username + ctx.obj["cassandra_password"] = cassandra_password ctx.obj["cassandra_keyspace"] = cassandra_keyspace ctx.obj["beanstalkd_host"] = beanstalkd_host ctx.obj["beanstalkd_port"] = beanstalkd_port diff --git a/websucker/db.py b/websucker/db.py index 03f51c2..7e8d84b 100644 --- a/websucker/db.py +++ b/websucker/db.py @@ -1,5 +1,6 @@ import cassandra import cassandra.cluster +from cassandra.auth import PlainTextAuthProvider import random import os import pkg_resources @@ -20,12 +21,15 @@ class Data: """ Database of text documents """ - def __init__(self,keyspace="websucker",cassandra_host="127.0.0.1",cassandra_port=9042): + def __init__(self,keyspace="websucker",cassandra_host="127.0.0.1",cassandra_port=9042,username=None,password=None): print("Database {}@{}:{}".format(keyspace,cassandra_host, cassandra_port)) + auth_provider = None + if username is not None and password is not None: + auth_provider = PlainTextAuthProvider(username=username, password=password) # execution profile ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0) profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep} - self.cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles) + self.cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles,auth_provider=auth_provider) self.session = self.cluster.connect(keyspace) self.check_document_select_query = self.session.prepare("SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=?" ) From 1e7ac17d903a136674146a0f9a2ea94cca93d2d4 Mon Sep 17 00:00:00 2001 From: Daniel Hladek Date: Thu, 23 Feb 2023 16:08:14 +0100 Subject: [PATCH 5/5] wip schema --- websucker/schema.py | 93 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 websucker/schema.py diff --git a/websucker/schema.py b/websucker/schema.py new file mode 100644 index 0000000..ac14e5a --- /dev/null +++ b/websucker/schema.py @@ -0,0 +1,93 @@ +from cassandra.cqlengine import columns +from cassandra.cqlengine.models import Model + +class Links(Model): + domain_name = column.Text(primary_key=True) + url_path = column.Text(primary_key=True) + url_query = column.Text(primary_key=True) + url_schema = column.Text() + redirect_target = column.Text() + link_status = column.Text() + link_originality = column.Float() + body_size = column.Integer() + update_time = column.DateTime() + + +class DailyLinks(Model): + day = column.Integer(primary_key=True) + domain_name = column.Text(primary_key=True) + url_path = column.Text(primary_key=True) + url_query = column.Text(primary_key=True) + url_schema = column.Text() + redirect_target = column.Text() + link_status = column.Text() + link_originality = column.Float() + body_size = column.Integer() + update_time = column.DateTime() + + +CREATE TABLE domain_quality ( + domain_name TEXT, + day DATE, + seen_count INT, + good_size INT, + good_count INT, + good_probability FLOAT, + good_originality FLOAT, + average_good_characters FLOAT, + content_size INT, + content_count INT, + content_probability FLOAT, + content_originality FLOAT, + average_content_characters FLOAT, + fetched_count INT, + average_fetched_good_characters FLOAT, + gain_ratio FLOAT, + update_time TIMESTAMP STATIC , + PRIMARY KEY(domain_name,day) +) WITH CLUSTERING ORDER BY (day DESC); + + +CREATE TABLE content ( + domain_name TEXT, + target_link TEXT, + agent_version TEXT, + title TEXT, + links SET, + authors SET, + tags SET, + description TEXT, + section TEXT, + article_published_time TEXT, + text_date TEXT, + body TEXT, + body_size INT, + update_time TIMESTAMP, + PRIMARY KEY(domain_name,target_link), +); + +CREATE TABLE paragraph_checksums ( + checksum BIGINT, + url_hash BIGINT, + PRIMARY KEY(checksum,url_hash), +); + +CREATE TABLE html ( + day DATE, + domain_name TEXT, + source_link TEXT, + target_link TEXT, + redirect_links LIST, + status INT, + content TEXT, + headers TEXT, + agent_version TEXT, + update_time TIMESTAMP, + PRIMARY KEY(day,domain_name,source_link) +); + +CREATE TABLE domain_connections ( + domain_name TEXT, + linked_domain TEXT, + PRIMARY KEY (domain_name,linked_domain) +);