diff --git a/lollipop/cli/deconvolute.py b/lollipop/cli/deconvolute.py index 48ff27f..26e6efb 100755 --- a/lollipop/cli/deconvolute.py +++ b/lollipop/cli/deconvolute.py @@ -68,6 +68,8 @@ class DeconvBootstrapsArgsNoSeed(TypedDict): n_cores : int The number of cores to use for parallel processing, (Only used for all locastion progress bar parr/sequc) + par_bar: int + None zero: display progress bars in sub-processes location : str The location to deconvolute. lod_df: pd.DataFrame @@ -108,6 +110,7 @@ class DeconvBootstrapsArgsNoSeed(TypedDict): """ n_cores: int + par_bar: int location: str loc_df: pd.DataFrame bootstrap: int @@ -174,6 +177,7 @@ def _deconvolute_bootstrap(args: DeconvBootstrapsArgsNoSeed) -> List[pd.DataFram # Unpack the arguments n_cores = args["n_cores"] + par_bar = args["par_bar"] location = args["location"] loc_df = args["loc_df"] bootstrap = args["bootstrap"] @@ -216,11 +220,15 @@ def _deconvolute_bootstrap(args: DeconvBootstrapsArgsNoSeed) -> List[pd.DataFram trange(bootstrap, desc=location, leave=(len(locations_list) > 1)) if n_cores == 1 # Progress bar for parallel bootstrapping (at fixed per core position) - else trange( - bootstrap, - desc=f"core{workerid}: {location}", - leave=False, - position=workerid + 1, + else ( + trange( + bootstrap, + desc=f"core{workerid}: {location}", + leave=False, + position=workerid + 1, + ) + if par_bar + else range(bootstrap) ) ) if bootstrap > 1 @@ -230,6 +238,8 @@ def _deconvolute_bootstrap(args: DeconvBootstrapsArgsNoSeed) -> List[pd.DataFram logging.info(f"bootstrap: {b}") start_time_b = time.time() if bootstrap > 1: + if n_cores > 1 and par_bar == 0: + print(f"core{workerid}: {location} - {b}/{bootstrap}") # resample if we're doing bootstrapping assert ( namefield in loc_df.columns @@ -419,6 +429,15 @@ def _deconvolute_bootstrap(args: DeconvBootstrapsArgsNoSeed) -> List[pd.DataFram type=int, help="Cores for parallel processing of location, default 1 for sequential processing.", ) +@click.option( + "--par-bar", + "--pb", + metavar="BAR", + required=False, + default=2, + type=int, + help="Progress bar when parallel processing: 0-Disable, 1-Display, 2-Fix Windows/Mac OS npn-pickleable RLock error in tqdm", +) @click.option( "--namefield", "-nf", @@ -437,6 +456,7 @@ def deconvolute( filters, seed, n_cores, + par_bar, output, fmt_columns, out_json, @@ -719,6 +739,7 @@ def deconvolute( args_list: List[DeconvBootstrapsArgs] = [ { "n_cores": n_cores, + "par_bar": par_bar,, "location": location, "loc_df": loc_df, "bootstrap": bootstrap, @@ -747,13 +768,19 @@ def deconvolute( all_deconv = [_deconvolute_bootstrap_wrapper(args) for args in args_list] # Run the deconvolution in parallel else: - multiprocessing.freeze_support() - tqdm.set_lock(multiprocessing.Manager().Lock()) + # enable parallel bars + bar_lock = {} + if par_bar: + if par_bar == 2: + # this fixes error message about not pickleable _threads.RLock in some versions of tqdm on Windows and Mac OS + multiprocessing.freeze_support() + tqdm.set_lock(multiprocessing.Manager().Lock()) + + # enable multiprocessing in tqdm + bar_lock = {"initializer": tqdm.set_lock, "initargs": (tqdm.get_lock(),)} # Create a pool of workers - with multiprocessing.Pool( - processes=n_cores, initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),) - ) as pool: + with multiprocessing.Pool(processes=n_cores, **bar_lock) as pool: # Map the function to the arguments in parallel - choosing imap as objects are large instaed of pool.map all_deconv = list( tqdm( diff --git a/tests/test_integration.py b/tests/test_integration.py index 9bbdd99..c265533 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -12,6 +12,8 @@ def test_workflow(): [ "lollipop", "deconvolute", + "--n-cores=2", + "--par-bar=1", "--output=test_results.csv", "--out-json=test_results.json", "--fmt-columns", @@ -19,6 +21,7 @@ def test_workflow(): "--deconv-config=presets/deconv_linear.yaml", "--filters=filters_preprint.yaml", "--location=Zürich (ZH)", + "--location=Genève (GE)", "--seed=42", "preprint/data/tallymut_line_full.tsv.zst", ]