import cassandra import cassandra.cluster import cassandra.query import json import datetime import sys from urllib.parse import urlparse cassandra_host = sys.argv[1] cassandra_port = sys.argv[2] keyspace = "websucker" ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0,row_factory=cassandra.query.dict_factory) profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep} cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles) select_documents = "select json * from content" with cluster.connect(keyspace) as session: #session.row_factory = cassandra.query.dict_factory select_html = session.prepare("select json * from html where day=? and domain_name=? LIMIT 1") select_link = session.prepare("select link_status from links where domain_name=? and url_path=? and url_query=? LIMIT 1") rows = session.execute(select_documents) for row in rows: doc = json.loads(row["[json]"]) target_link = doc["target_link"] parsed_link = urlparse(target_link) netloc = parsed_link[1].strip().lower() path = parsed_link[2].strip() # strip leading / if len(path) > 1 and path[0] == "/": path = path[1:] query = parsed_link[4] lrows = session.execute(select_link,(netloc,path,query)) status = None for l in lrows: status = str(l["link_status"]) break #assert status is not None if status is None: continue #print(status) # skip bad links if not status == "good": continue dt = doc["update_time"] d = dt.split()[0] hrows = session.execute(select_html,(d,doc["domain_name"])) html = {} for h in hrows: html = json.loads(h["[json]"]) break doc["html_data"] = html del doc["links"] #print(parsed_link) print(json.dumps(doc))