Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DSK Processing #299

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions data-processing/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,27 @@ dependencies = [
"requests>=2.31.0",
"polars>0.19.1",
# Data cleaning dependencies:
"dolma[pii,code]@git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released
"kenlm>=0.2.0", # Used for perplexity tagging
"blingfire>=0.1.8", # Used for perplexity tagging
"dolma[pii,code]",
# @git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released
"kenlm>=0.2.0",
# Used for perplexity tagging
"blingfire>=0.1.8",
# Used for perplexity tagging
"mosaicml-streaming",
"orjsonl",
"tqdm",
"zstandard",
"nlp_dedup",
"datatrove>=0.2.0",
"pyyaml",
"docling>=1.20.0",
"joblib>=1.4.2",
"pdfminer-six>=20240706",
"pypandoc-binary>=1.14",
"trafilatura>=1.9.0",
"typer>=0.12.5",
"loguru>=0.7.2",
"extract-msg>=0.52.0",
]

[project.optional-dependencies]
Expand Down
10 changes: 10 additions & 0 deletions data-processing/scripts/dsk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Data-processing Dansk Sprogmodel Konsortium (DSK)

This directory contains scripts for processing the DSK data. Mainly converting from various formats (e.g. pdf and docx) to jsonl.

Run the conversion on an entire repo:

```bash
$ python convert_dsk_to_jsonl.py $PATH_TO_DSK_DIR $OUTPUT_PATH $DSK_CLIENT
```

119 changes: 119 additions & 0 deletions data-processing/scripts/dsk/convert_dsk_to_jsonl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Convert from various file formats (pdf, docx, etc.).

To this format:
{
"id": "...", # MANDATORY: source-specific identifier
"text": "foo", # MANDATORY: textual content of the document
"source": "...", # MANDATORY: source of the data, such as peS2o, common-crawl, etc.
"added": "...", # OPTIONAL: timestamp we acquired this data (time file was created), specified as YYYY-MM-DDTHH:MM:SS.TIMEZONE e.g 2021-04-13T12:52:46.000Z
"created": "..." # OPTIONAL: timestamp when orig document was created (best-guess if not available), should be specified as a range; "YYYY-MM-DDTHH:MM:SS.TIMEZONE, YYYY-MM-DDTHH:MM:SS.TIMEZONE"
"metadata": { # OPTIONAL: source-specific metadata
"sub-source": "...", # OPTIONAL: E.g. "newspaper_ocr"
...
}
}
"""

import logging
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING

import typer
from dfm_data.document_processing.processors import process_files
from typer import Typer

if TYPE_CHECKING:
import pandas as pd

APP = Typer(name="2JSONL Converter")


@APP.command(
name="process_directory",
help="Crawl a directory and process files of different types",
)
def crawl_directory(
top_level_path: Path,
output_path: Path,
dsk_client: str,
output_suffix: str = ".jsonl.gz",
n_workers: int = 4,
):
"""Process a set of data delivered from a DSK organisation.

Args:
top_level_path: Path to a directory with the data delivered by the DSK organization
output_path: Path to place the processed data
dsk_client: What DSK organizations pages have been crawled
output_suffix: What suffix to use. Defaults to ".jsonl.gz".
n_workers: How many process to run in parallel. Defaults to 4.
"""
files = list(top_level_path.glob("**/*.*"))

files = list(filter(lambda x: x.is_file(), files))

if len(files) == 0:
logging.error("Something went wrong. No files to process")
raise typer.Exit(code=1)

process_files(files, output_path, dsk_client, output_suffix, n_workers)


@APP.command(
name="process_web_crawl",
help="Process output from a web crawl",
)
def process_web_crawl(
path_to_crawl_log: Path,
output_path: Path,
data_path: Path,
dsk_client: str,
output_suffix: str = ".jsonl.gz",
n_workers: int = 4,
):
"""Process a set of crawled data from a DSK organisation.

Args:
path_to_crawl_log: Path to a log file from the crawl
output_path: Path to place the processed data
data_path: Path where the crawled data is located
dsk_client: What DSK organizations pages have been crawled
output_suffix: What suffix to use. Defaults to ".jsonl.gz".
n_workers: How many process to run in parallel. Defaults to 4.
"""
# Define the command as a list of strings
command = ["grep", "^--", path_to_crawl_log]
failed = False
# Run the command and capture the output
try:
result = subprocess.run(command, text=True, capture_output=True, check=True)
# Filter the third column using Python (equivalent to `awk '{print $3}'`)
main_folders = {
line.split()[2].split("/")[2] for line in result.stdout.splitlines()
}
except subprocess.CalledProcessError as e:
logging.error(f"Command failed with error: {e}")
failed = True

if failed:
raise typer.Exit(code=1)

files: list[Path] = []
for main_folder in main_folders:
if not (data_path / main_folder).exists():
continue
files.extend(list((data_path / main_folder).glob("**/*.*")))

files = list(filter(lambda x: x.is_file(), files))

if len(files) == 0:
logging.error("Something went wrong. No files to process")
raise typer.Exit(code=1)

process_files(files, output_path, dsk_client, output_suffix, n_workers)


if __name__ == "__main__":
APP()
Empty file.
221 changes: 221 additions & 0 deletions data-processing/src/dfm_data/document_processing/processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""This module contains processing methods for extracting text from various documents."""

import gzip
import json
import re
from dataclasses import asdict
from pathlib import Path
from typing import TYPE_CHECKING, Any

