diff --git a/docker-compose.yaml b/docker-compose.yaml index c07be58..53f70a7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,10 +1,10 @@ -version: '2' +version: '3' services: cassandra: image: bitnami/cassandra:3.11 environment: - CASSANDRA_CLUSTER_NAME=cassandra-cluster ports: - - "89042:9042" - - "87000:7000" + - "8942:9042" + - "8700:7000" diff --git a/websucker/cli.py b/websucker/cli.py index 817cad6..006b20a 100644 --- a/websucker/cli.py +++ b/websucker/cli.py @@ -5,11 +5,14 @@ from websucker.parser import normalize_link,urlunparse from websucker.parser import load_parser from websucker.db import Data from websucker.db import get_schema +import websucker.db import click import pprint import greenstalk import os +from websucker import schema + def create_database_from_context(ctx): return Data(ctx.obj["cassandra_keyspace"],ctx.obj["cassandra_host"],ctx.obj["cassandra_port"],ctx.obj["cassandra_username"],ctx.obj["cassandra_password"]) @@ -23,8 +26,8 @@ def create_queue_from_context(ctx): @click.option("--cassandra-keyspace",metavar="CASSANDRA_KEYSPACE",help="cassandra keyspace (if defined, value read from CASSANDRA_KEYSPACE env variable)",envvar="CASSANDRA_KEYSPACE",default="websucker",show_default=True) @click.option("--cassandra-host",metavar="CASSANDRA_HOST",help="cassandra host (if defined, value read from CASSANDRA_HOST env variable)",envvar="CASSANDRA_HOST",default="127.0.0.1",show_default=True) @click.option("--cassandra-port",metavar="CASSANDRA_PORT",help="cassandra port (if defined, value read from CASSANDRA_PORT env variable)",envvar="CASSANDRA_PORT",default=9042,show_default=True) -@click.option("--cassandra-username",metavar="CASSANDRA_USERNAME",help="cassandra username (if defined, value read from CASSANDRA_USERNAME env variable)",envvar="CASSANDRA_USERNAME") -@click.option("--cassandra-password",metavar="CASSANDRA_PASSWORD",help="cassandra password (if defined, value read from CASSANDRA_PASSWORD env variable)",envvar="CASSANDRA_PASSWORD") +@click.option("--cassandra-username",metavar="CASSANDRA_USERNAME",help="cassandra username (if defined, value read from CASSANDRA_USERNAME env variable)",envvar="CASSANDRA_USERNAME",default="cassandra",show_default=True) +@click.option("--cassandra-password",metavar="CASSANDRA_PASSWORD",help="cassandra password (if defined, value read from CASSANDRA_PASSWORD env variable)",envvar="CASSANDRA_PASSWORD",default="cassandra",show_default=True) @click.option("--beanstalkd-tube",metavar="BEANSTALKD_TUBE",help="beanstalkd keyspace (if defined, value read from BEANSTALKD_TUBE env variable)",envvar="BEANSTALKD_TUBE",default="websucker",show_default=True) @click.option("--beanstalkd-host",metavar="BEANSTALKD_HOST",help="beanstalkd host (if defined, value read from beanstalkd_HOST env variable)",envvar="BEANSTALKD_HOST",default="127.0.0.1",show_default=True) @click.option("--beanstalkd-port",metavar="BEANSTALKD_PORT",help="beanstalkd port (if defined, value read from BEANSTALKD_PORT env variable)",envvar="BEANSTALKD_PORT",default=11300,show_default=True) @@ -187,6 +190,18 @@ def summary(ctx): p = ctx.obj["parser"] db.summary(p) +@cli.command(help="Create database") +@click.pass_context +@click.argument("replication",default=1) +@click.argument("strategy",default="SimpleStrategy") +def create_database(ctx,replication,strategy): + cluster = websucker.db.connect_cluster(ctx.obj["cassandra_host"],ctx.obj["cassandra_port"],ctx.obj["cassandra_username"],ctx.obj["cassandra_password"]) + with cluster.connect() as session: + query = "CREATE KEYSPACE {} WITH replication = {'class':'{}', 'replication_factor' : {}}".format(ctx.obj["cassandra_keyspace"],strategy,replication) + session.execute(query) + session.set_keyspace(ctx.obj["cassandra_keyspace"]) + model.create_database() + @cli.command(help="Print keyspace schema") def schema(): schema = get_schema() diff --git a/websucker/db.py b/websucker/db.py index a9bb00b..ec5d2b5 100644 --- a/websucker/db.py +++ b/websucker/db.py @@ -19,19 +19,23 @@ def get_schema(): schema = f.read() return str(schema,encoding="utf8") +def connect_cluster(cassandra_host,cassandra_port,username,password): + auth_provider = None + if username is not None and password is not None: + auth_provider = PlainTextAuthProvider(username=username, password=password) + # execution profile + ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0) + profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep} + cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles,auth_provider=auth_provider) + return cluster + class Data: """ Database of text documents """ def __init__(self,keyspace="websucker",cassandra_host="127.0.0.1",cassandra_port=9042,username=None,password=None): print("Database {}@{}:{}".format(keyspace,cassandra_host, cassandra_port)) - auth_provider = None - if username is not None and password is not None: - auth_provider = PlainTextAuthProvider(username=username, password=password) - # execution profile - ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0) - profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep} - self.cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles,auth_provider=auth_provider) + self.cluster = connect_cluster(cassandra_host,cassandra_port,username,password) self.session = self.cluster.connect(keyspace) self.check_document_select_query = self.session.prepare("SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=?" ) diff --git a/websucker/schema.py b/websucker/schema.py index ac14e5a..0525ea6 100644 --- a/websucker/schema.py +++ b/websucker/schema.py @@ -1,93 +1,101 @@ from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model +from cassandra.cqlengine.management import sync_table class Links(Model): - domain_name = column.Text(primary_key=True) - url_path = column.Text(primary_key=True) - url_query = column.Text(primary_key=True) - url_schema = column.Text() - redirect_target = column.Text() - link_status = column.Text() - link_originality = column.Float() - body_size = column.Integer() - update_time = column.DateTime() + __table_name__ = "links" + domain_name = columns.Text(primary_key=True) + url_path = columns.Text(primary_key=True) + url_query = columns.Text(primary_key=True) + url_schema = columns.Text() + redirect_target = columns.Text() + link_status = columns.Text() + link_originality = columns.Float() + body_size = columns.Integer() + update_time = columns.DateTime() class DailyLinks(Model): - day = column.Integer(primary_key=True) - domain_name = column.Text(primary_key=True) - url_path = column.Text(primary_key=True) - url_query = column.Text(primary_key=True) - url_schema = column.Text() - redirect_target = column.Text() - link_status = column.Text() - link_originality = column.Float() - body_size = column.Integer() - update_time = column.DateTime() + __table_name__ = "daily_links" + day = columns.Integer(primary_key=True) + domain_name = columns.Text(primary_key=True) + url_path = columns.Text(primary_key=True) + url_query = columns.Text(primary_key=True) + url_schema = columns.Text() + redirect_target = columns.Text() + link_status = columns.Text() + link_originality = columns.Float() + body_size = columns.Integer() + update_time = columns.DateTime() -CREATE TABLE domain_quality ( - domain_name TEXT, - day DATE, - seen_count INT, - good_size INT, - good_count INT, - good_probability FLOAT, - good_originality FLOAT, - average_good_characters FLOAT, - content_size INT, - content_count INT, - content_probability FLOAT, - content_originality FLOAT, - average_content_characters FLOAT, - fetched_count INT, - average_fetched_good_characters FLOAT, - gain_ratio FLOAT, - update_time TIMESTAMP STATIC , - PRIMARY KEY(domain_name,day) -) WITH CLUSTERING ORDER BY (day DESC); +class DomainQuality: + __table_name__ = "domain_quality" + domain_name = columns.Text(primary_key=True) + day = columns.Date(primary_key=True) + seen_count = columns.Integer() + good_size = columns.Integer() + good_count = columns.Integer() + good_probability = columns.Float() + good_originality = columns.Float() + average_good_characters = columns.Float() + content_size = columns.Integer() + content_count = columns.Integer() + content_probability = columns.Float() + content_originality = columns.Float() + average_content_characters = columns.Float() + fetched_count = columns.Integer() + average_fetched_good_characters = columns.Float() + gain_ratio = columns.Float() + update_time = columns.TimeUUID(static=True) #TIMESTAMP STATIC , +#) WITH CLUSTERING ORDER BY (day DESC); +class Content(Model): + __table_name__ = "content" + domain_name = columns.Text(primary_key=True) + target_link = columns.Text(primary_key=True) + agent_version = columns.Text() + title = columns.Text() + links = columns.Set(value_type=columns.Text) + authors = columns.Set(value_type=columns.Text) + tags = columns.Set(value_type=columns.Text) + description = columns.Text() + section = columns.Text() + article_published_time = columns.Text() + text_date = columns.Text() + body = columns.Text() + body_size = columns.Text() + update_time = columns.DateTime() +# PRIMARY KEY(domain_name,target_link), -CREATE TABLE content ( - domain_name TEXT, - target_link TEXT, - agent_version TEXT, - title TEXT, - links SET, - authors SET, - tags SET, - description TEXT, - section TEXT, - article_published_time TEXT, - text_date TEXT, - body TEXT, - body_size INT, - update_time TIMESTAMP, - PRIMARY KEY(domain_name,target_link), -); +class ParagraphChecksums(Model): + __table_name__ = "paragraph_checksums" + checksum = columns.BigInt(primary_key=True) + url_hash = columns.BigInt(primary_key=True) -CREATE TABLE paragraph_checksums ( - checksum BIGINT, - url_hash BIGINT, - PRIMARY KEY(checksum,url_hash), -); +class Html(Model): + __table_name__ = "html" + day = columns.Date(primary_key=True) + domain_name = columns.Text(primary_key=True) + source_link = columns.Text(primary_key=True) + target_link = columns.Text() + redirect_links = columns.List(value_type=columns.Text) + status = columns.Integer() + content = columns.Text() + headers = columns.Text() + agent_version = columns.Text() + update_time = columns.Text() -CREATE TABLE html ( - day DATE, - domain_name TEXT, - source_link TEXT, - target_link TEXT, - redirect_links LIST, - status INT, - content TEXT, - headers TEXT, - agent_version TEXT, - update_time TIMESTAMP, - PRIMARY KEY(day,domain_name,source_link) -); +class DomainConnections(Model): + __table_name__ = "domain_connections" + domain_name = columns.Text(primary_key=True) + linked_domain = columns.Text(primary_key=True) -CREATE TABLE domain_connections ( - domain_name TEXT, - linked_domain TEXT, - PRIMARY KEY (domain_name,linked_domain) -); +def create_db(): + sync_table(Links) + sync_table(DailyLinks) + sync_table(DomainQuality) + sync_table(Content) + sync_table(ParagraphChecksums) + sync_table(Html) + sync_table(DomainConnections)