Skip to content

Commit

Permalink
format adjustments; docstrings; more helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
e-maud committed Aug 27, 2024
1 parent dd30fbc commit 173dc6e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 26 deletions.
21 changes: 13 additions & 8 deletions impresso_commons/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
# created on 2018.03.27 using PyCharm
# project impresso-image-acquisition

import datetime
import logging
import multiprocessing
import sys
import time
import datetime
from datetime import timedelta

import dask
from dask import compute, delayed
from dask import compute
from dask.diagnostics import ProgressBar
from dask.multiprocessing import get as mp_get
import multiprocessing

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,7 +51,7 @@ def init_logger(logger, log_level, log_file):
ch.setFormatter(formatter)

logger.addHandler(ch)
logger.info("Logger successfully initialised")
logger.info("LOGGER - Logger successfully initialised")

return logger

Expand Down Expand Up @@ -87,14 +87,18 @@ def user_confirmation(question, default=None):
sys.stdout.write("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")



def user_question(variable_to_confirm):
answer = user_confirmation(f"Is [{variable_to_confirm}] the correct one to work with?", None)
answer = user_confirmation(
f"\tIs the following the correct item to work with?\n"
f"{variable_to_confirm}",
None
)

if not answer:
logger.info(f"Variable {variable_to_confirm} not confirmed, exiting.")
logger.info("Variable not confirmed, exiting.")
sys.exit()
else:
logger.info(f"Variable {variable_to_confirm} confirmed.")
logger.info("Variable confirmed.")


def timestamp():
Expand All @@ -111,6 +115,7 @@ def timestamp():

class Timer:
""" Basic timer"""

def __init__(self):
self.start = time.time()
self.intermediate = time.time()
Expand Down
46 changes: 30 additions & 16 deletions impresso_commons/utils/daskutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,56 @@
--config-file=<cf> json configuration dict specifying various arguments
"""

import os
import logging
import docopt
import os

from dask.diagnostics import ProgressBar
import dask.bag as db
import docopt
import numpy as np
from dask.bag import Bag
from dask.diagnostics import ProgressBar

from impresso_commons.utils import init_logger
from impresso_commons.utils import Timer
from impresso_commons.path.path_s3 import s3_filter_archives
from impresso_commons.utils.s3 import get_bucket, read_jsonlines, readtext_jsonlines
from impresso_commons.utils.s3 import IMPRESSO_STORAGEOPT
from impresso_commons.utils import Timer
from impresso_commons.utils import init_logger
from impresso_commons.utils.config_loader import PartitionerConfig
from impresso_commons.utils.s3 import IMPRESSO_STORAGEOPT
from impresso_commons.utils.s3 import get_bucket, read_jsonlines, readtext_jsonlines

__author__ = "maudehrmann"

logger = logging.getLogger(__name__)


def partitioner(bag, path, nbpart):
"""Partition a bag into n partitions and write each partition in a file"""
grouped_items = bag.groupby(lambda x: np.random.randint(500), npartitions=nbpart)
def partitioner(bag: Bag,
path: str,
nb_partitions: int) -> None:
"""
Partition a Dask bag into n partitions and write each to a separate file.
Args:
bag (dask.bag.Bag): The Dask bag to be partitioned.
path (str): Directory path where partitioned files will be saved.
nb_partitions (int): Number of partitions to create.
Returns:
None: The function writes partitioned files to the specified path.
"""
grouped_items = bag.groupby(lambda x: np.random.randint(500),
npartitions=nb_partitions)
items = grouped_items.map(lambda x: x[1]).flatten()
path = os.path.join(path, "*.jsonl.bz2")
with ProgressBar():
items.to_textfiles(path)


def create_even_partitions(
bucket,
config_newspapers,
output_dir,
local_fs=False,
keep_full=False,
nb_partition=500,
bucket,
config_newspapers,
output_dir,
local_fs=False,
keep_full=False,
nb_partition=500,
):
"""Convert yearly bz2 archives to even bz2 archives, i.e. partitions.
Expand Down
5 changes: 3 additions & 2 deletions impresso_commons/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from smart_open.s3 import iter_bucket

from impresso_commons.utils import _get_cores
from utils.utils import bytes_to
from impresso_commons.utils.utils import bytes_to

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -557,6 +557,7 @@ def list_s3_directories(bucket_name, prefix=''):
list: A list of 'directory' names found in the specified bucket
and prefix.
"""
logger.info(f"Listing 'folders'' of '{bucket_name}' under prefix '{prefix}'")
s3 = get_s3_client()
result = s3.list_objects_v2(
Bucket=bucket_name, Prefix=prefix, Delimiter='/'
Expand All @@ -568,7 +569,7 @@ def list_s3_directories(bucket_name, prefix=''):
prefix['Prefix'][:-1].split("/")[-1]
for prefix in result['CommonPrefixes']
]

logger.info(f"Returning {len(directories)} directories.")
return directories


Expand Down

0 comments on commit 173dc6e

Please sign in to comment.