Skip to content

Commit

Permalink
Process data per newspaper in compute_manifest()
Browse files Browse the repository at this point in the history
  • Loading branch information
piconti committed May 29, 2024
1 parent d5d0169 commit cfe3aa4
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 31 deletions.
109 changes: 79 additions & 30 deletions impresso_commons/versioning/compute_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import dask.bag as db
from dask.distributed import Client
from impresso_commons.path.path_s3 import list_newspapers
from impresso_commons.utils.s3 import fixed_s3fs_glob, IMPRESSO_STORAGEOPT
from impresso_commons.utils.utils import init_logger
from impresso_commons.versioning.helpers import (
Expand Down Expand Up @@ -57,15 +58,35 @@
]


def get_files_to_consider(config: dict[str, Any]) -> Union[list[str], None]:
def extract_np_key(s3_key: str, bucket: str) -> str:
"""Extract the newspaper an s3:key corresponds to given the bucket and partition
eg. s3_key is in format:
- s3_key: 's3://31-passim-rebuilt-staging/passim/indeplux/indeplux-1889.jsonl.bz2'
- bucket: '31-passim-rebuilt-staging/passim'
--> returns 'indeplux'
Args:
s3_key (str): Full S3 path of a file (as returned by fixed_s3fs_glob).
bucket (str): S3 bucket, including partition, in which the newspaper dirs are.
Returns:
str: Name of the corresponding newspaper, extracted form the s3 path.
"""
# in format: 's3://31-passim-rebuilt-staging/passim/indeplux/indeplux-1889.jsonl.bz2'
return s3_key.replace(f"s3://{bucket}/", "").split("/")[0]


def get_files_to_consider(config: dict[str, Any]) -> Union[dict[str, list[str]], None]:
"""Get the list of S3 files to consider based on the provided configuration.
Args:
config (dict[str, Any]): Configuration parameters with the s3 bucket, titles,
and file extensions
Returns:
list[str] | None: List of the s3 files to consider, or None if no files found.
dict[str, list[str]] | None: Dict mapping each newspaper to the s3 files to
consider, or None if no files found.
Raises:
ValueError: If `file_extensions` in the config is empty or None.
Expand All @@ -79,15 +100,24 @@ def get_files_to_consider(config: dict[str, Any]) -> Union[list[str], None]:
# if newspapers is empty, include all newspapers
if config["newspapers"] is None or len(config["newspapers"]) == 0:
logger.info("Fetching the files to consider for all titles...")
# TODO update list_newspapers to include possibility of partition, and unify both cases
# return all filenames in the given bucket partition with the correct extension
return fixed_s3fs_glob(os.path.join(config["output_bucket"], extension_filter))
files = fixed_s3fs_glob(os.path.join(config["output_bucket"], extension_filter))
s3_files = {}
for s3_key in files:
np = extract_np_key(s3_key, config["output_bucket"])
if np in s3_files:
s3_files[np].append(s3_key)
else:
s3_files[np] = [s3_key]
return s3_files

# here list newspapers instead and s3_files becomes a dict np -> liest of files
logger.info("Fetching the files to consider for titles %s...", config["newspapers"])
s3_files = []
s3_files = {}
for np in config["newspapers"]:
s3_files.extend(
fixed_s3fs_glob(os.path.join(config["output_bucket"], np, extension_filter))
s3_files[np] = fixed_s3fs_glob(
os.path.join(config["output_bucket"], np, extension_filter)
)

return s3_files
Expand Down Expand Up @@ -158,6 +188,10 @@ def create_manifest(
) -> None:
"""Given its configuration, generate the manifest for a given s3 bucket partition.
TODO: add option to agg for all titles together if desired
TODO: add iptions to exclude NP for all agg types
TODO: separate further into functions
Note:
The contents of the configuration file (or dict) are given in markdown file
`impresso_commons/data/manifest_config/manifest.config.example.md``
Expand All @@ -177,19 +211,14 @@ def create_manifest(

logger.info("Starting to generate the manifest for DataStage: '%s'", stage)

# fetch the names of the files to consider
# fetch the names of the files to consider separated per title
s3_files = get_files_to_consider(config_dict)
# in format: 's3://31-passim-rebuilt-staging/passim/indeplux/indeplux-1889.jsonl.bz2'
# --> sep per title?

logger.info("Collected a total of %s files, reading them...", len(s3_files))
logger.debug("The list of files selected is: %s", s3_files)
# load the selected files in dask bags
processed_files = db.read_text(s3_files, storage_options=IMPRESSO_STORAGEOPT).map(
json.loads
logger.info(
"Collected a total of %s files, reading them...", len(s3_files.values())
)

logger.info("Files loaded successfully, initialising the manifest.")
logger.info("Files identified successfully, initialising the manifest.")
# init the git repo object for the processing's repository.
repo = git.Repo(config_dict["git_repository"])

Expand Down Expand Up @@ -217,22 +246,42 @@ def create_manifest(
only_counting=only_counting,
)

logger.info("Starting to compute the statistics on the fetched files...")
computed_stats = compute_stats_for_stage(processed_files, stage, client)
# processing newspapers one at a time
for np_title, np_s3_files in s3_files.items():

logger.info(
"Populating the manifest with the resulting %s yearly statistics found...",
len(computed_stats),
)
logger.debug("computed_stats: %s", computed_stats)

for stats in computed_stats:
title = stats["np_id"]
year = stats["year"]
del stats["np_id"]
del stats["year"]
logger.debug("Adding %s to %s-%s", stats, title, year)
manifest.add_by_title_year(title, year, stats)
logger.debug(
"The list of files selected for %s is: %s",
np_title,
list(np_s3_files.values()),
)
# load the selected files in dask bags
processed_files = db.read_text(
np_s3_files, storage_options=IMPRESSO_STORAGEOPT
).map(json.loads)

logger.info(
"%s - Starting to compute the statistics on the fetched files...",
np_title,
)
computed_stats = compute_stats_for_stage(processed_files, stage, client)

logger.info(
"%s - Populating the manifest with the resulting %s yearly statistics found...",
np_title,
len(computed_stats),
)
logger.debug("%s - computed_stats: %s", np_title, computed_stats)

for stats in computed_stats:
title = stats["np_id"]
year = stats["year"]
del stats["np_id"]
del stats["year"]
logger.debug("Adding %s to %s-%s", stats, title, year)
manifest.add_by_title_year(title, year, stats)

logger.info("%s - Finished adding stats, going to the next title...", np_title)
logger.info("-" * 10)

logger.info("Finalizing the manifest, and computing the result...")
# Add the note to the manifest
Expand Down
1 change: 0 additions & 1 deletion impresso_commons/versioning/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,6 @@ def counts_for_rebuilt(
}
)

# print(f"{strftime('%Y-%m-%d %H:%M:%S')} – id: {rebuilt_ci['id']}")
return counts


Expand Down

0 comments on commit cfe3aa4

Please sign in to comment.