From 8c9ed4ab091d769826bf9ba3b33b7dd51082ac21 Mon Sep 17 00:00:00 2001 From: Andy Chosak Date: Wed, 18 Oct 2023 12:43:12 -0400 Subject: [PATCH 1/9] Downgrade from Django 4.0 to Django 3.2 Unfortunately wpull only supports Python 3.6, see - ArchiveTeam/wpull#404 - ArchiveTeam/wpull#451 Django 4.0 dropped support for Python 3.6, see https://docs.djangoproject.com/en/4.2/releases/4.0/#python-compatibility In order to integrate wpull with the viewer application, we need to downgrade the viewer Django version from 4.0 to 3.2. --- requirements/base.txt | 12 ++++++------ settings.py | 16 +++++++--------- wsgi.py | 2 +- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/requirements/base.txt b/requirements/base.txt index 6573881..eda2d3f 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,12 +1,12 @@ -click==8.1.3 +click==8.0.4 cssselect==1.1.0 -Django==4.0.7 +Django==3.2.22 django-click==2.3.0 -django-debug-toolbar==3.4.0 -django-filter==22.1 -django-modelcluster==6.0 +django-debug-toolbar==3.2.4 +django-filter==21.1 +django-modelcluster==5.3 djangorestframework==3.13.1 djangorestframework-csv==2.1.1 lxml==4.9.1 warcio==1.7.4 -whitenoise==6.1.0 +whitenoise==5.3.0 diff --git a/settings.py b/settings.py index 1d71f47..df1ad38 100644 --- a/settings.py +++ b/settings.py @@ -1,13 +1,11 @@ """ Django settings for viewer project. -Generated by 'django-admin startproject' using Django 4.0.3. - For more information on this file, see -https://docs.djangoproject.com/en/4.0/topics/settings/ +https://docs.djangoproject.com/en/3.2/topics/settings/ For the full list of settings and their values, see -https://docs.djangoproject.com/en/4.0/ref/settings/ +https://docs.djangoproject.com/en/3.2/ref/settings/ """ import os import sys @@ -18,7 +16,7 @@ # Quick-start development settings - unsuitable for production -# See https://docs.djangoproject.com/en/4.0/howto/deployment/checklist/ +# See https://docs.djangoproject.com/en/3.2/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = "django-insecure-a94cjadrz=y0o+c75138ro=gn3oq0*by)gs1cs88k$9+taepp(" @@ -73,7 +71,7 @@ # Database -# https://docs.djangoproject.com/en/4.0/ref/settings/#databases +# https://docs.djangoproject.com/en/3.2/ref/settings/#databases _sample_db_path = str(BASE_DIR / "sample" / "sample.sqlite3") _env_db_path = os.getenv("CRAWL_DATABASE") @@ -97,7 +95,7 @@ } # Internationalization -# https://docs.djangoproject.com/en/4.0/topics/i18n/ +# https://docs.djangoproject.com/en/3.2/topics/i18n/ LANGUAGE_CODE = "en-us" @@ -109,12 +107,12 @@ # Static files (CSS, JavaScript, Images) -# https://docs.djangoproject.com/en/4.0/howto/static-files/ +# https://docs.djangoproject.com/en/3.2/howto/static-files/ STATIC_URL = "static/" # Default primary key field type -# https://docs.djangoproject.com/en/4.0/ref/settings/#default-auto-field +# https://docs.djangoproject.com/en/3.2/ref/settings/#default-auto-field DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" diff --git a/wsgi.py b/wsgi.py index 49cf873..d9bf8fc 100644 --- a/wsgi.py +++ b/wsgi.py @@ -4,7 +4,7 @@ It exposes the WSGI callable as a module-level variable named ``application``. For more information on this file, see -https://docs.djangoproject.com/en/4.0/howto/deployment/wsgi/ +https://docs.djangoproject.com/en/3.2/howto/deployment/wsgi/ """ import os From 5c0fc8ce2a69977b3a7a308474468650f03bfcc3 Mon Sep 17 00:00:00 2001 From: Andy Chosak Date: Fri, 20 Oct 2023 16:22:28 -0400 Subject: [PATCH 2/9] Downgrade from Python 3.8 to Python 3.6 Unfortunately wpull only supports Python 3.6, see - ArchiveTeam/wpull#404 - ArchiveTeam/wpull#451 In order to integrate wpull with the viewer application, we need to downgrade Python from 3.8 to 3.6. --- README.md | 4 ++-- fabfile.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4327367..7a660cb 100644 --- a/README.md +++ b/README.md @@ -166,7 +166,7 @@ yarn build Create a Python virtual environment and install required packages: ``` -python3.8 -m venv venv +python3.6 -m venv venv source venv/bin/activate pip install -r requirements/base.txt ``` @@ -248,7 +248,7 @@ under the `/sample/src` subdirectory. To regenerate these files, first serve the sample website locally: ``` -python -m http.server -d ./sample/src +cd ./sample/src && python -m http.server ``` This starts the sample website running at http://localhost:8000. diff --git a/fabfile.py b/fabfile.py index e2b09d4..0d97059 100644 --- a/fabfile.py +++ b/fabfile.py @@ -22,7 +22,7 @@ SQLITE_BASENAME = f"sqlite-autoconf-{SQLITE_VERSION}" SQLITE_INSTALL_ROOT = f"{DEPLOY_ROOT}/{SQLITE_BASENAME}" -PYTHON_VERSION = "3.8.13" +PYTHON_VERSION = "3.6.15" PYTHON_BASENAME = f"Python-{PYTHON_VERSION}" PYTHON_INSTALL_ROOT = f"{DEPLOY_ROOT}/{PYTHON_BASENAME}" From 372a19a444af3fefd510f7a9186f2f0067885762 Mon Sep 17 00:00:00 2001 From: Andy Chosak Date: Fri, 20 Oct 2023 16:24:04 -0400 Subject: [PATCH 3/9] Implement new crawler using wpull This change adds a new management command (manage.py crawl) that crawls a website directly into a SQLite database, using the wpull package: https://github.com/ArchiveTeam/wpull Usage: manage.py crawl [OPTIONS] START_URL DB_FILENAME --- crawler/management/commands/crawl.py | 284 +++++++++++++++++++++++++++ requirements/base.txt | 8 + sample/src/index.html | 8 + 3 files changed, 300 insertions(+) create mode 100644 crawler/management/commands/crawl.py diff --git a/crawler/management/commands/crawl.py b/crawler/management/commands/crawl.py new file mode 100644 index 0000000..512092c --- /dev/null +++ b/crawler/management/commands/crawl.py @@ -0,0 +1,284 @@ +import os +import os.path +import re +from email.utils import parsedate_to_datetime +from urllib import parse + +from django.core.management import call_command +from django.db import connections + +import djclick as click +import lxml.html +from wpull.application.builder import Builder +from wpull.application.hook import Actions +from wpull.application.options import AppArgumentParser +from wpull.application.plugin import PluginFunctions, WpullPlugin, hook +from wpull.pipeline.item import URLProperties +from wpull.url import URLInfo + +from crawler.models import Component, Error, Link, Page, Redirect +from crawler.writer import DatabaseWriter + + +COMPONENT_SEARCH = re.compile(r"(?:(?:class=\")|\s)((?:o|m|a)-[\w\-]*)") +EXTERNAL_SITE = re.compile("/external-site/") +WHITESPACE = re.compile(r"\s+") + + +def get_body(tree): + body = tree.find("./body") + + if body is not None: + drop_element_selectors = [ + ".o-header", + ".o-footer", + ".skip-nav", + "img", + "script", + "style", + ] + + for drop_element_selector in drop_element_selectors: + for element in body.cssselect(drop_element_selector): + element.drop_tree() + + return body + + +class DatabaseWritingPlugin(WpullPlugin): + def activate(self): + super().activate() + + self.start_url = URLInfo.parse(self.app_session.args.urls[0]) + self.db_filename, self.max_pages = self.app_session.args.plugin_args.rsplit( + ",", maxsplit=1 + ) + self.max_pages = int(self.max_pages) + + self.init_db() + self.num_pages = 0 + + def init_db(self): + db_alias = "warc_to_db" + + connections.databases[db_alias] = { + "ENGINE": "django.db.backends.sqlite3", + "NAME": self.db_filename, + } + + call_command("migrate", database=db_alias, app_label="crawler", run_syncdb=True) + + self.db_writer = DatabaseWriter(db_alias) + + @property + def at_max_pages(self): + return self.max_pages and self.num_pages >= self.max_pages + + @hook(PluginFunctions.accept_url) + def accept_url(self, item_session, verdict, reasons): + if self.at_max_pages: + return False + + request = item_session.url_record + + # We want to crawl links to different domains to test their validity. + # But once we've done that, we don't want to keep crawling there. + # Therefore, don't crawl links that start on different domains. + if ( + request.parent_url_info.hostname_with_port + != self.start_url.hostname_with_port + ): + return False + + # If we're crawling on the start domain, apply additional rejections. + if request.url_info.hostname_with_port == self.start_url.hostname_with_port: + # Don't crawl URLs that look like filenames. + if "." in request.url_info.path: + return False + + qs = parse.parse_qs(request.url_info.query) + + if qs: + # Don't crawl external link URLs directly. + # Instead crawl to their ultimate destination. + if EXTERNAL_SITE.match(request.url_info.path): + ext_url = qs.get("ext_url") + if ext_url: + # Add the external URL to the list to be crawled. + url_properties = URLProperties() + url_properties.level = request.level + url_properties.inline_level = request.inline_level + url_properties.parent_url = request.parent_url + url_properties.root_url = request.root_url + + item_session.add_url(ext_url[0], url_properites=url_properties) + return False + + # For all other URLs, limit querystrings that get crawled. + # Only crawl pages that only have the "page" parameter. + elif list(qs.keys()) != ["page"]: + return False + + return verdict + + @hook(PluginFunctions.handle_response) + def my_handle_response(self, item_session): + self.num_pages += 1 + if self.at_max_pages: + item_session.skip() + return Actions.FINISH + + db_record = self.process_response(item_session.request, item_session.response) + + if db_record: + self.db_writer.write(db_record) + + return Actions.NORMAL + + def process_response(self, request, response): + status_code = response.status_code + content_type = response.fields["Content-Type"] + timestamp = parsedate_to_datetime(response.fields["Date"]) + referrer = request.fields.get("Referer") + + if status_code >= 300: + if status_code < 400: + location = response.fields.get("Location") + return Redirect( + timestamp=timestamp, + url=request.url, + status_code=status_code, + referrer=referrer, + location=location, + ) + else: + return Error( + timestamp=timestamp, + url=request.url, + status_code=status_code, + referrer=referrer, + ) + + if 200 != status_code: + raise ValueError(f"Unexpected status code {status_code} for {request.url}") + + if not content_type: + raise ValueError(f"Missing content type for {request.url}") + + if not content_type.startswith("text/html"): + return + + # We don't record external page data because we've only crawled them to + # check for redirects, 404s, or other errors. + if request.url_info.hostname_with_port != self.start_url.hostname_with_port: + return + + html = response.body.content().decode("utf-8") + tree = lxml.html.fromstring(html) + title_tag = tree.find(".//title") + title = title_tag.text.strip() if title_tag is not None else None + language = tree.find(".").get("lang") + + if title is None: + return + + body = get_body(tree) + + if body is not None: + text = WHITESPACE.sub(" ", body.text_content()).strip() + else: + text = None + + page = Page( + timestamp=timestamp, + url=request.url, + title=title, + language=language, + html=html, + text=text, + ) + + hrefs = list( + set( + href + for element, attribute, href, pos in body.iterlinks() + if "a" == element.tag and "href" == attribute + ) + ) + + # Remove any external link URL wrapping. + for i, href in enumerate(hrefs): + parsed_href = parse.urlparse(href) + if not EXTERNAL_SITE.match(parsed_href.path): + continue + + if parsed_href.netloc and self.start_url.host != parsed_href.netloc: + continue + + ext_url = parse.parse_qs(parsed_href.query).get("ext_url") + if ext_url: + hrefs[i] = ext_url[0] + + page.links = [Link(href=href) for href in sorted(hrefs)] + + body_html = lxml.etree.tostring(body, encoding="unicode") + + class_names = set(COMPONENT_SEARCH.findall(body_html)) + page.components = [ + Component(class_name=class_name) for class_name in sorted(class_names) + ] + + return page + + +@click.command() +@click.argument("start_url") +@click.argument("db_filename", type=click.Path()) +@click.option( + "--max-pages", type=int, help="Maximum number of pages to crawl", default=0 +) +@click.option("--depth", type=int, help="Maximum crawl depth", default=0) +@click.option( + "--recreate", + is_flag=True, + show_default=True, + default=False, + help="Recreate database file if it already exists", +) +def command(start_url, db_filename, max_pages, depth, recreate): + if os.path.exists(db_filename): + if not recreate: + raise click.ClickException( + f"File {db_filename} already exists, use --recreate to recreate." + ) + + os.remove(db_filename) + + arg_parser = AppArgumentParser() + args = arg_parser.parse_args( + [ + start_url, + "--recursive", + "--no-verbose", + "--delete-after", + "--no-robots", + "--wait=0.5", + "--random-wait", + "--span-hosts", + "--user-agent=CFPB website indexer", + f"--level={depth}", + f"--plugin-script={__file__}", + f"--plugin-args={db_filename},{max_pages}", + ] + ) + builder = Builder(args) + app = builder.build() + + # This is required due to the use of async code in wpull. Unfortunately + # wpull hooks aren't called in a way that allows us to wrap Django database + # calls with sync_to_async. This is only safe because we only download one + # URL at a time. + # https://docs.djangoproject.com/en/3.2/topics/async/#async-safety + os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" + + return app.run_sync() diff --git a/requirements/base.txt b/requirements/base.txt index eda2d3f..9006775 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -10,3 +10,11 @@ djangorestframework-csv==2.1.1 lxml==4.9.1 warcio==1.7.4 whitenoise==5.3.0 +wpull==2.0.1 + +# wpull doesn't set upper bounds for some of its requirements, +# so we need to specify these manually: +# See https://github.com/ArchiveTeam/wpull/blob/v2.0.1/requirements.txt +html5lib==0.9999999 +sqlalchemy==1.0.12 +tornado==4.5.3 diff --git a/sample/src/index.html b/sample/src/index.html index 4cd4e03..9e3cf13 100644 --- a/sample/src/index.html +++ b/sample/src/index.html @@ -10,5 +10,13 @@

