diff --git a/data-processing/pyproject.toml b/data-processing/pyproject.toml index fc6190a1..5c8bb2ea 100644 --- a/data-processing/pyproject.toml +++ b/data-processing/pyproject.toml @@ -22,9 +22,12 @@ dependencies = [ "requests>=2.31.0", "polars>0.19.1", # Data cleaning dependencies: - "dolma[pii,code]@git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released - "kenlm>=0.2.0", # Used for perplexity tagging - "blingfire>=0.1.8", # Used for perplexity tagging + "dolma[pii,code]", + # @git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released + "kenlm>=0.2.0", + # Used for perplexity tagging + "blingfire>=0.1.8", + # Used for perplexity tagging "mosaicml-streaming", "orjsonl", "tqdm", @@ -32,6 +35,14 @@ dependencies = [ "nlp_dedup", "datatrove>=0.2.0", "pyyaml", + "docling>=1.20.0", + "joblib>=1.4.2", + "pdfminer-six>=20240706", + "pypandoc-binary>=1.14", + "trafilatura>=1.9.0", + "typer>=0.12.5", + "loguru>=0.7.2", + "extract-msg>=0.52.0", ] [project.optional-dependencies] diff --git a/data-processing/scripts/cellar/README.md b/data-processing/scripts/cellar/README.md new file mode 100644 index 00000000..55ec8e00 --- /dev/null +++ b/data-processing/scripts/cellar/README.md @@ -0,0 +1,5 @@ +# European Union Publications Office - Cellar + +The data is pulled in two steps: +1. `catalog.py` - Pull a list of all publictaions from `start_date` to `end_date` +2. `ingest.py` - Download the individual publications and process them. \ No newline at end of file diff --git a/data-processing/scripts/cellar/catalog.py b/data-processing/scripts/cellar/catalog.py new file mode 100644 index 00000000..aa58f5c8 --- /dev/null +++ b/data-processing/scripts/cellar/catalog.py @@ -0,0 +1,134 @@ +""" +Pull a list of publications from the European Union Publications Office. +""" +import collections +import datetime +import json +import logging +import re +import sys +from pathlib import Path +from string import Template +from typing import Any + +import requests +from typer import Typer + +APP = Typer(name="Cellar CLI") + + +def parse_response(content: bytes): + """ + Parses the JSON response from the European Union Publications Office API. + + Args: + content: The raw JSON response from the API. + + Yields: + dict: A dictionary containing information about each publication. + """ + for row in json.loads(content)["results"]["bindings"]: + # Split the langs, types, and titles by semicolons + data = {k: v["value"] for k, v in row.items()} + langs = data["langs"].split(";") + types = data["types"] + titles = data["titles"] + + pattern = "^" + ";".join([f"{lang}:(?P<{lang}>.*)" for lang in langs]) + "$" + titles = re.match(pattern, titles, re.DOTALL).groupdict() + titles = {k: v.strip() for k, v in titles.items()} + + types = collections.defaultdict(list) + for spec in data["types"].split(";"): + lang, mtype = spec.split(":") + types[lang].append(mtype) + + subjects = [ + subj for subject in data["subjects"].split(";") if (subj := subject.strip()) + ] + + for lang in langs: + ret = { + "language": lang, + "title": titles[lang], + "type": types[lang], + "subjects": subjects, + "work": data["work"], + "timestamp": data["timestamp"], + } + yield ret + + +def sparql_query( + start: str, + stop: str, + query: Template, +) -> bytes | Any | None: + """ + Execute a SPARQL query on the European Union Publications Office API. + + Args: + start: The starting date for the query. + stop: The ending date for the query. + query: A template string containing the SPARQL query. + + Returns: + bytes | Any | None: The response content if the query is successful, otherwise None. + """ + logging.info(f"querying {start}-{stop}") + params = {"query": query.substitute(start=start, stop=stop)} + headers = {"accept": "application/json"} + resp = requests.get( + "http://publications.europa.eu/webapi/rdf/sparql", + params=params, + headers=headers, + ) + if resp.status_code == 200: + return resp.content + + logging.warning(f"Query failed {resp.status_code}") + return None + + +@APP.command( + name="fetch_cellar", + help="Fetch publications from the European Union Publications Office.", +) +def main( + output_file: Path, + query_template: Path, + start_date: datetime.datetime = "2016-07-21", + end_date: datetime.datetime = "2024-12-31", + delta: int = 5, +): + """ + Fetch publications from the European Union Publications Office API and write them to an output file. + + Args: + output_file: The path to the output file where the publications will be written. + query_template: The path to a template file containing the SPARQL query. + start_date (optional): The starting date for the query. Defaults to "2016-07-21". + end_date (optional): The ending date for the query. Defaults to "2024-12-31". + delta (optional): The number of days between each query. Defaults to 5. + """ + delta = datetime.timedelta(delta) + with query_template.open() as handle: + query = Template(handle.read()) + + lb = start_date + + logging.basicConfig(level=logging.INFO) + with output_file.open("w") as f: + while lb < end_date: + ub = min(lb + delta, end_date) + result = sparql_query(lb.isoformat(), ub.isoformat(), query) + if not result: + lb = ub + continue + for doc in parse_response(result): + f.write(json.dumps(doc, ensure_ascii=False) + "\n") + lb = ub + + +if __name__ == "__main__": + APP() diff --git a/data-processing/scripts/cellar/ingest.py b/data-processing/scripts/cellar/ingest.py new file mode 100644 index 00000000..110ca1ec --- /dev/null +++ b/data-processing/scripts/cellar/ingest.py @@ -0,0 +1,201 @@ +""" +Convert from various file formats (pdf, docx, etc.). + +To this format: +{ + "id": "...", # MANDATORY: source-specific identifier + "text": "foo", # MANDATORY: textual content of the document + "source": "...", # MANDATORY: source of the data, such as peS2o, common-crawl, etc. + "added": "...", # OPTIONAL: timestamp we acquired this data (time file was created), specified as YYYY-MM-DDTHH:MM:SS.TIMEZONE e.g 2021-04-13T12:52:46.000Z + "created": "..." # OPTIONAL: timestamp when orig document was created (best-guess if not available), should be specified as a range; "YYYY-MM-DDTHH:MM:SS.TIMEZONE, YYYY-MM-DDTHH:MM:SS.TIMEZONE" + "metadata": { # OPTIONAL: source-specific metadata + "sub-source": "...", # OPTIONAL: E.g. "newspaper_ocr" + ... + } +} +""" + +import gzip +import io +import itertools +import json +import time +import zipfile +from datetime import date, datetime +from pathlib import Path +from typing import TextIO + +import requests +from dfm_data.document_processing.processors import process_file +from dfm_data.document_processing.utils import parallel_process_with_retries +from loguru import logger +from requests.exceptions import ConnectionError, ReadTimeout +from typer import Typer + +APP = Typer(name="Cellar CLI") + + +# Match available formats to the preference order +def ordered_match(options: list[str]) -> list[str]: + # Preferred file types order + preference = [ + "txt", + "html", + "xhtml", + "epub", + "docx", + "doc", + "pdf", + "pdf1x", + "pdfa1a", + "pdfa1b", + "pdfa2a", + "pdfx", + "pdfx4", + "fmx4", + "xml", + "RDF/XML", + "mobi", + ] + opts = set(options) + return [p for p in preference if p in opts] + + +def process_documents(lang: str, docs: list[dict], path: Path): + """ + Process a list of documents and save the results in a compressed file. + + Args: + lang: The language to fetch the document for. + docs: A list of dictionaries representing the documents to process. Each dictionary must contain at least a "work" key with the URI of the document, and a "type" key with a list of available formats. + path: The path where the compressed file will be saved. + """ + logger.info(f"{path} - {len(docs)}") + with gzip.open( + path, + "wt", + encoding="utf-8", + ) as gzfile, requests.Session() as session: + for doc in docs: + uri = doc["work"] + types = doc["type"] + name = uri.split("/")[-1] + logger.info(f"Ingesting {uri}") + + # Fetch document content in the preferred format for each language + try: + if not fetch_and_process_doc(lang, gzfile, session, uri, types, name): + logger.warning(f"No valid format for {uri} {lang}") + except ConnectionError: + logger.error(f"Connection error for {uri} - {lang}") + except ReadTimeout: + logger.error(f"Read timeour for {uri} - {lang}") + + +def fetch_and_process_doc( + lang: str, + gzfile: TextIO, + session: requests.Session, + uri: str, + types: list[str], + name: str, +) -> bool: + """ + Fetch and process a document in the preferred format. + + Args: + lang: The language to fetch the document for. + gzfile: A text file where the processed content will be written. + session: An instance of `requests.Session` that is used to make HTTP requests. + uri: The URI of the document. + types: A list of available formats for the document. + name: The name of the document. + + Returns: + True if the document was successfully fetched and processed, False otherwise. + """ + for mtype in ordered_match(types): + header = { + "accept": f"application/zip;mtype={mtype}", + "accept-language": lang.lower(), + } + time.sleep(1.0) + response = session.get(uri, headers=header) + if response.status_code == 200: + with zipfile.ZipFile(io.BytesIO(response.content)) as z: + process_zip_content(gzfile, f"Cellar-{lang}-{name}", z) + return True + return False + + +def process_zip_content(gzfile: TextIO, name: str, z: zipfile.ZipFile): + """ + Process the contents of a ZIP file and write the processed text to a gzip file. + + Args: + gzfile: A text file where the processed content will be written. + name: The name of the document being processed. + z: An instance of `zipfile.ZipFile` containing the contents of the ZIP file. + """ + for zip_info in z.infolist(): + with z.open(zip_info) as file: + text = process_file(file, name) + if text: + gzfile.write(text + "\n") + + +# Group documents by date +def get_groups(infile: Path) -> list[tuple[tuple[str, date], itertools.groupby]]: + """ + Generate groups of documents based on language and date. + + Args: + infile: The path to the input file containing document data in JSON format. + + Yields: + A tuple containing a key (language, date) and a group of documents that match the key. + """ + + def get_group(doc: dict) -> tuple[str, date]: + return (doc["language"], datetime.fromisoformat(doc["timestamp"]).date()) + + with infile.open() as handle: + docs = (json.loads(line) for line in handle) + docs = sorted(docs, key=get_group) + grouped = itertools.groupby(docs, get_group) + return [(key, list(group)) for key, group in grouped] + + +@APP.command( + name="process_cellar", + help="Process documents and save the results in a compressed file.", +) +def main(infile: Path, outdir: Path, workers: int = 2): + """ + Process documents from the given input file and save the results in compressed JSONL files. + + Args: + infile: The path to the input file containing document metadata. + outdir: The directory where the compressed JSONL files will be saved. + workers: Number of parallel workers. Defaults to 2. + """ + + def process_group(group: tuple[tuple[str, date], itertools.groupby]): + (lang, date), docs = group + path = outdir / f"{lang}-{date.year}-{date.month}-{date.day}.jsonl.gz" + + if path.exists(): + logger.warning(f"{path} already exists! Skipping") + return + + path.parent.mkdir(parents=True, exist_ok=True) + + process_documents(lang, list(docs), path) + + groups = get_groups(infile) + parallel_process_with_retries(process_group, groups, retries=10, n_workers=workers) + + +# Main script execution +if __name__ == "__main__": + APP() diff --git a/data-processing/scripts/cellar/query.template b/data-processing/scripts/cellar/query.template new file mode 100644 index 00000000..283a7397 --- /dev/null +++ b/data-processing/scripts/cellar/query.template @@ -0,0 +1,41 @@ +PREFIX cdm: +PREFIX skos: +PREFIX cmr: +PREFIX dc: +PREFIX xsd: +SELECT +?work +?timestamp +(group_concat(distinct ?lang; separator=";") as ?langs) +(group_concat(distinct ?lang_type; separator=";") as ?types) +(group_concat(distinct ?lang_title; separator=";") as ?titles) +(group_concat(distinct ?subject; separator=";") as ?subjects) +WHERE +{ + VALUES ?lang {"ENG" "SWE" "DAN" "NOR" "NOB" "NNO" "ISL" "FAO"} + + ?work + cmr:creationDate ?timestamp . + + FILTER (?timestamp < "${stop}Z"^^xsd:dateTime) + FILTER (?timestamp >= "${start}Z"^^xsd:dateTime) + FILTER NOT EXISTS {?work cdm:work_part_of_work ?parent} + + OPTIONAL { ?work cdm:work_is_about_concept_eurovoc/skos:prefLabel ?subject . FILTER (lang(?subject)="en") } + + ?exp + cdm:expression_belongs_to_work ?work ; + cdm:expression_uses_language/dc:identifier ?lang ; + cdm:expression_title ?title . + + ?manif + cdm:manifestation_manifests_expression ?exp ; + cdm:manifestation_type ?mtype . + + FILTER (REGEX(?mtype, "(txt)|(doc)|(pdf)|(epub)|(fmx4)|(html)|(xml)", "i")) + + BIND (CONCAT(?lang, ":", ?title) AS ?lang_title) + BIND (CONCAT(?lang, ":", ?mtype) AS ?lang_type) +} +GROUP BY ?work ?timestamp +ORDER BY ?timestamp \ No newline at end of file diff --git a/data-processing/scripts/dsk/README.md b/data-processing/scripts/dsk/README.md new file mode 100644 index 00000000..7f556b68 --- /dev/null +++ b/data-processing/scripts/dsk/README.md @@ -0,0 +1,10 @@ +# Data-processing Dansk Sprogmodel Konsortium (DSK) + +This directory contains scripts for processing the DSK data. Mainly converting from various formats (e.g. pdf and docx) to jsonl. + +Run the conversion on an entire repo: + +```bash +$ python convert_dsk_to_jsonl.py $PATH_TO_DSK_DIR $OUTPUT_PATH $DSK_CLIENT +``` + diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py new file mode 100644 index 00000000..b5fa8041 --- /dev/null +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -0,0 +1,131 @@ +""" +Convert from various file formats (pdf, docx, etc.). + +To this format: +{ + "id": "...", # MANDATORY: source-specific identifier + "text": "foo", # MANDATORY: textual content of the document + "source": "...", # MANDATORY: source of the data, such as peS2o, common-crawl, etc. + "added": "...", # OPTIONAL: timestamp we acquired this data (time file was created), specified as YYYY-MM-DDTHH:MM:SS.TIMEZONE e.g 2021-04-13T12:52:46.000Z + "created": "..." # OPTIONAL: timestamp when orig document was created (best-guess if not available), should be specified as a range; "YYYY-MM-DDTHH:MM:SS.TIMEZONE, YYYY-MM-DDTHH:MM:SS.TIMEZONE" + "metadata": { # OPTIONAL: source-specific metadata + "sub-source": "...", # OPTIONAL: E.g. "newspaper_ocr" + ... + } +} +""" + +import logging +import subprocess +from pathlib import Path +from typing import TYPE_CHECKING + +import typer +from dfm_data.document_processing.processors import process_files +from typer import Typer + +if TYPE_CHECKING: + import pandas as pd + +APP = Typer(name="2JSONL Converter") + + +@APP.command( + name="process_directory", + help="Crawl a directory and process files of different types", +) +def crawl_directory( + top_level_path: Path, + output_path: Path, + dsk_client: str, + output_suffix: str = ".jsonl.gz", + n_workers: int = 4, + key_paths: str = "text", + text_format: str = "txt", +): + """Process a set of data delivered from a DSK organisation. + + Args: + top_level_path: Path to a directory with the data delivered by the DSK organization + output_path: Path to place the processed data + dsk_client: What DSK organizations pages have been crawled + output_suffix: What suffix to use. Defaults to ".jsonl.gz". + n_workers: How many process to run in parallel. Defaults to 4. + key_paths: If JSON data, what is the path to the text (Can be nested keys represented as a comma separated list). + text_format: What format is the text, html or plain text. + """ + files = list(top_level_path.glob("**/*.*")) + + files = list(filter(lambda x: x.is_file(), files)) + + if len(files) == 0: + logging.error("Something went wrong. No files to process") + raise typer.Exit(code=1) + + process_files( + files, + output_path, + dsk_client, + output_suffix, + n_workers, + text_path=key_paths, + text_format=text_format, + ) + + +@APP.command( + name="process_web_crawl", + help="Process output from a web crawl", +) +def process_web_crawl( + path_to_crawl_log: Path, + output_path: Path, + data_path: Path, + dsk_client: str, + output_suffix: str = ".jsonl.gz", + n_workers: int = 4, +): + """Process a set of crawled data from a DSK organisation. + + Args: + path_to_crawl_log: Path to a log file from the crawl + output_path: Path to place the processed data + data_path: Path where the crawled data is located + dsk_client: What DSK organizations pages have been crawled + output_suffix: What suffix to use. Defaults to ".jsonl.gz". + n_workers: How many process to run in parallel. Defaults to 4. + """ + # Define the command as a list of strings + command = ["grep", "^--", path_to_crawl_log] + failed = False + # Run the command and capture the output + try: + result = subprocess.run(command, text=True, capture_output=True, check=True) + # Filter the third column using Python (equivalent to `awk '{print $3}'`) + main_folders = { + line.split()[2].split("/")[2] for line in result.stdout.splitlines() + } + except subprocess.CalledProcessError as e: + logging.error(f"Command failed with error: {e}") + failed = True + + if failed: + raise typer.Exit(code=1) + + files: list[Path] = [] + for main_folder in main_folders: + if not (data_path / main_folder).exists(): + continue + files.extend(list((data_path / main_folder).glob("**/*.*"))) + + files = list(filter(lambda x: x.is_file(), files)) + + if len(files) == 0: + logging.error("Something went wrong. No files to process") + raise typer.Exit(code=1) + + process_files(files, output_path, dsk_client, output_suffix, n_workers) + + +if __name__ == "__main__": + APP() diff --git a/data-processing/scripts/miljoeportal/README.md b/data-processing/scripts/miljoeportal/README.md new file mode 100644 index 00000000..c9fd9a25 --- /dev/null +++ b/data-processing/scripts/miljoeportal/README.md @@ -0,0 +1,7 @@ +# Danmarks Miljøportal + +To pull data run: + +```bash +$ uv run data-processing/scripts/miljoeportal/download.py $PATH_TO_SAVE_RAW_DATA $PATH_TO_SAVE_EXTRACTED_DATA4 +``` \ No newline at end of file diff --git a/data-processing/scripts/miljoeportal/download.py b/data-processing/scripts/miljoeportal/download.py new file mode 100644 index 00000000..6a173e0c --- /dev/null +++ b/data-processing/scripts/miljoeportal/download.py @@ -0,0 +1,100 @@ +"""Script to fetch a list of all documents available in the Milloeportal""" + +import gzip +import json +from io import BytesIO +from pathlib import Path +from zipfile import ZipFile + +import requests +import tqdm +from dfm_data.document_processing.processors import process_file +from loguru import logger +from requests import Response +from typer import Exit, Typer + +APP = Typer(name="Miljoeportalen") + + +def catalog(path: Path) -> bool: + """Pull a list of all assessments. + + Args: + path: Path to save results. + + Returns: + bool: If the pull was succesful or not. + """ + url = "https://eahub.miljoeportal.dk/api/assessments/search" + + headers: dict[str, str] = {"Content-Type": "application/json"} + + body: dict[str, str] = {"assessmentType": "All"} + + res: Response = requests.post(url, json=body, headers=headers) + + if res.status_code == 200: + assessments = res.json() + logger.info(f"Fetched {len(assessments)} from Miljøportalen.") + with (path / "result.json").open("w") as output: + json.dump( + [doc["id"] for doc in res.json()], + output, + indent=4, + ensure_ascii=False, + ) + return True + + logger.error(f"Searching content failed: {res.status_code} - {res.text}") + return False + + +def download_files(file_path: Path, id_: str) -> list[str]: + """Download and extract files + + Args: + file_path (Path): Path to where to save raw files + id_ (str): Assess + + Returns: + list[str]: _description_ + """ + url = "https://eahub.miljoeportal.dk/api/assessments/{assessment}/download" + res: Response = requests.get(url.format(assessment=id_)) + body: str | json.Any = res.content.decode("utf-8").lstrip('"').rstrip('"') + response: Response = requests.get(body) + texts = [] + with ZipFile(BytesIO(response.content)) as z: + for zip_info in z.infolist(): + with z.open(zip_info) as file: + raw_file = file_path / zip_info.filename + if raw_file.exists(): + continue + text: str | None = process_file(file, "Miljoeportalen") + if text: + file.seek(0) + raw_file.open("wb").write(file.read()) + texts.append(text) + + return texts + + +@APP.command() +def download(raw_data: Path, cleaned: Path, workers: int = 2): + raw_data.mkdir(parents=True, exist_ok=True) + cleaned.mkdir(parents=True, exist_ok=True) + + result: bool = catalog(raw_data) + if not result: + raise Exit(2) + + results = json.load((raw_data / "result.json").open()) + with gzip.open((cleaned / "miljoeportal.jsonl.gz"), mode="wb") as gzfile: + # TODO: parallelize this... + for result in tqdm.tqdm(results): + texts: list[str] = download_files(raw_data, result) + [gzfile.write(f"{line}\n".encode()) for line in texts] + + +if __name__ == "__main__": + APP() diff --git a/data-processing/src/dfm_data/document_processing/__init__.py b/data-processing/src/dfm_data/document_processing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/data-processing/src/dfm_data/document_processing/processors.py b/data-processing/src/dfm_data/document_processing/processors.py new file mode 100644 index 00000000..891ea005 --- /dev/null +++ b/data-processing/src/dfm_data/document_processing/processors.py @@ -0,0 +1,376 @@ +"""This module contains processing methods for extracting text from various documents.""" + +import gzip +import io +import json +import re +from dataclasses import asdict +from pathlib import Path +from typing import IO, TYPE_CHECKING, Any, Callable + +from docling.datamodel.base_models import DocumentStream +from docling.datamodel.document import TableItem, TextItem +from extract_msg import openMsg +from joblib import Parallel, delayed +from loguru import logger +from pypandoc import convert_file, convert_text +from tqdm import tqdm +from trafilatura import extract as extract_html_text + +from .utils import ( + build_document_converter, + build_metadata, + create_JSONL, + find_near_duplicates, + generate_decode_url, + make_unique, + remove_newlines, +) + +if TYPE_CHECKING: + import pandas as pd + +SCRIPT_TAG = "" + + +def process_json(file_path: Path, source: str, **kwargs: dict[str, Any]) -> list[str]: + """ + Extracts and processes text from a JSON file based on a given path of keys and formatter. + + Args: + file_path (Path): Path to the JSON file. + source (str): Source description for logging purposes. + **kwargs: Additional arguments: + - text_path (str): Comma-separated keys to traverse the JSON structure. + - text_format (str): Format of the text ('txt' or 'html'). + - formatter (Callable): Custom function to format the extracted text. + + Returns: + list[str]: A list of formatted text strings extracted from the JSON file. + """ + + def extract_text(data: Any, keys: list[str]) -> list[str]: + """ + Recursively traverses the JSON structure to extract text at the given keys. + + Args: + data (Any): The current level of the JSON object. + keys (list[str]): List of keys to traverse. + + Returns: + list[str]: A list of extracted text strings. + """ + if not keys: + # If keys are exhausted, handle the data + if isinstance(data, str): + return [data] + if isinstance(data, list): + # Collect strings from list elements + texts = [] + for item in data: + texts.extend(extract_text(item, keys)) + return texts + if isinstance(data, dict): + return [json.dumps(data)] # Convert objects to string representation + + return [str(data)] # Fallback for other data types + + key = keys[0] + remaining_keys = keys[1:] + + if isinstance(data, dict) and key in data: + return extract_text(data[key], remaining_keys) + if isinstance(data, list): + # If data is a list, attempt to extract text from each item + texts = [] + for item in data: + texts.extend(extract_text(item, keys)) + return texts + + logger.warning(f"Key '{key}' not found in JSON document.") + return [] + + # Supported formatters (default options) + def default_formatter(text: str) -> str: + return text + + # Setup + key_path = kwargs.get("text_path", "text").split(",") + text_format = kwargs.get("text_format", "txt") + + # Choose the appropriate formatter + formatters = { + "txt": default_formatter, + "html": lambda text: extract_html_text(text + SCRIPT_TAG), + } + formatter: Callable[[str], str] = formatters.get( + text_format, + default_formatter, + ) + + if text_format not in formatters: + logger.warning( + f"Text format '{text_format}' is not supported. Defaulting to plain text.", + ) + + # Load and process the JSON file + try: + with file_path.open("r", encoding="utf-8") as f: + document = json.load(f) + + extracted_texts = extract_text(document, key_path) + extracted_texts = [formatter(text) for text in extracted_texts] + metadata = build_metadata(file_path) + formatted_texts = [ + json.dumps( + asdict(create_JSONL(text, source, metadata)), + ensure_ascii=False, + ) + for text in extracted_texts + ] + return formatted_texts + except Exception as e: + logger.error( + f"Error processing JSON file '{file_path}' from source '{source}': {e}", + ) + return None + + +def process_msg(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 + """Read a single MSG file and build a JSONL object. Uses Trafilatura for the extraction. + + Args: + file_path: Path to the HTML file + source: What is the data source (most likely DSK client) + **kwargs: Additional arguments + + Returns: + str: JSONL line with the file content + """ + + def replace_url(match: re.Match) -> str: + url = match.group(0) + decoded_url = generate_decode_url(url) + if decoded_url: + return decoded_url + return url + + text = openMsg(file_path).body + text = re.sub(r"(\n\s)+", "\n", text) + text = re.sub(r"\[.+?\]", "", text) + text = text.replace("\r", "") + text = re.sub(r"https?://[^>]+", replace_url, text) + metadata = build_metadata(file_path) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) + + +def process_html( + file_path: Path | IO[bytes], + source: str, + **kwargs: dict[str, Any], # noqa: ARG001 +) -> str: + """Read a single HTML file and build a JSONL object. Uses Trafilatura for the extraction. + + Args: + file_path: Path to the HTML file + source: What is the data source (most likely DSK client) + **kwargs: Additional arguments + + Returns: + str: JSONL line with the file content + """ + try: + file_content = ( + file_path.read_text() + if isinstance(file_path, Path) + else file_path.read().decode() + ) + except UnicodeDecodeError: + logger.error(f"Unable to read {file_path}") + return None + text = extract_html_text(file_content) + if not text: + return None + text = re.sub(r"(\n\s)+", "\n", text) + metadata = build_metadata(file_path) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) + + +def process_epub( + file_path: Path | IO[bytes], + source: str, + **kwargs: dict[str, Any], # noqa: ARG001 +) -> str: + """Read a single EPUB file and build a JSONL object. Uses Pypandoc for the extraction. + + Args: + file_path: Path to the EPUB file + source: What is the data source (most likely DSK client) + **kwargs: Additional arguments + + Returns: + str: JSONL line with the file content + """ + if isinstance(file_path, Path): + text = convert_file(file_path, to="plain", format="epub") + else: + try: + text = convert_text(file_path.read().decode(), to="plain", format="epub") + except UnicodeDecodeError: + logger.error(f"Unable to read {file_path}") + return None + text = re.sub(r"(\n\s)+", "\n", text) + metadata = build_metadata(file_path) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) + + +def process_txt( + file_path: Path | IO[bytes], + source: str, + **kwargs: dict[str, Any], # noqa: ARG001 +) -> str: + """Read a single TXT file and build a JSONL object + + Args: + file_path: Path to the TXT file + source: What is the data source (most likely DSK client) + **kwargs: Additional arguments + + Returns: + str: JSONL line with the file content + """ + text = ( + file_path.read_text() + if isinstance(file_path, Path) + else file_path.read().decode() + ) + text = re.sub(r"(\n\s)+", "\n", text) + metadata = build_metadata(file_path) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) + + +def process_document( + file: Path | IO[bytes], + source: str, + **kwargs: dict[str, Any], +) -> str | None: + """ + Process a single file, converting it into a JSONL string. + + Args: + file: Path to the input file. + source: Source identifier for the data. + **kwargs: Extra arguments + + Returns: + str | None: JSONL string if the file is processed successfully, else None. + """ + doc_converter = kwargs.get("converter", build_document_converter()) + try: + input_ = ( + file + if isinstance(file, Path) + else DocumentStream(name=file.name, stream=io.BytesIO(file.read())) + ) + result = doc_converter.convert(input_) + + metadata = build_metadata(result.input) + + file_content = "" + # Iterate the elements in reading order, including hierarchy level + for item, _ in result.document.iterate_items(): + if isinstance(item, TextItem): + if item.text.strip(): + file_content += item.text + elif isinstance(item, TableItem): + table_df: pd.DataFrame = item.export_to_dataframe() + dups = find_near_duplicates(table_df, 0.8) + column_counts = {} + table_df.columns = [ + make_unique(col, column_counts) for col in table_df.columns + ] + columns_to_keep = table_df.columns.copy(deep=True) + for pair in dups: + if pair[1] in columns_to_keep: + columns_to_keep = columns_to_keep.drop(pair[1]) + + df_cleaned = table_df[list(columns_to_keep)] + df_cleaned = remove_newlines(df_cleaned) + file_content += df_cleaned.to_markdown(index=False, tablefmt="github") + file_content += "\n\n" + + # Create and return JSONL entry + return json.dumps( + asdict(create_JSONL(file_content, source, metadata)), + ensure_ascii=False, + ) + except Exception as e: + logger.error(f"Failed to process file {file}: {e}") + return None + + +def process_file( + file: Path | IO[bytes], source: str, **kwargs: dict +) -> str | list[str] | None: + """Generic method for processing a file. Will find the file type and use the right processing method. + + Args: + file: Path to the file to process + source: What DSK client have delivered the file + **kwargs: Extra arguments + + Returns: + str | None: Returns a JSONL line if the file type is supported, else None. + """ + suffix = file.suffix if isinstance(file, Path) else "." + file.name.split(".")[-1] + method = { + ".pdf": process_document, + ".html": process_html, + ".docx": process_document, + ".epub": process_epub, + ".txt": process_txt, + ".pptx": process_document, + ".md": process_document, + ".msg": process_msg, + ".json": process_json, + }.get(suffix) + + if not method: + logger.warning(f"Unsupported file type: {suffix} - for file: {file!s}") + return None + + return method(file, source, **kwargs) + + +def process_files( + files: list[Path], + output_path: Path, + dsk_client: str, + output_suffix: str = ".jsonl.gz", + n_workers: int = 4, + **kwargs: dict, +): + save_file = output_path + if "".join(output_path.suffixes) != ".jsonl.gz": + save_file = output_path / (dsk_client + output_suffix) + + converter = build_document_converter() + parallel = Parallel(n_jobs=n_workers, return_as="generator_unordered") + save_file.parent.mkdir(parents=True, exist_ok=True) + with gzip.open(save_file, mode="wb") as out_file: + # with (output_path / output_name).open("w+") as out_file: + for doc in parallel( + delayed(process_file)( + file, + dsk_client, + converter=converter, + **kwargs, + ) + for file in tqdm(files) + ): + if doc is None: + continue + if isinstance(doc, str): + out_file.write(f"{doc}\n".encode()) + if isinstance(doc, list): + [out_file.write(f"{d}\n".encode()) for d in doc] diff --git a/data-processing/src/dfm_data/document_processing/utils.py b/data-processing/src/dfm_data/document_processing/utils.py new file mode 100644 index 00000000..cd5ab1e9 --- /dev/null +++ b/data-processing/src/dfm_data/document_processing/utils.py @@ -0,0 +1,315 @@ +"""This module contains utilities for extracting text from documents.""" + +import time +import urllib.parse +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import IO, Any, Callable, Union + +import pandas as pd +from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend +from docling.datamodel.base_models import InputFormat +from docling.datamodel.document import InputDocument +from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode +from docling.document_converter import ( + DocumentConverter, + PdfFormatOption, + WordFormatOption, +) +from docling.pipeline.simple_pipeline import SimplePipeline +from joblib import Parallel, delayed +from joblib.externals.loky.process_executor import TerminatedWorkerError +from loguru import logger + +pd.options.mode.chained_assignment = None + + +def build_document_converter() -> DocumentConverter: + """Create a Docling `DocumentConverter` instance. Used to convert PDF, DOCX and, PPTX. + + Returns: + DocumentConverter: THe `DocumentConverter` instance + """ + # previous `PipelineOptions` is now `PdfPipelineOptions` + pipeline_options = PdfPipelineOptions() + pipeline_options.do_ocr = False + pipeline_options.do_table_structure = True + pipeline_options.table_structure_options.do_cell_matching = False + pipeline_options.table_structure_options.mode = TableFormerMode.FAST + # ... + + ## Custom options are now defined per format. + doc_converter = DocumentConverter( # all of the below is optional, has internal defaults. + allowed_formats=[ + InputFormat.PDF, + InputFormat.MD, + InputFormat.DOCX, + InputFormat.HTML, + InputFormat.PPTX, + InputFormat.ASCIIDOC, + ], # whitelist formats, non-matching files are ignored. + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_options=pipeline_options, # pipeline options go here. + backend=PyPdfiumDocumentBackend, # optional: pick an alternative backend + ), + InputFormat.DOCX: WordFormatOption( + pipeline_cls=SimplePipeline, # default for office formats and HTML + ), + }, + ) + return doc_converter + + +@dataclass +class JSONL: + id: str # noqa: A003 + text: str + source: str + added: str + created: str + metadata: dict[str, Any] + + +def create_JSONL(text: str, source: str, metadata: dict[str, Any]) -> JSONL: + """Helper method to create a JSONL dataclass instance. + + Args: + text: The text that should be part of the object + source: The source of the text + metadata: Metadata surrounding the text (e.g. filename, filetype, etc.) + + Returns: + JSONL: The JSONL dataclass instance. + """ + id_ = f"{source}-{metadata.get('filename', '')}".replace(" ", "-") + jsonl = JSONL( + id=id_, + text=text, + source=source, + added=str(datetime.now()), + created=f"{datetime(2000, 1, 1)!s}, {datetime.now()!s}", + metadata=metadata, + ) + return jsonl + + +def build_metadata(document: Union[InputDocument, Path, IO[bytes]]) -> dict: + """Helper function to build metadata from an input file. + + Args: + document: The document/file to build metadata from. + + Returns: + dict: A dictionary containing the metadata. + """ + if isinstance(document, InputDocument): + file_path = document.file + filename = document.file.name + filetype = document.format.name + filesize = document.filesize + page_count = document.page_count + elif isinstance(document, Path): + file_path = document + filename = document.name + filetype = "".join(document.suffixes) + filesize = document.stat().st_size + page_count = 0 + else: + file_path = document.name + filename = document.name + filetype = document.name.split(".")[-1] + document.seek(0) + filesize = len(document.read()) + page_count = 0 + + metadata = { + "filename": filename, + "filetype": filetype, + "filesize": filesize, + "page_count": page_count, + "file_path": str(file_path), + } + + return metadata + + +def find_near_duplicates( + df: pd.DataFrame, + threshold: float = 0.9, +) -> list[tuple]: + """ + Finds pairs of columns in a DataFrame that are near-duplicates, + based on a specified threshold of identical values. + + Args: + df: The DataFrame to check. + threshold: The minimum proportion of identical values to consider a pair of columns as near-duplicates. + + Returns: + A list of tuples, where each tuple contains the names of two near-duplicate columns. + """ + + near_duplicates = [] + for i in range(len(df.columns)): + for j in range(i + 1, len(df.columns)): + if (df.iloc[:, i] == df.iloc[:, j]).mean() >= threshold: + near_duplicates.append((df.columns[i], df.columns[j])) + return near_duplicates + + +def find_text_keys(data: dict, prefix: str = ""): + """Recursively finds all keys named 'text' in a JSON object and yields their full paths. + + Args: + data: The JSON object to search. + prefix: The prefix to be added to the path. + + Yields: + The full paths to the 'text' keys. + """ + + if isinstance(data, dict): + for key, value in data.items(): + if key == "text": + yield prefix + key + else: + yield from find_text_keys(value, prefix + key + ".") + elif isinstance(data, list): + for i, item in enumerate(data): + yield from find_text_keys(item, prefix + str(i) + ".") + + +def extract_text_values(data: dict, paths: list[str]) -> list[str]: + """Extracts text values from a JSON object using given paths. + + Args: + data: The JSON object. + paths: A list of paths to the 'text' keys. + + Returns: + A list of extracted text values. + """ + + text_values = [] + for path in paths: + value = data + for key in path.split("."): + try: + k = int(key) + except ValueError: + k = key + value = value[k] + + if value not in text_values and value != "": + text_values.append(value) + + return text_values + + +def remove_newlines(df: pd.DataFrame) -> pd.DataFrame: + """Removes newline characters from all string columns in a DataFrame. + + Args: + df: The DataFrame to process. + + Returns: + The DataFrame with newlines removed from string columns. + """ + + # Identify string columns + string_cols = df.select_dtypes(include=["object", "string"]).columns + # Remove newlines from string columns + for col in string_cols: + df[col] = df[col].astype(str).str.replace("\n", "", regex=False) + + return df + + +# Function to rename columns with unique suffixes +def make_unique(column_name: str, column_counts: dict[str, int]) -> str: + """Method to rename duplicate column name + + Args: + column_name (str): Name of the column + column_counts (dict[str, int]): Number of times we have seen this column name + + Returns: + str: The new column name + """ + if column_name in column_counts: + # Increment count and append the new suffix + column_counts[column_name] += 1 + return f"{column_name}_{column_counts[column_name]}" + # Initialize count for the first occurrence + column_counts[column_name] = 0 + return column_name + + +def generate_decode_url(link: str) -> Union[str, None]: + """Decode a SafeURL link, to extract original url. + + Args: + link: The SafeURL to decode + + Returns: + Union[str, None]: Returns the decoded url if possible else None + """ + try: + if "?" in link: + url_parts = link.split("?", 1)[1] + else: + return None + + if "&" in url_parts: + params = url_parts.split("&") + else: + return None + + for param in params: + name, value = param.split("=") + if name == "url": + target_url = urllib.parse.unquote(value) + return target_url + return None + except ValueError: + return None + + +def parallel_process_with_retries( + task_function: Callable, + data: list, + retries: int = 3, + n_workers: int = 2, +) -> Any: + """ + Runs joblib Parallel processing with a retry mechanism. + + Args: + task_function: Function to run in parallel + data: The input data to process. + retries: Number of retries if TerminatedWorkerError occurs. + n_workers: Number of parallel processes. + + Returns: + List of results from the parallel computation. + """ + attempt = 0 + while attempt <= retries: + try: + logger.info(f"Attempt {attempt + 1}") + results = Parallel(n_jobs=n_workers)( + delayed(task_function)(x) for x in data + ) + return results # If successful, return results + except TerminatedWorkerError as e: + attempt += 1 + logger.error( + f"Error occurred: {e}. Retrying {retries - attempt + 1} more times.", + ) + time.sleep(1) # Optional: Delay between retries + except Exception as e: + logger.error(f"Unexpected error: {e}") + break + raise RuntimeError(f"Failed after {retries} retries due to TerminatedWorkerError.")