Skip to content

Commit

Permalink
Merge branch 'main' into cellpose
Browse files Browse the repository at this point in the history
  • Loading branch information
mzouink authored May 9, 2024
2 parents fbfff32 + d81281f commit 880946c
Show file tree
Hide file tree
Showing 140 changed files with 11,823 additions and 689 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
# platform: [ubuntu-latest, windows-latest, macos-latest]
platform: [ubuntu-latest]
python-version: ['3.10', '3.11']
timeout-minutes: 20

steps:
- uses: actions/checkout@v3
Expand Down
8 changes: 8 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@ This will also be run automatically when a PR is made to master and a codecov re
- For a completely new feature, make a branch off of the `dev/main` branch of CellMap's fork of DaCapo with a name describing the feature. If you are collaborating on a feature that already has a branch, you can branch off that feature branch.
- Currently, you should make your PRs into the `dev/main` branch of CellMap's fork, or the feature branch you branched off of. PRs currently require one maintainer's approval before merging. Once the PR is merged, the feature branch should be deleted.
- `dev/main` will be regularly merged to `main` when new features are fully implemented and all tests are passing.


## Documentation
Documentation is built using Sphinx. To build the documentation locally, run
```bash
sphinx-build -M html docs/source docs/build
```
This will generate the html files in the `docs/build/html` directory.
2 changes: 1 addition & 1 deletion dacapo/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
create_weights_store,
)

from pathlib import Path
from upath import UPath as Path

logger = logging.getLogger(__name__)

Expand Down
65 changes: 50 additions & 15 deletions dacapo/blockwise/argmax_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pathlib import Path
from upath import UPath as Path
import sys
from dacapo.experiments.datasplits.datasets.arrays.zarr_array import ZarrArray
from dacapo.store.array_store import LocalArrayIdentifier
Expand Down Expand Up @@ -27,6 +27,12 @@
default="INFO",
)
def cli(log_level):
"""
CLI for running the threshold worker.
Args:
log_level (str): The log level to use.
"""
logging.basicConfig(level=getattr(logging, log_level.upper()))


Expand All @@ -47,7 +53,17 @@ def start_worker(
input_dataset: str,
output_container: Path | str,
output_dataset: str,
return_io_loop: bool = False,
):
"""
Start the threshold worker.
Args:
input_container (Path | str): The input container.
input_dataset (str): The input dataset.
output_container (Path | str): The output container.
output_dataset (str): The output dataset.
"""
# get arrays
input_array_identifier = LocalArrayIdentifier(Path(input_container), input_dataset)
input_array = ZarrArray.open_from_array_identifier(input_array_identifier)
Expand All @@ -57,34 +73,51 @@ def start_worker(
)
output_array = ZarrArray.open_from_array_identifier(output_array_identifier)

# wait for blocks to run pipeline
client = daisy.Client()
def io_loop():
# wait for blocks to run pipeline
client = daisy.Client()

while True:
print("getting block")
with client.acquire_block() as block:
if block is None:
break
while True:
print("getting block")
with client.acquire_block() as block:
if block is None:
break

# write to output array
output_array[block.write_roi] = np.argmax(
input_array[block.write_roi],
axis=input_array.axes.index("c"),
)
# write to output array
output_array[block.write_roi] = np.argmax(
input_array[block.write_roi],
axis=input_array.axes.index("c"),
)

if return_io_loop:
return io_loop
else:
io_loop()


def spawn_worker(
input_array_identifier: "LocalArrayIdentifier",
output_array_identifier: "LocalArrayIdentifier",
):
"""Spawn a worker to predict on a given dataset.
"""
Spawn a worker to predict on a given dataset.
Args:
model (Model): The model to use for prediction.
raw_array (Array): The raw data to predict on.
prediction_array_identifier (LocalArrayIdentifier): The identifier of the prediction array.
Returns:
Callable: The function to run the worker.
"""
compute_context = create_compute_context()
if not compute_context.distribute_workers:
return start_worker(
input_array_identifier.container,
input_array_identifier.dataset,
output_array_identifier.container,
output_array_identifier.dataset,
return_io_loop=True,
)

# Make the command for the worker to run
command = [
Expand All @@ -103,7 +136,9 @@ def spawn_worker(
]

def run_worker():
# Run the worker in the given compute context
"""
Run the worker in the given compute context.
"""
compute_context.execute(command)

return run_worker
Expand Down
41 changes: 40 additions & 1 deletion dacapo/blockwise/blockwise_task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
from datetime import datetime
from importlib.machinery import SourceFileLoader
from pathlib import Path
from upath import UPath as Path
from daisy import Task, Roi


class DaCapoBlockwiseTask(Task):
"""
A task to run a blockwise worker function. This task is used to run a
blockwise worker function on a given ROI.
Attributes:
worker_file (str | Path): The path to the worker file.
total_roi (Roi): The ROI to process.
read_roi (Roi): The ROI to read from for a block.
write_roi (Roi): The ROI to write to for a block.
num_workers (int): The number of workers to use.
max_retries (int): The maximum number of times a task will be retried if failed
(either due to failed post check or application crashes or network
failure)
timeout: The timeout for the task.
upstream_tasks: The upstream tasks.
*args: Additional positional arguments to pass to ``worker_function``.
**kwargs: Additional keyword arguments to pass to ``worker_function``.
Methods:
__init__:
Initialize the task.
"""

def __init__(
self,
worker_file: str | Path,
Expand All @@ -18,6 +40,23 @@ def __init__(
*args,
**kwargs,
):
"""
Initialize the task.
Args:
worker_file (str | Path): The path to the worker file.
total_roi (Roi): The ROI to process.
read_roi (Roi): The ROI to read from for a block.
write_roi (Roi): The ROI to write to for a block.
num_workers (int): The number of workers to use.
max_retries (int): The maximum number of times a task will be retried if failed
(either due to failed post check or application crashes or network
failure)
timeout: The timeout for the task.
upstream_tasks: The upstream tasks.
*args: Additional positional arguments to pass to ``worker_function``.
**kwargs: Additional keyword arguments to pass to ``worker_function``.
"""
# Load worker functions
worker_name = Path(worker_file).stem
worker = SourceFileLoader(worker_name, str(worker_file)).load_module()
Expand Down
Loading

0 comments on commit 880946c

Please sign in to comment.