Skip to content

Commit

Permalink
Merge pull request #58 from hynky1999/pipeline
Browse files Browse the repository at this point in the history
Pipeline
  • Loading branch information
hynky1999 authored May 11, 2023
2 parents 23e2c9d + 47e7a22 commit 67b99eb
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 86 deletions.
2 changes: 1 addition & 1 deletion cmoncrawl/integrations/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DownloadOutputFormat(Enum):

def add_mode_args(subparser: Any):
record_parser = subparser.add_parser(DownloadOutputFormat.RECORD.value)
record_parser.add_argument("--max_crawls_per_file", type=int, default=100_000)
record_parser.add_argument("--max_crawls_per_file", type=int, default=500_000)
subparser.add_parser(DownloadOutputFormat.HTML.value)
return subparser

Expand Down
4 changes: 2 additions & 2 deletions cmoncrawl/integrations/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.middleware.synchronized import extract
import argparse
from typing import Any, Dict, List
from typing import Any, List
import asyncio
from cmoncrawl.processor.pipeline.streamer import (
StreamerFileJSON,
Expand Down Expand Up @@ -45,7 +45,7 @@ def add_args(subparser: Any):
)
parser.add_argument("output_path", type=Path)
parser.add_argument("files", nargs="+", type=Path)
parser.add_argument("--max_crawls_per_file", type=int, default=100_000)
parser.add_argument("--max_crawls_per_file", type=int, default=500_000)
parser.add_argument("--max_directory_size", type=int, default=1000)
parser.add_argument("--n_proc", type=int, default=1)
mode_subparser = parser.add_subparsers(dest="mode", required=True)
Expand Down
67 changes: 52 additions & 15 deletions cmoncrawl/middleware/synchronized.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from cmoncrawl.aggregator.index_query import IndexAggregator
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.common.types import DomainRecord
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.loggers import all_purpose_logger, metadata_logger
from cmoncrawl.aggregator.utils.helpers import unify_url_id
from tqdm import tqdm
import asyncio


