Skip to content

Commit

Permalink
better logging and add info working now
Browse files Browse the repository at this point in the history
  • Loading branch information
hynky1999 committed May 11, 2023
1 parent d1c66ae commit eeaac08
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
33 changes: 24 additions & 9 deletions cmoncrawl/integrations/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import json
import multiprocessing
from pathlib import Path

from tqdm import tqdm
from cmoncrawl.common.types import ExtractConfig

from cmoncrawl.processor.pipeline.downloader import DownloaderDummy, AsyncDownloader
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.middleware.synchronized import extract
import argparse
from typing import Any, List
from typing import Any, Dict, List, Tuple
import asyncio
from cmoncrawl.processor.pipeline.streamer import (
StreamerFileJSON,
Expand Down Expand Up @@ -68,15 +70,28 @@ def get_extract_downloader(
return AsyncDownloader(max_retry=max_retry, sleep_step=sleep_step)


def get_domain_records_json(file_path: Path) -> List[DomainRecord]:
def get_domain_records_json(
file_path: Path,
) -> List[Tuple[DomainRecord, Dict[str, Any]]]:
records: List[Tuple[DomainRecord, Dict[str, Any]]] = []
with open(file_path, "r") as f:
js = [json.loads(line) for line in f.readlines()]
return [DomainRecord.schema().load(record["domain_record"]) for record in js]
for line in tqdm(f):
js = json.loads(line)
domain_record: DomainRecord = DomainRecord.schema().load(
js["domain_record"]
)
additional_info = js.get("additional_info", {})
if not isinstance(additional_info, dict):
additional_info = {}
records.append((domain_record, additional_info))
return records


def get_domain_records_html(url: str | None, date: datetime | None):
def get_domain_records_html(
url: str | None, date: datetime | None
) -> List[Tuple[DomainRecord, Dict[str, Any]]]:
# Just return dummy as correct crawl will be loaded from dummy downloader
return [DomainRecord("", url=url, offset=0, length=0, timestamp=date)]
return [DomainRecord("", url=url, offset=0, length=0, timestamp=date), {}]


def load_config(config_path: Path) -> ExtractConfig:
Expand Down Expand Up @@ -111,10 +126,10 @@ async def extract_from_files(
for path in files:
match mode:
case ExtractMode.RECORD:
domain_records = get_domain_records_json(path)
records = get_domain_records_json(path)
case ExtractMode.HTML:
domain_records = get_domain_records_html(url, date)
await extract(domain_records, pipeline)
records = get_domain_records_html(url, date)
await extract(records, pipeline)


def _extract_task(
Expand Down
6 changes: 6 additions & 0 deletions cmoncrawl/middleware/synchronized.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async def index_and_extract(
filter_non_unique_url: bool = False,
):
processed_urls: Set[str] = set()
total_extracted: int = 0

if hasattr(pipeline.downloader, "__aenter__"):
await pipeline.downloader.__aenter__()
Expand All @@ -26,6 +27,7 @@ async def index_and_extract(
continue
try:
await pipeline.process_domain_record(domain_record, {})
total_extracted += 1
except KeyboardInterrupt as e:
break

Expand All @@ -39,6 +41,7 @@ async def index_and_extract(
finally:
if hasattr(pipeline.downloader, "__aexit__"):
await pipeline.downloader.__aexit__(None, None, None)
all_purpose_logger.info(f"Extracted {total_extracted} urls")


async def _extract_task(
Expand Down Expand Up @@ -67,6 +70,7 @@ async def extract(
):
domain_records_iterator = iter(tqdm(records))
domains_exausted = False
total_extracted: int = 0
if hasattr(pipeline.downloader, "__aenter__"):
await pipeline.downloader.__aenter__()
try:
Expand All @@ -92,6 +96,7 @@ async def extract(
for task in done:
try:
await task
total_extracted += 1
except KeyboardInterrupt as e:
break

Expand All @@ -104,3 +109,4 @@ async def extract(
finally:
if hasattr(pipeline.downloader, "__aexit__"):
await pipeline.downloader.__aexit__(None, None, None)
all_purpose_logger.info(f"Extracted {total_extracted} urls")

0 comments on commit eeaac08

Please sign in to comment.