Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miljoeportal #302

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
34afcd0
Updating dolma dependency as version on pypi is now up to date
kris927b Oct 25, 2024
5f4dbda
Adding processing script for DSK data
kris927b Oct 25, 2024
9a31be0
Adhering to the desired format
kris927b Oct 25, 2024
1bc6552
Minor adjustments to file handling
kris927b Oct 25, 2024
fd7fffe
Minor change to the entry id. Using filename rather than path
kris927b Oct 25, 2024
23e9537
Extracting util into separate file
kris927b Nov 14, 2024
3fcb91b
Testing extracting documents using Docling
kris927b Nov 14, 2024
819648c
Using docling to extract data from PDF, DOCX, PPTX, MD.
kris927b Nov 19, 2024
3e6c045
Removing test files
kris927b Nov 19, 2024
e89f50f
Adding lower bounds on dependencies
kris927b Nov 20, 2024
404abf3
Moving utils to dfm-data package
kris927b Nov 20, 2024
52ad08a
Removing unused methods to clean up code
kris927b Nov 20, 2024
dc2291d
Fixing spelling error
kris927b Nov 20, 2024
bf11fcc
Separating document processing in dfm-data.
kris927b Nov 20, 2024
79dcae4
Adding missing docstrings.
kris927b Nov 20, 2024
32768f3
Adding .msg functionality
kris927b Nov 21, 2024
19244de
Adding extract-msg to pyproject.
kris927b Nov 21, 2024
0710e13
Adding a separate `process_files` method to handle the base logic of …
kris927b Dec 4, 2024
faeca4f
Refactoring `process_directory` command, and adding command to proces…
kris927b Dec 4, 2024
b169b34
Merge branch 'main' into dsk_processing
kris927b Dec 4, 2024
39a8693
Adding support for bytes streams in document processing.
kris927b Dec 5, 2024
b6207f0
Preliminary scripts for pulling data from Cellar
kris927b Dec 5, 2024
69e09ac
Minor changes
kris927b Dec 6, 2024
dd26b94
Adding parallelization
kris927b Dec 6, 2024
69bf7fb
Handling connection errors
kris927b Dec 9, 2024
7b1e5de
Fixing a problem with ordering of mtypes
kris927b Dec 9, 2024
154f8ca
Updating logger
kris927b Dec 9, 2024
ff52bc4
Adding extra error handling
kris927b Dec 9, 2024
e8f7893
Adding method to run a task in parallel using Joblib. With retries in…
kris927b Dec 13, 2024
b929d75
Using new parallel processing method with retries
kris927b Dec 13, 2024
01578b1
Adding extra fallback
kris927b Dec 13, 2024
720e2c3
Adding decode error handling
kris927b Dec 13, 2024
393feb9
A sequential download option of the miljoeportal data.
kris927b Dec 16, 2024
8988271
Adding the possibility to process json files
kris927b Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions data-processing/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,27 @@ dependencies = [
"requests>=2.31.0",
"polars>0.19.1",
# Data cleaning dependencies:
"dolma[pii,code]@git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released
"kenlm>=0.2.0", # Used for perplexity tagging
"blingfire>=0.1.8", # Used for perplexity tagging
"dolma[pii,code]",
# @git+https://github.com/allenai/dolma.git@476629dc4d8d804dd2123509dc48b549e6b49dfb", # Install from git until a 1.0.2 package is released
"kenlm>=0.2.0",
# Used for perplexity tagging
"blingfire>=0.1.8",
# Used for perplexity tagging
"mosaicml-streaming",
"orjsonl",
"tqdm",
"zstandard",
"nlp_dedup",
"datatrove>=0.2.0",
"pyyaml",
"docling>=1.20.0",
"joblib>=1.4.2",
"pdfminer-six>=20240706",
"pypandoc-binary>=1.14",
"trafilatura>=1.9.0",
"typer>=0.12.5",
"loguru>=0.7.2",
"extract-msg>=0.52.0",
]

[project.optional-dependencies]
Expand Down
5 changes: 5 additions & 0 deletions data-processing/scripts/cellar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# European Union Publications Office - Cellar

The data is pulled in two steps:
1. `catalog.py` - Pull a list of all publictaions from `start_date` to `end_date`
2. `ingest.py` - Download the individual publications and process them.
134 changes: 134 additions & 0 deletions data-processing/scripts/cellar/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""
Pull a list of publications from the European Union Publications Office.
"""
import collections
import datetime
import json
import logging
import re
import sys
from pathlib import Path
from string import Template
from typing import Any

import requests
from typer import Typer

APP = Typer(name="Cellar CLI")


def parse_response(content: bytes):
"""
Parses the JSON response from the European Union Publications Office API.

