diff --git a/iohub/cli/cli.py b/iohub/cli/cli.py index 901b92a9..ed70585e 100644 --- a/iohub/cli/cli.py +++ b/iohub/cli/cli.py @@ -1,9 +1,11 @@ +import csv import pathlib import click from iohub._version import __version__ from iohub.convert import TIFFConverter +from iohub.ngff import open_ome_zarr from iohub.reader import print_info VERSION = __version__ @@ -87,3 +89,60 @@ def convert(input, output, grid_layout, chunks): chunks=chunks, ) converter() + + +@click.command() +@click.help_option("-h", "--help") +@click.argument( + "csvfile", + type=click.File("r"), +) +@click.argument( + "zarrfile", + type=click.Path( + exists=True, + file_okay=True, + dir_okay=False, + resolve_path=True, + path_type=pathlib.Path, + ), +) +def rename_wells_cli(csvfile, zarrfile): + """Rename wells based on CSV file + + The CSV file should have two columns: old_well_path and new_well_path. + """ + names = [] + + csvreader = csv.reader(csvfile) + for row in csvreader: + if len(row) != 2: + raise ValueError( + f"Invalid row format: {row}." + f"Each row must have two columns." + ) + names.append([row[0], row[1]]) + + plate = open_ome_zarr(zarrfile, mode="a") + + modified = {} + modified["wells"] = [] + modified["rows"] = [] + modified["columns"] = [] + + well_paths = [ + plate.metadata.wells[i].path for i in range(len(plate.metadata.wells)) + ] + + for old_well_path, new_well_path in names: + if old_well_path not in well_paths: + raise ValueError( + f"Old well path '{old_well_path}' not found " + f"in the plate metadata." + ) + for well in plate.metadata.wells: + if well.path == old_well_path and well not in modified["wells"]: + plate.rename_well( + plate, well, old_well_path, new_well_path, modified, False + ) + modified["wells"].append(well) diff --git a/iohub/ngff.py b/iohub/ngff.py index d5a307e0..698215c5 100644 --- a/iohub/ngff.py +++ b/iohub/ngff.py @@ -1565,6 +1565,50 @@ def positions(self) -> Generator[tuple[str, Position], None, None]: for _, position in well.positions(): yield position.zgroup.path, position + def rename_well( + self, + well, + old_well_path: str, + new_well_path: str, + modified={}, + single_well=True, + ): + """Rename a well. + + Parameters + ---------- + old_well_path : str + Old name of well + new_well_path : str + New name of well + modified : dict, optional + Tracks modified wells + single well: bool, optional + True: renaming one well, False: renaming multiple wells + """ + old_row, old_column = old_well_path.split("/") + new_row, new_column = new_well_path.split("/") + + if well.path == old_well_path: + well.path = new_well_path # update well metadata + zarr.storage.rename( + self.zgroup._store, old_well_path, new_well_path + ) # update well paths + + for column in self.metadata.columns: + if column.name == old_column and column not in modified["columns"]: + column.name = new_column # update column metadata + if not single_well: + modified["columns"].append(column) + + for row in self.metadata.rows: + if row.name == old_row and row not in modified["rows"]: + row.name = new_row # update row metadata + if not single_well: + modified["rows"].append(row) + + self.dump_meta() + def open_ome_zarr( store_path: StrOrBytesPath,