From 34afcd0ef7dc1a7f8bc2cee5989bb760bb1492ef Mon Sep 17 00:00:00 2001 From: kris927b Date: Fri, 25 Oct 2024 11:32:01 +0200 Subject: [PATCH 01/19] Updating dolma dependency as version on pypi is now up to date --- data-processing/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing/pyproject.toml b/data-processing/pyproject.toml index fc6190a1..f3bb6ada 100644 --- a/data-processing/pyproject.toml +++ b/data-processing/pyproject.toml @@ -22,7 +22,7 @@ dependencies = [ "requests>=2.31.0", "polars>0.19.1", # Data cleaning dependencies: - "dolma[pii,code]@git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released + "dolma[pii,code]", # @git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released "kenlm>=0.2.0", # Used for perplexity tagging "blingfire>=0.1.8", # Used for perplexity tagging "mosaicml-streaming", From 5f4dbda889b6529daae620a2b350b4f263878a86 Mon Sep 17 00:00:00 2001 From: kris927b Date: Fri, 25 Oct 2024 11:32:17 +0200 Subject: [PATCH 02/19] Adding processing script for DSK data --- data-processing/pyproject.toml | 6 ++ data-processing/scripts/dsk/README.md | 3 + .../scripts/dsk/convert_dsk_to_jsonl.py | 102 ++++++++++++++++++ 3 files changed, 111 insertions(+) create mode 100644 data-processing/scripts/dsk/README.md create mode 100644 data-processing/scripts/dsk/convert_dsk_to_jsonl.py diff --git a/data-processing/pyproject.toml b/data-processing/pyproject.toml index f3bb6ada..53baeffe 100644 --- a/data-processing/pyproject.toml +++ b/data-processing/pyproject.toml @@ -32,6 +32,12 @@ dependencies = [ "nlp_dedup", "datatrove>=0.2.0", "pyyaml", + "joblib", + "pdfminer.six", + "pypandoc-binary", + "trafilatura", + "typer", + "loguru" ] [project.optional-dependencies] diff --git a/data-processing/scripts/dsk/README.md b/data-processing/scripts/dsk/README.md new file mode 100644 index 00000000..ef26462c --- /dev/null +++ b/data-processing/scripts/dsk/README.md @@ -0,0 +1,3 @@ +# Data-processing Dansk Sprogmodel Konsortium (DSK) + +This directory contains scripts for processing the DSK data. Mainly converting from various formats (e.g. pdf and docx) to jsonl. \ No newline at end of file diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py new file mode 100644 index 00000000..850cce18 --- /dev/null +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -0,0 +1,102 @@ +import json +import re +from pathlib import Path +from typing import Any + +from joblib import Parallel, delayed +from loguru import logger +from pdfminer.high_level import extract_text as extract_pdf_text +from pypandoc import convert_file +from tqdm import tqdm +from trafilatura import extract as extract_html_text +from typer import Typer + +APP = Typer(name="2JSONL Converter") + + +# @APP.command(name="PDF2JSONL", no_args_is_help=True, help="Convert PDF file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def process_pdf(file_path: Path, **kwargs: dict[str, Any]) -> str: + text = extract_pdf_text(file_path) + text = re.sub(r"(\n\s)+", "\n", text) + metadata = {**kwargs, "file_path": str(file_path)} + return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + + +# @APP.command(name="DOCX2JSONL", no_args_is_help=True, help="Convert DOCX file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def process_docx(file_path: Path, **kwargs: dict[str, Any]) -> str: + text = convert_file(file_path, to="plain", format="docx") + text = re.sub(r"(\n\s)+", "\n", text) + metadata = {**kwargs, "file_path": str(file_path)} + return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + + +# @APP.command(name="HTML2JSONL", no_args_is_help=True, help="Convert HTML file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def process_html(file_path: Path, **kwargs: dict[str, Any]) -> str: + text = extract_html_text(file_path.read_text()) + text = re.sub(r"(\n\s)+", "\n", text) + metadata = {**kwargs, "file_path": str(file_path)} + return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + + +# @APP.command(name="EPUB2JSONL", no_args_is_help=True, help="Convert EPUB file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def process_epub(file_path: Path, **kwargs: dict[str, Any]) -> str: + text = convert_file(file_path, to="plain", format="epub") + text = re.sub(r"(\n\s)+", "\n", text) + metadata = {**kwargs, "file_path": str(file_path)} + return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + + +def process_txt(file_path: Path, **kwargs: dict[str, Any]) -> str: + text = file_path.read_text() + text = re.sub(r"(\n\s)+", "\n", text) + metadata = {**kwargs, "file_path": str(file_path)} + return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + + +SUFFIX2METHOD = { + ".pdf": process_pdf, + ".html": process_html, + ".docx": process_docx, + ".epub": process_epub, + ".txt": process_txt, +} + + +@APP.command(name="process_file", help="Process a single file, returning a json string") +def process_file(file: Path) -> str | None: + if file.is_dir(): + return None + + suffix = file.suffix + name = file.name + method = SUFFIX2METHOD.get(suffix) + + if not method: + logger.warning(f"Unsupported file type: {suffix} - for file: {file!s}") + return None + + return method(file, file_type=suffix, filename=name) + + +@APP.command( + name="process_directory", + help="Crawl a directory and process files of different types", +) +def craw_directory( + top_level_path: Path, + output_path: Path, + output_name: str = "raw.jsonl", +): + files = top_level_path.glob("**/*") + + parallel = Parallel(n_jobs=2, return_as="generator_unordered") + + with (output_path / output_name).open("w+") as out_file: + for doc in parallel(delayed(process_file)(file) for file in tqdm(files)): + if doc is None: + continue + out_file.write(f"{doc}\n") + + +if __name__ == "__main__": + APP() From 9a31be0bfaf32e0a9cc617aa1c84e1ad3811402f Mon Sep 17 00:00:00 2001 From: kris927b Date: Fri, 25 Oct 2024 13:34:11 +0200 Subject: [PATCH 03/19] Adhering to the desired format --- .../scripts/dsk/convert_dsk_to_jsonl.py | 84 +++++++++++++++---- 1 file changed, 66 insertions(+), 18 deletions(-) diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index 850cce18..05f7fe35 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -1,5 +1,25 @@ +""" +Convert from various file formats (pdf, docx, etc.). + +To this format: +{ + "id": "...", # MANDATORY: source-specific identifier + "text": "foo", # MANDATORY: textual content of the document + "source": "...", # MANDATORY: source of the data, such as peS2o, common-crawl, etc. + "added": "...", # OPTIONAL: timestamp we acquired this data (time file was created), specified as YYYY-MM-DDTHH:MM:SS.TIMEZONE e.g 2021-04-13T12:52:46.000Z + "created": "..." # OPTIONAL: timestamp when orig document was created (best-guess if not available), should be specified as a range; "YYYY-MM-DDTHH:MM:SS.TIMEZONE, YYYY-MM-DDTHH:MM:SS.TIMEZONE" + "metadata": { # OPTIONAL: source-specific metadata + "sub-source": "...", # OPTIONAL: E.g. "newspaper_ocr" + ... + } +} +""" + import json import re +import gzip +from dataclasses import asdict, dataclass +from datetime import datetime from pathlib import Path from typing import Any @@ -11,46 +31,70 @@ from trafilatura import extract as extract_html_text from typer import Typer + +@dataclass +class JSONL: + id: str + text: str + source: str + added: str + created: str + metadata: dict[str, Any] + + APP = Typer(name="2JSONL Converter") +def create_JSONL(text: str, source: str, metadata: dict[str, Any]) -> JSONL: + id_ = f"{source}-{metadata.get("file_path", "")}".replace(" ", "-") + jsonl = JSONL( + id=id_, + text=text, + source=source, + added=str(datetime.now()), + created=f"{datetime(2000, 1, 1)!s}, {datetime.now()!s}", + metadata=metadata, + ) + return jsonl + + # @APP.command(name="PDF2JSONL", no_args_is_help=True, help="Convert PDF file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) -def process_pdf(file_path: Path, **kwargs: dict[str, Any]) -> str: +def process_pdf(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: text = extract_pdf_text(file_path) text = re.sub(r"(\n\s)+", "\n", text) metadata = {**kwargs, "file_path": str(file_path)} - return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) # @APP.command(name="DOCX2JSONL", no_args_is_help=True, help="Convert DOCX file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) -def process_docx(file_path: Path, **kwargs: dict[str, Any]) -> str: +def process_docx(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: text = convert_file(file_path, to="plain", format="docx") text = re.sub(r"(\n\s)+", "\n", text) metadata = {**kwargs, "file_path": str(file_path)} - return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) # @APP.command(name="HTML2JSONL", no_args_is_help=True, help="Convert HTML file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) -def process_html(file_path: Path, **kwargs: dict[str, Any]) -> str: +def process_html(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: text = extract_html_text(file_path.read_text()) text = re.sub(r"(\n\s)+", "\n", text) metadata = {**kwargs, "file_path": str(file_path)} - return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) # @APP.command(name="EPUB2JSONL", no_args_is_help=True, help="Convert EPUB file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) -def process_epub(file_path: Path, **kwargs: dict[str, Any]) -> str: +def process_epub(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: text = convert_file(file_path, to="plain", format="epub") text = re.sub(r"(\n\s)+", "\n", text) metadata = {**kwargs, "file_path": str(file_path)} - return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) -def process_txt(file_path: Path, **kwargs: dict[str, Any]) -> str: +def process_txt(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: text = file_path.read_text() text = re.sub(r"(\n\s)+", "\n", text) metadata = {**kwargs, "file_path": str(file_path)} - return json.dumps({"text": text, "metadata": metadata}, ensure_ascii=False) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) SUFFIX2METHOD = { @@ -63,7 +107,7 @@ def process_txt(file_path: Path, **kwargs: dict[str, Any]) -> str: @APP.command(name="process_file", help="Process a single file, returning a json string") -def process_file(file: Path) -> str | None: +def process_file(file: Path, dsk_client: str) -> str | None: if file.is_dir(): return None @@ -75,7 +119,7 @@ def process_file(file: Path) -> str | None: logger.warning(f"Unsupported file type: {suffix} - for file: {file!s}") return None - return method(file, file_type=suffix, filename=name) + return method(file, dsk_client, file_type=suffix, filename=name) @APP.command( @@ -85,17 +129,21 @@ def process_file(file: Path) -> str | None: def craw_directory( top_level_path: Path, output_path: Path, - output_name: str = "raw.jsonl", + dsk_client: str, + output_name: str = "1.jsonl.gz", + n_workers: int = 2, ): files = top_level_path.glob("**/*") - parallel = Parallel(n_jobs=2, return_as="generator_unordered") - - with (output_path / output_name).open("w+") as out_file: - for doc in parallel(delayed(process_file)(file) for file in tqdm(files)): + parallel = Parallel(n_jobs=n_workers, return_as="generator_unordered") + with gzip.open(output_path / output_name) as out_file: + # with (output_path / output_name).open("w+") as out_file: + for doc in parallel( + delayed(process_file)(file, dsk_client) for file in tqdm(files) + ): if doc is None: continue - out_file.write(f"{doc}\n") + out_file.write(f"{doc}\n".encode()) if __name__ == "__main__": From 1bc65520c21c13a921a3e04704025fb5b5aa7211 Mon Sep 17 00:00:00 2001 From: kris927b Date: Fri, 25 Oct 2024 13:45:56 +0200 Subject: [PATCH 04/19] Minor adjustments to file handling --- data-processing/scripts/dsk/convert_dsk_to_jsonl.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index 05f7fe35..84c26d29 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -15,9 +15,9 @@ } """ +import gzip import json import re -import gzip from dataclasses import asdict, dataclass from datetime import datetime from pathlib import Path @@ -136,7 +136,8 @@ def craw_directory( files = top_level_path.glob("**/*") parallel = Parallel(n_jobs=n_workers, return_as="generator_unordered") - with gzip.open(output_path / output_name) as out_file: + output_path.mkdir(parents=True, exist_ok=True) + with gzip.open(output_path / output_name, mode="wb") as out_file: # with (output_path / output_name).open("w+") as out_file: for doc in parallel( delayed(process_file)(file, dsk_client) for file in tqdm(files) From fd7fffe2e5c584c62bf037af5734417a325cda07 Mon Sep 17 00:00:00 2001 From: kris927b Date: Fri, 25 Oct 2024 14:20:55 +0200 Subject: [PATCH 05/19] Minor change to the entry id. Using filename rather than path --- data-processing/scripts/dsk/convert_dsk_to_jsonl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index 84c26d29..be96f3e0 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -46,7 +46,7 @@ class JSONL: def create_JSONL(text: str, source: str, metadata: dict[str, Any]) -> JSONL: - id_ = f"{source}-{metadata.get("file_path", "")}".replace(" ", "-") + id_ = f"{source}-{metadata.get("filename", "")}".replace(" ", "-") jsonl = JSONL( id=id_, text=text, From 23e95376f712a99ac42c2e8b53532c2a98d77f5d Mon Sep 17 00:00:00 2001 From: kris927b Date: Thu, 14 Nov 2024 14:41:55 +0100 Subject: [PATCH 06/19] Extracting util into separate file --- .../scripts/dsk/convert_dsk_to_jsonl.py | 25 +--- data-processing/scripts/dsk/utils.py | 121 ++++++++++++++++++ 2 files changed, 122 insertions(+), 24 deletions(-) create mode 100644 data-processing/scripts/dsk/utils.py diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index be96f3e0..bdd1f49e 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -30,34 +30,11 @@ from tqdm import tqdm from trafilatura import extract as extract_html_text from typer import Typer - - -@dataclass -class JSONL: - id: str - text: str - source: str - added: str - created: str - metadata: dict[str, Any] - +from utils import create_JSONL APP = Typer(name="2JSONL Converter") -def create_JSONL(text: str, source: str, metadata: dict[str, Any]) -> JSONL: - id_ = f"{source}-{metadata.get("filename", "")}".replace(" ", "-") - jsonl = JSONL( - id=id_, - text=text, - source=source, - added=str(datetime.now()), - created=f"{datetime(2000, 1, 1)!s}, {datetime.now()!s}", - metadata=metadata, - ) - return jsonl - - # @APP.command(name="PDF2JSONL", no_args_is_help=True, help="Convert PDF file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) def process_pdf(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: text = extract_pdf_text(file_path) diff --git a/data-processing/scripts/dsk/utils.py b/data-processing/scripts/dsk/utils.py new file mode 100644 index 00000000..63cae92d --- /dev/null +++ b/data-processing/scripts/dsk/utils.py @@ -0,0 +1,121 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +import pandas as pd + + +@dataclass +class JSONL: + id: str + text: str + source: str + added: str + created: str + metadata: dict[str, Any] + + +def create_JSONL(text: str, source: str, metadata: dict[str, Any]) -> JSONL: + id_ = f"{source}-{metadata.get('filename', '')}".replace(" ", "-") + jsonl = JSONL( + id=id_, + text=text, + source=source, + added=str(datetime.now()), + created=f"{datetime(2000, 1, 1)!s}, {datetime.now()!s}", + metadata=metadata, + ) + return jsonl + + +def find_near_duplicates( + df: pd.DataFrame, + threshold: float = 0.9, +) -> list[tuple]: + """ + Finds pairs of columns in a DataFrame that are near-duplicates, + based on a specified threshold of identical values. + + Args: + df: The DataFrame to check. + threshold: The minimum proportion of identical values to consider a pair of columns as near-duplicates. + + Returns: + A list of tuples, where each tuple contains the names of two near-duplicate columns. + """ + + near_duplicates = [] + for i in range(len(df.columns)): + for j in range(i + 1, len(df.columns)): + if (df.iloc[:, i] == df.iloc[:, j]).mean() >= threshold: + near_duplicates.append((df.columns[i], df.columns[j])) + return near_duplicates + + +def find_text_keys(data: dict, prefix: str = ""): + """Recursively finds all keys named 'text' in a JSON object and yields their full paths. + + Args: + data: The JSON object to search. + prefix: The prefix to be added to the path. + + Yields: + The full paths to the 'text' keys. + """ + + if isinstance(data, dict): + for key, value in data.items(): + if key == "text": + yield prefix + key + else: + yield from find_text_keys(value, prefix + key + ".") + elif isinstance(data, list): + for i, item in enumerate(data): + yield from find_text_keys(item, prefix + str(i) + ".") + + +def extract_text_values(data: dict, paths: list[str]) -> list[str]: + """Extracts text values from a JSON object using given paths. + + Args: + data: The JSON object. + paths: A list of paths to the 'text' keys. + + Returns: + A list of extracted text values. + """ + + text_values = [] + for path in paths: + value = data + for key in path.split("."): + try: + k = int(key) + except ValueError: + k = key + value = value[k] + + if value not in text_values and value != "": + text_values.append(value) + + return text_values + + +def remove_newlines(df: pd.DataFrame) -> pd.DataFrame: + """Removes newline characters from all string columns in a DataFrame. + + Args: + df: The DataFrame to process. + + Returns: + The DataFrame with newlines removed from string columns. + """ + + # Identify string columns + string_cols = df.select_dtypes(include=["object", "string"]).columns + + # Remove newlines from string columns + for col in string_cols: + df[col] = df[col].astype(str).str.replace("\n", "", regex=False) + + return df From 3fcb91b556c30b8c720cde3feb693070b37ecbe5 Mon Sep 17 00:00:00 2001 From: kris927b Date: Thu, 14 Nov 2024 14:44:25 +0100 Subject: [PATCH 07/19] Testing extracting documents using Docling --- data-processing/scripts/dsk/test_docling.py | 109 ++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 data-processing/scripts/dsk/test_docling.py diff --git a/data-processing/scripts/dsk/test_docling.py b/data-processing/scripts/dsk/test_docling.py new file mode 100644 index 00000000..4860f793 --- /dev/null +++ b/data-processing/scripts/dsk/test_docling.py @@ -0,0 +1,109 @@ +import gzip +import json +from dataclasses import asdict +from pathlib import Path + +import pandas as pd +from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend +from docling.datamodel.base_models import InputFormat +from docling.datamodel.document import TableItem, TextItem +from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode +from docling.document_converter import ( + DocumentConverter, + PdfFormatOption, + WordFormatOption, +) +from docling.pipeline.simple_pipeline import SimplePipeline +from utils import create_JSONL, find_near_duplicates, remove_newlines + +# previous `PipelineOptions` is now `PdfPipelineOptions` +pipeline_options = PdfPipelineOptions() +pipeline_options.do_ocr = False +pipeline_options.do_table_structure = True +pipeline_options.table_structure_options.do_cell_matching = False +pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE +# ... + +## Custom options are now defined per format. +doc_converter = DocumentConverter( # all of the below is optional, has internal defaults. + allowed_formats=[ + InputFormat.PDF, + InputFormat.MD, + InputFormat.IMAGE, + InputFormat.DOCX, + InputFormat.HTML, + InputFormat.PPTX, + InputFormat.ASCIIDOC, + ], # whitelist formats, non-matching files are ignored. + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_options=pipeline_options, # pipeline options go here. + backend=PyPdfiumDocumentBackend, # optional: pick an alternative backend + ), + InputFormat.DOCX: WordFormatOption( + pipeline_cls=SimplePipeline, # default for office formats and HTML + ), + }, +) + + +files = list( + Path( + "./data", + ).glob("**/*.docx"), +) +results = doc_converter.convert_all(files) +out_file = gzip.open("test.jsonl.gz", mode="wb") +for result in results: + file_path = result.input.file + filename = result.input.file.name + filetype = result.input.format.name + filesize = result.input.filesize + page_count = result.input.page_count + + metadata = { + "filename": filename, + "filetype": filetype, + "filesize": filesize, + "page_count": page_count, + "file_path": str(file_path), + } + + file_content = "" + ## Iterate the elements in reading order, including hierachy level: + for item, level in result.document.iterate_items(): + if isinstance(item, TextItem): + if item.text == "": + continue + file_content += item.text + elif isinstance(item, TableItem): + table_df: pd.DataFrame = item.export_to_dataframe() + dups = find_near_duplicates(table_df, 0.8) + # Choose one column from each near-duplicate pair to keep + columns_to_keep = table_df.columns.copy(deep=True) + print(columns_to_keep, dups) + for pair in dups: + # Keep the first column in the pair, remove the second + if pair[1] in columns_to_keep: + columns_to_keep = columns_to_keep.drop(pair[1]) + + # Create a new DataFrame with the selected columns + df_cleaned = table_df[list(columns_to_keep)] + df_cleaned = remove_newlines(df_cleaned) + file_content += df_cleaned.to_markdown(index=False, tablefmt="github") + file_content += "\n\n" + + line = json.dumps( + asdict(create_JSONL(file_content, "cBrain", metadata)), + ensure_ascii=False, + ) + + out_file.write(f"{line}\n".encode()) + +out_file.close() +""" +text_paths = find_text_keys(document) +text_values = extract_text_values(document, list(text_paths)) + +print("\n\n".join(text_values)) +""" From 819648c01152dab93bd301ba8d267d55fb15167c Mon Sep 17 00:00:00 2001 From: kris927b Date: Tue, 19 Nov 2024 11:15:13 +0100 Subject: [PATCH 08/19] Using docling to extract data from PDF, DOCX, PPTX, MD. --- data-processing/pyproject.toml | 22 ++- data-processing/scripts/dsk/README.md | 9 +- .../scripts/dsk/convert_dsk_to_jsonl.py | 157 ++++++++++++++---- data-processing/scripts/dsk/utils.py | 84 +++++++++- 4 files changed, 231 insertions(+), 41 deletions(-) diff --git a/data-processing/pyproject.toml b/data-processing/pyproject.toml index 53baeffe..168b0d08 100644 --- a/data-processing/pyproject.toml +++ b/data-processing/pyproject.toml @@ -22,9 +22,12 @@ dependencies = [ "requests>=2.31.0", "polars>0.19.1", # Data cleaning dependencies: - "dolma[pii,code]", # @git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released - "kenlm>=0.2.0", # Used for perplexity tagging - "blingfire>=0.1.8", # Used for perplexity tagging + "dolma[pii,code]", + # @git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released + "kenlm>=0.2.0", + # Used for perplexity tagging + "blingfire>=0.1.8", + # Used for perplexity tagging "mosaicml-streaming", "orjsonl", "tqdm", @@ -32,12 +35,13 @@ dependencies = [ "nlp_dedup", "datatrove>=0.2.0", "pyyaml", - "joblib", - "pdfminer.six", - "pypandoc-binary", - "trafilatura", - "typer", - "loguru" + "joblib", + "pdfminer.six", + "pypandoc-binary", + "trafilatura", + "typer", + "loguru", + "docling>=1.20.0", ] [project.optional-dependencies] diff --git a/data-processing/scripts/dsk/README.md b/data-processing/scripts/dsk/README.md index ef26462c..06e68950 100644 --- a/data-processing/scripts/dsk/README.md +++ b/data-processing/scripts/dsk/README.md @@ -1,3 +1,10 @@ # Data-processing Dansk Sprogmodel Konsortium (DSK) -This directory contains scripts for processing the DSK data. Mainly converting from various formats (e.g. pdf and docx) to jsonl. \ No newline at end of file +This directory contains scripts for processing the DSK data. Mainly converting from various formats (e.g. pdf and docx) to jsonl. + +Run the conversion on an entire repo: + +```bash +$ python conver_dsk_to_jsonl.py $PATH_TO_DSK_DIR $OUTPUT_PATH $DSK_CLIENT +``` + diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index bdd1f49e..93e135cb 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -18,11 +18,11 @@ import gzip import json import re -from dataclasses import asdict, dataclass -from datetime import datetime +from dataclasses import asdict from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any +from docling.datamodel.document import TableItem, TextItem from joblib import Parallel, delayed from loguru import logger from pdfminer.high_level import extract_text as extract_pdf_text @@ -30,94 +30,193 @@ from tqdm import tqdm from trafilatura import extract as extract_html_text from typer import Typer -from utils import create_JSONL +from utils import ( + build_document_converter, + build_metadata, + create_JSONL, + find_near_duplicates, + make_unique, + remove_newlines, +) + +if TYPE_CHECKING: + import pandas as pd APP = Typer(name="2JSONL Converter") -# @APP.command(name="PDF2JSONL", no_args_is_help=True, help="Convert PDF file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +# UNUSED - USING DOCLING INSTEAD def process_pdf(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: text = extract_pdf_text(file_path) text = re.sub(r"(\n\s)+", "\n", text) - metadata = {**kwargs, "file_path": str(file_path)} + metadata = {**kwargs, **build_metadata(file_path)} return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) -# @APP.command(name="DOCX2JSONL", no_args_is_help=True, help="Convert DOCX file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +# UNUSED - USING DOCLING INSTEAD def process_docx(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: text = convert_file(file_path, to="plain", format="docx") text = re.sub(r"(\n\s)+", "\n", text) - metadata = {**kwargs, "file_path": str(file_path)} + metadata = {**kwargs, **build_metadata(file_path)} return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) -# @APP.command(name="HTML2JSONL", no_args_is_help=True, help="Convert HTML file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) -def process_html(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: +def process_html(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 + """Read a single HTML file and build a JSONL object. Uses Trafilatura for the extraction. + + Args: + file_path (Path): Path to the HTML file + source (str): What is the data source (most likely DSK client) + **kwargs (dict): Additional arguments + + Returns: + str: JSONL line with the file content + """ text = extract_html_text(file_path.read_text()) text = re.sub(r"(\n\s)+", "\n", text) - metadata = {**kwargs, "file_path": str(file_path)} + metadata = build_metadata(file_path) return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) -# @APP.command(name="EPUB2JSONL", no_args_is_help=True, help="Convert EPUB file to JSONL string", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) -def process_epub(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: +def process_epub(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 + """Read a single EPUB file and build a JSONL object. Uses Pypandoc for the extraction. + + Args: + file_path (Path): Path to the EPUB file + source (str): What is the data source (most likely DSK client) + **kwargs (dict): Additional arguments + + Returns: + str: JSONL line with the file content + """ text = convert_file(file_path, to="plain", format="epub") text = re.sub(r"(\n\s)+", "\n", text) - metadata = {**kwargs, "file_path": str(file_path)} + metadata = build_metadata(file_path) return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) -def process_txt(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: +def process_txt(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 + """Read a single TXT file and build a JSONL object + + Args: + file_path (Path): Path to the TXT file + source (str): What is the data source (most likely DSK client) + **kwargs (dict): Additional arguments + + Returns: + str: JSONL line with the file content + """ text = file_path.read_text() text = re.sub(r"(\n\s)+", "\n", text) - metadata = {**kwargs, "file_path": str(file_path)} + metadata = build_metadata(file_path) return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) +def process_document( + file: Path, + source: str, + **kwargs: dict[str, Any], +) -> str | None: + """ + Process a single file, converting it into a JSONL string. + + Args: + file (Path): Path to the input file. + source (str): Source identifier for the data. + **kwargs (dict): Extra arguments + + Returns: + str | None: JSONL string if the file is processed successfully, else None. + """ + doc_converter = kwargs.get("converter", build_document_converter()) + try: + result = doc_converter.convert(file) + + metadata = build_metadata(result.input) + + file_content = "" + # Iterate the elements in reading order, including hierarchy level + for item, _ in result.document.iterate_items(): + if isinstance(item, TextItem): + if item.text.strip(): + file_content += item.text + elif isinstance(item, TableItem): + table_df: pd.DataFrame = item.export_to_dataframe() + dups = find_near_duplicates(table_df, 0.8) + column_counts = {} + table_df.columns = [ + make_unique(col, column_counts) for col in table_df.columns + ] + columns_to_keep = table_df.columns.copy(deep=True) + for pair in dups: + if pair[1] in columns_to_keep: + columns_to_keep = columns_to_keep.drop(pair[1]) + + df_cleaned = table_df[list(columns_to_keep)] + df_cleaned = remove_newlines(df_cleaned) + file_content += df_cleaned.to_markdown(index=False, tablefmt="github") + file_content += "\n\n" + + # Create and return JSONL entry + return json.dumps( + asdict(create_JSONL(file_content, source, metadata)), + ensure_ascii=False, + ) + except Exception as e: + logger.error(f"Failed to process file {file}: {e}") + return None + + SUFFIX2METHOD = { - ".pdf": process_pdf, + ".pdf": process_document, ".html": process_html, - ".docx": process_docx, + ".docx": process_document, ".epub": process_epub, ".txt": process_txt, + ".pptx": process_document, + ".md": process_document, } -@APP.command(name="process_file", help="Process a single file, returning a json string") -def process_file(file: Path, dsk_client: str) -> str | None: +def process_file(file: Path, dsk_client: str, **kwargs: dict) -> str | None: if file.is_dir(): return None suffix = file.suffix - name = file.name method = SUFFIX2METHOD.get(suffix) if not method: logger.warning(f"Unsupported file type: {suffix} - for file: {file!s}") return None - return method(file, dsk_client, file_type=suffix, filename=name) + return method(file, dsk_client, **kwargs) @APP.command( name="process_directory", help="Crawl a directory and process files of different types", ) -def craw_directory( +def crawl_directory( top_level_path: Path, output_path: Path, dsk_client: str, - output_name: str = "1.jsonl.gz", - n_workers: int = 2, + output_suffix: str = ".jsonl.gz", + n_workers: int = 4, ): - files = top_level_path.glob("**/*") + files = list(top_level_path.glob("**/*.*")) + + save_file = output_path + if "".join(output_path.suffixes) != ".jsonl.gz": + save_file = output_path / (dsk_client + output_suffix) + converter = build_document_converter() parallel = Parallel(n_jobs=n_workers, return_as="generator_unordered") - output_path.mkdir(parents=True, exist_ok=True) - with gzip.open(output_path / output_name, mode="wb") as out_file: + save_file.parent.mkdir(parents=True, exist_ok=True) + with gzip.open(save_file, mode="wb") as out_file: # with (output_path / output_name).open("w+") as out_file: for doc in parallel( - delayed(process_file)(file, dsk_client) for file in tqdm(files) + delayed(process_file)(file, dsk_client, converter=converter) + for file in tqdm(files) ): if doc is None: continue diff --git a/data-processing/scripts/dsk/utils.py b/data-processing/scripts/dsk/utils.py index 63cae92d..3137e4df 100644 --- a/data-processing/scripts/dsk/utils.py +++ b/data-processing/scripts/dsk/utils.py @@ -1,8 +1,53 @@ from dataclasses import dataclass from datetime import datetime -from typing import Any +from pathlib import Path +from typing import Any, Union import pandas as pd +from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend +from docling.datamodel.base_models import InputFormat +from docling.datamodel.document import InputDocument +from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode +from docling.document_converter import ( + DocumentConverter, + PdfFormatOption, + WordFormatOption, +) +from docling.pipeline.simple_pipeline import SimplePipeline + +pd.options.mode.chained_assignment = None + + +def build_document_converter() -> DocumentConverter: + # previous `PipelineOptions` is now `PdfPipelineOptions` + pipeline_options = PdfPipelineOptions() + pipeline_options.do_ocr = False + pipeline_options.do_table_structure = True + pipeline_options.table_structure_options.do_cell_matching = False + pipeline_options.table_structure_options.mode = TableFormerMode.FAST + # ... + + ## Custom options are now defined per format. + doc_converter = DocumentConverter( # all of the below is optional, has internal defaults. + allowed_formats=[ + InputFormat.PDF, + InputFormat.MD, + InputFormat.DOCX, + InputFormat.HTML, + InputFormat.PPTX, + InputFormat.ASCIIDOC, + ], # whitelist formats, non-matching files are ignored. + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_options=pipeline_options, # pipeline options go here. + backend=PyPdfiumDocumentBackend, # optional: pick an alternative backend + ), + InputFormat.DOCX: WordFormatOption( + pipeline_cls=SimplePipeline, # default for office formats and HTML + ), + }, + ) + return doc_converter @dataclass @@ -28,6 +73,31 @@ def create_JSONL(text: str, source: str, metadata: dict[str, Any]) -> JSONL: return jsonl +def build_metadata(document: Union[InputDocument, Path]) -> dict: + if isinstance(document, InputDocument): + file_path = document.file + filename = document.file.name + filetype = document.format.name + filesize = document.filesize + page_count = document.page_count + else: # TODO: build metadata from Path + file_path = document + filename = document.name + filetype = "".join(document.suffixes) + filesize = document.stat().st_size + page_count = 0 + + metadata = { + "filename": filename, + "filetype": filetype, + "filesize": filesize, + "page_count": page_count, + "file_path": str(file_path), + } + + return metadata + + def find_near_duplicates( df: pd.DataFrame, threshold: float = 0.9, @@ -113,9 +183,19 @@ def remove_newlines(df: pd.DataFrame) -> pd.DataFrame: # Identify string columns string_cols = df.select_dtypes(include=["object", "string"]).columns - # Remove newlines from string columns for col in string_cols: df[col] = df[col].astype(str).str.replace("\n", "", regex=False) return df + + +# Function to rename columns with unique suffixes +def make_unique(column_name: str, column_counts: dict[str, int]) -> str: + if column_name in column_counts: + # Increment count and append the new suffix + column_counts[column_name] += 1 + return f"{column_name}_{column_counts[column_name]}" + # Initialize count for the first occurrence + column_counts[column_name] = 0 + return column_name From 3e6c045ddcf3ba7782fb6e66f54318b1648c234d Mon Sep 17 00:00:00 2001 From: kris927b Date: Tue, 19 Nov 2024 11:16:50 +0100 Subject: [PATCH 09/19] Removing test files --- data-processing/scripts/dsk/test_docling.py | 109 -------------------- 1 file changed, 109 deletions(-) delete mode 100644 data-processing/scripts/dsk/test_docling.py diff --git a/data-processing/scripts/dsk/test_docling.py b/data-processing/scripts/dsk/test_docling.py deleted file mode 100644 index 4860f793..00000000 --- a/data-processing/scripts/dsk/test_docling.py +++ /dev/null @@ -1,109 +0,0 @@ -import gzip -import json -from dataclasses import asdict -from pathlib import Path - -import pandas as pd -from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend -from docling.datamodel.base_models import InputFormat -from docling.datamodel.document import TableItem, TextItem -from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode -from docling.document_converter import ( - DocumentConverter, - PdfFormatOption, - WordFormatOption, -) -from docling.pipeline.simple_pipeline import SimplePipeline -from utils import create_JSONL, find_near_duplicates, remove_newlines - -# previous `PipelineOptions` is now `PdfPipelineOptions` -pipeline_options = PdfPipelineOptions() -pipeline_options.do_ocr = False -pipeline_options.do_table_structure = True -pipeline_options.table_structure_options.do_cell_matching = False -pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE -# ... - -## Custom options are now defined per format. -doc_converter = DocumentConverter( # all of the below is optional, has internal defaults. - allowed_formats=[ - InputFormat.PDF, - InputFormat.MD, - InputFormat.IMAGE, - InputFormat.DOCX, - InputFormat.HTML, - InputFormat.PPTX, - InputFormat.ASCIIDOC, - ], # whitelist formats, non-matching files are ignored. - format_options={ - InputFormat.PDF: PdfFormatOption( - pipeline_options=pipeline_options, # pipeline options go here. - backend=PyPdfiumDocumentBackend, # optional: pick an alternative backend - ), - InputFormat.DOCX: WordFormatOption( - pipeline_cls=SimplePipeline, # default for office formats and HTML - ), - }, -) - - -files = list( - Path( - "./data", - ).glob("**/*.docx"), -) -results = doc_converter.convert_all(files) -out_file = gzip.open("test.jsonl.gz", mode="wb") -for result in results: - file_path = result.input.file - filename = result.input.file.name - filetype = result.input.format.name - filesize = result.input.filesize - page_count = result.input.page_count - - metadata = { - "filename": filename, - "filetype": filetype, - "filesize": filesize, - "page_count": page_count, - "file_path": str(file_path), - } - - file_content = "" - ## Iterate the elements in reading order, including hierachy level: - for item, level in result.document.iterate_items(): - if isinstance(item, TextItem): - if item.text == "": - continue - file_content += item.text - elif isinstance(item, TableItem): - table_df: pd.DataFrame = item.export_to_dataframe() - dups = find_near_duplicates(table_df, 0.8) - # Choose one column from each near-duplicate pair to keep - columns_to_keep = table_df.columns.copy(deep=True) - print(columns_to_keep, dups) - for pair in dups: - # Keep the first column in the pair, remove the second - if pair[1] in columns_to_keep: - columns_to_keep = columns_to_keep.drop(pair[1]) - - # Create a new DataFrame with the selected columns - df_cleaned = table_df[list(columns_to_keep)] - df_cleaned = remove_newlines(df_cleaned) - file_content += df_cleaned.to_markdown(index=False, tablefmt="github") - file_content += "\n\n" - - line = json.dumps( - asdict(create_JSONL(file_content, "cBrain", metadata)), - ensure_ascii=False, - ) - - out_file.write(f"{line}\n".encode()) - -out_file.close() -""" -text_paths = find_text_keys(document) -text_values = extract_text_values(document, list(text_paths)) - -print("\n\n".join(text_values)) -""" From e89f50f6004ff7a30da350d1bfcd80fda115e124 Mon Sep 17 00:00:00 2001 From: kris927b Date: Wed, 20 Nov 2024 11:01:47 +0100 Subject: [PATCH 10/19] Adding lower bounds on dependencies --- data-processing/pyproject.toml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/data-processing/pyproject.toml b/data-processing/pyproject.toml index 168b0d08..92989191 100644 --- a/data-processing/pyproject.toml +++ b/data-processing/pyproject.toml @@ -35,13 +35,13 @@ dependencies = [ "nlp_dedup", "datatrove>=0.2.0", "pyyaml", - "joblib", - "pdfminer.six", - "pypandoc-binary", - "trafilatura", - "typer", - "loguru", "docling>=1.20.0", + "joblib>=1.4.2", + "pdfminer-six>=20240706", + "pypandoc-binary>=1.14", + "trafilatura>=1.9.0", + "typer>=0.12.5", + "loguru>=0.7.2", ] [project.optional-dependencies] From 404abf3cee87e2edef791f4feda6d1ee2d7ed9f4 Mon Sep 17 00:00:00 2001 From: kris927b Date: Wed, 20 Nov 2024 11:02:06 +0100 Subject: [PATCH 11/19] Moving utils to dfm-data package --- data-processing/scripts/dsk/convert_dsk_to_jsonl.py | 3 ++- data-processing/{scripts/dsk => src/dfm_data}/utils.py | 0 2 files changed, 2 insertions(+), 1 deletion(-) rename data-processing/{scripts/dsk => src/dfm_data}/utils.py (100%) diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index 93e135cb..506c9d69 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -30,7 +30,8 @@ from tqdm import tqdm from trafilatura import extract as extract_html_text from typer import Typer -from utils import ( + +from dfm_data.utils import ( build_document_converter, build_metadata, create_JSONL, diff --git a/data-processing/scripts/dsk/utils.py b/data-processing/src/dfm_data/utils.py similarity index 100% rename from data-processing/scripts/dsk/utils.py rename to data-processing/src/dfm_data/utils.py From 52ad08a145a9766f05f5df77515ee1f91f2b8d45 Mon Sep 17 00:00:00 2001 From: kris927b Date: Wed, 20 Nov 2024 11:03:04 +0100 Subject: [PATCH 12/19] Removing unused methods to clean up code --- .../scripts/dsk/convert_dsk_to_jsonl.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index 506c9d69..f99f9984 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -46,22 +46,6 @@ APP = Typer(name="2JSONL Converter") -# UNUSED - USING DOCLING INSTEAD -def process_pdf(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: - text = extract_pdf_text(file_path) - text = re.sub(r"(\n\s)+", "\n", text) - metadata = {**kwargs, **build_metadata(file_path)} - return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) - - -# UNUSED - USING DOCLING INSTEAD -def process_docx(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: - text = convert_file(file_path, to="plain", format="docx") - text = re.sub(r"(\n\s)+", "\n", text) - metadata = {**kwargs, **build_metadata(file_path)} - return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) - - def process_html(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 """Read a single HTML file and build a JSONL object. Uses Trafilatura for the extraction. From dc2291d58af4b8aa1c0a639db252384eedabc86c Mon Sep 17 00:00:00 2001 From: kris927b Date: Wed, 20 Nov 2024 11:03:25 +0100 Subject: [PATCH 13/19] Fixing spelling error --- data-processing/scripts/dsk/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing/scripts/dsk/README.md b/data-processing/scripts/dsk/README.md index 06e68950..7f556b68 100644 --- a/data-processing/scripts/dsk/README.md +++ b/data-processing/scripts/dsk/README.md @@ -5,6 +5,6 @@ This directory contains scripts for processing the DSK data. Mainly converting f Run the conversion on an entire repo: ```bash -$ python conver_dsk_to_jsonl.py $PATH_TO_DSK_DIR $OUTPUT_PATH $DSK_CLIENT +$ python convert_dsk_to_jsonl.py $PATH_TO_DSK_DIR $OUTPUT_PATH $DSK_CLIENT ``` From bf11fccf0e898e6738983abb9e98d347deb152bf Mon Sep 17 00:00:00 2001 From: kris927b Date: Wed, 20 Nov 2024 12:20:15 +0100 Subject: [PATCH 14/19] Separating document processing in dfm-data. --- .../scripts/dsk/convert_dsk_to_jsonl.py | 149 +---------------- .../document_processing/processors.py | 150 ++++++++++++++++++ .../{ => document_processing}/utils.py | 0 3 files changed, 153 insertions(+), 146 deletions(-) create mode 100644 data-processing/src/dfm_data/document_processing/processors.py rename data-processing/src/dfm_data/{ => document_processing}/utils.py (100%) diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index f99f9984..17844034 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -16,28 +16,16 @@ """ import gzip -import json -import re -from dataclasses import asdict from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING -from docling.datamodel.document import TableItem, TextItem from joblib import Parallel, delayed -from loguru import logger -from pdfminer.high_level import extract_text as extract_pdf_text -from pypandoc import convert_file from tqdm import tqdm -from trafilatura import extract as extract_html_text from typer import Typer -from dfm_data.utils import ( +from dfm_data.document_processing.processors import process_file +from dfm_data.document_processing.utils import ( build_document_converter, - build_metadata, - create_JSONL, - find_near_duplicates, - make_unique, - remove_newlines, ) if TYPE_CHECKING: @@ -46,137 +34,6 @@ APP = Typer(name="2JSONL Converter") -def process_html(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 - """Read a single HTML file and build a JSONL object. Uses Trafilatura for the extraction. - - Args: - file_path (Path): Path to the HTML file - source (str): What is the data source (most likely DSK client) - **kwargs (dict): Additional arguments - - Returns: - str: JSONL line with the file content - """ - text = extract_html_text(file_path.read_text()) - text = re.sub(r"(\n\s)+", "\n", text) - metadata = build_metadata(file_path) - return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) - - -def process_epub(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 - """Read a single EPUB file and build a JSONL object. Uses Pypandoc for the extraction. - - Args: - file_path (Path): Path to the EPUB file - source (str): What is the data source (most likely DSK client) - **kwargs (dict): Additional arguments - - Returns: - str: JSONL line with the file content - """ - text = convert_file(file_path, to="plain", format="epub") - text = re.sub(r"(\n\s)+", "\n", text) - metadata = build_metadata(file_path) - return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) - - -def process_txt(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 - """Read a single TXT file and build a JSONL object - - Args: - file_path (Path): Path to the TXT file - source (str): What is the data source (most likely DSK client) - **kwargs (dict): Additional arguments - - Returns: - str: JSONL line with the file content - """ - text = file_path.read_text() - text = re.sub(r"(\n\s)+", "\n", text) - metadata = build_metadata(file_path) - return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) - - -def process_document( - file: Path, - source: str, - **kwargs: dict[str, Any], -) -> str | None: - """ - Process a single file, converting it into a JSONL string. - - Args: - file (Path): Path to the input file. - source (str): Source identifier for the data. - **kwargs (dict): Extra arguments - - Returns: - str | None: JSONL string if the file is processed successfully, else None. - """ - doc_converter = kwargs.get("converter", build_document_converter()) - try: - result = doc_converter.convert(file) - - metadata = build_metadata(result.input) - - file_content = "" - # Iterate the elements in reading order, including hierarchy level - for item, _ in result.document.iterate_items(): - if isinstance(item, TextItem): - if item.text.strip(): - file_content += item.text - elif isinstance(item, TableItem): - table_df: pd.DataFrame = item.export_to_dataframe() - dups = find_near_duplicates(table_df, 0.8) - column_counts = {} - table_df.columns = [ - make_unique(col, column_counts) for col in table_df.columns - ] - columns_to_keep = table_df.columns.copy(deep=True) - for pair in dups: - if pair[1] in columns_to_keep: - columns_to_keep = columns_to_keep.drop(pair[1]) - - df_cleaned = table_df[list(columns_to_keep)] - df_cleaned = remove_newlines(df_cleaned) - file_content += df_cleaned.to_markdown(index=False, tablefmt="github") - file_content += "\n\n" - - # Create and return JSONL entry - return json.dumps( - asdict(create_JSONL(file_content, source, metadata)), - ensure_ascii=False, - ) - except Exception as e: - logger.error(f"Failed to process file {file}: {e}") - return None - - -SUFFIX2METHOD = { - ".pdf": process_document, - ".html": process_html, - ".docx": process_document, - ".epub": process_epub, - ".txt": process_txt, - ".pptx": process_document, - ".md": process_document, -} - - -def process_file(file: Path, dsk_client: str, **kwargs: dict) -> str | None: - if file.is_dir(): - return None - - suffix = file.suffix - method = SUFFIX2METHOD.get(suffix) - - if not method: - logger.warning(f"Unsupported file type: {suffix} - for file: {file!s}") - return None - - return method(file, dsk_client, **kwargs) - - @APP.command( name="process_directory", help="Crawl a directory and process files of different types", diff --git a/data-processing/src/dfm_data/document_processing/processors.py b/data-processing/src/dfm_data/document_processing/processors.py new file mode 100644 index 00000000..682fc68f --- /dev/null +++ b/data-processing/src/dfm_data/document_processing/processors.py @@ -0,0 +1,150 @@ +import json +import re +from dataclasses import asdict +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from docling.datamodel.document import TableItem, TextItem +from loguru import logger +from pypandoc import convert_file +from trafilatura import extract as extract_html_text + +from .utils import ( + build_document_converter, + build_metadata, + create_JSONL, + find_near_duplicates, + make_unique, + remove_newlines, +) + +if TYPE_CHECKING: + import pandas as pd + + +def process_html(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 + """Read a single HTML file and build a JSONL object. Uses Trafilatura for the extraction. + + Args: + file_path: Path to the HTML file + source: What is the data source (most likely DSK client) + **kwargs: Additional arguments + + Returns: + str: JSONL line with the file content + """ + text = extract_html_text(file_path.read_text()) + text = re.sub(r"(\n\s)+", "\n", text) + metadata = build_metadata(file_path) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) + + +def process_epub(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 + """Read a single EPUB file and build a JSONL object. Uses Pypandoc for the extraction. + + Args: + file_path: Path to the EPUB file + source: What is the data source (most likely DSK client) + **kwargs: Additional arguments + + Returns: + str: JSONL line with the file content + """ + text = convert_file(file_path, to="plain", format="epub") + text = re.sub(r"(\n\s)+", "\n", text) + metadata = build_metadata(file_path) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) + + +def process_txt(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 + """Read a single TXT file and build a JSONL object + + Args: + file_path: Path to the TXT file + source: What is the data source (most likely DSK client) + **kwargs: Additional arguments + + Returns: + str: JSONL line with the file content + """ + text = file_path.read_text() + text = re.sub(r"(\n\s)+", "\n", text) + metadata = build_metadata(file_path) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) + + +def process_document( + file: Path, + source: str, + **kwargs: dict[str, Any], +) -> str | None: + """ + Process a single file, converting it into a JSONL string. + + Args: + file: Path to the input file. + source: Source identifier for the data. + **kwargs: Extra arguments + + Returns: + str | None: JSONL string if the file is processed successfully, else None. + """ + doc_converter = kwargs.get("converter", build_document_converter()) + try: + result = doc_converter.convert(file) + + metadata = build_metadata(result.input) + + file_content = "" + # Iterate the elements in reading order, including hierarchy level + for item, _ in result.document.iterate_items(): + if isinstance(item, TextItem): + if item.text.strip(): + file_content += item.text + elif isinstance(item, TableItem): + table_df: pd.DataFrame = item.export_to_dataframe() + dups = find_near_duplicates(table_df, 0.8) + column_counts = {} + table_df.columns = [ + make_unique(col, column_counts) for col in table_df.columns + ] + columns_to_keep = table_df.columns.copy(deep=True) + for pair in dups: + if pair[1] in columns_to_keep: + columns_to_keep = columns_to_keep.drop(pair[1]) + + df_cleaned = table_df[list(columns_to_keep)] + df_cleaned = remove_newlines(df_cleaned) + file_content += df_cleaned.to_markdown(index=False, tablefmt="github") + file_content += "\n\n" + + # Create and return JSONL entry + return json.dumps( + asdict(create_JSONL(file_content, source, metadata)), + ensure_ascii=False, + ) + except Exception as e: + logger.error(f"Failed to process file {file}: {e}") + return None + + +def process_file(file: Path, dsk_client: str, **kwargs: dict) -> str | None: + if file.is_dir(): + return None + + suffix = file.suffix + method = { + ".pdf": process_document, + ".html": process_html, + ".docx": process_document, + ".epub": process_epub, + ".txt": process_txt, + ".pptx": process_document, + ".md": process_document, + }.get(suffix) + + if not method: + logger.warning(f"Unsupported file type: {suffix} - for file: {file!s}") + return None + + return method(file, dsk_client, **kwargs) diff --git a/data-processing/src/dfm_data/utils.py b/data-processing/src/dfm_data/document_processing/utils.py similarity index 100% rename from data-processing/src/dfm_data/utils.py rename to data-processing/src/dfm_data/document_processing/utils.py From 79dcae4bf059ed03ffa965e0352943c65314b59a Mon Sep 17 00:00:00 2001 From: kris927b Date: Wed, 20 Nov 2024 12:32:18 +0100 Subject: [PATCH 15/19] Adding missing docstrings. --- .../dfm_data/document_processing/__init__.py | 0 .../document_processing/processors.py | 12 +++++++ .../src/dfm_data/document_processing/utils.py | 36 ++++++++++++++++++- 3 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 data-processing/src/dfm_data/document_processing/__init__.py diff --git a/data-processing/src/dfm_data/document_processing/__init__.py b/data-processing/src/dfm_data/document_processing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/data-processing/src/dfm_data/document_processing/processors.py b/data-processing/src/dfm_data/document_processing/processors.py index 682fc68f..d398314b 100644 --- a/data-processing/src/dfm_data/document_processing/processors.py +++ b/data-processing/src/dfm_data/document_processing/processors.py @@ -1,3 +1,5 @@ +"""This module contains processing methods for extracting text from various documents.""" + import json import re from dataclasses import asdict @@ -129,6 +131,16 @@ def process_document( def process_file(file: Path, dsk_client: str, **kwargs: dict) -> str | None: + """Generic method for processing a file. Will find the file type and use the right processing method. + + Args: + file: Path to the file to process + dsk_client: What DSK client have delivered the file + **kwargs: Extra arguments + + Returns: + str | None: Returns a JSONL line if the file type is supported, else None. + """ if file.is_dir(): return None diff --git a/data-processing/src/dfm_data/document_processing/utils.py b/data-processing/src/dfm_data/document_processing/utils.py index 3137e4df..f38e01e8 100644 --- a/data-processing/src/dfm_data/document_processing/utils.py +++ b/data-processing/src/dfm_data/document_processing/utils.py @@ -1,3 +1,5 @@ +"""This module contains utilities for extracting text from documents.""" + from dataclasses import dataclass from datetime import datetime from pathlib import Path @@ -19,6 +21,11 @@ def build_document_converter() -> DocumentConverter: + """Create a Docling `DocumentConverter` instance. Used to convert PDF, DOCX and, PPTX. + + Returns: + DocumentConverter: THe `DocumentConverter` instance + """ # previous `PipelineOptions` is now `PdfPipelineOptions` pipeline_options = PdfPipelineOptions() pipeline_options.do_ocr = False @@ -61,6 +68,16 @@ class JSONL: def create_JSONL(text: str, source: str, metadata: dict[str, Any]) -> JSONL: + """Helper method to create a JSONL dataclass instance. + + Args: + text: The text that should be part of the object + source: The source of the text + metadata: Metadata surrounding the text (e.g. filename, filetype, etc.) + + Returns: + JSONL: The JSONL dataclass instance. + """ id_ = f"{source}-{metadata.get('filename', '')}".replace(" ", "-") jsonl = JSONL( id=id_, @@ -74,13 +91,21 @@ def create_JSONL(text: str, source: str, metadata: dict[str, Any]) -> JSONL: def build_metadata(document: Union[InputDocument, Path]) -> dict: + """Helper function to build metadata from an input file. + + Args: + document: The document/file to build metadata from. + + Returns: + dict: A dictionary containing the metadata. + """ if isinstance(document, InputDocument): file_path = document.file filename = document.file.name filetype = document.format.name filesize = document.filesize page_count = document.page_count - else: # TODO: build metadata from Path + else: file_path = document filename = document.name filetype = "".join(document.suffixes) @@ -192,6 +217,15 @@ def remove_newlines(df: pd.DataFrame) -> pd.DataFrame: # Function to rename columns with unique suffixes def make_unique(column_name: str, column_counts: dict[str, int]) -> str: + """Method to rename duplicate column name + + Args: + column_name (str): Name of the column + column_counts (dict[str, int]): Number of times we have seen this column name + + Returns: + str: The new column name + """ if column_name in column_counts: # Increment count and append the new suffix column_counts[column_name] += 1 From 32768f31748ac09e356596360cce7b32ab8dd981 Mon Sep 17 00:00:00 2001 From: kris927b Date: Thu, 21 Nov 2024 13:26:17 +0100 Subject: [PATCH 16/19] Adding .msg functionality --- .../document_processing/processors.py | 31 +++++++++++++++++ .../src/dfm_data/document_processing/utils.py | 33 ++++++++++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/data-processing/src/dfm_data/document_processing/processors.py b/data-processing/src/dfm_data/document_processing/processors.py index d398314b..a98044e3 100644 --- a/data-processing/src/dfm_data/document_processing/processors.py +++ b/data-processing/src/dfm_data/document_processing/processors.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any from docling.datamodel.document import TableItem, TextItem +from extract_msg import openMsg from loguru import logger from pypandoc import convert_file from trafilatura import extract as extract_html_text @@ -16,6 +17,7 @@ build_metadata, create_JSONL, find_near_duplicates, + generate_decode_url, make_unique, remove_newlines, ) @@ -24,6 +26,34 @@ import pandas as pd +def process_msg(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 + """Read a single MSG file and build a JSONL object. Uses Trafilatura for the extraction. + + Args: + file_path: Path to the HTML file + source: What is the data source (most likely DSK client) + **kwargs: Additional arguments + + Returns: + str: JSONL line with the file content + """ + + def replace_url(match: re.Match) -> str: + url = match.group(0) + decoded_url = generate_decode_url(url) + if decoded_url: + return decoded_url + return url + + text = openMsg(file_path).body + text = re.sub(r"(\n\s)+", "\n", text) + text = re.sub(r"\[.+?\]", "", text) + text = text.replace("\r", "") + text = re.sub(r"https?://[^>]+", replace_url, text) + metadata = build_metadata(file_path) + return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False) + + def process_html(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001 """Read a single HTML file and build a JSONL object. Uses Trafilatura for the extraction. @@ -153,6 +183,7 @@ def process_file(file: Path, dsk_client: str, **kwargs: dict) -> str | None: ".txt": process_txt, ".pptx": process_document, ".md": process_document, + ".msg": process_msg, }.get(suffix) if not method: diff --git a/data-processing/src/dfm_data/document_processing/utils.py b/data-processing/src/dfm_data/document_processing/utils.py index f38e01e8..30701570 100644 --- a/data-processing/src/dfm_data/document_processing/utils.py +++ b/data-processing/src/dfm_data/document_processing/utils.py @@ -1,5 +1,6 @@ """This module contains utilities for extracting text from documents.""" +import urllib.parse from dataclasses import dataclass from datetime import datetime from pathlib import Path @@ -59,7 +60,7 @@ def build_document_converter() -> DocumentConverter: @dataclass class JSONL: - id: str + id: str # noqa: A003 text: str source: str added: str @@ -233,3 +234,33 @@ def make_unique(column_name: str, column_counts: dict[str, int]) -> str: # Initialize count for the first occurrence column_counts[column_name] = 0 return column_name + + +def generate_decode_url(link: str) -> Union[str, None]: + """Decode a SafeURL link, to extract original url. + + Args: + link: The SafeURL to decode + + Returns: + Union[str, None]: Returns the decoded url if possible else None + """ + try: + if "?" in link: + url_parts = link.split("?", 1)[1] + else: + return None + + if "&" in url_parts: + params = url_parts.split("&") + else: + return None + + for param in params: + name, value = param.split("=") + if name == "url": + target_url = urllib.parse.unquote(value) + return target_url + return None + except ValueError: + return None From 19244de3845a6f5fbd165df2953b8e031e865de0 Mon Sep 17 00:00:00 2001 From: kris927b Date: Thu, 21 Nov 2024 13:32:12 +0100 Subject: [PATCH 17/19] Adding extract-msg to pyproject. --- data-processing/pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/data-processing/pyproject.toml b/data-processing/pyproject.toml index 92989191..5c8bb2ea 100644 --- a/data-processing/pyproject.toml +++ b/data-processing/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "trafilatura>=1.9.0", "typer>=0.12.5", "loguru>=0.7.2", + "extract-msg>=0.52.0", ] [project.optional-dependencies] From 0710e136b91956ea922c30ec3fbc4d062ec20f51 Mon Sep 17 00:00:00 2001 From: kris927b Date: Wed, 4 Dec 2024 13:54:12 +0100 Subject: [PATCH 18/19] Adding a separate `process_files` method to handle the base logic of processing a list of file paths. --- .../document_processing/processors.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/data-processing/src/dfm_data/document_processing/processors.py b/data-processing/src/dfm_data/document_processing/processors.py index a98044e3..e0a75173 100644 --- a/data-processing/src/dfm_data/document_processing/processors.py +++ b/data-processing/src/dfm_data/document_processing/processors.py @@ -1,5 +1,6 @@ """This module contains processing methods for extracting text from various documents.""" +import gzip import json import re from dataclasses import asdict @@ -8,8 +9,10 @@ from docling.datamodel.document import TableItem, TextItem from extract_msg import openMsg +from joblib import Parallel, delayed from loguru import logger from pypandoc import convert_file +from tqdm import tqdm from trafilatura import extract as extract_html_text from .utils import ( @@ -191,3 +194,28 @@ def process_file(file: Path, dsk_client: str, **kwargs: dict) -> str | None: return None return method(file, dsk_client, **kwargs) + + +def process_files( + files: list[Path], + output_path: Path, + dsk_client: str, + output_suffix: str = ".jsonl.gz", + n_workers: int = 4, +): + save_file = output_path + if "".join(output_path.suffixes) != ".jsonl.gz": + save_file = output_path / (dsk_client + output_suffix) + + converter = build_document_converter() + parallel = Parallel(n_jobs=n_workers, return_as="generator_unordered") + save_file.parent.mkdir(parents=True, exist_ok=True) + with gzip.open(save_file, mode="wb") as out_file: + # with (output_path / output_name).open("w+") as out_file: + for doc in parallel( + delayed(process_file)(file, dsk_client, converter=converter) + for file in tqdm(files) + ): + if doc is None: + continue + out_file.write(f"{doc}\n".encode()) From faeca4fe824ce2d6977cc1203bbd72b8058601d7 Mon Sep 17 00:00:00 2001 From: kris927b Date: Wed, 4 Dec 2024 13:55:00 +0100 Subject: [PATCH 19/19] Refactoring `process_directory` command, and adding command to process dsk crawled data. --- .../scripts/dsk/convert_dsk_to_jsonl.py | 98 ++++++++++++++----- 1 file changed, 74 insertions(+), 24 deletions(-) diff --git a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py index 17844034..01b25065 100644 --- a/data-processing/scripts/dsk/convert_dsk_to_jsonl.py +++ b/data-processing/scripts/dsk/convert_dsk_to_jsonl.py @@ -15,19 +15,15 @@ } """ -import gzip +import logging +import subprocess from pathlib import Path from typing import TYPE_CHECKING -from joblib import Parallel, delayed -from tqdm import tqdm +import typer +from dfm_data.document_processing.processors import process_files from typer import Typer -from dfm_data.document_processing.processors import process_file -from dfm_data.document_processing.utils import ( - build_document_converter, -) - if TYPE_CHECKING: import pandas as pd @@ -45,24 +41,78 @@ def crawl_directory( output_suffix: str = ".jsonl.gz", n_workers: int = 4, ): + """Process a set of data delivered from a DSK organisation. + + Args: + top_level_path: Path to a directory with the data delivered by the DSK organization + output_path: Path to place the processed data + dsk_client: What DSK organizations pages have been crawled + output_suffix: What suffix to use. Defaults to ".jsonl.gz". + n_workers: How many process to run in parallel. Defaults to 4. + """ files = list(top_level_path.glob("**/*.*")) - save_file = output_path - if "".join(output_path.suffixes) != ".jsonl.gz": - save_file = output_path / (dsk_client + output_suffix) - - converter = build_document_converter() - parallel = Parallel(n_jobs=n_workers, return_as="generator_unordered") - save_file.parent.mkdir(parents=True, exist_ok=True) - with gzip.open(save_file, mode="wb") as out_file: - # with (output_path / output_name).open("w+") as out_file: - for doc in parallel( - delayed(process_file)(file, dsk_client, converter=converter) - for file in tqdm(files) - ): - if doc is None: - continue - out_file.write(f"{doc}\n".encode()) + files = list(filter(lambda x: x.is_file(), files)) + + if len(files) == 0: + logging.error("Something went wrong. No files to process") + raise typer.Exit(code=1) + + process_files(files, output_path, dsk_client, output_suffix, n_workers) + + +@APP.command( + name="process_web_crawl", + help="Process output from a web crawl", +) +def process_web_crawl( + path_to_crawl_log: Path, + output_path: Path, + data_path: Path, + dsk_client: str, + output_suffix: str = ".jsonl.gz", + n_workers: int = 4, +): + """Process a set of crawled data from a DSK organisation. + + Args: + path_to_crawl_log: Path to a log file from the crawl + output_path: Path to place the processed data + data_path: Path where the crawled data is located + dsk_client: What DSK organizations pages have been crawled + output_suffix: What suffix to use. Defaults to ".jsonl.gz". + n_workers: How many process to run in parallel. Defaults to 4. + """ + # Define the command as a list of strings + command = ["grep", "^--", path_to_crawl_log] + failed = False + # Run the command and capture the output + try: + result = subprocess.run(command, text=True, capture_output=True, check=True) + # Filter the third column using Python (equivalent to `awk '{print $3}'`) + main_folders = { + line.split()[2].split("/")[2] for line in result.stdout.splitlines() + } + except subprocess.CalledProcessError as e: + logging.error(f"Command failed with error: {e}") + failed = True + + if failed: + raise typer.Exit(code=1) + + files: list[Path] = [] + for main_folder in main_folders: + if not (data_path / main_folder).exists(): + continue + files.extend(list((data_path / main_folder).glob("**/*.*"))) + + files = list(filter(lambda x: x.is_file(), files)) + + if len(files) == 0: + logging.error("Something went wrong. No files to process") + raise typer.Exit(code=1) + + process_files(files, output_path, dsk_client, output_suffix, n_workers) if __name__ == "__main__":