From 92fa68a424ff5177691ad81556ad72736fc8c8f7 Mon Sep 17 00:00:00 2001 From: Andy Chosak Date: Wed, 25 Oct 2023 17:19:51 -0400 Subject: [PATCH] Iterating on wpull-based crawler --- crawler/management/commands/crawl.py | 168 ++++++++++++++++++++------- crawler/models.py | 19 ++- crawler/writer.py | 7 ++ requirements/base.txt | 1 + settings.py | 17 +++ 5 files changed, 170 insertions(+), 42 deletions(-) diff --git a/crawler/management/commands/crawl.py b/crawler/management/commands/crawl.py index 512092c..fe76fec 100644 --- a/crawler/management/commands/crawl.py +++ b/crawler/management/commands/crawl.py @@ -1,18 +1,19 @@ +import logging import os import os.path import re -from email.utils import parsedate_to_datetime from urllib import parse from django.core.management import call_command from django.db import connections +from django.utils import timezone import djclick as click -import lxml.html +import lxml.html.soupparser from wpull.application.builder import Builder from wpull.application.hook import Actions from wpull.application.options import AppArgumentParser -from wpull.application.plugin import PluginFunctions, WpullPlugin, hook +from wpull.application.plugin import PluginFunctions, WpullPlugin, event, hook from wpull.pipeline.item import URLProperties from wpull.url import URLInfo @@ -20,6 +21,9 @@ from crawler.writer import DatabaseWriter +logger = logging.getLogger("crawler") + + COMPONENT_SEARCH = re.compile(r"(?:(?:class=\")|\s)((?:o|m|a)-[\w\-]*)") EXTERNAL_SITE = re.compile("/external-site/") WHITESPACE = re.compile(r"\s+") @@ -56,7 +60,8 @@ def activate(self): self.max_pages = int(self.max_pages) self.init_db() - self.num_pages = 0 + self.crawled_urls = [] + self.logged_urls = [] def init_db(self): db_alias = "warc_to_db" @@ -72,10 +77,15 @@ def init_db(self): @property def at_max_pages(self): - return self.max_pages and self.num_pages >= self.max_pages + return self.max_pages and len(self.logged_urls) >= self.max_pages @hook(PluginFunctions.accept_url) def accept_url(self, item_session, verdict, reasons): + # If upstream logic rejected this URL, let the rejection stand. + if not verdict: + return False + + # If we've already crawled enough pages, stop. if self.at_max_pages: return False @@ -90,8 +100,14 @@ def accept_url(self, item_session, verdict, reasons): ): return False + # If we're crawling on a different domain, use a HEAD request to avoid + # downloading files that might be linked, for example from S3. + if request.url_info.hostname_with_port != self.start_url.hostname_with_port: + if "." in request.url_info.path: + item_session.request.method = "HEAD" + # If we're crawling on the start domain, apply additional rejections. - if request.url_info.hostname_with_port == self.start_url.hostname_with_port: + else: # Don't crawl URLs that look like filenames. if "." in request.url_info.path: return False @@ -102,16 +118,21 @@ def accept_url(self, item_session, verdict, reasons): # Don't crawl external link URLs directly. # Instead crawl to their ultimate destination. if EXTERNAL_SITE.match(request.url_info.path): - ext_url = qs.get("ext_url") - if ext_url: + ext_urls = qs.get("ext_url") + if ext_urls: # Add the external URL to the list to be crawled. + ext_url = ext_urls[0] + url_properties = URLProperties() url_properties.level = request.level url_properties.inline_level = request.inline_level url_properties.parent_url = request.parent_url url_properties.root_url = request.root_url - item_session.add_url(ext_url[0], url_properites=url_properties) + item_session.app_session.factory["URLTable"].remove_many( + [ext_url] + ) + item_session.add_url(ext_url, url_properites=url_properties) return False # For all other URLs, limit querystrings that get crawled. @@ -119,62 +140,124 @@ def accept_url(self, item_session, verdict, reasons): elif list(qs.keys()) != ["page"]: return False - return verdict + if request.url not in self.crawled_urls: + logger.info(f"Crawling {request.url}") + self.crawled_urls.append(request.url) - @hook(PluginFunctions.handle_response) - def my_handle_response(self, item_session): - self.num_pages += 1 - if self.at_max_pages: - item_session.skip() - return Actions.FINISH + return True - db_record = self.process_response(item_session.request, item_session.response) + @hook(PluginFunctions.handle_error) + def handle_error(self, item_session, error): + self.db_writer.write( + Error( + timestamp=timezone.now(), + url=item_session.request.url, + status_code=0, + referrer=item_session.request.fields.get("Referer"), + ) + ) - if db_record: - self.db_writer.write(db_record) + @hook(PluginFunctions.handle_pre_response) + def handle_pre_response(self, item_session): + # Our accept_url handler converts external requests from GET to HEAD. + # The wpull response body handler seems to assume that HEAD request + # responses will never have Content-Length or Transfer-Encoding + # headers, which doesn't seem to be the case in practice: + # + # https://github.com/ArchiveTeam/wpull/blob/v2.0.1/wpull/protocol/http/stream.py#L441-L451 + # + # Therefore, we strip these headers out if they exist, since we don't + # need them for our purposes. Since this was an external request, we + # care only about the status code, not the response body. + if item_session.request.method == "HEAD": + item_session.response.fields.pop("Content-Length", None) + item_session.response.fields.pop("Transfer-Encoding", None) return Actions.NORMAL - def process_response(self, request, response): + @hook(PluginFunctions.handle_response) + def handle_response(self, item_session): + request = item_session.request + response = item_session.response status_code = response.status_code - content_type = response.fields["Content-Type"] - timestamp = parsedate_to_datetime(response.fields["Date"]) - referrer = request.fields.get("Referer") + timestamp = timezone.now() + + if request.url in self.logged_urls: + logger.debug(f"Already logged {request.url}") + item_session.skip() + return Actions.FINISH + else: + self.logged_urls.append(request.url) if status_code >= 300: + referrer = request.fields.get("Referer") + if status_code < 400: location = response.fields.get("Location") - return Redirect( - timestamp=timestamp, - url=request.url, - status_code=status_code, - referrer=referrer, - location=location, + location_parsed = parse.urlparse(location) + + self.db_writer.write( + Redirect( + timestamp=timestamp, + url=request.url, + status_code=status_code, + referrer=referrer, + location=location, + ) ) + + # Don't follow redirects that don't point to the start domain. + if ( + location_parsed.hostname + and location_parsed.hostname != self.start_url.hostname + ) or ( + location_parsed.port and location_parsed.port != self.start_url.port + ): + logger.debug(f"Not following redirect to {location}") + item_session.skip() + return Actions.FINISH else: - return Error( - timestamp=timestamp, - url=request.url, - status_code=status_code, - referrer=referrer, + self.db_writer.write( + Error( + timestamp=timestamp, + url=request.url, + status_code=status_code, + referrer=referrer, + ) ) + return Actions.NORMAL + if 200 != status_code: raise ValueError(f"Unexpected status code {status_code} for {request.url}") + # If this request was to an external domain and it responded with + # a normal status code, we don't care about recording it. + if request.url_info.hostname_with_port != self.start_url.hostname_with_port: + item_session.skip() + return Actions.FINISH + + page_record = self.process_200_response(request, response) + + if page_record: + self.db_writer.write(page_record) + + return Actions.NORMAL + + def process_200_response(self, request, response): + timestamp = timezone.now() + + content_type = response.fields.get("Content-Type") + if not content_type: raise ValueError(f"Missing content type for {request.url}") if not content_type.startswith("text/html"): return - # We don't record external page data because we've only crawled them to - # check for redirects, 404s, or other errors. - if request.url_info.hostname_with_port != self.start_url.hostname_with_port: - return - html = response.body.content().decode("utf-8") - tree = lxml.html.fromstring(html) + + tree = lxml.html.soupparser.fromstring(html) title_tag = tree.find(".//title") title = title_tag.text.strip() if title_tag is not None else None language = tree.find(".").get("lang") @@ -259,13 +342,16 @@ def command(start_url, db_filename, max_pages, depth, recreate): [ start_url, "--recursive", - "--no-verbose", "--delete-after", "--no-robots", "--wait=0.5", "--random-wait", + "--timeout=30", "--span-hosts", + "--link-extractors=html", + "--follow-tags=a", "--user-agent=CFPB website indexer", + "--no-check-certificate", f"--level={depth}", f"--plugin-script={__file__}", f"--plugin-args={db_filename},{max_pages}", diff --git a/crawler/models.py b/crawler/models.py index bf74d1e..cf5ac28 100644 --- a/crawler/models.py +++ b/crawler/models.py @@ -35,6 +35,9 @@ class Page(Request, ClusterableModel): components = ParentalManyToManyField(Component, related_name="pages") links = ParentalManyToManyField(Link, related_name="links") + def __str__(self): + return self.url + class ErrorBase(Request): status_code = models.PositiveIntegerField(db_index=True) @@ -43,10 +46,24 @@ class ErrorBase(Request): class Meta(Request.Meta): abstract = True + def __str__(self): + s = self.url + + if self.referrer: + s += f" (from {self.referrer})" + + s += f" {self.status_code}" + + return s + class Error(ErrorBase): - pass + def __str__(self): + return super().__str__() + " !" class Redirect(ErrorBase): location = models.TextField(db_index=True) + + def __str__(self): + return super().__str__() + f" -> {self.location}" diff --git a/crawler/writer.py b/crawler/writer.py index 1d15f29..caaadb7 100644 --- a/crawler/writer.py +++ b/crawler/writer.py @@ -1,8 +1,13 @@ +import logging + from django.db import connections from crawler.models import Component, Link, Page +logger = logging.getLogger("crawler") + + class DatabaseWriter: def __init__(self, db): self.db = db @@ -12,6 +17,7 @@ def write(self, instance): if isinstance(instance, Page): return self._write_page(instance) else: + logger.debug(f"Saving {instance}") instance.save(using=self.db) def _write_page(self, page): @@ -36,6 +42,7 @@ def _write_page(self, page): .values() ) + logger.debug(f"Saving {page}") page.save(using=self.db) def analyze(self): diff --git a/requirements/base.txt b/requirements/base.txt index 9006775..91679fe 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,3 +1,4 @@ +beautifulsoup4==4.12.2 click==8.0.4 cssselect==1.1.0 Django==3.2.22 diff --git a/settings.py b/settings.py index df1ad38..b97fd95 100644 --- a/settings.py +++ b/settings.py @@ -136,3 +136,20 @@ "PAGE_SIZE": 25, "UNAUTHENTICATED_USER": None, } + +LOGGING = { + "version": 1, + "disable_existing_loggers": False, + "handlers": { + "console": { + "class": "logging.StreamHandler", + }, + }, + "loggers": { + "crawler": { + "handlers": ["console"], + "level": "DEBUG", + "propagate": False, + }, + }, +}