diff --git a/examples/blockwise/dummy_worker.py b/examples/blockwise/dummy_worker.py index ebd7a8ef5..9bdb33e7e 100644 --- a/examples/blockwise/dummy_worker.py +++ b/examples/blockwise/dummy_worker.py @@ -70,7 +70,9 @@ def io_loop(): break # Do the blockwise process - print(f"processing block: {block.id}, with read_roi: {block.read_roi}, using arg: {arg}") + print( + f"processing block: {block.id}, with read_roi: {block.read_roi}, using arg: {arg}" + ) # DO SOMETHING WITH THE BLOCK if return_io_loop: diff --git a/examples/postprocessing/README.md b/examples/postprocessing/README.md new file mode 100644 index 000000000..436694ba4 --- /dev/null +++ b/examples/postprocessing/README.md @@ -0,0 +1,10 @@ +# Post processing example scripts for distribute blockwise processing of peroxisome data. + +The goal of the script is to : +- Gaussian filter the data +- Threshold the distance data to get binary data +- Apply watershed to get connected components +- Find the connected components +- Mask False Positives Mitochondria using Mitochondria data +- Merge crops +- Filter the connected components based on size diff --git a/examples/postprocessing/blockwise_postprocess_script.py b/examples/postprocessing/blockwise_postprocess_script.py new file mode 100644 index 000000000..5008e476d --- /dev/null +++ b/examples/postprocessing/blockwise_postprocess_script.py @@ -0,0 +1,50 @@ +from dacapo.blockwise.scheduler import run_blockwise +from funlib.geometry import Roi +from postprocessing.postprocess_worker import open_ds +import daisy +import numpy as np + +# Make the ROIs +path_to_worker = "postprocess_worker.py" +num_workers = 16 +overlap = 20 + +peroxi_container = "/path/to/peroxi_container.zarr" +peroxi_dataset = "peroxisomes" +mito_container = "/path/to/mito_container.zarr" +mito_dataset = "mitochondria" +threshold = "0.5" +gaussian_kernel = 2 + +array_in = open_ds(peroxi_container, peroxi_dataset) +total_roi = array_in.roi + +voxel_size = array_in.voxel_size +block_size = np.array(array_in.data.chunks) * np.array(voxel_size) + +write_size = daisy.Coordinate(block_size) +write_roi = daisy.Roi((0,) * len(write_size), write_size) + +context = np.array(voxel_size) * overlap + +read_roi = write_roi.grow(context, context) +total_roi = array_in.roi.grow(context, context) + + +# Run the script blockwise +success = run_blockwise( + worker_file=path_to_worker, + total_roi=total_roi, + read_roi=read_roi, + write_roi=write_roi, + num_workers=num_workers, +) + +# Print the success +if success: + print("Success") +else: + print("Failure") + +# example run command: +# bsub -n 4 python blockwise_postprocess_script.py diff --git a/examples/postprocessing/postprocess_worker.py b/examples/postprocessing/postprocess_worker.py new file mode 100644 index 000000000..b5183efd4 --- /dev/null +++ b/examples/postprocessing/postprocess_worker.py @@ -0,0 +1,210 @@ +from typing import Any, Optional +import sys +from dacapo.compute_context import create_compute_context + +import daisy + +import click + +import logging + +import skimage.measure +import skimage.filters +import skimage.morphology +from funlib.persistence import open_ds +import numpy as np +from skimage.segmentation import watershed +from scipy import ndimage as ndi + +logger = logging.getLogger(__file__) + +read_write_conflict: bool = False +fit: str = "valid" +path = __file__ + + +# OPTIONALLY DEFINE GLOBALS HERE + + +@click.group() +@click.option( + "--log-level", + type=click.Choice( + ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False + ), + default="INFO", +) +def cli(log_level): + """ + CLI for running the threshold worker. + + Args: + log_level (str): The log level to use. + """ + logging.basicConfig(level=getattr(logging, log_level.upper())) + + +@cli.command() +@click.option( + "-pc", + "--peroxi-container", + required=True, + type=str, + default=None, +) +@click.option( + "-pd", + "--peroxi-dataset", + required=True, + type=str, + default=None, +) +@click.option( + "-mc", + "--mito-container", + required=True, + type=str, + default=None, +) +@click.option( + "-md", + "--mito-dataset", + required=True, + type=str, + default=None, +) +@click.option( + "-t", + "--threshold", + required=False, + type=float, + default=0.5, +) +@click.option( + "-g", + "--gaussian-kernel", + required=False, + type=int, + default=2, +) +def start_worker( + peroxi_container, + peroxi_dataset, + mito_container, + mito_dataset, + threshold, + gaussian_kernel, + return_io_loop: Optional[bool] = False, +): + """ + Start the worker. + + Args: + peroxi_container (str): The container of the peroxisome predictions. + peroxi_dataset (str): The dataset of the peroxisome predictions. + mito_container (str): The container of the mitochondria predictions. + mito_dataset (str): The dataset of the mitochondria predictions. + threshold (float): The threshold to use for the peroxisome predictions. + gaussian_kernel (int): The kernel size to use for the gaussian filter. + + returns: + instance_peroxi (np.ndarray): The instance labels of the peroxisome predictions. + + """ + # Do something with the argument + # print(arg) + + def io_loop(): + # wait for blocks to run pipeline + client = daisy.Client() + peroxi_ds = open_ds(peroxi_container, peroxi_dataset) + mito_ds = open_ds(mito_container, mito_dataset) + + while True: + print("getting block") + with client.acquire_block() as block: + if block is None: + break + + # Do the blockwise process + peroxi = peroxi_ds.to_ndarray(block.read_roi) + mito = mito_ds.to_ndarray(block.read_roi) + + print(f"processing block: {block.id}, with read_roi: {block.read_roi}") + peroxi = skimage.filters.gaussian(peroxi, gaussian_kernel) + # threshold precictions + binary_peroxi = peroxi > threshold + # get instance labels + markers, _ = ndi.label(binary_peroxi) + # Apply Watershed + ws_labels = watershed(-peroxi, markers, mask=peroxi) + instance_peroxi = skimage.measure.label(ws_labels).astype(np.int64) + # relabel background to 0 + instance_peroxi[mito > 0] = 0 + # make mask of unwanted object class overlaps + return instance_peroxi.astype(np.uint64) + + if return_io_loop: + return io_loop + else: + io_loop() + + +def spawn_worker( + peroxi_container, + peroxi_dataset, + mito_container, + mito_dataset, + threshold, + gaussian_kernel, +): + """ + Spawn a worker. + + Args: + arg (Any): An example argument to use. + Returns: + Callable: The function to run the worker. + """ + compute_context = create_compute_context() + if not compute_context.distribute_workers: + return start_worker( + peroxi_container=peroxi_container, + peroxi_dataset=peroxi_dataset, + mito_container=mito_container, + mito_dataset=mito_dataset, + threshold=threshold, + gaussian_kernel=gaussian_kernel, + return_io_loop=True, + ) + + # Make the command for the worker to run + command = [ + sys.executable, + path, + "start-worker", + "--peroxi-container", + peroxi_container, + "--peroxi-dataset", + peroxi_dataset, + "--mito-container", + mito_container, + "--mito-dataset", + mito_dataset, + "--threshold", + str(threshold), + "--gaussian-kernel", + str(gaussian_kernel), + ] + + def run_worker(): + """ + Run the worker in the given compute context. + """ + compute_context.execute(command) + + return run_worker + + +if __name__ == "__main__": + cli()