Skip to content

Commit

Permalink
Option for parallel processing progress bar
Browse files Browse the repository at this point in the history
- disable progress bar
- option to fix error message about non-pickleable _threads.RLock
  in some versions of tqdm on Windows and Mac OS
- (test now exercices progress bar and n-cores)
  • Loading branch information
DrYak committed Nov 22, 2024
1 parent bc9a65b commit b4f0a05
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
47 changes: 37 additions & 10 deletions lollipop/cli/deconvolute.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class DeconvBootstrapsArgsNoSeed(TypedDict):
n_cores : int
The number of cores to use for parallel processing,
(Only used for all locastion progress bar parr/sequc)
par_bar: int
None zero: display progress bars in sub-processes
location : str
The location to deconvolute.
lod_df: pd.DataFrame
Expand Down Expand Up @@ -108,6 +110,7 @@ class DeconvBootstrapsArgsNoSeed(TypedDict):
"""

n_cores: int
par_bar: int
location: str
loc_df: pd.DataFrame
bootstrap: int
Expand Down Expand Up @@ -174,6 +177,7 @@ def _deconvolute_bootstrap(args: DeconvBootstrapsArgsNoSeed) -> List[pd.DataFram

# Unpack the arguments
n_cores = args["n_cores"]
par_bar = args["par_bar"]
location = args["location"]
loc_df = args["loc_df"]
bootstrap = args["bootstrap"]
Expand Down Expand Up @@ -216,11 +220,15 @@ def _deconvolute_bootstrap(args: DeconvBootstrapsArgsNoSeed) -> List[pd.DataFram
trange(bootstrap, desc=location, leave=(len(locations_list) > 1))
if n_cores == 1
# Progress bar for parallel bootstrapping (at fixed per core position)
else trange(
bootstrap,
desc=f"core{workerid}: {location}",
leave=False,
position=workerid + 1,
else (
trange(
bootstrap,
desc=f"core{workerid}: {location}",
leave=False,
position=workerid + 1,
)
if par_bar
else range(bootstrap)
)
)
if bootstrap > 1
Expand All @@ -230,6 +238,8 @@ def _deconvolute_bootstrap(args: DeconvBootstrapsArgsNoSeed) -> List[pd.DataFram
logging.info(f"bootstrap: {b}")
start_time_b = time.time()
if bootstrap > 1:
if n_cores > 1 and par_bar == 0:
print(f"core{workerid}: {location} - {b}/{bootstrap}")
# resample if we're doing bootstrapping
assert (
namefield in loc_df.columns
Expand Down Expand Up @@ -419,6 +429,15 @@ def _deconvolute_bootstrap(args: DeconvBootstrapsArgsNoSeed) -> List[pd.DataFram
type=int,
help="Cores for parallel processing of location, default 1 for sequential processing.",
)
@click.option(
"--par-bar",
"--pb",
metavar="BAR",
required=False,
default=2,
type=int,
help="Progress bar when parallel processing: 0-Disable, 1-Display, 2-Fix Windows/Mac OS npn-pickleable RLock error in tqdm",
)
@click.option(
"--namefield",
"-nf",
Expand All @@ -437,6 +456,7 @@ def deconvolute(
filters,
seed,
n_cores,
par_bar,
output,
fmt_columns,
out_json,
Expand Down Expand Up @@ -719,6 +739,7 @@ def deconvolute(
args_list: List[DeconvBootstrapsArgs] = [
{
"n_cores": n_cores,
"par_bar": par_bar,
"location": location,
"loc_df": loc_df,
"bootstrap": bootstrap,
Expand Down Expand Up @@ -747,13 +768,19 @@ def deconvolute(
all_deconv = [_deconvolute_bootstrap_wrapper(args) for args in args_list]
# Run the deconvolution in parallel
else:
multiprocessing.freeze_support()
tqdm.set_lock(multiprocessing.Manager().Lock())
# enable parallel bars
bar_lock = {}
if par_bar:
if par_bar == 2:
# this fixes error message about not pickleable _threads.RLock in some versions of tqdm on Windows and Mac OS
multiprocessing.freeze_support()
tqdm.set_lock(multiprocessing.Manager().Lock())

# enable multiprocessing in tqdm
bar_lock = {"initializer": tqdm.set_lock, "initargs": (tqdm.get_lock(),)}

# Create a pool of workers
with multiprocessing.Pool(
processes=n_cores, initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),)
) as pool:
with multiprocessing.Pool(processes=n_cores, **bar_lock) as pool:
# Map the function to the arguments in parallel - choosing imap as objects are large instaed of pool.map
all_deconv = list(
tqdm(
Expand Down
3 changes: 3 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ def test_workflow():
[
"lollipop",
"deconvolute",
"--n-cores=2",
"--par-bar=1",
"--output=test_results.csv",
"--out-json=test_results.json",
"--fmt-columns",
"--variants-config=config_preprint.yaml",
"--deconv-config=presets/deconv_linear.yaml",
"--filters=filters_preprint.yaml",
"--location=Zürich (ZH)",
"--location=Genève (GE)",
"--seed=42",
"preprint/data/tallymut_line_full.tsv.zst",
]
Expand Down

0 comments on commit b4f0a05

Please sign in to comment.