Sample homepage

This is sample content.

This is a link to a child page.

+

This is a link somewhere else.

+

This is an obfuscated link somewhere else.

+

This is another obfuscated link some + where else.

+

This links to a file.

+

This links to a file somewhere else.

+

This link has a page query string parameter.

This link has a non-page query string parameter.

+

This link has multiple query string parameters.

From 92fa68a424ff5177691ad81556ad72736fc8c8f7 Mon Sep 17 00:00:00 2001 From: Andy Chosak Date: Wed, 25 Oct 2023 17:19:51 -0400 Subject: [PATCH 4/9] Iterating on wpull-based crawler --- crawler/management/commands/crawl.py | 168 ++++++++++++++++++++------- crawler/models.py | 19 ++- crawler/writer.py | 7 ++ requirements/base.txt | 1 + settings.py | 17 +++ 5 files changed, 170 insertions(+), 42 deletions(-) diff --git a/crawler/management/commands/crawl.py b/crawler/management/commands/crawl.py index 512092c..fe76fec 100644 --- a/crawler/management/commands/crawl.py +++ b/crawler/management/commands/crawl.py @@ -1,18 +1,19 @@ +import logging import os import os.path import re -from email.utils import parsedate_to_datetime from urllib import parse from django.core.management import call_command from django.db import connections +from django.utils import timezone import djclick as click -import lxml.html +import lxml.html.soupparser from wpull.application.builder import Builder from wpull.application.hook import Actions from wpull.application.options import AppArgumentParser -from wpull.application.plugin import PluginFunctions, WpullPlugin, hook +from wpull.application.plugin import PluginFunctions, WpullPlugin, event, hook from wpull.pipeline.item import URLProperties from wpull.url import URLInfo @@ -20,6 +21,9 @@ from crawler.writer import DatabaseWriter +logger = logging.getLogger("crawler") + + COMPONENT_SEARCH = re.compile(r"(?:(?:class=\")|\s)((?:o|m|a)-[\w\-]*)") EXTERNAL_SITE = re.compile("/external-site/") WHITESPACE = re.compile(r"\s+") @@ -56,7 +60,8 @@ def activate(self): self.max_pages = int(self.max_pages) self.init_db() - self.num_pages = 0 + self.crawled_urls = [] + self.logged_urls = [] def init_db(self): db_alias = "warc_to_db" @@ -72,10 +77,15 @@ def init_db(self): @property def at_max_pages(self): - return self.max_pages and self.num_pages >= self.max_pages + return self.max_pages and len(self.logged_urls) >= self.max_pages @hook(PluginFunctions.accept_url) def accept_url(self, item_session, verdict, reasons): + # If upstream logic rejected this URL, let the rejection stand. + if not verdict: + return False + + # If we've already crawled enough pages, stop. if self.at_max_pages: return False @@ -90,8 +100,14 @@ def accept_url(self, item_session, verdict, reasons): ): return False + # If we're crawling on a different domain, use a HEAD request to avoid + # downloading files that might be linked, for example from S3. + if request.url_info.hostname_with_port != self.start_url.hostname_with_port: + if "." in request.url_info.path: + item_session.request.method = "HEAD" + # If we're crawling on the start domain, apply additional rejections. - if request.url_info.hostname_with_port == self.start_url.hostname_with_port: + else: # Don't crawl URLs that look like filenames. if "." in request.url_info.path: return False @@ -102,16 +118,21 @@ def accept_url(self, item_session, verdict, reasons): # Don't crawl external link URLs directly. # Instead crawl to their ultimate destination. if EXTERNAL_SITE.match(request.url_info.path): - ext_url = qs.get("ext_url") - if ext_url: + ext_urls = qs.get("ext_url") + if ext_urls: # Add the external URL to the list to be crawled. + ext_url = ext_urls[0] + url_properties = URLProperties() url_properties.level = request.level url_properties.inline_level = request.inline_level url_properties.parent_url = request.parent_url url_properties.root_url = request.root_url - item_session.add_url(ext_url[0], url_properites=url_properties) + item_session.app_session.factory["URLTable"].remove_many( + [ext_url] + ) + item_session.add_url(ext_url, url_properites=url_properties) return False # For all other URLs, limit querystrings that get crawled. @@ -119,62 +140,124 @@ def accept_url(self, item_session, verdict, reasons): elif list(qs.keys()) != ["page"]: return False - return verdict + if request.url not in self.crawled_urls: + logger.info(f"Crawling {request.url}") + self.crawled_urls.append(request.url) - @hook(PluginFunctions.handle_response) - def my_handle_response(self, item_session): - self.num_pages += 1 - if self.at_max_pages: - item_session.skip() - return Actions.FINISH + return True - db_record = self.process_response(item_session.request, item_session.response) + @hook(PluginFunctions.handle_error) + def handle_error(self, item_session, error): + self.db_writer.write( + Error( + timestamp=timezone.now(), + url=item_session.request.url, + status_code=0, + referrer=item_session.request.fields.get("Referer"), + ) + ) - if db_record: - self.db_writer.write(db_record) + @hook(PluginFunctions.handle_pre_response) + def handle_pre_response(self, item_session): + # Our accept_url handler converts external requests from GET to HEAD. + # The wpull response body handler seems to assume that HEAD request + # responses will never have Content-Length or Transfer-Encoding + # headers, which doesn't seem to be the case in practice: + # + # https://github.com/ArchiveTeam/wpull/blob/v2.0.1/wpull/protocol/http/stream.py#L441-L451 + # + # Therefore, we strip these headers out if they exist, since we don't + # need them for our purposes. Since this was an external request, we + # care only about the status code, not the response body. + if item_session.request.method == "HEAD": + item_session.response.fields.pop("Content-Length", None) + item_session.response.fields.pop("Transfer-Encoding", None) return Actions.NORMAL - def process_response(self, request, response): + @hook(PluginFunctions.handle_response) + def handle_response(self, item_session): + request = item_session.request + response = item_session.response status_code = response.status_code - content_type = response.fields["Content-Type"] - timestamp = parsedate_to_datetime(response.fields["Date"]) - referrer = request.fields.get("Referer") + timestamp = timezone.now() + + if request.url in self.logged_urls: + logger.debug(f"Already logged {request.url}") + item_session.skip() + return Actions.FINISH + else: + self.logged_urls.append(request.url) if status_code >= 300: + referrer = request.fields.get("Referer") + if status_code < 400: location = response.fields.get("Location") - return Redirect( - timestamp=timestamp, - url=request.url, - status_code=status_code, - referrer=referrer, - location=location, + location_parsed = parse.urlparse(location) + + self.db_writer.write( + Redirect( + timestamp=timestamp, + url=request.url, + status_code=status_code, + referrer=referrer, + location=location, + ) ) + + # Don't follow redirects that don't point to the start domain. + if ( + location_parsed.hostname + and location_parsed.hostname != self.start_url.hostname + ) or ( + location_parsed.port and location_parsed.port != self.start_url.port + ): + logger.debug(f"Not following redirect to {location}") + item_session.skip() + return Actions.FINISH else: - return Error( - timestamp=timestamp, - url=request.url, - status_code=status_code, - referrer=referrer, + self.db_writer.write( + Error( + timestamp=timestamp, + url=request.url, + status_code=status_code, + referrer=referrer, + ) ) + return Actions.NORMAL + if 200 != status_code: raise ValueError(f"Unexpected status code {status_code} for {request.url}") + # If this request was to an external domain and it responded with + # a normal status code, we don't care about recording it. + if request.url_info.hostname_with_port != self.start_url.hostname_with_port: + item_session.skip() + return Actions.FINISH + + page_record = self.process_200_response(request, response) + + if page_record: + self.db_writer.write(page_record) + + return Actions.NORMAL + + def process_200_response(self, request, response): + timestamp = timezone.now() + + content_type = response.fields.get("Content-Type") + if not content_type: raise ValueError(f"Missing content type for {request.url}") if not content_type.startswith("text/html"): return - # We don't record external page data because we've only crawled them to - # check for redirects, 404s, or other errors. - if request.url_info.hostname_with_port != self.start_url.hostname_with_port: - return - html = response.body.content().decode("utf-8") - tree = lxml.html.fromstring(html) + + tree = lxml.html.soupparser.fromstring(html) title_tag = tree.find(".//title") title = title_tag.text.strip() if title_tag is not None else None language = tree.find(".").get("lang") @@ -259,13 +342,16 @@ def command(start_url, db_filename, max_pages, depth, recreate): [ start_url, "--recursive", - "--no-verbose", "--delete-after", "--no-robots", "--wait=0.5", "--random-wait", + "--timeout=30", "--span-hosts", + "--link-extractors=html", + "--follow-tags=a", "--user-agent=CFPB website indexer", + "--no-check-certificate", f"--level={depth}", f"--plugin-script={__file__}", f"--plugin-args={db_filename},{max_pages}", diff --git a/crawler/models.py b/crawler/models.py index bf74d1e..cf5ac28 100644 --- a/crawler/models.py +++ b/crawler/models.py @@ -35,6 +35,9 @@ class Page(Request, ClusterableModel): components = ParentalManyToManyField(Component, related_name="pages") links = ParentalManyToManyField(Link, related_name="links") + def __str__(self): + return self.url + class ErrorBase(Request): status_code = models.PositiveIntegerField(db_index=True) @@ -43,10 +46,24 @@ class ErrorBase(Request): class Meta(Request.Meta): abstract = True + def __str__(self): + s = self.url + + if self.referrer: + s += f" (from {self.referrer})" + + s += f" {self.status_code}" + + return s + class Error(ErrorBase): - pass + def __str__(self): + return super().__str__() + " !" class Redirect(ErrorBase): location = models.TextField(db_index=True) + + def __str__(self): + return super().__str__() + f" -> {self.location}" diff --git a/crawler/writer.py b/crawler/writer.py index 1d15f29..caaadb7 100644 --- a/crawler/writer.py +++ b/crawler/writer.py @@ -1,8 +1,13 @@ +import logging + from django.db import connections from crawler.models import Component, Link, Page +logger = logging.getLogger("crawler") + + class DatabaseWriter: def __init__(self, db): self.db = db @@ -12,6 +17,7 @@ def write(self, instance): if isinstance(instance, Page): return self._write_page(instance) else: + logger.debug(f"Saving {instance}") instance.save(using=self.db) def _write_page(self, page): @@ -36,6 +42,7 @@ def _write_page(self, page): .values() ) + logger.debug(f"Saving {page}") page.save(using=self.db) def analyze(self): diff --git a/requirements/base.txt b/requirements/base.txt index 9006775..91679fe 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,3 +1,4 @@ +beautifulsoup4==4.12.2 click==8.0.4 cssselect==1.1.0 Django==3.2.22 diff --git a/settings.py b/settings.py index df1ad38..b97fd95 100644 --- a/settings.py +++ b/settings.py @@ -136,3 +136,20 @@ "PAGE_SIZE": 25, "UNAUTHENTICATED_USER": None, } + +LOGGING = { + "version": 1, + "disable_existing_loggers": False, + "handlers": { + "console": { + "class": "logging.StreamHandler", + }, + }, + "loggers": { + "crawler": { + "handlers": ["console"], + "level": "DEBUG", + "propagate": False, + }, + }, +} From a7dbe3a416552c47bf8b465331b8346fd6447a0b Mon Sep 17 00:00:00 2001 From: Andy Chosak Date: Mon, 30 Oct 2023 11:58:19 -0400 Subject: [PATCH 5/9] Add timestamp to crawler logging --- settings.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/settings.py b/settings.py index b97fd95..b706559 100644 --- a/settings.py +++ b/settings.py @@ -140,9 +140,16 @@ LOGGING = { "version": 1, "disable_existing_loggers": False, + "formatters": { + "default": { + "format": " %(asctime)s.%(msecs)03d %(levelname)s %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + } + }, "handlers": { "console": { "class": "logging.StreamHandler", + "formatter": "default", }, }, "loggers": { From ca2c67cf3e294cadd8e6eee3700a814646658e38 Mon Sep 17 00:00:00 2001 From: Andy Chosak Date: Mon, 30 Oct 2023 11:59:45 -0400 Subject: [PATCH 6/9] Crawler iteration --- crawler/management/commands/crawl.py | 77 +++++++++++++++++++++------- 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/crawler/management/commands/crawl.py b/crawler/management/commands/crawl.py index fe76fec..25c9c86 100644 --- a/crawler/management/commands/crawl.py +++ b/crawler/management/commands/crawl.py @@ -1,3 +1,4 @@ +import asyncio import logging import os import os.path @@ -13,7 +14,8 @@ from wpull.application.builder import Builder from wpull.application.hook import Actions from wpull.application.options import AppArgumentParser -from wpull.application.plugin import PluginFunctions, WpullPlugin, event, hook +from wpull.application.plugin import PluginFunctions, WpullPlugin, hook +from wpull.network.connection import BaseConnection from wpull.pipeline.item import URLProperties from wpull.url import URLInfo @@ -28,6 +30,17 @@ EXTERNAL_SITE = re.compile("/external-site/") WHITESPACE = re.compile(r"\s+") +SKIP_URLS = list( + map( + re.compile, + [ + r"^https://www.facebook.com/dialog/share\?.*", + r"^https://twitter.com/intent/tweet\?.*", + r"^https://www.linkedin.com/shareArticle\?.*", + ], + ) +) + def get_body(tree): body = tree.find("./body") @@ -60,8 +73,8 @@ def activate(self): self.max_pages = int(self.max_pages) self.init_db() - self.crawled_urls = [] - self.logged_urls = [] + self.accepted_urls = [] + self.requested_urls = [] def init_db(self): db_alias = "warc_to_db" @@ -77,7 +90,7 @@ def init_db(self): @property def at_max_pages(self): - return self.max_pages and len(self.logged_urls) >= self.max_pages + return self.max_pages and len(self.retrieved_urls) >= self.max_pages @hook(PluginFunctions.accept_url) def accept_url(self, item_session, verdict, reasons): @@ -91,6 +104,14 @@ def accept_url(self, item_session, verdict, reasons): request = item_session.url_record + # Don't request pages more than once. + if request.url in self.requested_urls: + return False + + # Always skip certain URLs. + if SKIP_URLS and any(skip_url.match(request.url) for skip_url in SKIP_URLS): + return False + # We want to crawl links to different domains to test their validity. # But once we've done that, we don't want to keep crawling there. # Therefore, don't crawl links that start on different domains. @@ -140,22 +161,27 @@ def accept_url(self, item_session, verdict, reasons): elif list(qs.keys()) != ["page"]: return False - if request.url not in self.crawled_urls: + if request.url not in self.accepted_urls: logger.info(f"Crawling {request.url}") - self.crawled_urls.append(request.url) + self.accepted_urls.append(request.url) return True @hook(PluginFunctions.handle_error) def handle_error(self, item_session, error): - self.db_writer.write( - Error( - timestamp=timezone.now(), - url=item_session.request.url, - status_code=0, - referrer=item_session.request.fields.get("Referer"), + if item_session.request.url in self.requested_urls: + logger.debug(f"Already logged error for {item_session.request.url}") + else: + self.db_writer.write( + Error( + timestamp=timezone.now(), + url=item_session.request.url, + status_code=0, + referrer=item_session.request.fields.get("Referer"), + ) ) - ) + + self.requested_urls.append(item_session.request.url) @hook(PluginFunctions.handle_pre_response) def handle_pre_response(self, item_session): @@ -182,12 +208,12 @@ def handle_response(self, item_session): status_code = response.status_code timestamp = timezone.now() - if request.url in self.logged_urls: + if request.url in self.requested_urls: logger.debug(f"Already logged {request.url}") item_session.skip() return Actions.FINISH else: - self.logged_urls.append(request.url) + self.requested_urls.append(request.url) if status_code >= 300: referrer = request.fields.get("Referer") @@ -228,9 +254,6 @@ def handle_response(self, item_session): return Actions.NORMAL - if 200 != status_code: - raise ValueError(f"Unexpected status code {status_code} for {request.url}") - # If this request was to an external domain and it responded with # a normal status code, we don't care about recording it. if request.url_info.hostname_with_port != self.start_url.hostname_with_port: @@ -314,6 +337,17 @@ def process_200_response(self, request, response): return page +def patch_wpull_connection(): + @asyncio.coroutine + def readline(self): + data = yield from self.run_network_operation( + self.reader.readline(), wait_timeout=self._timeout, name="Readline" + ) + return data + + BaseConnection.readline = readline + + @click.command() @click.argument("start_url") @click.argument("db_filename", type=click.Path()) @@ -341,12 +375,16 @@ def command(start_url, db_filename, max_pages, depth, recreate): args = arg_parser.parse_args( [ start_url, + "--quiet", "--recursive", "--delete-after", "--no-robots", "--wait=0.5", "--random-wait", - "--timeout=30", + "--dns-timeout=5", + "--connect-timeout=5", + "--read-timeout=30", + "--session-timeout=30", "--span-hosts", "--link-extractors=html", "--follow-tags=a", @@ -367,4 +405,5 @@ def command(start_url, db_filename, max_pages, depth, recreate): # https://docs.djangoproject.com/en/3.2/topics/async/#async-safety os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" + patch_wpull_connection() return app.run_sync() From 438ad12a16e0b6679e0bb20cb17600b622b123ee Mon Sep 17 00:00:00 2001 From: Andy Chosak Date: Wed, 1 Nov 2023 14:03:47 -0400 Subject: [PATCH 7/9] More wpull refactoring --- crawler/management/commands/crawl.py | 366 ++------------------------- crawler/models.py | 102 ++++++++ crawler/tests/__init__.py | 0 crawler/tests/test_models.py | 110 ++++++++ crawler/wpull_plugin.py | 273 ++++++++++++++++++++ 5 files changed, 504 insertions(+), 347 deletions(-) create mode 100644 crawler/tests/__init__.py create mode 100644 crawler/tests/test_models.py create mode 100644 crawler/wpull_plugin.py diff --git a/crawler/management/commands/crawl.py b/crawler/management/commands/crawl.py index 25c9c86..6b56390 100644 --- a/crawler/management/commands/crawl.py +++ b/crawler/management/commands/crawl.py @@ -1,351 +1,11 @@ -import asyncio -import logging import os import os.path -import re -from urllib import parse - -from django.core.management import call_command -from django.db import connections -from django.utils import timezone import djclick as click -import lxml.html.soupparser from wpull.application.builder import Builder -from wpull.application.hook import Actions from wpull.application.options import AppArgumentParser -from wpull.application.plugin import PluginFunctions, WpullPlugin, hook -from wpull.network.connection import BaseConnection -from wpull.pipeline.item import URLProperties -from wpull.url import URLInfo - -from crawler.models import Component, Error, Link, Page, Redirect -from crawler.writer import DatabaseWriter - - -logger = logging.getLogger("crawler") - - -COMPONENT_SEARCH = re.compile(r"(?:(?:class=\")|\s)((?:o|m|a)-[\w\-]*)") -EXTERNAL_SITE = re.compile("/external-site/") -WHITESPACE = re.compile(r"\s+") - -SKIP_URLS = list( - map( - re.compile, - [ - r"^https://www.facebook.com/dialog/share\?.*", - r"^https://twitter.com/intent/tweet\?.*", - r"^https://www.linkedin.com/shareArticle\?.*", - ], - ) -) - - -def get_body(tree): - body = tree.find("./body") - - if body is not None: - drop_element_selectors = [ - ".o-header", - ".o-footer", - ".skip-nav", - "img", - "script", - "style", - ] - - for drop_element_selector in drop_element_selectors: - for element in body.cssselect(drop_element_selector): - element.drop_tree() - - return body - - -class DatabaseWritingPlugin(WpullPlugin): - def activate(self): - super().activate() - - self.start_url = URLInfo.parse(self.app_session.args.urls[0]) - self.db_filename, self.max_pages = self.app_session.args.plugin_args.rsplit( - ",", maxsplit=1 - ) - self.max_pages = int(self.max_pages) - - self.init_db() - self.accepted_urls = [] - self.requested_urls = [] - - def init_db(self): - db_alias = "warc_to_db" - connections.databases[db_alias] = { - "ENGINE": "django.db.backends.sqlite3", - "NAME": self.db_filename, - } - - call_command("migrate", database=db_alias, app_label="crawler", run_syncdb=True) - - self.db_writer = DatabaseWriter(db_alias) - - @property - def at_max_pages(self): - return self.max_pages and len(self.retrieved_urls) >= self.max_pages - - @hook(PluginFunctions.accept_url) - def accept_url(self, item_session, verdict, reasons): - # If upstream logic rejected this URL, let the rejection stand. - if not verdict: - return False - - # If we've already crawled enough pages, stop. - if self.at_max_pages: - return False - - request = item_session.url_record - - # Don't request pages more than once. - if request.url in self.requested_urls: - return False - - # Always skip certain URLs. - if SKIP_URLS and any(skip_url.match(request.url) for skip_url in SKIP_URLS): - return False - - # We want to crawl links to different domains to test their validity. - # But once we've done that, we don't want to keep crawling there. - # Therefore, don't crawl links that start on different domains. - if ( - request.parent_url_info.hostname_with_port - != self.start_url.hostname_with_port - ): - return False - - # If we're crawling on a different domain, use a HEAD request to avoid - # downloading files that might be linked, for example from S3. - if request.url_info.hostname_with_port != self.start_url.hostname_with_port: - if "." in request.url_info.path: - item_session.request.method = "HEAD" - - # If we're crawling on the start domain, apply additional rejections. - else: - # Don't crawl URLs that look like filenames. - if "." in request.url_info.path: - return False - - qs = parse.parse_qs(request.url_info.query) - - if qs: - # Don't crawl external link URLs directly. - # Instead crawl to their ultimate destination. - if EXTERNAL_SITE.match(request.url_info.path): - ext_urls = qs.get("ext_url") - if ext_urls: - # Add the external URL to the list to be crawled. - ext_url = ext_urls[0] - - url_properties = URLProperties() - url_properties.level = request.level - url_properties.inline_level = request.inline_level - url_properties.parent_url = request.parent_url - url_properties.root_url = request.root_url - - item_session.app_session.factory["URLTable"].remove_many( - [ext_url] - ) - item_session.add_url(ext_url, url_properites=url_properties) - return False - - # For all other URLs, limit querystrings that get crawled. - # Only crawl pages that only have the "page" parameter. - elif list(qs.keys()) != ["page"]: - return False - - if request.url not in self.accepted_urls: - logger.info(f"Crawling {request.url}") - self.accepted_urls.append(request.url) - - return True - - @hook(PluginFunctions.handle_error) - def handle_error(self, item_session, error): - if item_session.request.url in self.requested_urls: - logger.debug(f"Already logged error for {item_session.request.url}") - else: - self.db_writer.write( - Error( - timestamp=timezone.now(), - url=item_session.request.url, - status_code=0, - referrer=item_session.request.fields.get("Referer"), - ) - ) - - self.requested_urls.append(item_session.request.url) - - @hook(PluginFunctions.handle_pre_response) - def handle_pre_response(self, item_session): - # Our accept_url handler converts external requests from GET to HEAD. - # The wpull response body handler seems to assume that HEAD request - # responses will never have Content-Length or Transfer-Encoding - # headers, which doesn't seem to be the case in practice: - # - # https://github.com/ArchiveTeam/wpull/blob/v2.0.1/wpull/protocol/http/stream.py#L441-L451 - # - # Therefore, we strip these headers out if they exist, since we don't - # need them for our purposes. Since this was an external request, we - # care only about the status code, not the response body. - if item_session.request.method == "HEAD": - item_session.response.fields.pop("Content-Length", None) - item_session.response.fields.pop("Transfer-Encoding", None) - - return Actions.NORMAL - - @hook(PluginFunctions.handle_response) - def handle_response(self, item_session): - request = item_session.request - response = item_session.response - status_code = response.status_code - timestamp = timezone.now() - - if request.url in self.requested_urls: - logger.debug(f"Already logged {request.url}") - item_session.skip() - return Actions.FINISH - else: - self.requested_urls.append(request.url) - - if status_code >= 300: - referrer = request.fields.get("Referer") - - if status_code < 400: - location = response.fields.get("Location") - location_parsed = parse.urlparse(location) - - self.db_writer.write( - Redirect( - timestamp=timestamp, - url=request.url, - status_code=status_code, - referrer=referrer, - location=location, - ) - ) - - # Don't follow redirects that don't point to the start domain. - if ( - location_parsed.hostname - and location_parsed.hostname != self.start_url.hostname - ) or ( - location_parsed.port and location_parsed.port != self.start_url.port - ): - logger.debug(f"Not following redirect to {location}") - item_session.skip() - return Actions.FINISH - else: - self.db_writer.write( - Error( - timestamp=timestamp, - url=request.url, - status_code=status_code, - referrer=referrer, - ) - ) - - return Actions.NORMAL - - # If this request was to an external domain and it responded with - # a normal status code, we don't care about recording it. - if request.url_info.hostname_with_port != self.start_url.hostname_with_port: - item_session.skip() - return Actions.FINISH - - page_record = self.process_200_response(request, response) - - if page_record: - self.db_writer.write(page_record) - - return Actions.NORMAL - - def process_200_response(self, request, response): - timestamp = timezone.now() - - content_type = response.fields.get("Content-Type") - - if not content_type: - raise ValueError(f"Missing content type for {request.url}") - - if not content_type.startswith("text/html"): - return - - html = response.body.content().decode("utf-8") - - tree = lxml.html.soupparser.fromstring(html) - title_tag = tree.find(".//title") - title = title_tag.text.strip() if title_tag is not None else None - language = tree.find(".").get("lang") - - if title is None: - return - - body = get_body(tree) - - if body is not None: - text = WHITESPACE.sub(" ", body.text_content()).strip() - else: - text = None - - page = Page( - timestamp=timestamp, - url=request.url, - title=title, - language=language, - html=html, - text=text, - ) - - hrefs = list( - set( - href - for element, attribute, href, pos in body.iterlinks() - if "a" == element.tag and "href" == attribute - ) - ) - - # Remove any external link URL wrapping. - for i, href in enumerate(hrefs): - parsed_href = parse.urlparse(href) - if not EXTERNAL_SITE.match(parsed_href.path): - continue - - if parsed_href.netloc and self.start_url.host != parsed_href.netloc: - continue - - ext_url = parse.parse_qs(parsed_href.query).get("ext_url") - if ext_url: - hrefs[i] = ext_url[0] - - page.links = [Link(href=href) for href in sorted(hrefs)] - - body_html = lxml.etree.tostring(body, encoding="unicode") - - class_names = set(COMPONENT_SEARCH.findall(body_html)) - page.components = [ - Component(class_name=class_name) for class_name in sorted(class_names) - ] - - return page - - -def patch_wpull_connection(): - @asyncio.coroutine - def readline(self): - data = yield from self.run_network_operation( - self.reader.readline(), wait_timeout=self._timeout, name="Readline" - ) - return data - - BaseConnection.readline = readline +from crawler import wpull_plugin @click.command() @@ -360,17 +20,29 @@ def readline(self): is_flag=True, show_default=True, default=False, - help="Recreate database file if it already exists", + help="Overwrite SQLite database if it already exists", ) -def command(start_url, db_filename, max_pages, depth, recreate): +@click.option("--resume", is_flag=True) +def command(start_url, db_filename, max_pages, depth, recreate, resume): + """Crawl a website to a SQLite database.""" if os.path.exists(db_filename): - if not recreate: + if not recreate and not resume: raise click.ClickException( - f"File {db_filename} already exists, use --recreate to recreate." + f"File {db_filename} already exists, " + "use --recreate to recreate " + "or --resume to resume a previous crawl." ) os.remove(db_filename) + wpull_progress_filename = f"{db_filename}.wpull.db" + click.echo( + f"Storing crawl progress in {wpull_progress_filename}, use --resume to resume." + ) + + if not resume and os.path.exists(wpull_progress_filename): + os.path.remove(wpull_progress_filename) + arg_parser = AppArgumentParser() args = arg_parser.parse_args( [ @@ -391,8 +63,9 @@ def command(start_url, db_filename, max_pages, depth, recreate): "--user-agent=CFPB website indexer", "--no-check-certificate", f"--level={depth}", - f"--plugin-script={__file__}", + f"--plugin-script={wpull_plugin.__file__}", f"--plugin-args={db_filename},{max_pages}", + f"--database={wpull_progress_filename}", ] ) builder = Builder(args) @@ -405,5 +78,4 @@ def command(start_url, db_filename, max_pages, depth, recreate): # https://docs.djangoproject.com/en/3.2/topics/async/#async-safety os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" - patch_wpull_connection() return app.run_sync() diff --git a/crawler/models.py b/crawler/models.py index cf5ac28..b30f958 100644 --- a/crawler/models.py +++ b/crawler/models.py @@ -1,4 +1,10 @@ +import lxml.etree +import lxml.html.soupparser +import re +from urllib import parse + from django.db import models +from django.utils import timezone from modelcluster.models import ClusterableModel from modelcluster.fields import ParentalManyToManyField @@ -38,6 +44,102 @@ class Page(Request, ClusterableModel): def __str__(self): return self.url + HTML_COMPONENT_SEARCH = re.compile(r"(?:(?:class=\")|\s)((?:o|m|a)-[\w\-]*)") + HTML_EXTERNAL_SITE = re.compile("/external-site/") + HTML_WHITESPACE = re.compile(r"\s+") + + @classmethod + def from_html( + cls, + url, + html, + internal_link_host, + ): + try: + tree = lxml.html.fromstring(html) + except lxml.etree.ParserError: + # https://bugs.launchpad.net/lxml/+bug/1949271 + tree = lxml.html.soupparser.fromstring(html) + + title_tag = tree.find(".//title") + title = title_tag.text.strip() if title_tag is not None else None + language = tree.find(".").get("lang") + + if title is None: + return + + body = cls._get_cleaned_body_from_tree(tree) + + if body is not None: + text = cls.HTML_WHITESPACE.sub(" ", body.text_content()).strip() + else: + text = None + + page = Page( + timestamp=timezone.now(), + url=url, + title=title, + language=language, + html=html, + text=text, + ) + + if body is None: + return page + + hrefs = list( + set( + href + for element, attribute, href, pos in body.iterlinks() + if "a" == element.tag and "href" == attribute + ) + ) + + # Remove any external link URL wrapping. + for i, href in enumerate(hrefs): + parsed_href = parse.urlparse(href) + if not cls.HTML_EXTERNAL_SITE.match(parsed_href.path): + continue + + if parsed_href.netloc and internal_link_host != parsed_href.netloc: + continue + + ext_url = parse.parse_qs(parsed_href.query).get("ext_url") + if ext_url: + hrefs[i] = ext_url[0] + + page.links = [Link(href=href) for href in sorted(hrefs)] + + body_html = lxml.etree.tostring(body, encoding="unicode") + + class_names = set(cls.HTML_COMPONENT_SEARCH.findall(body_html)) + page.components = [ + Component(class_name=class_name) for class_name in sorted(class_names) + ] + + return page + + @staticmethod + def _get_cleaned_body_from_tree(tree): + """Extract page body without header, footer, images, or scripts.""" + body = tree.find("./body") + + if body is not None: + drop_element_selectors = [ + ".o-header", + ".o-footer", + ".skip-nav", + "img", + "script", + "style", + ] + + for drop_element_selector in drop_element_selectors: + for element in body.cssselect(drop_element_selector): + element.drop_tree() + + return body + class ErrorBase(Request): status_code = models.PositiveIntegerField(db_index=True) diff --git a/crawler/tests/__init__.py b/crawler/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/crawler/tests/test_models.py b/crawler/tests/test_models.py new file mode 100644 index 0000000..c6d1890 --- /dev/null +++ b/crawler/tests/test_models.py @@ -0,0 +1,110 @@ +from operator import attrgetter +from unittest.mock import patch + +import lxml.etree + +from django.test import SimpleTestCase + +from crawler.models import Error, Page, Redirect + + +class PageTests(SimpleTestCase): + def test_from_html_no_title_returns_none(self): + self.assertIsNone( + Page.from_html( + "https://example.com/", + "This page has no title.", + "example.com", + ) + ) + + def check_from_html(self): + html = """ + +Test page + + + + + + + + """.strip() + + page = Page.from_html("https://example.com/", html, "example.com") + self.assertEqual(str(page), "https://example.com/") + self.assertEqual(page.title, "Test page") + self.assertEqual(page.language, "en") + self.assertEqual(page.html, html) + self.assertEqual( + page.text, + ( + "Links " + "A regular link on the same domain. " + "An external link pointing to another domain " + "An external link missing its target " + "A link on another domain that also uses /external-site/" + ), + ) + self.assertCountEqual( + page.components.values_list("class_name", flat=True), + ["a-external-link", "m-links"], + ) + self.assertCountEqual( + page.links.values_list("href", flat=True), + [ + "/external-site/", + "/page/", + "https://example.org/", + "https://example.org/external-site/", + ], + ) + + def test_from_html(self): + self.check_from_html() + + def test_from_html_etree_fallback_parser(self): + with patch( + "lxml.html.fromstring", + side_effect=lxml.etree.ParserError("testing parser error"), + ): + self.check_from_html() + + def test_from_html_no_body(self): + html = 'Test page with no body</head></html>' + page = Page.from_html("https://example.com/", html, "example.com") + self.assertEqual(str(page), "https://example.com/") + self.assertEqual(page.title, "Test page with no body") + self.assertEqual(page.language, "en") + self.assertEqual(page.html, html) + self.assertIsNone(page.text) + + +class ErrorTests(SimpleTestCase): + def test_error_str(self): + self.assertEqual( + str(Error(url="/not-found/", status_code=404)), "/not-found/ 404 !" + ) + + def test_error_str_with_referrer(self): + self.assertEqual( + str( + Redirect( + url="/redirect/", + referrer="/source/", + status_code=301, + location="/destination/", + ) + ), + "/redirect/ (from /source/) 301 -> /destination/", + ) diff --git a/crawler/wpull_plugin.py b/crawler/wpull_plugin.py new file mode 100644 index 0000000..af85d08 --- /dev/null +++ b/crawler/wpull_plugin.py @@ -0,0 +1,273 @@ +import asyncio +import logging +import re +from urllib import parse + +from django.core.management import call_command +from django.db import connections +from django.utils import timezone + +from wpull.application.hook import Actions +from wpull.application.plugin import PluginFunctions, WpullPlugin, hook +from wpull.network.connection import BaseConnection +from wpull.pipeline.item import URLProperties +from wpull.url import URLInfo + +from crawler.models import Error, Page, Redirect +from crawler.writer import DatabaseWriter + + +logger = logging.getLogger("crawler") + + +SKIP_URLS = list( + map( + re.compile, + [ + r"^https://www.facebook.com/dialog/share\?.*", + r"^https://twitter.com/intent/tweet\?.*", + r"^https://www.linkedin.com/shareArticle\?.*", + ], + ) +) + +HEAD_URLS = list(map(re.compile, [r"https://files.consumerfinance.gov/.*"])) + + +def patch_wpull_connection(): + """Use wait_timeout instead of close_timeout for readline.""" + + @asyncio.coroutine + def readline(self): + data = yield from self.run_network_operation( + self.reader.readline(), wait_timeout=self._timeout, name="Readline" + ) + return data + + BaseConnection.readline = readline + + +class DatabaseWritingPlugin(WpullPlugin): + def activate(self): + super().activate() + + patch_wpull_connection() + + self.start_url = URLInfo.parse(self.app_session.args.urls[0]) + self.db_filename, self.max_pages = self.app_session.args.plugin_args.rsplit( + ",", maxsplit=1 + ) + self.max_pages = int(self.max_pages) + + self.db_writer = self.init_db() + self.accepted_urls = [] + self.requested_urls = [] + + def deactivate(self): + super().deactivate() + self.db_writer.analyze() + + def init_db(self): + db_alias = "warc_to_db" + + connections.databases[db_alias] = { + "ENGINE": "django.db.backends.sqlite3", + "NAME": self.db_filename, + } + + call_command("migrate", database=db_alias, app_label="crawler", run_syncdb=True) + + return DatabaseWriter(db_alias) + + @property + def at_max_pages(self): + return self.max_pages and len(self.requested_urls) >= self.max_pages + + @hook(PluginFunctions.accept_url) + def accept_url(self, item_session, verdict, reasons): + # If upstream logic rejected this URL, let the rejection stand. + if not verdict: + return False + + # If we've already crawled enough pages, stop. + if self.at_max_pages: + return False + + request = item_session.url_record + + # Don't request pages more than once. + if request.url in self.requested_urls: + return False + + # Always skip certain URLs. + if SKIP_URLS and any(skip_url.match(request.url) for skip_url in SKIP_URLS): + return False + + # We want to crawl links to different domains to test their validity. + # But once we've done that, we don't want to keep crawling there. + # Therefore, don't crawl links that start on different domains. + if ( + request.parent_url_info.hostname_with_port + != self.start_url.hostname_with_port + ): + return False + + # Use HEAD requests to speed up the crawl for certain external domains. + # We can't do this everywhere because other sites may respond to HEAD + # requests in inconvenient ways. This avoids the need to fully download + # external responses. + if HEAD_URLS and any(head_url.match(request.url) for head_url in HEAD_URLS): + item_session.request.method = "HEAD" + + # If we're crawling on the start domain, apply additional rejections. + elif request.url_info.hostname_with_port == self.start_url.hostname_with_port: + # Don't crawl URLs that look like filenames. + if "." in request.url_info.path: + return False + + qs = parse.parse_qs(request.url_info.query) + + if qs: + # Don't crawl external link URLs directly. + # Instead crawl to their ultimate destination. + if Page.HTML_EXTERNAL_SITE.match(request.url_info.path): + ext_urls = qs.get("ext_url") + if ext_urls: + # Add the external URL to the list to be crawled. + ext_url = ext_urls[0] + + url_properties = URLProperties() + url_properties.level = request.level + url_properties.inline_level = request.inline_level + url_properties.parent_url = request.parent_url + url_properties.root_url = request.root_url + + item_session.app_session.factory["URLTable"].remove_many( + [ext_url] + ) + item_session.add_url(ext_url, url_properites=url_properties) + return False + + # For all other URLs, limit querystrings that get crawled. + # Only crawl pages that only have the "page" parameter. + elif list(qs.keys()) != ["page"]: + return False + + if request.url not in self.accepted_urls: + logger.info(f"Crawling {request.url}") + self.accepted_urls.append(request.url) + + return True + + @hook(PluginFunctions.handle_error) + def handle_error(self, item_session, error): + if item_session.request.url in self.requested_urls: + logger.debug(f"Already logged error for {item_session.request.url}") + else: + logger.debug(error) + self.db_writer.write( + Error( + timestamp=timezone.now(), + url=item_session.request.url, + status_code=0, + referrer=item_session.request.fields.get("Referer"), + ) + ) + + self.requested_urls.append(item_session.request.url) + + @hook(PluginFunctions.handle_pre_response) + def handle_pre_response(self, item_session): + # Our accept_url handler converts certain external requests from GET to + # HEAD. The wpull response body handler seems to assume that HEAD + # request responses will never have Content-Length or Transfer-Encoding + # headers, which doesn't seem to be the case in practice: + # + # https://github.com/ArchiveTeam/wpull/blob/v2.0.1/wpull/protocol/http/stream.py#L441-L451 + # + # Therefore, we strip these headers out if they exist, since we don't + # need them for our purposes. Since this was an external request, we + # care only about the status code, not the response body. + if item_session.request.method == "HEAD": + item_session.response.fields.pop("Content-Length", None) + item_session.response.fields.pop("Transfer-Encoding", None) + + return Actions.NORMAL + + @hook(PluginFunctions.handle_response) + def handle_response(self, item_session): + request = item_session.request + response = item_session.response + status_code = response.status_code + timestamp = timezone.now() + + if request.url in self.requested_urls: + logger.debug(f"Already logged {request.url}") + item_session.skip() + return Actions.FINISH + else: + self.requested_urls.append(request.url) + + if status_code >= 300: + referrer = request.fields.get("Referer") + + if status_code < 400: + location = response.fields.get("Location") + location_parsed = parse.urlparse(location) + + self.db_writer.write( + Redirect( + timestamp=timestamp, + url=request.url, + status_code=status_code, + referrer=referrer, + location=location, + ) + ) + + # Don't follow redirects that don't point to the start domain. + if ( + location_parsed.hostname + and location_parsed.hostname != self.start_url.hostname + ) or ( + location_parsed.port and location_parsed.port != self.start_url.port + ): + logger.debug(f"Not following redirect to {location}") + item_session.skip() + return Actions.FINISH + else: + self.db_writer.write( + Error( + timestamp=timestamp, + url=request.url, + status_code=status_code, + referrer=referrer, + ) + ) + + return Actions.NORMAL + + # If this request was to an external domain and it responded with + # a normal status code, we don't care about recording it. + if request.url_info.hostname_with_port != self.start_url.hostname_with_port: + item_session.skip() + return Actions.FINISH + + page_record = self.process_200_response(request, response) + + if not page_record: + logger.debug(f"Unexpected response for {request.url}, skipping") + item_session.skip() + return Actions.FINISH + + self.db_writer.write(page_record) + return Actions.NORMAL + + def process_200_response(self, request, response): + content_type = response.fields.get("Content-Type") + + if not (content_type or "").startswith("text/html"): + return + + html = response.body.content().decode("utf-8") + return Page.from_html(request.url, html, self.start_url.hostname) From a58c707dbff1eaab7938985cd0ed2878d7acf55a Mon Sep 17 00:00:00 2001 From: Andy Chosak <andy.chosak@cfpb.gov> Date: Thu, 2 Nov 2023 12:59:50 +0000 Subject: [PATCH 8/9] Bug fix: don't recreate database if resuming --- crawler/management/commands/crawl.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crawler/management/commands/crawl.py b/crawler/management/commands/crawl.py index 6b56390..b473ba4 100644 --- a/crawler/management/commands/crawl.py +++ b/crawler/management/commands/crawl.py @@ -33,7 +33,8 @@ def command(start_url, db_filename, max_pages, depth, recreate, resume): "or --resume to resume a previous crawl." ) - os.remove(db_filename) + if recreate: + os.remove(db_filename) wpull_progress_filename = f"{db_filename}.wpull.db" click.echo( From bf51eb8da707f0cc99ce8e64e02a516febda503c Mon Sep 17 00:00:00 2001 From: Andy Chosak <andy.chosak@cfpb.gov> Date: Thu, 2 Nov 2023 13:01:47 +0000 Subject: [PATCH 9/9] Echo exit status upon completion --- crawler/management/commands/crawl.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crawler/management/commands/crawl.py b/crawler/management/commands/crawl.py index b473ba4..78705fb 100644 --- a/crawler/management/commands/crawl.py +++ b/crawler/management/commands/crawl.py @@ -79,4 +79,6 @@ def command(start_url, db_filename, max_pages, depth, recreate, resume): # https://docs.djangoproject.com/en/3.2/topics/async/#async-safety os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" - return app.run_sync() + exit_status = app.run_sync() + click.echo(f"done, exiting with status {exit_status}") + return exit_status