Expand All @@ -20,15 +21,11 @@ async def index_and_extract(
try:
async with index_agg:
async for domain_record in index_agg:
if (
filter_non_unique_url
and unify_url_id(domain_record.url) in processed_urls
):
url = domain_record.url or ""
if filter_non_unique_url and unify_url_id(url) in processed_urls:
continue
try:
paths: List[Path] = await pipeline.process_domain_record(
domain_record
)
await pipeline.process_domain_record(domain_record)
except KeyboardInterrupt as e:
break

Expand All @@ -37,26 +34,66 @@ async def index_and_extract(
f"Failed to process {domain_record.url} with {e}"
)
continue
processed_urls.add(unify_url_id(domain_record.url))
processed_urls.add(unify_url_id(url))

finally:
if hasattr(pipeline.downloader, "__aexit__"):
await pipeline.downloader.__aexit__(None, None, None)


async def _extract_task(domain_record: DomainRecord, pipeline: ProcessorPipeline):
result = []
try:
result = await pipeline.process_domain_record(domain_record)
except KeyboardInterrupt as e:
raise e
except Exception as e:
metadata_logger.error(
f"Failed to process {domain_record.url} with {e}",
extra={"domain_record": domain_record},
)
return result


async def extract(
domain_records: List[DomainRecord],
pipeline: ProcessorPipeline,
concurrent_length: int = 20,
timeout: int = 5,
):
domain_records_iterator = iter(tqdm(domain_records))
domains_exausted = False
if hasattr(pipeline.downloader, "__aenter__"):
await pipeline.downloader.__aenter__()
try:
await asyncio.gather(
*[
pipeline.process_domain_record(domain_record)
for domain_record in domain_records
]
)
queue: Set[asyncio.Task[List[Path]]] = set()
while not domains_exausted or len(queue) > 0:
# Put into queue till possible
while len(queue) < concurrent_length and not domains_exausted:
next_domain_record = next(domain_records_iterator, None)
if next_domain_record is None:
domains_exausted = True
break

queue.add(
asyncio.create_task(_extract_task(next_domain_record, pipeline))
)

done, queue = await asyncio.wait(
queue, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
await task
except KeyboardInterrupt as e:
break

except Exception as _:
all_purpose_logger.error(f"Failed to process {task}")
pass
except Exception as e:
all_purpose_logger.error(e, exc_info=True)

finally:
if hasattr(pipeline.downloader, "__aexit__"):
await pipeline.downloader.__aexit__(None, None, None)
12 changes: 10 additions & 2 deletions cmoncrawl/processor/pipeline/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ def __init__(self, filter_non_ok: bool = True):
self.filter_non_ok = filter_non_ok

def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata):
metadata.name = metadata.domain_record.url.replace("/", "_")[:100]
metadata.name = (
metadata.domain_record.url.replace("/", "_")[:100]
if metadata.domain_record.url is not None
else "unknown"
)
result_dict: Dict[str, Any] = {"html": str(soup)}

return result_dict
Expand Down Expand Up @@ -120,7 +124,11 @@ def __init__(self, filter_non_ok: bool = True):
self.filter_non_ok = filter_non_ok

def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata):
metadata.name = metadata.domain_record.url.replace("/", "_")[:100]
metadata.name = (
metadata.domain_record.url.replace("/", "_")[:100]
if metadata.domain_record.url is not None
else "unknown"
)
result_dict: Dict[str, Any] = {
"domain_record": metadata.domain_record.to_dict()
}
Expand Down
44 changes: 19 additions & 25 deletions cmoncrawl/processor/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import List
from typing import Any, Dict, List
from cmoncrawl.processor.pipeline.downloader import IDownloader
from cmoncrawl.processor.pipeline.streamer import IStreamer
from cmoncrawl.processor.pipeline.router import IRouter
Expand All @@ -16,7 +16,9 @@ def __init__(
self.downloader = downloader
self.oustreamer = outstreamer

async def process_domain_record(self, domain_record: DomainRecord):
async def process_domain_record(
self, domain_record: DomainRecord, additional_info: Dict[str, Any] = {}
):
paths: List[Path] = []
downloaded_articles = []
try:
Expand All @@ -25,28 +27,20 @@ async def process_domain_record(self, domain_record: DomainRecord):
metadata_logger.error(f"{e}", extra={"domain_record": domain_record})

for (downloaded_article, metadata) in downloaded_articles:
try:
extractor = self.router.route(
metadata.domain_record.url,
metadata.domain_record.timestamp,
metadata,
)
output = extractor.extract(downloaded_article, metadata)
if output is None:
metadata_logger.debug(
f"No output from {extractor.__class__}",
extra={"domain_record": metadata.domain_record},
)
continue
paths.append(await self.oustreamer.stream(output, metadata))
metadata_logger.info(
"Successfully processed",
extra={"domain_record": metadata.domain_record},
)
except ValueError as e:
metadata_logger.error(
str(e),
extra={"domain_record": domain_record},
extractor = self.router.route(
metadata.domain_record.url,
metadata.domain_record.timestamp,
metadata,
)
output = extractor.extract(downloaded_article, metadata)
if output is None:
metadata_logger.warn(
f"Extractor {extractor.__class__.__name__} returned None for {metadata.domain_record.url}"
)
# Not catching IOError because some other processor could process it -> nack
continue

if "additional_info" not in output:
output["additional_info"] = additional_info

paths.append(await self.oustreamer.stream(output, metadata))
return paths
14 changes: 11 additions & 3 deletions cmoncrawl/processor/pipeline/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class Route:

class IRouter(ABC):
@abstractmethod
def route(self, url: str, time: datetime, metadata: PipeMetadata) -> IExtractor:
def route(
self, url: str | None, time: datetime | None, metadata: PipeMetadata
) -> IExtractor:
raise NotImplementedError()


Expand All @@ -39,7 +41,7 @@ def __init__(self):
self.registered_routes: List[Route] = []
self.modules: Dict[str, IExtractor] = {}

def load_module(self, module_path: Path) -> IExtractor:
def load_module(self, module_path: Path):
module_name = os.path.splitext(os.path.basename(module_path))[0]
spec = importlib.util.spec_from_file_location(module_name, module_path)
if spec is None:
Expand All @@ -51,7 +53,10 @@ def load_module(self, module_path: Path) -> IExtractor:
raise Exception("Failed to load module: " + module_name)

spec.loader.exec_module(module)
return module, module_name

def load_module_as_extractor(self, module_path: Path):
module, module_name = self.load_module(module_path)
name: str = getattr(module, "NAME", module_name)
extractor: IExtractor | None = getattr(module, "extractor", None)
if extractor is None:
Expand All @@ -67,7 +72,10 @@ def load_modules(self, folder: Path):
if not file.endswith(".py"):
continue

extractors.append(self.load_module(Path(root) / file))
if file == "__init__.py":
self.load_module(Path(root) / file)

extractors.append(self.load_module_as_extractor(Path(root) / file))
all_purpose_logger.info(f"Loaded {len(extractors)} extractors")

def load_extractor(self, name: str, extractor: IExtractor):
Expand Down
3 changes: 1 addition & 2 deletions cmoncrawl/processor/pipeline/streamer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from abc import ABC, abstractmethod
import asyncio
import json
from math import log
from pathlib import Path
import random
from typing import Any, Dict, List
Expand Down Expand Up @@ -183,7 +182,7 @@ def __init__(
max_file_size: int,
pretty: Boolean = False,
):
super().__init__(root, max_directory_size, max_file_size, extension=".json")
super().__init__(root, max_directory_size, max_file_size, extension=".jsonl")
self.pretty = pretty

def metadata_to_string(self, extracted_data: Dict[Any, Any]) -> str:
Expand Down
36 changes: 8 additions & 28 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,8 @@ build-backend = "setuptools.build_meta"

[project]
name = "CmonCrawl"
version = "0.9.0"
dependencies = [
"aiofiles==0.8.0",
"aiohttp==3.8.1",
"aiosignal==1.2.0",
"async-timeout==4.0.2",
"attrs==21.4.0",
"beautifulsoup4==4.11.1",
"bs4==0.0.1",
"charset-normalizer==2.1.0",
"dataclasses-json==0.5.7",
"docopt==0.6.2",
"frozenlist==1.3.0",
"idna==3.3",
"marshmallow==3.19.0",
"marshmallow-enum==1.5.1",
"multidict==6.0.2",
"mypy-extensions==1.0.0",
"packaging==23.1",
"six==1.16.0",
"soupsieve==2.3.2.post1",
"stomp.py==8.0.1",
"typing-inspect==0.8.0",
"typing_extensions==4.5.0",
"warcio==1.7.4",
"yarl==1.7.2"
]

dynamic = ["version"]

keywords = [
"Common Crawl",
Expand All @@ -44,14 +19,19 @@ keywords = [
]

readme = "README.md"
license = {file = "MIT"}
license = {file = "LICENSE"}

classifiers = [
"Development Status :: 3 - Alpha",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
]
[tool.setuptools_scm]


[tool.setuptools.dynamic]
dependencies = {file = "requirements.txt"}

[tool.setuptools.packages.find]
include = ["cmoncrawl*"]
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ packaging==23.1
six==1.16.0
soupsieve==2.3.2.post1
stomp.py==8.0.1
tqdm==4.65.0
typing-inspect==0.8.0
typing_extensions==4.5.0
warcio==1.7.4
Expand Down
6 changes: 3 additions & 3 deletions tests/end_to_end_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def test_extract_from_records(self):
cfg: ExtractConfig = ExtractConfig.schema(many=False).load(js)
results = await extract_from_files(
config=cfg,
files=[self.base_folder / "files" / "file.json"],
files=[self.base_folder / "files" / "file.jsonl"],
output_path=self.base_folder / "output",
mode=ExtractMode.RECORD,
date=datetime(2021, 1, 1),
Expand All @@ -42,7 +42,7 @@ async def test_extract_from_records(self):
max_retry=1,
sleep_step=1,
)
with open(self.output_folder / "directory_0" / "0_file.json") as f:
with open(self.output_folder / "directory_0" / "0_file.jsonl") as f:
lines = f.readlines()
self.assertEqual(len(lines), 5)
self.assertEqual(
Expand All @@ -67,7 +67,7 @@ async def test_extract_from_html(self):
max_retry=1,
sleep_step=1,
)
with open(self.output_folder / "directory_0" / "0_file.json") as f:
with open(self.output_folder / "directory_0" / "0_file.jsonl") as f:
lines = f.readlines()
self.assertEqual(len(lines), 1)
self.assertEqual(
Expand Down
5 changes: 0 additions & 5 deletions tests/test_extract/files/file.json

This file was deleted.

0 comments on commit 67b99eb

Please sign in to comment.