Args:
content: The raw JSON response from the API.

Yields:
dict: A dictionary containing information about each publication.
"""
for row in json.loads(content)["results"]["bindings"]:
# Split the langs, types, and titles by semicolons
data = {k: v["value"] for k, v in row.items()}
langs = data["langs"].split(";")
types = data["types"]
titles = data["titles"]

pattern = "^" + ";".join([f"{lang}:(?P<{lang}>.*)" for lang in langs]) + "$"
titles = re.match(pattern, titles, re.DOTALL).groupdict()
titles = {k: v.strip() for k, v in titles.items()}

types = collections.defaultdict(list)
for spec in data["types"].split(";"):
lang, mtype = spec.split(":")
types[lang].append(mtype)

subjects = [
subj for subject in data["subjects"].split(";") if (subj := subject.strip())
]

for lang in langs:
ret = {
"language": lang,
"title": titles[lang],
"type": types[lang],
"subjects": subjects,
"work": data["work"],
"timestamp": data["timestamp"],
}
yield ret


def sparql_query(
start: str,
stop: str,
query: Template,
) -> bytes | Any | None:
"""
Execute a SPARQL query on the European Union Publications Office API.

Args:
start: The starting date for the query.
stop: The ending date for the query.
query: A template string containing the SPARQL query.

Returns:
bytes | Any | None: The response content if the query is successful, otherwise None.
"""
logging.info(f"querying {start}-{stop}")
params = {"query": query.substitute(start=start, stop=stop)}
headers = {"accept": "application/json"}
resp = requests.get(
"http://publications.europa.eu/webapi/rdf/sparql",
params=params,
headers=headers,
)
if resp.status_code == 200:
return resp.content

logging.warning(f"Query failed {resp.status_code}")
return None


@APP.command(
name="fetch_cellar",
help="Fetch publications from the European Union Publications Office.",
)
def main(
output_file: Path,
query_template: Path,
start_date: datetime.datetime = "2016-07-21",
end_date: datetime.datetime = "2024-12-31",
delta: int = 5,
):
"""
Fetch publications from the European Union Publications Office API and write them to an output file.

Args:
output_file: The path to the output file where the publications will be written.
query_template: The path to a template file containing the SPARQL query.
start_date (optional): The starting date for the query. Defaults to "2016-07-21".
end_date (optional): The ending date for the query. Defaults to "2024-12-31".
delta (optional): The number of days between each query. Defaults to 5.
"""
delta = datetime.timedelta(delta)
with query_template.open() as handle:
query = Template(handle.read())

lb = start_date

logging.basicConfig(level=logging.INFO)
with output_file.open("w") as f:
while lb < end_date:
ub = min(lb + delta, end_date)
result = sparql_query(lb.isoformat(), ub.isoformat(), query)
if not result:
lb = ub
continue
for doc in parse_response(result):
f.write(json.dumps(doc, ensure_ascii=False) + "\n")
lb = ub


if __name__ == "__main__":
APP()
201 changes: 201 additions & 0 deletions data-processing/scripts/cellar/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
Convert from various file formats (pdf, docx, etc.).

To this format:
{
"id": "...", # MANDATORY: source-specific identifier
"text": "foo", # MANDATORY: textual content of the document
"source": "...", # MANDATORY: source of the data, such as peS2o, common-crawl, etc.
"added": "...", # OPTIONAL: timestamp we acquired this data (time file was created), specified as YYYY-MM-DDTHH:MM:SS.TIMEZONE e.g 2021-04-13T12:52:46.000Z
"created": "..." # OPTIONAL: timestamp when orig document was created (best-guess if not available), should be specified as a range; "YYYY-MM-DDTHH:MM:SS.TIMEZONE, YYYY-MM-DDTHH:MM:SS.TIMEZONE"
"metadata": { # OPTIONAL: source-specific metadata
"sub-source": "...", # OPTIONAL: E.g. "newspaper_ocr"
...
}
}
"""

import gzip
import io
import itertools
import json
import time
import zipfile
from datetime import date, datetime
from pathlib import Path
from typing import TextIO

import requests
from dfm_data.document_processing.processors import process_file
from dfm_data.document_processing.utils import parallel_process_with_retries
from loguru import logger
from requests.exceptions import ConnectionError, ReadTimeout
from typer import Typer