from docling.datamodel.document import TableItem, TextItem
from extract_msg import openMsg
from joblib import Parallel, delayed
from loguru import logger
from pypandoc import convert_file
from tqdm import tqdm
from trafilatura import extract as extract_html_text

from .utils import (
build_document_converter,
build_metadata,
create_JSONL,
find_near_duplicates,
generate_decode_url,
make_unique,
remove_newlines,
)

if TYPE_CHECKING:
import pandas as pd


def process_msg(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001
"""Read a single MSG file and build a JSONL object. Uses Trafilatura for the extraction.

Args:
file_path: Path to the HTML file
source: What is the data source (most likely DSK client)
**kwargs: Additional arguments

Returns:
str: JSONL line with the file content
"""

def replace_url(match: re.Match) -> str:
url = match.group(0)
decoded_url = generate_decode_url(url)
if decoded_url:
return decoded_url
return url

text = openMsg(file_path).body
text = re.sub(r"(\n\s)+", "\n", text)
text = re.sub(r"\[.+?\]", "", text)
text = text.replace("\r", "")
text = re.sub(r"https?://[^>]+", replace_url, text)
metadata = build_metadata(file_path)
return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False)


def process_html(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001
"""Read a single HTML file and build a JSONL object. Uses Trafilatura for the extraction.

Args:
file_path: Path to the HTML file
source: What is the data source (most likely DSK client)
**kwargs: Additional arguments

Returns:
str: JSONL line with the file content
"""
text = extract_html_text(file_path.read_text())
text = re.sub(r"(\n\s)+", "\n", text)
metadata = build_metadata(file_path)
return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False)


def process_epub(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001
"""Read a single EPUB file and build a JSONL object. Uses Pypandoc for the extraction.

Args:
file_path: Path to the EPUB file
source: What is the data source (most likely DSK client)
**kwargs: Additional arguments

Returns:
str: JSONL line with the file content
"""
text = convert_file(file_path, to="plain", format="epub")
text = re.sub(r"(\n\s)+", "\n", text)
metadata = build_metadata(file_path)
return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False)


def process_txt(file_path: Path, source: str, **kwargs: dict[str, Any]) -> str: # noqa: ARG001
"""Read a single TXT file and build a JSONL object

Args:
file_path: Path to the TXT file
source: What is the data source (most likely DSK client)
**kwargs: Additional arguments

Returns:
str: JSONL line with the file content
"""
text = file_path.read_text()
text = re.sub(r"(\n\s)+", "\n", text)
metadata = build_metadata(file_path)
return json.dumps(asdict(create_JSONL(text, source, metadata)), ensure_ascii=False)


def process_document(
file: Path,
source: str,
**kwargs: dict[str, Any],
) -> str | None:
"""
Process a single file, converting it into a JSONL string.

Args:
file: Path to the input file.
source: Source identifier for the data.
**kwargs: Extra arguments

Returns:
str | None: JSONL string if the file is processed successfully, else None.
"""
doc_converter = kwargs.get("converter", build_document_converter())
try:
result = doc_converter.convert(file)

metadata = build_metadata(result.input)

file_content = ""
# Iterate the elements in reading order, including hierarchy level
for item, _ in result.document.iterate_items():
if isinstance(item, TextItem):
if item.text.strip():
file_content += item.text
elif isinstance(item, TableItem):
table_df: pd.DataFrame = item.export_to_dataframe()
dups = find_near_duplicates(table_df, 0.8)
column_counts = {}
table_df.columns = [
make_unique(col, column_counts) for col in table_df.columns
]
columns_to_keep = table_df.columns.copy(deep=True)
for pair in dups:
if pair[1] in columns_to_keep:
columns_to_keep = columns_to_keep.drop(pair[1])

df_cleaned = table_df[list(columns_to_keep)]
df_cleaned = remove_newlines(df_cleaned)
file_content += df_cleaned.to_markdown(index=False, tablefmt="github")
file_content += "\n\n"

# Create and return JSONL entry
return json.dumps(
asdict(create_JSONL(file_content, source, metadata)),
ensure_ascii=False,
)
except Exception as e:
logger.error(f"Failed to process file {file}: {e}")
return None


def process_file(file: Path, dsk_client: str, **kwargs: dict) -> str | None:
"""Generic method for processing a file. Will find the file type and use the right processing method.

Args:
file: Path to the file to process
dsk_client: What DSK client have delivered the file
**kwargs: Extra arguments

Returns:
str | None: Returns a JSONL line if the file type is supported, else None.
"""
if file.is_dir():
return None

suffix = file.suffix
method = {
".pdf": process_document,
".html": process_html,
".docx": process_document,
".epub": process_epub,
".txt": process_txt,
".pptx": process_document,
".md": process_document,
".msg": process_msg,
}.get(suffix)

if not method:
logger.warning(f"Unsupported file type: {suffix} - for file: {file!s}")
return None

return method(file, dsk_client, **kwargs)


def process_files(
files: list[Path],
output_path: Path,
dsk_client: str,
output_suffix: str = ".jsonl.gz",
n_workers: int = 4,
):
save_file = output_path
if "".join(output_path.suffixes) != ".jsonl.gz":
save_file = output_path / (dsk_client + output_suffix)

converter = build_document_converter()
parallel = Parallel(n_jobs=n_workers, return_as="generator_unordered")
save_file.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(save_file, mode="wb") as out_file:
# with (output_path / output_name).open("w+") as out_file:
for doc in parallel(
delayed(process_file)(file, dsk_client, converter=converter)
for file in tqdm(files)
):
if doc is None:
continue
out_file.write(f"{doc}\n".encode())
Loading