-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Separate processing steps into distinct units
Each processing step is represented by its own module: segment, filter, sample. The config entries refer to the method names in these modules. For each method, we provide additional arguments in its own yaml section.
- Loading branch information
Showing
8 changed files
with
384 additions
and
420 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ dependencies = [ | |
"confuse", | ||
"rich", | ||
"scikit-image", | ||
"tqdm", | ||
"typer" | ||
] | ||
dynamic = ["version"] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
# SPDX-FileCopyrightText: 2023 Friedrich Miescher Institute for Biomedical Research (FMI), Basel (Switzerland) | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
||
"""Collection of methods to filter a label image. | ||
Each method must accept a file path and a label image as first two arguments, | ||
and must modify the label image inplace. | ||
The file path can be used to find related files for more complex object filtering, | ||
e.g. by intensity in a different channel. | ||
""" | ||
|
||
import re | ||
from pathlib import Path | ||
|
||
from numpy import ndarray | ||
from skimage.measure import regionprops | ||
from tifffile import imread | ||
|
||
|
||
def bounding_box( | ||
tif_file: Path, | ||
labels, | ||
min_x: int, | ||
min_y: int, | ||
max_x: int, | ||
max_y: int | ||
): | ||
"""Modify 'labels' to set everything outside the bounding box to zero.""" | ||
labels[0:max(min_y, 0), :] = 0 | ||
labels[:, 0:max(min_x, 0)] = 0 | ||
y, x = labels.shape | ||
labels[min(max_y, y):y, :] = 0 | ||
labels[:, min(max_x, x):x] = 0 | ||
|
||
|
||
def area( | ||
tif_file: Path, | ||
labels: ndarray, | ||
min_area: int, | ||
max_area: int, | ||
): | ||
"""Modify 'labels' to only keep objects within range.""" | ||
regions = regionprops(labels) | ||
for region in regions: | ||
if not min_area <= region.area <= max_area: | ||
labels[labels == region.label] = 0 | ||
|
||
|
||
def solidity( | ||
tif_file: Path, | ||
labels: ndarray, | ||
min_solidity: int, | ||
max_solidity: int, | ||
): | ||
"""Modify 'labels' to only keep objects within range.""" | ||
regions = regionprops(labels) | ||
for region in regions: | ||
if not min_solidity <= region.solidity <= max_solidity: | ||
labels[labels == region.label] = 0 | ||
|
||
|
||
def intensity( | ||
tif_file: Path, | ||
labels: ndarray, | ||
target_channel: str, | ||
min_intensity: int, | ||
): | ||
"""Filter objects in 'labels' using the provided function.""" | ||
intensity_image = imread(_get_other_channel_file(tif_file, target_channel)) | ||
_filter_objects_by_intensity(labels, intensity_image, min_intensity) | ||
|
||
|
||
def _get_other_channel_file(tif_file: Path, target_channel: str) -> Path: | ||
"""Detect the file of target channel with the same well and field as the given 'tif_file'.""" | ||
pattern = re.compile(r"(.*_[A-Z]\d{2}_T\d{4}F\d{3}L\d{2})(A\d{2})(Z\d{2})(C\d{2})\.tif") | ||
m = pattern.fullmatch(tif_file.name) | ||
assert m is not None | ||
candidate_files = tif_file.parent.glob("*" + target_channel + ".[Tt][Ii][Ff]") | ||
for candidate in candidate_files: | ||
n = pattern.fullmatch(candidate.name) | ||
if (n is not None) and (n.group(4) == target_channel) and (m.group(1) == n.group(1)): | ||
return candidate | ||
raise FileNotFoundError(f"No matching file for channel {target_channel}.") | ||
|
||
|
||
def _filter_objects_by_intensity(labels, img, min_intensity): | ||
"""Filter objects in 'labels' by intensity in 'img'. | ||
Apply changes inplace in 'labels'. | ||
""" | ||
regions = regionprops(labels, img) | ||
for region in regions: | ||
if region.intensity_mean < min_intensity: | ||
labels[labels == region.label] = 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
# SPDX-FileCopyrightText: 2023 Friedrich Miescher Institute for Biomedical Research (FMI), Basel (Switzerland) | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
||
"""Process images of a Wako SearchFirst first pass acquisition. | ||
The processing workflow consists of three parts: | ||
* Segment the input image. | ||
* Optionally filter objects. | ||
* Sample the resulting mask to write hit coordinates into csv file. | ||
""" | ||
|
||
import logging | ||
from datetime import datetime | ||
from pathlib import Path | ||
from typing import List, Union | ||
|
||
import confuse | ||
from skimage import img_as_float, img_as_ubyte | ||
from skimage.color import label2rgb | ||
from skimage.exposure import rescale_intensity | ||
from skimage.io import imread, imsave | ||
from tqdm.rich import tqdm | ||
|
||
from faim_wako_searchfirst import filter, sample, segment | ||
|
||
|
||
def run( | ||
folder: Union[str, Path], | ||
configfile: Union[str, Path] | ||
): | ||
"""Analyse first pass of a Wako SearchFirst experiment.""" | ||
# Check if folder_path is valid | ||
folder_path = Path(folder) | ||
if not folder_path.is_dir(): | ||
raise ValueError(f"Invalid input folder: {folder}") | ||
|
||
# Setup logging | ||
logging.basicConfig( | ||
filename=folder_path / (__name__ + ".log"), | ||
format='%(asctime)s - %(name)s - [%(levelname)s] %(message)s', | ||
level=logging.INFO, | ||
# encoding="utf-8", | ||
) | ||
logger = logging.getLogger(__name__) | ||
|
||
# Read config | ||
config_path = Path(configfile) | ||
# source = confuse.YamlSource(config_path, base_for_paths=True) | ||
# config = confuse.RootView(sources=[source]) | ||
config = confuse.Configuration("faim-wako-searchfirst", read=False) | ||
config.set_file(config_path, base_for_paths=True) | ||
|
||
# Copy config file to destination | ||
config_filename = datetime.now().strftime("%Y%m%d_%H%M_") + __name__.replace(".", "_") + "_config.yml" | ||
config_copy = folder_path / config_filename | ||
config_copy.write_text(config.dump()) | ||
|
||
# Select files | ||
tif_files = _select_files( | ||
folder=folder_path, | ||
**(config["file_selection"].get()), | ||
) | ||
|
||
logger.info(f"Found {len(tif_files)} matching files.") | ||
|
||
# Setup | ||
# Segment | ||
segment_method = config["process"]["segment"].get(str) | ||
segment_config = config[segment_method].get() | ||
logger.info(f"Segment using '{segment_method}'.") | ||
segment_fn = getattr(segment, segment_method) | ||
|
||
# Filter | ||
filter_methods = config["process"]["filter"].as_str_seq() | ||
filter_funcs = {f: getattr(filter, f) for f in filter_methods} | ||
|
||
# Sample | ||
sample_method = config["process"]["sample"].get(str) | ||
sample_config = config[sample_method].get(confuse.Optional(dict, default={})) | ||
sample_fn = getattr(sample, sample_method) | ||
|
||
# Process | ||
for tif_file in tqdm(tif_files): | ||
# Read image | ||
img = imread(tif_file) | ||
|
||
# Segment | ||
labels = segment_fn( | ||
img, | ||
**segment_config, | ||
logger=logger, | ||
) | ||
|
||
# Filter | ||
for name, func in filter_funcs.items(): | ||
conf = config[name].get(confuse.Optional(dict, default={})) | ||
func( | ||
tif_file, | ||
labels, | ||
**conf, | ||
) | ||
|
||
|
||
# Sample | ||
# mask -> csv | ||
csv_path = tif_file.parent / (tif_file.stem + ".csv") | ||
sample_fn( | ||
labels, | ||
csv_path, | ||
**sample_config, | ||
) | ||
|
||
# mask + image -> preview | ||
_save_segmentation_image(tif_file.parent, tif_file.name, img, labels) | ||
|
||
def _select_files( | ||
folder: Path, | ||
channel: str = "C01", | ||
) -> List[Path]: | ||
"""Filter all TIFs in folder starting with folder name - and containing channel ID.""" | ||
return sorted(folder.rglob(folder.name + "*" + channel + ".tif")) | ||
|
||
|
||
def _save_segmentation_image(folder_path, filename, img, labels): | ||
"""Save segmentation overlay as RGB image into separate folder.""" | ||
destination_folder = folder_path.parent / (folder_path.name + "_segmentation") | ||
destination_folder.mkdir(exist_ok=True) | ||
rescaled = rescale_intensity(img_as_float(img)) | ||
preview = label2rgb(labels, image=rescaled) | ||
imsave(destination_folder / (Path(filename).stem + '.png'), img_as_ubyte(preview)) | ||
|
||
|
||
# def process( | ||
# folder: Path, | ||
# file_selection_params: dict, | ||
# segmentation_params: dict, | ||
# bounding_box_params: dict, | ||
# additional_analysis_params: dict, | ||
# output_params: dict, | ||
# grid_sampling_params: dict, | ||
# logger=logging, | ||
# ) -> None: | ||
# """Segment images with the provided segmentation parameters.""" | ||
# logger.info("File selection parameters: " + json.dumps(file_selection_params, indent=4)) | ||
# logger.info("Segmentation parameters: " + json.dumps(segmentation_params, indent=4)) | ||
# logger.info("Additional analysis parameters: " + json.dumps(additional_analysis_params, indent=4)) | ||
# logger.info("Output parameters: " + json.dumps(output_params, indent=4)) | ||
# logger.info("Grid sampling parameters: " + json.dumps(grid_sampling_params, indent=4)) | ||
|
||
# logger.info(f"Finished processing {len(tif_files)} image(s).") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# SPDX-FileCopyrightText: 2023 Friedrich Miescher Institute for Biomedical Research (FMI), Basel (Switzerland) | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
||
"""Collection of methods to sample a label image and write coordinates into a csv file. | ||
Each method must accept a label image and an output file path as first two arguments. | ||
""" | ||
|
||
import csv | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
from numpy import ndarray | ||
from skimage.measure import regionprops | ||
|
||
|
||
def dense_grid( | ||
labels: ndarray, | ||
output_path: Path, | ||
factor: int = 50, | ||
): | ||
"""Save densely sampled grid positions for object hits.""" | ||
|
||
|
||
def grid_overlap( | ||
labeled_img: ndarray, | ||
path, | ||
mag_first_pass, | ||
mag_second_pass, | ||
overlap_ratio: float = 0.0, | ||
): | ||
"""Save grid positions of the tiles that contain objects.""" | ||
factor = mag_first_pass / mag_second_pass | ||
shift_percent = 1.0 - overlap_ratio | ||
tile_size_y = labeled_img.shape[0] * factor * shift_percent | ||
tile_size_x = labeled_img.shape[1] * factor * shift_percent | ||
|
||
with open(path, "w", newline="") as csv_file: | ||
c = csv.writer(csv_file) | ||
count = 0 | ||
for y in np.arange(0, labeled_img.shape[0], tile_size_y): # TODO: use np.linspace | ||
for x in np.arange(0, labeled_img.shape[1], tile_size_x): | ||
if np.max( | ||
labeled_img[ | ||
int(np.floor(y)):int(np.ceil(y + tile_size_y)), | ||
int(np.floor(x)):int(np.ceil(x + tile_size_x)) | ||
] | ||
) > 0: | ||
c.writerow([count, x + tile_size_x / 2, y + tile_size_y / 2]) | ||
count += 1 | ||
|
||
|
||
def centers(labeled_img, path): | ||
"""Save center position of each object in 'labeled_img'.""" | ||
regions = regionprops(labeled_img) | ||
with open(path, "w", newline="") as csv_file: | ||
c = csv.writer(csv_file) | ||
for region in regions: | ||
c.writerow([region.label, *reversed(region.centroid)]) |
Oops, something went wrong.