From 0c9ea2b4e3baa4be5a855798f801132b252fab3b Mon Sep 17 00:00:00 2001 From: Daniel Hladek Date: Thu, 7 May 2020 16:09:45 +0200 Subject: [PATCH] initial --- .gitignore | 4 + LICENSE.txt | 21 ++ MANIFEST.in | 1 + README.md | 1 + requirements.txt | 6 + setup.py | 43 ++++ websucker/__init__.py | 0 websucker/agent.py | 364 ++++++++++++++++++++++++++++++++++ websucker/cli.py | 130 +++++++++++++ websucker/db.py | 444 ++++++++++++++++++++++++++++++++++++++++++ websucker/parser.py | 335 +++++++++++++++++++++++++++++++ websucker/queue.py | 35 ++++ websucker/schema.sql | 94 +++++++++ 13 files changed, 1478 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE.txt create mode 100644 MANIFEST.in create mode 100644 README.md create mode 100644 requirements.txt create mode 100644 setup.py create mode 100644 websucker/__init__.py create mode 100755 websucker/agent.py create mode 100644 websucker/cli.py create mode 100644 websucker/db.py create mode 100644 websucker/parser.py create mode 100644 websucker/queue.py create mode 100644 websucker/schema.sql diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..47ee97c --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build +dist +*.egg-info +venv diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..9df9db6 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Technical University of Kosice + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..103efea --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +recursive-include websucker *.sql diff --git a/README.md b/README.md new file mode 100644 index 0000000..c3197ca --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Websucker diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6341dae --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +BeautifulSoup4 +justext +cassandra-driver +python-dateutil +click +pycurl diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..1297b72 --- /dev/null +++ b/setup.py @@ -0,0 +1,43 @@ +import setuptools + +with open("README.md", "r") as fh: + long_description = fh.read() + +setuptools.setup( + name="websucker", # Replace with your own username + version="1.0.0", + author="Daniel Hládek", + author_email="dhladek@gmail.com", + description="Web Crawler", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/hladek/websucker", + packages=setuptools.find_packages(), + # specified in MANIFEST + include_package_data=True, + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Development Status :: 3 - Alpha", + "Intended Audience :: Science/Research", + "Topic :: Internet :: WWW/HTTP :: Indexing/Search" + ], + python_requires='>=3.6', + entry_points={ # Optional + 'console_scripts': [ + 'websuck=websucker.cli:cli', + ], + }, + install_requires=[ + "BeautifulSoup4", + "justext", + "cassandra-driver", + "python-dateutil", + "click", + "pycurl", + "greenstalk" + ], + +) + diff --git a/websucker/__init__.py b/websucker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/websucker/agent.py b/websucker/agent.py new file mode 100755 index 0000000..2d2f718 --- /dev/null +++ b/websucker/agent.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python +#! -*- coding: utf-8 -*- +import urllib.parse +import urllib.error +import os +import os.path +import re +import datetime +import time +import sys +import tempfile +import pprint +import bs4 + +import pycurl +import urllib.robotparser + + +from websucker.parser import normalize_link,urlunparse + + +# Parses http refresh in header or on html meta +def get_refresh(ref,target_link): + refresh = None + tokens = ref.strip().split(";") + if len(tokens) > 1 and tokens[1].lower().startswith("url="): + refresh = urlunparse(normalize_link( + tokens[1][4:].strip("\'"), target_link)) + return refresh + +class Response: + def __init__(self,url,headers,status,content,redirects,link_status): + assert len(url) > 0 + assert url[0] != "/" + self.url = url + self.status = status + self.content = content + self.headers = headers + self.redirects = redirects + self.visited_time = datetime.date.today() + self.bs = None + self.link_status = link_status + if content is not None and link_status == "good": + self.bs = bs4.BeautifulSoup(content, "lxml") + + def __str__(self): + return "{} {} {}".format(self.url,self.get_canonical(),self.link_status) + + def get_content(self): + if self.content is None: + print("NO CONTENT") + print(self.url,self.redirects) + return None + self.content.seek(0) + text = self.content.read() + out = str(text,encoding="utf8",errors="replace") + return out + + + # HMTL metarefresh redirect + def get_metarefresh(self): + if self.content is None: + return None + metarefresh = None + t = self.bs.find_all("meta", attrs={"http-equiv": "refresh"}) + canonical = self.get_canonical() + for tags in t: + if "content" in tags: + metarefresh = get_refresh(tags["content"],canonical) + if metarefresh is not None: + nl = normalize_link(metarefresh, canonical) + print("Metarefresh") + print(nl) + metarefresh = urlunparse(nl) + + return metarefresh + + def get_canonical(self): + r = None + last_link = self.url + if len(self.redirects) > 0: + last_link = self.redirects[-1] + if self.bs is not None: + l = self.bs.find("link", rel="canonical", href=True) + if l is not None: + r = urlunparse(normalize_link(l["href"], last_link)) + if r is None: + r = last_link + r = urlunparse(normalize_link(r, last_link)) + assert len(r) > 0 + assert r[0] != "/" + return r + + def get_redirects(self): + if len(self.redirects) <2 : + return [] + return self.redirects[0:-1] + + +class Connection: + def __init__(self): + self.c = pycurl.Curl() + self.c.setopt(self.c.FOLLOWLOCATION, True) +# self.c.setopt(self.c.VERBOSE, True) + self.c.setopt(self.c.CONNECTTIMEOUT, 20) + self.c.setopt(self.c.TIMEOUT, 20) + self.c.setopt(self.c.FAILONERROR, True) + self.c.setopt(self.c.HTTPHEADER, [ + 'Accept: text/html', 'Accept-Charset: UTF-8']) + self.c.setopt(self.c.HEADERFUNCTION, self.header) + self.c.setopt(self.c.USERAGENT, "Googlebot-News") +# #self.c.setopt(pycurl.COOKIEJAR, 'cookie.txt') +# #self.c.setopt(pycurl.COOKIEFILE, 'cookie.txt') + self.robots = {} + self.headers = {} + self.redirects = [] + self.header_lines = [] + self.status = 0 + self.max_redirect = 4 + + # Zastavi spracovanie ak content nie je text + # zaznamena location a refresh + def header(self, data): + if len(data) == 0: + return None + l = str(data, encoding="utf8") + self.header_lines.append(l) + s = l.find(" ") + if s >= 1 and s < len(l): + key = l[0:s - 1] + value = l[s + 1:].rstrip() + self.headers[key] = value + if key.lower() == "refresh": + self.add_redirect(value) + elif key.lower() == "location": + self.add_redirect(value) + elif key == "Content-Type" and "text" not in value: + # Pycurl potom vyhodi 23, failed writing header + return 0 + + def __del__(self): + self.c.close() + + def close(self): + self.c.close() + + def add_redirect(self,link): + last_link = self.url + if len(self.redirects) > 0: + last_link = self.redirects[-1] + v = urlunparse(normalize_link(link, last_link)) + if v!=last_link and v not in set(self.redirects): + self.redirects.append(v) + + """ + @returns content, link_status + @throws pycurl.error + """ + def _download(self, url): + print("Downloading " + url) + self.url = url + self.headers = {} + self.redirects = [] + self.header_lines = [] + self.status = 0 + content = None + link_status = "bad_connection" + try: + self.headers = {} + del self.header_lines[:] + content = tempfile.SpooledTemporaryFile() + self.c.setopt(self.c.WRITEDATA, content) + self.c.setopt(self.c.URL, url) + self.c.perform() + self.status = self.c.getinfo(self.c.RESPONSE_CODE) + if self.status != 200: + link_status = "bad_httpcode" + elif "Content-Type" in self.headers and not self.headers["Content-Type"].startswith("text"): + link_status = "bad_type" + else: + link_status = "good" + content.seek(0) + except pycurl.error as e: + errno, message = e.args + content = None + self.status = self.c.getinfo(self.c.RESPONSE_CODE) + if errno == 23: + # 23 je zly content v header + link_status = "bad_type" + elif errno == 22: + link_status = "bad_httpcode" + else: + raise e + except UnicodeDecodeError as e: + content = None + link_status = "bad_unicode" + except UnicodeEncodeError as e: + content = None + link_status = "bad_unicode" + sz = self.c.getinfo(self.c.SIZE_DOWNLOAD) + tt = self.c.getinfo(self.c.TOTAL_TIME) + print("{} Received {} bytes in {} s".format(self.status,sz,tt)) + return content, link_status + + # Throws pycurl.error + def html_download2(self, url): + dlink = url + responses = [] + while len(responses) < 5: + nl = normalize_link(dlink) + url = urlunparse(nl) + assert url.startswith("http") + content, link_status = self._download(url) + response = Response(url,"\r\n".join(self.header_lines),self.status,content,self.redirects,link_status) + dlink = response.get_metarefresh() + responses.append(response) + if dlink is None: + break + return responses + + def is_robot_good(self, url): + schema, domain, path, query = normalize_link(url) + res = True + if domain not in self.robots: + roboturl = urlunparse((schema, domain, "robots.txt", "")) + try: + r = self._download(roboturl) + if r[1] == "good": + c = r[0].read() + lines = str(c, errors="ignore", encoding="utf8").split("\n") + self.robots[domain] = urllib.robotparser.RobotFileParser() + self.robots[domain].parse(lines) + else: + self.robots[domain] = None + except pycurl.error as err: + print(err) + if domain in self.robots and self.robots[domain] is not None: + res = self.robots[domain].can_fetch("Agent", url) + return res + +class ParsedDocument: + def __init__(self, parser,work_link): + self.parser = parser + self.work_link = work_link + + self.content = None + self.bs = None + self.paragraph_checksums = None + self.paragraph_sizes = None + + self.link_set = set() + self.body = None + self.text_date = None + self.tags = None + self.authors = None + self.title = None + self.description = None + self.section = None + self.article_published_time = None + self.current_time = datetime.date.today() + + def extract(self,content,bs): + self.content = content + self.bs = bs + + # Extract text and metatext + self.body, self.text_date = self.parser.extract_raw_text(content, self.current_time) + # Paragraph Checksums + pch,pszs = self.parser.calculate_checksums(self.body) + self.paragraph_checksums = pch + self.paragraph_sizes = pszs + if bs is None: + return + self.tags,self.authors,self.title,self.article_publilshed_time, self.description,self.section = self.parser.extract_og(bs) + + # Extrakcia linkov zo stranky + base = self.work_link + if bs.base is not None and "href" in bs.base.attrs: + base = bs.base["href"] + # Normalizacia linkov + for l in bs.find_all("a", href=True): + if "rel" in l.attrs and l.attrs["rel"] == "nofollow" or "nofollow" in l.attrs: + continue + href = l["href"] + try: + nl = normalize_link(href, base) + link = urlunparse(nl) + if link == base: + continue + self.link_set.add(link) + except ValueError: + pass + + def get_links(self): + return self.link_set + + def get_follow_links(self): + follow_links = set() + for l in self.link_set: + if self.parser.is_link_good(l): + link = normalize_link(l,strip_query=self.parser.strip_query) + follow_links.add(urlunparse(link)) + return follow_links + + + def __str__(self): + r = [] + if self.title is not None: + r.append(self.title) + if self.body is not None: + if (len(self.body) < 20): + r.append(self.body) + else: + r.append(self.body[0:20]) + " ...." + return ">>> ".join(r) + + +def get_domains(arg): + domains = [] + if arg == "-": + for l in sys.stdin: + domain = l.rstrip() + assert(domain is not None) + if len(domain) == 0: + continue + domains.append(domain) + else: + domains = arg.split(",") + return domains + +def visit_links(links,connection,parser,db): + outlinks = [] + for work_link in links: + responses = [] + if parser.is_link_good(work_link) and connection.is_robot_good(work_link): + responses = connection.html_download2(work_link) + time.sleep(4) + db.index_responses(work_link,responses) + if len(responses) > 0: + lr = responses[-1] + if lr.content is not None: + target_link = lr.get_canonical() + parsed = ParsedDocument(parser,target_link) + parsed.extract(lr.content, lr.bs) + db.index_content(target_link,parsed) + outlinks += parsed.get_links() + if len(outlinks) > 0: + db.index_follow_links(parser,outlinks,connection) + +def visit_domain(domain,parser,db): + c = Connection() + p = parser + # Get links from frontpage + # TODO Sitemap + sitemap = "http://" + domain + visit_links([sitemap],c,p,db) + db.check_domain(domain) + for i in range(p.crawl_rounds): + # Visit links from frontpage + links = db.get_visit_links(domain,p.recent_links,p.old_links,p.random_links) + visit_links(links,c,p,db) + db.check_domain(domain) + + diff --git a/websucker/cli.py b/websucker/cli.py new file mode 100644 index 0000000..ce63fa1 --- /dev/null +++ b/websucker/cli.py @@ -0,0 +1,130 @@ +from websucker.agent import Connection,visit_links,visit_domain +from websucker.agent import ParsedDocument +from websucker.parser import BaseParser +from websucker.parser import normalize_link,urlunparse +from websucker.db import Data +from websucker.db import get_schema +import click +import pprint + + + +def create_database_from_context(ctx): + return Data(ctx.obj["cassandra_keyspace"],ctx.obj["cassandra_host"],ctx.obj["cassandra_port"]) + +@click.group() +@click.pass_context +@click.option("--cassandra-keyspace",metavar="CASSANDRA_KEYSPACE",help="cassandra keyspace (if defined, value read from CASSANDRA_KEYSPACE env variable)",envvar="CASSANDRA_KEYSPACE",default="websucker",show_default=True) +@click.option("--cassandra-host",metavar="CASSANDRA_HOST",help="cassandra host (if defined, value read from CASSANDRA_HOST env variable)",envvar="CASSANDRA_HOST",default="127.0.0.1",show_default=True) +@click.option("--cassandra-port",metavar="CASSANDRA_PORT",help="cassandra port (if defined, value read from CASSANDRA_PORT env variable)",envvar="CASSANDRA_PORT",default=9042,show_default=True) + +@click.option("--justext-language",metavar="JUSTEXT_LANGUAGE",help="Target language (if defined, value read from JUSTEXT_LANGUAGE env variable)",envvar="JUSTEXT_LANGUAGE",default="English",show_default=True) +@click.option("--visit",is_flag=True) +def cli(ctx,cassandra_keyspace,cassandra_host,cassandra_port,justext_language,visit): + ctx.ensure_object(dict) + p = BaseParser() + p.justext_language = justext_language + ctx.obj["parser"] = p + ctx.obj["cassandra_host"] = cassandra_host + ctx.obj["cassandra_port"] = cassandra_port + ctx.obj["cassandra_keyspace"] = cassandra_keyspace + ctx.obj["visit"] = visit + + +@cli.command(help="Print domains") +@click.pass_context +@click.argument("count",type=int,default=20) +def all(ctx,count): + p = ctx.obj["parser"] + c = Connection() + db = create_database_from_context(ctx) + res = db.all_domains(count) + for row in res: + print(",".join(map(str,row))) + if ctx.obj["visit"]: + visit_domain(row[0],p,db) + +@cli.command(help="Continue crawling of seen links from a domain") +@click.pass_context +@click.argument("domain") +def crawl(ctx, domain): + db = create_database_from_context(ctx) + p = ctx.obj["parser"] + c = Connection() + links = db.get_visit_links(domain,p.recent_links,p.old_links,p.random_links) + visit_links(links,c,p,db) + db.check_domain(domain) + +@cli.command(help="find best domains") +@click.pass_context +@click.argument("count",type=int,default=20) +#@click.option("visit",is_flag=True) +def best(ctx, count): + db = create_database_from_context(ctx) + p = ctx.obj["parser"] + domains = db.get_best_domains(count) + for domain,gr in domains: + print(domain,gr) + if ctx.obj["visit"]: + visit_domain(domain,p,db) + + +@cli.command(help="Find unvisited domains, Visit a site, get links and crawl") +@click.pass_context +@click.argument("count",type=int,default=20) +def unvisited(ctx, count): + db = create_database_from_context(ctx) + p = ctx.obj["parser"] + c = Connection() + domains = db.get_unvisited_domains(count) + for domain in domains: + print(domain) + if ctx.obj["visit"]: + visit_domain(domain,p,db) + +@cli.command(help="Visit url, get links and crawl. Start here") +@click.pass_context +@click.argument("link") +def visit(ctx, link): + db = create_database_from_context(ctx) + p = ctx.obj["parser"] + c = Connection() + nl = normalize_link(link) + domain=nl[1] + visit_domain(domain,p,db) + +@cli.command(help="Update domain statistics") +@click.pass_context +@click.argument("domain") +def check(ctx,domain): + db = create_database_from_context(ctx) + res = db.check_domain(domain) + print(res) + +@cli.command(help="Print daily report") +@click.pass_context +def report(ctx): + db = create_database_from_context(ctx) + db.daily_report() + +@cli.command(help="Print keyspace schema") +def schema(): + schema = get_schema() + print(schema) + +@cli.command(help="Fetch given url (just for debug)") +@click.pass_context +@click.argument("urls") +def fetch(ctx,urls): + parser = ctx.obj["parser"] + # Visit first page + connection = Connection() + responses = connection.html_download2(urls) + for res in responses: + target_link = res.get_canonical() + pd = ParsedDocument(parser,target_link) + pd.extract(res.content, res.bs) + print(pd) + +if __name__ == "__main__": + cli() diff --git a/websucker/db.py b/websucker/db.py new file mode 100644 index 0000000..5a82681 --- /dev/null +++ b/websucker/db.py @@ -0,0 +1,444 @@ +import cassandra +import cassandra.cluster +import random +import os +import pkg_resources +import datetime +from websucker.parser import normalize_link,urlunparse + +VERSION = "sucker6" + + +def get_schema(): + with pkg_resources.resource_stream(__name__,"schema.sql") as f: + schema = f.read() + return str(schema,encoding="utf8") + +class Data: + """ + Database of text documents + """ + def __init__(self,keyspace="websucker",cassandra_host="127.0.0.1",cassandra_port=9042): + # execution profile + ep = cassandra.cluster.ExecutionProfile(request_timeout=240.0) + profiles = {cassandra.cluster.EXEC_PROFILE_DEFAULT:ep} + self.cluster = cassandra.cluster.Cluster([cassandra_host],port=cassandra_port,execution_profiles=profiles) + self.session = self.cluster.connect(keyspace) + + self.check_document_select_query = self.session.prepare("SELECT count(url_hash) FROM paragraph_checksums WHERE checksum=?" ) + + + + self.index_response_link_update = self.session.prepare(""" + UPDATE links SET + link_status ='redirect', + redirect_target = ?, + update_time = toTimestamp(now()) + WHERE + domain_name=? AND + url_path=? AND + url_query=? + """) + + self.domain_quality_update = self.session.prepare(""" + UPDATE domain_quality SET + seen_count=?, + good_size=?, + good_count=?, + good_probability=?, + good_originality=?, + average_good_characters=?, + content_size=?, + content_count=?, + content_probability=?, + content_originality=?, + average_content_characters=?, + fetched_count=?, + average_fetched_good_characters=?, + gain_ratio=?, + update_time = toTimestamp(now()) + WHERE + domain_name=? AND + day=toDate(now()) + """) + + self.index_response_insert_html = self.session.prepare(""" +INSERT INTO html( + day, + domain_name, + source_link, + target_link, + redirect_links, + status, + headers, + content, + agent_version, + update_time + ) VALUES (toDate(now()),?,?,?,?,?,?,?,?,toTimestamp(now())); +""") + + self.index_content_link_insert = self.session.prepare(""" +INSERT INTO links ( +url_schema, +domain_name, +url_path, +url_query, +link_status, +update_time +) VALUES (?,?,?,?,'seen',?) IF NOT EXISTS +""") + + self.daily_links_insert = self.session.prepare(""" +INSERT INTO daily_links ( +day, +domain_name, +url_path, +url_query, +link_status, +body_size, +link_originality, +update_time +) VALUES (toDate(now()),?,?,?,?,?,?,toTimestamp(now())) +""") + self.daily_links_select = self.session.prepare(""" +SELECT +domain_name, +link_status, +count(link_status) +FROM daily_links WHERE day=toDate(now()) GROUP BY domain_name,link_status +""") + # PArsed Content + self.index_content_content_insert = self.session.prepare(""" +INSERT INTO content( + domain_name, + target_link, + links, + title, + description, + section, + authors, + tags, + article_published_time, + text_date, + body, + body_size, + agent_version, + update_time + ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?); +""") + + self.paragraph_checksums_insert = self.session.prepare("INSERT INTO paragraph_checksums (checksum,url_hash) VALUES(?,?)") + self.index_content_links_update = self.session.prepare("UPDATE links SET link_status=?, link_originality=?,body_size=?,url_schema=? WHERE domain_name=? AND url_path = ? AND url_query=? ") + self.check_domain_count = self.session.prepare("select count(url_path) from links where domain_name=? and link_status = ?") + + self.check_domain_size = self.session.prepare("select sum(body_size),sum(link_originality) from links where domain_name=? and link_status =?") + + self.domains_select = self.session.prepare("SELECT domain_name,seen_count,fetched_count,gain_ratio,average_fetched_good_characters FROM domain_quality PER PARTITION LIMIT 1") + + + def index_responses(self,source_link,responses): + # Redirect links + pl = normalize_link(source_link) + for response in responses: + tl = response.get_canonical() + r = ( + tl, + pl[1], + pl[2], + pl[3], + ) + if pl != tl: + res = self.session.execute(self.index_response_link_update,r) + d = ( + pl[1], + source_link, + response.get_canonical(), + response.redirects, + response.status, + response.headers, + response.get_content(), + VERSION, + ) + self.session.execute(self.index_response_insert_html,d) + + def daily_report(self): + rows = self.session.execute(self.daily_links_select) + for row in rows: + print(row[0],row[1],row[2]) + + def index_follow_links(self,parser,links,connection): + # Index seen links + + follow_links = set() + for l in links: + if parser.is_link_good(l): + #if connection is not None and parser.listen_robot and not connection.is_robot_good(l): + # continue + link = normalize_link(l,strip_query=parser.strip_query) + follow_links.add(urlunparse(link)) + + newlinkdomains = set() + for link in follow_links: + value = [] + nl = normalize_link(link) + value += nl + value.append(datetime.date.today()) + rows = self.session.execute(self.index_content_link_insert,value) + row = rows.one() + if row.applied: + newlinkdomains.add(nl[1]) + for domain in newlinkdomains: + self.check_domain(domain) + + + def index_content(self,target_link,parsed_document): + nl = normalize_link(target_link) + domain_name = nl[1] + assert len(domain_name) > 1 + + pd = parsed_document + body_length = 0 + if pd.body is not None: + body_length = len(pd.body) + value = ( + domain_name, + target_link, + pd.get_links(), + pd.title, + pd.description, + pd.section, + pd.authors, + pd.tags, + pd.article_published_time, + pd.text_date, + pd.body, + body_length, + VERSION, + pd.current_time + ) + content_future = self.session.execute_async(self.index_content_content_insert,value) + # result later + + link_status = "good" + originality = 0 + tsz = 0 + if pd.body is None: + link_status = "bad_parse" + else: + tsz = len(pd.body) + if tsz < 300: + link_status = "bad_small" + + if link_status == "good": + + futures = [] + for pc,psz in zip(pd.paragraph_checksums,pd.paragraph_sizes): + fut = self.session.execute_async(self.paragraph_checksums_insert,(pc,hash(nl[1] + "/" + nl[2] + "?" + nl[3]))) + futures.append(fut) + for fut in futures: + fut.result() + originality = self.check_document(pd.paragraph_checksums,pd.paragraph_sizes) + if originality < 0.8: + link_status = "bad_copy" + print(nl) + self.session.execute(self.index_content_links_update,(link_status,originality,tsz,nl[0],nl[1],nl[2],nl[3])) + content_future.result() + print("<<<< " + link_status + " " + str(originality)) + dl = ( + nl[1], + nl[2], + nl[3], + link_status, + tsz, + originality + ) + self.session.execute(self.daily_links_insert,dl) + + def check_document(self,paragraph_checksums,paragraph_sizes): + tsz = sum(paragraph_sizes) + if tsz == 0: + return 0 + copies = 0 + futures = [] + for pc,psz in zip(paragraph_checksums,paragraph_sizes): + futures.append(self.session.execute_async(self.check_document_select_query,(pc,))) + + for fut,psz in zip(futures,paragraph_sizes): + rows = fut.result() + res = rows.one()[0] + if res > 1: + copies += psz + return (tsz-copies)/tsz + + + def check_domain(self, domain): + assert len(domain) > 0 + seen_count = None + good_size = None + good_count = None + good_probability = None + good_originality = None + average_good_characters = None + content_size = None + content_count = None + content_probability = None + content_originality = None + average_content_characters = None + fetched_count = None + average_fetched_good_characters = None + gain_ratio = None + + counts = { + "good":0, + "bad_copy":0, + "bad_small":0, + "bad_httpcode":0, + "bad_type":0, + "bad_content":0, + "bad_parse":0, + "seen":0 + } + for k in counts.keys(): + res = self.session.execute(self.check_domain_count,(domain,k)) + co = res.one()[0] + counts[k]= co + + seen_count = counts["seen"] + good_count = counts["good"] + content_count = counts["good"] + counts["bad_copy"] + counts["bad_small"] + + fetched_count = sum(counts.values()) - counts["seen"] + + if fetched_count > 0: + content_probability = content_count / fetched_count + good_probability = good_count / fetched_count + sizes = { + "good":0, + "bad_copy":0, + "bad_small":0 + } + + originalities ={} + + for k in sizes.keys(): + res = self.session.execute(self.check_domain_size,(domain,k)) + row = res.one() + co =row[0] + originalities[k] = row[1] + sizes[k]= co + good_size = sizes["good"] + content_size = sum(sizes.values()) + if good_count > 0: + good_originality = originalities["good"] / good_count + if content_count > 0: + content_originality = sum(originalities.values()) / content_count + + if good_count > 0: + average_good_characters = good_size / good_count * good_originality + average_fetched_good_characters = good_size * good_originality / fetched_count + + gain_ratio = average_fetched_good_characters / fetched_count + + if content_count > 0: + average_content_characters = content_size / content_count + + #print(sizes) + #print(originalities) + uv = ( + seen_count, + good_size, + good_count, + good_probability, + good_originality, + average_good_characters, + content_size, + content_count, + content_probability, + content_originality, + average_content_characters, + fetched_count, + average_fetched_good_characters, + gain_ratio, + domain) + if fetched_count > 0 or seen_count > 0: + self.session.execute(self.domain_quality_update,uv) + return average_fetched_good_characters + + def all_domains(self,count): + rows = self.session.execute(self.domains_select) + domains = [] + for row in rows: + domain = row[0] + seen_count = row[1] + fetched_count = row[2] + gain_ratio = row[3] + afg = row[4] + if fetched_count and afg and seen_count: + domains.append(tuple(row)) + l = len(domains) + ss = min(l,count) + res = [] + if ss > 0: + # sort according to ratio + res = list(sorted(domains,key=lambda x:x[4],reverse=True))[0:ss] + # returns sorted list of tuples domain,gain_ratio + return res + + def get_best_domains(self,count): + # get all domains + rows = self.session.execute(self.domains_select) + domains = [] + for row in rows: + domain = row[0] + seen_count = row[1] + fetched_count = row[2] + gain_ratio = row[3] + afg = row[4] + if seen_count and fetched_count and gain_ratio: + domains.append((domain,gain_ratio)) + l = len(domains) + ss = min(l,count) + res = [] + if ss > 0: + # sort according to ratio + res = list(sorted(domains,key=lambda x:x[1],reverse=True))[0:ss] + # returns sorted list of tuples domain,gain_ratio + return res + + def get_unvisited_domains(self,count): + # get all domains + rows = self.session.execute(self.domains_select) + domains = [] + for row in rows: + domain = row[0] + seen_count = row[1] + fetched_count = row[2] + gain_ratio = row[3] + afg = row[4] + if seen_count and not fetched_count: + domains.append(domain) + ss = min(len(domains),count) + return random.sample(domains,ss) + + def get_visit_links(self,domain,recent_count,old_count,random_count): + dblinks = [] + rows = self.session.execute("SELECT url_schema,url_path,url_query,update_time FROM links Where domain_name=%s AND link_status='seen'",(domain,)) + for row in rows: + link = urlunparse((row[0],domain,row[1],row[2])) + dblinks.append((link,row[3])) + + visitlinks = [] + dblinks.sort(key=lambda x:x[1]) + random_links = [] + for i,(link,time) in enumerate(dblinks): + #print(link,time) + if i < recent_count: + visitlinks.append(link) + elif i >= len(dblinks) - old_count: + visitlinks.append(link) + else: + random_links.append(link) + sc = min(random_count,len(random_links)) + if sc > 0: + visitlinks += random.sample(random_links,sc) + return visitlinks diff --git a/websucker/parser.py b/websucker/parser.py new file mode 100644 index 0000000..86bb9f7 --- /dev/null +++ b/websucker/parser.py @@ -0,0 +1,335 @@ +import dateutil.parser +import justext +import re +import sys +import datetime + +import lxml.etree +import urllib.parse +import os.path + + +datere = re.compile("\d{1,2}\.\s*\d{1,2}\.\s*[12]\d{3}") +yearre = re.compile(r"\s\d{4}\s") + +def urlunparse(parsed_url): + schema, netloc, path, query = parsed_url + return urllib.parse.urlunparse((schema, netloc, path, "", query, "")) + +def normalize_link(link, base=None,strip_query=False): + + link = link.strip().replace( + "\n", "").replace("\t", "").replace("\r", "") + parsed_link = urllib.parse.urlparse(link) + schema = parsed_link[0] + netloc = parsed_link[1].strip().lower() + path = parsed_link[2].strip() + query = parsed_link[4] + if strip_query: + query = "" + if path is None or len(path) == 0: + path = "/" + dirname, filename = os.path.split(path) + if base is not None: + parsed_base = urllib.parse.urlparse(base) + if schema == "": + schema = parsed_base[0] + # Ak je relativny link + if netloc == "": + netloc = parsed_base[1] + schema = parsed_base[0] + bdir, bfile = os.path.split(parsed_base[2]) + if len(bdir) > 0 and bdir[0] != "." and len(dirname) > 0 and dirname[0] != "/": + dirname = bdir + "/" + dirname + # if len(dirname) == 0 or dirname[0] != '/': + # path = '/' + path + dirname = os.path.normpath(dirname) + dirname = dirname.lstrip("/").lstrip(".") + path = dirname + "/" + filename + return schema, netloc, path, query + + +def get_date(te): + dates = [] + words = [] + if te is None: + te = "" + for t in te.split(): + t = t.strip().lower().lstrip("0").replace("\r", "\n").replace("\n", "") + if len(t) == 0: + continue + for i, m in enumerate(["jan", "feb", "mar", "apr", "máj", "jún", "júl", "aug", "sept", "okt", "nov", "dec"]): + if t.startswith(m): + t = str(i + 1) + "." + break + if t[0].isdigit(): + words.append(t) + txt = " ".join(words) + for st in re.findall(datere, txt): + tokens = st.replace(" ", "").split(".") + try: + y = int(tokens[-1]) + if y < 2000 or y > 2020: + continue + m = 2 + d = 2 + if len(tokens) > 2: + m = int(tokens[-2]) + d = int(tokens[-3]) + dates.append(datetime.date(y, m, d)) + except ValueError: + pass + return dates + + +class BaseParser: + def __init__(self, verbose=False): + self.strip_query = True + self.skiptypes = [".gz", ".js", ".avi", ".flv", ".zip", ".xls", ".doc", ".rtf", ".odt", ".mp3", ".mp4", ".wmv", ".jpg", ".png", ".txt", ".pdf", ".css", ".gif", ".tgz", + ".7", ".ogg", "rss", "galeria", "gallery", ".jpeg", ".mpg", ".mpeg", ".xml", ".rar", ".xlsx", ".docx", ".pptx", ".odp", ".iso", ".ppt", ".bz", ".dwg", ".eps", ".bin"] + self.skipchars = re.compile(r"[();:@& ]") + self.store = True + self.verbose = verbose + self.domain_re = re.compile("^((?!-)[A-Za-z0-9-]{1,63}(? 50: + r = "Too long location" + elif domain.startswith(".") or domain.endswith("."): + r = "Malformed domain" + elif not self.domain_re.match(domain): + r = "Bad domain" + else: + da = False + for d in self.allowdomains: + if domain.endswith(d): + da = True + break + if not da and len(self.allowdomains) > 0: + r = "Domain not in allowlist" + for d in self.skipdomains: + if domain.endswith(d): + r = "In domain skiplist" + for d in domain.split("."): + if d in self.skippaths: + r = "Domain in skippath" + if r is not None and self.verbose: + print(domain + " " + r) + return r is None + +# # Argument - parsovana url + def is_link_good(self, link): + assert(link is not None) + r = None + if sys.getsizeof(link) > 1023: + r = "Too long" + try: + schema, domain, path, query = normalize_link(link) + if not schema.startswith("http"): + r = "Bad schema" + dg = self.is_domain_good(domain) + if not dg: + return False + for c in link: + if ord(c) >= 128: + r = "Bad domain character" + break + for p in self.skipdomains: + if domain.endswith(p): + r = "Bad domain" + break + if ".b-" in domain: + r = "Bad domain" + if len(domain) > 127: + r = "Too long path" + # Path + for t in self.skiptypes: + if path.lower().endswith(t): + r = "Bad type" + break + if re.search(self.skipchars, path): + r = "Bad path" + for p in path.split("/"): + if p in self.skippaths or "jpg" in p or "galeria" in p: + r = "Bad path" + break + except ValueError: + r = "Bad urlparse" + return r is None + + def filter_links(links): + # Filter links + linkset = set() + for link in links: + if not self.is_link_good(link): + continue + link = urlunparse(normalize_link(link,strip_query=self.strip_query)) + linkset.add(link) + + return list(linkset) + + def extract_raw_text(self, content, current_time): + result = [] + rd = None + paragraphs = [] + content.seek(0) + try: + c = content.read() + paragraphs = justext.justext(c, justext.get_stoplist(self.justext_language), length_low=50, length_high=150) + content.seek(0) + except lxml.etree.XMLSyntaxError: + print("XML Syntax parse error") + except lxml.etree.ParserError: + + print("XML Parse parse error") + except justext.core.JustextError: + print("Justext error") + except IndexError: + print("XML error") + except UnicodeDecodeError: + print("Unicode Error") + except TypeError: + # NUll in string + print("String Error") + except RuntimeError: + # Maximum recursion depth" + print("Recursion Error") + dates = [] + for p in paragraphs: + # TODO - match URL for date + if p is not None and p.text is not None and len(p.text) > 0: + dat = get_date(p.text) + for d in dat: + dates.append(d) + if self.verbose: + print(p.class_type, p.links_density(), p.stopwords_density( + justext.get_stoplist(self.justext_language)), p.text) + if not p.is_boilerplate: + result.append(p.text.strip()) + if len(dates) == 0: + dates.append(current_time) + if len(dates) > 0: + rd = max(dates) + rd = rd.isoformat() + + return "\n\n".join(result), rd + + # Extracts matainformation from html + # First it looks for name, content in meta tags + # then it looks for opengraph + def extract_og(self, bs): + tags = set() + authors = set() + title = "" + description = "" + section = "" + article_published_time = "" + + for m in bs.find_all("meta", attrs={"name": True, "content": True}): + content = m["content"].strip() + if len(content) == 0: + continue + name = m["name"].strip() + if name == "keywords": + for t in content.split(","): + if len(t.strip()) > 0: + tags.add(t.strip()) + if name == "news_keywords": + for t in content.split(","): + if len(t.strip()) > 0: + tags.add(t.strip()) + if name == "author": + authors.add(content) + if name == "description": + description = content + + for m in bs.find_all("meta", property=True, content=True): + content = m["content"].strip() + if len(content) == 0: + continue + property = m["property"].strip() + if property == "og:title": + title = content + if property == "article:published_time": + try: + # Je v ISO formate? + d = dateutil.parser.parse(content) + article_published_time = d.isoformat() + except ValueError: + pass + except OverflowError: + pass + if property == "article:author" and "admin" not in content.lower(): + authors.add(content) + if property == "section": + section = content + if property == "tag": + tags.add(content) + if property == "og:description": + description = content + + if len(title) < 2 and bs.h1 is not None: + title = bs.h1.get_text(strip=True) + if len(title) < 2 and bs.title is not None: + title = bs.title.get_text(strip=True) + if len(authors) == 0: + for m in bs.find_all(property="author"): + authors.add(m.get_text(strip=True)) + if len(authors) == 0: + for m in bs.find_all(itemprop="author"): + authors.add(m.get_text(strip=True)) + authors = set(filter(lambda x: len(x) > 2, authors)) + + return tags,authors,title.strip(),article_published_time.strip(),description,section.strip() + + + def calculate_checksums(self, text): + """ + @return fingerprints of a paragraphs in text. Paragraphs are separated by a blank line + """ + checksums = [] + sizes = [] + hval = 0 + hsz = 0 + sz = 0 + for c in text: + cv = ord(c) + sz += 1 + if cv > 64: + hval += (hval << 3) + cv + zv = hval >> 31 + hval &= 0x7fffffff + hval += zv + hsz += 1 + if c == "\n" and hsz > 0: + if hsz > 100: + checksums.append(hval) + sizes.append(sz) + sz = 0 + hsz = 0 + if hsz > 100: + checksums.append(hval) + sizes.append(sz) + return checksums, sizes + +class EnglishParser(BaseParser): + def __init__(self): + super(EnglishParser,self).__init__() + self.justext_language = "English" + self.allowdomains = set(["com","org","io"]) + diff --git a/websucker/queue.py b/websucker/queue.py new file mode 100644 index 0000000..fe33eb1 --- /dev/null +++ b/websucker/queue.py @@ -0,0 +1,35 @@ +import greenstalk +import random + + +MAX_PRIORITY = 0 +MIN_PRIORITY = 4000000000 + +MAX_FLOAT_PRIORITY = 10000.0 + +def map_priority(p,max_priority): + p = p / max_priority + return MIN_PRIORITY - (p*MIN_PRIORITY) + +class BeanstalkdQueue: + def __init__(self,host,port,tube): + self.c = greenstalk.Client(host,port,use=tube,encoding="utf8") + + def queue_priority_domains(self,priority_domains): + for domain,priority in priority_domains: + p = priority / MAX_FLOAT_PRIORITY + p = MIN_PRIORITY - (p*MIN_PRIORITY) + self.c.put(domain,p) + + def queue_random_domains(self,domains): + for domain in domains: + p = random.randint(MAX_PRIORITY,MIN_PRIORITY) + self.c.put(domain,p) + + def consume_domains(self,callback): + while True: + job = self.c.reserve() + domain = job.body + self.c.delete(job) + callback(domain) + diff --git a/websucker/schema.sql b/websucker/schema.sql new file mode 100644 index 0000000..5b54909 --- /dev/null +++ b/websucker/schema.sql @@ -0,0 +1,94 @@ +DROP KEYSPACE websucker; + +CREATE KEYSPACE websucker +WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1}; + +USE websucker; + +CREATE TABLE links ( + domain_name TEXT, + url_path TEXT, + url_query TEXT, + url_schema TEXT, + redirect_target TEXT, + link_status TEXT, + link_originality FLOAT, + body_size INT, + update_time TIMESTAMP, + PRIMARY KEY(domain_name,url_path,url_query) +); + +CREATE INDEX link_status_index ON links(link_status); + +CREATE TABLE daily_links ( + day DATE, + domain_name TEXT, + url_path TEXT, + url_query TEXT, + link_status TEXT, + body_size INT, + link_originality FLOAT, + update_time TIMESTAMP, + PRIMARY KEY(day,domain_name,link_status,url_path,url_query) +); + +CREATE TABLE domain_quality ( + domain_name TEXT, + day DATE, + seen_count INT, + good_size INT, + good_count INT, + good_probability FLOAT, + good_originality FLOAT, + average_good_characters FLOAT, + content_size INT, + content_count INT, + content_probability FLOAT, + content_originality FLOAT, + average_content_characters FLOAT, + fetched_count INT, + average_fetched_good_characters FLOAT, + gain_ratio FLOAT, + update_time TIMESTAMP STATIC , + PRIMARY KEY(domain_name,day) +) WITH CLUSTERING ORDER BY (day DESC); + + +CREATE TABLE content ( + domain_name TEXT, + target_link TEXT, + agent_version TEXT, + title TEXT, + links SET, + authors SET, + tags SET, + description TEXT, + section TEXT, + article_published_time TEXT, + text_date TEXT, + body TEXT, + body_size INT, + update_time TIMESTAMP, + PRIMARY KEY(domain_name,target_link), +); + +CREATE TABLE paragraph_checksums ( + checksum BIGINT, + url_hash BIGINT, + PRIMARY KEY(checksum,url_hash), +); + +CREATE TABLE html ( + day DATE, + domain_name TEXT, + source_link TEXT, + target_link TEXT, + redirect_links LIST, + status INT, + content TEXT, + headers TEXT, + agent_version TEXT, + update_time TIMESTAMP, + PRIMARY KEY(day,domain_name,source_link) +); +