APP = Typer(name="Cellar CLI")


# Match available formats to the preference order
def ordered_match(options: list[str]) -> list[str]:
# Preferred file types order
preference = [
"txt",
"html",
"xhtml",
"epub",
"docx",
"doc",
"pdf",
"pdf1x",
"pdfa1a",
"pdfa1b",
"pdfa2a",
"pdfx",
"pdfx4",
"fmx4",
"xml",
"RDF/XML",
"mobi",
]
opts = set(options)
return [p for p in preference if p in opts]


def process_documents(lang: str, docs: list[dict], path: Path):
"""
Process a list of documents and save the results in a compressed file.

Args:
lang: The language to fetch the document for.
docs: A list of dictionaries representing the documents to process. Each dictionary must contain at least a "work" key with the URI of the document, and a "type" key with a list of available formats.
path: The path where the compressed file will be saved.
"""
logger.info(f"{path} - {len(docs)}")
with gzip.open(
path,
"wt",
encoding="utf-8",
) as gzfile, requests.Session() as session:
for doc in docs:
uri = doc["work"]
types = doc["type"]
name = uri.split("/")[-1]
logger.info(f"Ingesting {uri}")

# Fetch document content in the preferred format for each language
try:
if not fetch_and_process_doc(lang, gzfile, session, uri, types, name):
logger.warning(f"No valid format for {uri} {lang}")
except ConnectionError:
logger.error(f"Connection error for {uri} - {lang}")
except ReadTimeout:
logger.error(f"Read timeour for {uri} - {lang}")


def fetch_and_process_doc(
lang: str,
gzfile: TextIO,
session: requests.Session,
uri: str,
types: list[str],
name: str,
) -> bool:
"""
Fetch and process a document in the preferred format.

Args:
lang: The language to fetch the document for.
gzfile: A text file where the processed content will be written.
session: An instance of `requests.Session` that is used to make HTTP requests.
uri: The URI of the document.
types: A list of available formats for the document.
name: The name of the document.

Returns:
True if the document was successfully fetched and processed, False otherwise.
"""
for mtype in ordered_match(types):
header = {
"accept": f"application/zip;mtype={mtype}",
"accept-language": lang.lower(),
}
time.sleep(1.0)
response = session.get(uri, headers=header)
if response.status_code == 200:
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
process_zip_content(gzfile, f"Cellar-{lang}-{name}", z)
return True
return False


def process_zip_content(gzfile: TextIO, name: str, z: zipfile.ZipFile):
"""
Process the contents of a ZIP file and write the processed text to a gzip file.

Args:
gzfile: A text file where the processed content will be written.
name: The name of the document being processed.
z: An instance of `zipfile.ZipFile` containing the contents of the ZIP file.
"""
for zip_info in z.infolist():
with z.open(zip_info) as file:
text = process_file(file, name)
if text:
gzfile.write(text + "\n")


# Group documents by date
def get_groups(infile: Path) -> list[tuple[tuple[str, date], itertools.groupby]]:
"""
Generate groups of documents based on language and date.

Args:
infile: The path to the input file containing document data in JSON format.

Yields:
A tuple containing a key (language, date) and a group of documents that match the key.
"""

def get_group(doc: dict) -> tuple[str, date]:
return (doc["language"], datetime.fromisoformat(doc["timestamp"]).date())

with infile.open() as handle:
docs = (json.loads(line) for line in handle)
docs = sorted(docs, key=get_group)
grouped = itertools.groupby(docs, get_group)
return [(key, list(group)) for key, group in grouped]


@APP.command(
name="process_cellar",
help="Process documents and save the results in a compressed file.",
)
def main(infile: Path, outdir: Path, workers: int = 2):
"""
Process documents from the given input file and save the results in compressed JSONL files.

Args:
infile: The path to the input file containing document metadata.
outdir: The directory where the compressed JSONL files will be saved.
workers: Number of parallel workers. Defaults to 2.
"""

def process_group(group: tuple[tuple[str, date], itertools.groupby]):
(lang, date), docs = group
path = outdir / f"{lang}-{date.year}-{date.month}-{date.day}.jsonl.gz"

if path.exists():
logger.warning(f"{path} already exists! Skipping")
return

path.parent.mkdir(parents=True, exist_ok=True)

process_documents(lang, list(docs), path)

groups = get_groups(infile)
parallel_process_with_retries(process_group, groups, retries=10, n_workers=workers)


# Main script execution
if __name__ == "__main__":
APP()
Loading