diff --git a/src/spikeinterface/core/sortinganalyzer.py b/src/spikeinterface/core/sortinganalyzer.py index fdace37dd0..14b4f73eaf 100644 --- a/src/spikeinterface/core/sortinganalyzer.py +++ b/src/spikeinterface/core/sortinganalyzer.py @@ -46,6 +46,7 @@ def create_sorting_analyzer( sparsity=None, return_scaled=True, overwrite=False, + backend_options=None, **sparsity_kwargs, ) -> "SortingAnalyzer": """ @@ -64,24 +65,29 @@ def create_sorting_analyzer( recording : Recording The recording object folder : str or Path or None, default: None - The folder where waveforms are cached + The folder where analyzer is cached format : "memory | "binary_folder" | "zarr", default: "memory" - The mode to store waveforms. If "folder", waveforms are stored on disk in the specified folder. + The mode to store analyzer. If "folder", the analyzer is stored on disk in the specified folder. The "folder" argument must be specified in case of mode "folder". - If "memory" is used, the waveforms are stored in RAM. Use this option carefully! + If "memory" is used, the analyzer is stored in RAM. Use this option carefully! sparse : bool, default: True If True, then a sparsity mask is computed using the `estimate_sparsity()` function using a few spikes to get an estimate of dense templates to create a ChannelSparsity object. Then, the sparsity will be propagated to all ResultExtention that handle sparsity (like wavforms, pca, ...) You can control `estimate_sparsity()` : all extra arguments are propagated to it (included job_kwargs) sparsity : ChannelSparsity or None, default: None - The sparsity used to compute waveforms. If this is given, `sparse` is ignored. + The sparsity used to compute exensions. If this is given, `sparse` is ignored. return_scaled : bool, default: True All extensions that play with traces will use this global return_scaled : "waveforms", "noise_levels", "templates". This prevent return_scaled being differents from different extensions and having wrong snr for instance. overwrite: bool, default: False If True, overwrite the folder if it already exists. - + backend_options : dict | None, default: None + Keyword arguments for the backend specified by format. It can contain the: + - storage_options: dict | None (fsspec storage options) + - saving_options: dict | None (additional saving options for creating and saving datasets, + e.g. compression/filters for zarr) + sparsity_kwargs : keyword arguments Returns ------- @@ -92,7 +98,7 @@ def create_sorting_analyzer( -------- >>> import spikeinterface as si - >>> # Extract dense waveforms and save to disk with binary_folder format. + >>> # Create dense analyzer and save to disk with binary_folder format. >>> sorting_analyzer = si.create_sorting_analyzer(sorting, recording, format="binary_folder", folder="/path/to_my/result") >>> # Can be reload @@ -118,12 +124,14 @@ def create_sorting_analyzer( """ if format != "memory": if format == "zarr": - folder = clean_zarr_folder_name(folder) - if Path(folder).is_dir(): - if not overwrite: - raise ValueError(f"Folder already exists {folder}! Use overwrite=True to overwrite it.") - else: - shutil.rmtree(folder) + if not is_path_remote(folder): + folder = clean_zarr_folder_name(folder) + if not is_path_remote(folder): + if Path(folder).is_dir(): + if not overwrite: + raise ValueError(f"Folder already exists {folder}! Use overwrite=True to overwrite it.") + else: + shutil.rmtree(folder) # handle sparsity if sparsity is not None: @@ -145,27 +153,38 @@ def create_sorting_analyzer( return_scaled = False sorting_analyzer = SortingAnalyzer.create( - sorting, recording, format=format, folder=folder, sparsity=sparsity, return_scaled=return_scaled + sorting, + recording, + format=format, + folder=folder, + sparsity=sparsity, + return_scaled=return_scaled, + backend_options=backend_options, ) return sorting_analyzer -def load_sorting_analyzer(folder, load_extensions=True, format="auto", storage_options=None) -> "SortingAnalyzer": +def load_sorting_analyzer(folder, load_extensions=True, format="auto", backend_options=None) -> "SortingAnalyzer": """ Load a SortingAnalyzer object from disk. Parameters ---------- folder : str or Path - The folder / zarr folder where the waveform extractor is stored + The folder / zarr folder where the analyzer is stored. If the folder is a remote path stored in the cloud, + the backend_options can be used to specify credentials. If the remote path is not accessible, + and backend_options is not provided, the function will try to load the object in anonymous mode (anon=True), + which enables to load data from open buckets. load_extensions : bool, default: True Load all extensions or not. format : "auto" | "binary_folder" | "zarr" The format of the folder. - storage_options : dict | None, default: None - The storage options to specify credentials to remote zarr bucket. - For open buckets, it doesn't need to be specified. + backend_options : dict | None, default: None + The backend options for the backend. + The dictionary can contain the following keys: + - storage_options: dict | None (fsspec storage options) + - saving_options: dict | None (additional saving options for creating and saving datasets) Returns ------- @@ -173,7 +192,20 @@ def load_sorting_analyzer(folder, load_extensions=True, format="auto", storage_o The loaded SortingAnalyzer """ - return SortingAnalyzer.load(folder, load_extensions=load_extensions, format=format, storage_options=storage_options) + if is_path_remote(folder) and backend_options is None: + try: + return SortingAnalyzer.load( + folder, load_extensions=load_extensions, format=format, backend_options=backend_options + ) + except Exception as e: + backend_options = dict(storage_options=dict(anon=True)) + return SortingAnalyzer.load( + folder, load_extensions=load_extensions, format=format, backend_options=backend_options + ) + else: + return SortingAnalyzer.load( + folder, load_extensions=load_extensions, format=format, backend_options=backend_options + ) class SortingAnalyzer: @@ -206,7 +238,7 @@ def __init__( format=None, sparsity=None, return_scaled=True, - storage_options=None, + backend_options=None, ): # very fast init because checks are done in load and create self.sorting = sorting @@ -216,10 +248,18 @@ def __init__( self.format = format self.sparsity = sparsity self.return_scaled = return_scaled - self.storage_options = storage_options + # this is used to store temporary recording self._temporary_recording = None + # backend-specific kwargs for different formats, which can be used to + # set some parameters for saving (e.g., compression) + # + # - storage_options: dict | None (fsspec storage options) + # - saving_options: dict | None + # (additional saving options for creating and saving datasets, e.g. compression/filters for zarr) + self._backend_options = {} if backend_options is None else backend_options + # extensions are not loaded at init self.extensions = dict() @@ -229,13 +269,18 @@ def __repr__(self) -> str: nchan = self.get_num_channels() nunits = self.get_num_units() txt = f"{clsname}: {nchan} channels - {nunits} units - {nseg} segments - {self.format}" + if self.format != "memory": + if is_path_remote(str(self.folder)): + txt += f" (remote)" if self.is_sparse(): txt += " - sparse" if self.has_recording(): txt += " - has recording" if self.has_temporary_recording(): txt += " - has temporary recording" - ext_txt = f"Loaded {len(self.extensions)} extensions: " + ", ".join(self.extensions.keys()) + ext_txt = f"Loaded {len(self.extensions)} extensions" + if len(self.extensions) > 0: + ext_txt += f": {', '.join(self.extensions.keys())}" txt += "\n" + ext_txt return txt @@ -254,6 +299,7 @@ def create( folder=None, sparsity=None, return_scaled=True, + backend_options=None, ): assert recording is not None, "To create a SortingAnalyzer you need to specify the recording" # some checks @@ -279,22 +325,35 @@ def create( if format == "memory": sorting_analyzer = cls.create_memory(sorting, recording, sparsity, return_scaled, rec_attributes=None) elif format == "binary_folder": - cls.create_binary_folder(folder, sorting, recording, sparsity, return_scaled, rec_attributes=None) - sorting_analyzer = cls.load_from_binary_folder(folder, recording=recording) - sorting_analyzer.folder = Path(folder) + sorting_analyzer = cls.create_binary_folder( + folder, + sorting, + recording, + sparsity, + return_scaled, + rec_attributes=None, + backend_options=backend_options, + ) elif format == "zarr": assert folder is not None, "For format='zarr' folder must be provided" - folder = clean_zarr_folder_name(folder) - cls.create_zarr(folder, sorting, recording, sparsity, return_scaled, rec_attributes=None) - sorting_analyzer = cls.load_from_zarr(folder, recording=recording) - sorting_analyzer.folder = Path(folder) + if not is_path_remote(folder): + folder = clean_zarr_folder_name(folder) + sorting_analyzer = cls.create_zarr( + folder, + sorting, + recording, + sparsity, + return_scaled, + rec_attributes=None, + backend_options=backend_options, + ) else: raise ValueError("SortingAnalyzer.create: wrong format") return sorting_analyzer @classmethod - def load(cls, folder, recording=None, load_extensions=True, format="auto", storage_options=None): + def load(cls, folder, recording=None, load_extensions=True, format="auto", backend_options=None): """ Load folder or zarr. The recording can be given if the recording location has changed. @@ -308,18 +367,15 @@ def load(cls, folder, recording=None, load_extensions=True, format="auto", stora format = "binary_folder" if format == "binary_folder": - sorting_analyzer = SortingAnalyzer.load_from_binary_folder(folder, recording=recording) + sorting_analyzer = SortingAnalyzer.load_from_binary_folder( + folder, recording=recording, backend_options=backend_options + ) elif format == "zarr": sorting_analyzer = SortingAnalyzer.load_from_zarr( - folder, recording=recording, storage_options=storage_options + folder, recording=recording, backend_options=backend_options ) - if is_path_remote(str(folder)): - sorting_analyzer.folder = folder - # in this case we only load extensions when needed - else: - sorting_analyzer.folder = Path(folder) - + if not is_path_remote(str(folder)): if load_extensions: sorting_analyzer.load_all_saved_extension() @@ -351,7 +407,7 @@ def create_memory(cls, sorting, recording, sparsity, return_scaled, rec_attribut return sorting_analyzer @classmethod - def create_binary_folder(cls, folder, sorting, recording, sparsity, return_scaled, rec_attributes): + def create_binary_folder(cls, folder, sorting, recording, sparsity, return_scaled, rec_attributes, backend_options): # used by create and save_as folder = Path(folder) @@ -419,8 +475,10 @@ def create_binary_folder(cls, folder, sorting, recording, sparsity, return_scale with open(settings_file, mode="w") as f: json.dump(check_json(settings), f, indent=4) + return cls.load_from_binary_folder(folder, recording=recording, backend_options=backend_options) + @classmethod - def load_from_binary_folder(cls, folder, recording=None): + def load_from_binary_folder(cls, folder, recording=None, backend_options=None): folder = Path(folder) assert folder.is_dir(), f"This folder does not exists {folder}" @@ -491,34 +549,50 @@ def load_from_binary_folder(cls, folder, recording=None): format="binary_folder", sparsity=sparsity, return_scaled=return_scaled, + backend_options=backend_options, ) + sorting_analyzer.folder = folder return sorting_analyzer def _get_zarr_root(self, mode="r+"): import zarr - if is_path_remote(str(self.folder)): - mode = "r" + assert mode in ("r+", "a", "r"), "mode must be 'r+', 'a' or 'r'" + + storage_options = self._backend_options.get("storage_options", {}) # we open_consolidated only if we are in read mode if mode in ("r+", "a"): - zarr_root = zarr.open(str(self.folder), mode=mode, storage_options=self.storage_options) + try: + zarr_root = zarr.open(str(self.folder), mode=mode, storage_options=storage_options) + except Exception as e: + # this could happen in remote mode, and it's a way to check if the folder is still there + zarr_root = zarr.open_consolidated(self.folder, mode=mode, storage_options=storage_options) else: - zarr_root = zarr.open_consolidated(self.folder, mode=mode, storage_options=self.storage_options) + zarr_root = zarr.open_consolidated(self.folder, mode=mode, storage_options=storage_options) return zarr_root @classmethod - def create_zarr(cls, folder, sorting, recording, sparsity, return_scaled, rec_attributes): + def create_zarr(cls, folder, sorting, recording, sparsity, return_scaled, rec_attributes, backend_options): # used by create and save_as import zarr import numcodecs + from .zarrextractors import add_sorting_to_zarr_group - folder = clean_zarr_folder_name(folder) + if is_path_remote(folder): + remote = True + else: + remote = False + if not remote: + folder = clean_zarr_folder_name(folder) + if folder.is_dir(): + raise ValueError(f"Folder already exists {folder}") - if folder.is_dir(): - raise ValueError(f"Folder already exists {folder}") + backend_options = {} if backend_options is None else backend_options + storage_options = backend_options.get("storage_options", {}) + saving_options = backend_options.get("saving_options", {}) - zarr_root = zarr.open(folder, mode="w") + zarr_root = zarr.open(folder, mode="w", storage_options=storage_options) info = dict(version=spikeinterface.__version__, dev_mode=spikeinterface.DEV_MODE, object="SortingAnalyzer") zarr_root.attrs["spikeinterface_info"] = check_json(info) @@ -527,8 +601,9 @@ def create_zarr(cls, folder, sorting, recording, sparsity, return_scaled, rec_at zarr_root.attrs["settings"] = check_json(settings) # the recording + relative_to = folder if not remote else None if recording is not None: - rec_dict = recording.to_dict(relative_to=folder, recursive=True) + rec_dict = recording.to_dict(relative_to=relative_to, recursive=True) if recording.check_serializability("json"): # zarr_root.create_dataset("recording", data=rec_dict, object_codec=numcodecs.JSON()) zarr_rec = np.array([check_json(rec_dict)], dtype=object) @@ -544,7 +619,7 @@ def create_zarr(cls, folder, sorting, recording, sparsity, return_scaled, rec_at warnings.warn("Recording not provided, instntiating SortingAnalyzer in recordingless mode.") # sorting provenance - sort_dict = sorting.to_dict(relative_to=folder, recursive=True) + sort_dict = sorting.to_dict(relative_to=relative_to, recursive=True) if sorting.check_serializability("json"): zarr_sort = np.array([check_json(sort_dict)], dtype=object) zarr_root.create_dataset("sorting_provenance", data=zarr_sort, object_codec=numcodecs.JSON()) @@ -571,24 +646,23 @@ def create_zarr(cls, folder, sorting, recording, sparsity, return_scaled, rec_at recording_info.attrs["probegroup"] = check_json(probegroup.to_dict()) if sparsity is not None: - zarr_root.create_dataset("sparsity_mask", data=sparsity.mask) - - # write sorting copy - from .zarrextractors import add_sorting_to_zarr_group + zarr_root.create_dataset("sparsity_mask", data=sparsity.mask, **saving_options) - # Alessio : we need to find a way to propagate compressor for all steps. - # kwargs = dict(compressor=...) - zarr_kwargs = dict() - add_sorting_to_zarr_group(sorting, zarr_root.create_group("sorting"), **zarr_kwargs) + add_sorting_to_zarr_group(sorting, zarr_root.create_group("sorting"), **saving_options) recording_info = zarr_root.create_group("extensions") zarr.consolidate_metadata(zarr_root.store) + return cls.load_from_zarr(folder, recording=recording, backend_options=backend_options) + @classmethod - def load_from_zarr(cls, folder, recording=None, storage_options=None): + def load_from_zarr(cls, folder, recording=None, backend_options=None): import zarr + backend_options = {} if backend_options is None else backend_options + storage_options = backend_options.get("storage_options", {}) + zarr_root = zarr.open_consolidated(str(folder), mode="r", storage_options=storage_options) si_info = zarr_root.attrs["spikeinterface_info"] @@ -651,8 +725,9 @@ def load_from_zarr(cls, folder, recording=None, storage_options=None): format="zarr", sparsity=sparsity, return_scaled=return_scaled, - storage_options=storage_options, + backend_options=backend_options, ) + sorting_analyzer.folder = folder return sorting_analyzer @@ -694,6 +769,7 @@ def _save_or_select_or_merge( sparsity_overlap=0.75, verbose=False, new_unit_ids=None, + backend_options=None, **job_kwargs, ) -> "SortingAnalyzer": """ @@ -723,8 +799,13 @@ def _save_or_select_or_merge( The new unit ids for merged units. Required if `merge_unit_groups` is not None. verbose : bool, default: False If True, output is verbose. - job_kwargs : dict - Keyword arguments for parallelization. + backend_options : dict | None, default: None + Keyword arguments for the backend specified by format. It can contain the: + - storage_options: dict | None (fsspec storage options) + - saving_options: dict | None (additional saving options for creating and saving datasets, + e.g. compression/filters for zarr) + job_kwargs : keyword arguments + Keyword arguments for the job parallelization. Returns ------- @@ -798,6 +879,8 @@ def _save_or_select_or_merge( # TODO: sam/pierre would create a curation field / curation.json with the applied merges. # What do you think? + backend_options = {} if backend_options is None else backend_options + if format == "memory": # This make a copy of actual SortingAnalyzer new_sorting_analyzer = SortingAnalyzer.create_memory( @@ -808,20 +891,28 @@ def _save_or_select_or_merge( # create a new folder assert folder is not None, "For format='binary_folder' folder must be provided" folder = Path(folder) - SortingAnalyzer.create_binary_folder( - folder, sorting_provenance, recording, sparsity, self.return_scaled, self.rec_attributes + new_sorting_analyzer = SortingAnalyzer.create_binary_folder( + folder, + sorting_provenance, + recording, + sparsity, + self.return_scaled, + self.rec_attributes, + backend_options=backend_options, ) - new_sorting_analyzer = SortingAnalyzer.load_from_binary_folder(folder, recording=recording) - new_sorting_analyzer.folder = folder elif format == "zarr": assert folder is not None, "For format='zarr' folder must be provided" folder = clean_zarr_folder_name(folder) - SortingAnalyzer.create_zarr( - folder, sorting_provenance, recording, sparsity, self.return_scaled, self.rec_attributes + new_sorting_analyzer = SortingAnalyzer.create_zarr( + folder, + sorting_provenance, + recording, + sparsity, + self.return_scaled, + self.rec_attributes, + backend_options=backend_options, ) - new_sorting_analyzer = SortingAnalyzer.load_from_zarr(folder, recording=recording) - new_sorting_analyzer.folder = folder else: raise ValueError(f"SortingAnalyzer.save: unsupported format: {format}") @@ -859,7 +950,7 @@ def _save_or_select_or_merge( return new_sorting_analyzer - def save_as(self, format="memory", folder=None) -> "SortingAnalyzer": + def save_as(self, format="memory", folder=None, backend_options=None) -> "SortingAnalyzer": """ Save SortingAnalyzer object into another format. Uselful for memory to zarr or memory to binary. @@ -874,10 +965,15 @@ def save_as(self, format="memory", folder=None) -> "SortingAnalyzer": The output folder if `format` is "zarr" or "binary_folder" format : "memory" | "binary_folder" | "zarr", default: "memory" The new backend format to use + backend_options : dict | None, default: None + Keyword arguments for the backend specified by format. It can contain the: + - storage_options: dict | None (fsspec storage options) + - saving_options: dict | None (additional saving options for creating and saving datasets, + e.g. compression/filters for zarr) """ if format == "zarr": folder = clean_zarr_folder_name(folder) - return self._save_or_select_or_merge(format=format, folder=folder) + return self._save_or_select_or_merge(format=format, folder=folder, backend_options=backend_options) def select_units(self, unit_ids, format="memory", folder=None) -> "SortingAnalyzer": """ @@ -1040,7 +1136,15 @@ def copy(self): def is_read_only(self) -> bool: if self.format == "memory": return False - return not os.access(self.folder, os.W_OK) + elif self.format == "binary_folder": + return not os.access(self.folder, os.W_OK) + else: + if not is_path_remote(str(self.folder)): + return not os.access(self.folder, os.W_OK) + else: + # in this case we don't know if the file is read only so an error + # will be raised if we try to save/append + return False ## map attribute and property zone @@ -2064,24 +2168,24 @@ def run(self, save=True, **kwargs): if save and not self.sorting_analyzer.is_read_only(): self._save_run_info() - self._save_data(**kwargs) + self._save_data() if self.format == "zarr": import zarr zarr.consolidate_metadata(self.sorting_analyzer._get_zarr_root().store) - def save(self, **kwargs): + def save(self): self._save_params() self._save_importing_provenance() self._save_run_info() - self._save_data(**kwargs) + self._save_data() if self.format == "zarr": import zarr zarr.consolidate_metadata(self.sorting_analyzer._get_zarr_root().store) - def _save_data(self, **kwargs): + def _save_data(self): if self.format == "memory": return @@ -2120,14 +2224,14 @@ def _save_data(self, **kwargs): except: raise Exception(f"Could not save {ext_data_name} as extension data") elif self.format == "zarr": - import zarr import numcodecs + saving_options = self.sorting_analyzer._backend_options.get("saving_options", {}) extension_group = self._get_zarr_extension_group(mode="r+") - compressor = kwargs.get("compressor", None) - if compressor is None: - compressor = get_default_zarr_compressor() + # if compression is not externally given, we use the default + if "compressor" not in saving_options: + saving_options["compressor"] = get_default_zarr_compressor() for ext_data_name, ext_data in self.data.items(): if ext_data_name in extension_group: @@ -2137,7 +2241,7 @@ def _save_data(self, **kwargs): name=ext_data_name, data=np.array([ext_data], dtype=object), object_codec=numcodecs.JSON() ) elif isinstance(ext_data, np.ndarray): - extension_group.create_dataset(name=ext_data_name, data=ext_data, compressor=compressor) + extension_group.create_dataset(name=ext_data_name, data=ext_data, **saving_options) elif HAS_PANDAS and isinstance(ext_data, pd.DataFrame): df_group = extension_group.create_group(ext_data_name) # first we save the index @@ -2200,7 +2304,7 @@ def delete(self): def reset(self): """ - Reset the waveform extension. + Reset the extension. Delete the sub folder and create a new empty one. """ self._reset_extension_folder() @@ -2215,7 +2319,8 @@ def set_params(self, save=True, **params): """ # this ensure data is also deleted and corresponds to params # this also ensure the group is created - self._reset_extension_folder() + if save: + self._reset_extension_folder() params = self._set_params(**params) self.params = params diff --git a/src/spikeinterface/core/tests/test_sortinganalyzer.py b/src/spikeinterface/core/tests/test_sortinganalyzer.py index 5c7e267cc6..35ab18b5f2 100644 --- a/src/spikeinterface/core/tests/test_sortinganalyzer.py +++ b/src/spikeinterface/core/tests/test_sortinganalyzer.py @@ -10,6 +10,7 @@ load_sorting_analyzer, get_available_analyzer_extensions, get_default_analyzer_extension_params, + get_default_zarr_compressor, ) from spikeinterface.core.sortinganalyzer import ( register_result_extension, @@ -99,16 +100,25 @@ def test_SortingAnalyzer_zarr(tmp_path, dataset): recording, sorting = dataset folder = tmp_path / "test_SortingAnalyzer_zarr.zarr" - if folder.exists(): - shutil.rmtree(folder) + default_compressor = get_default_zarr_compressor() sorting_analyzer = create_sorting_analyzer( - sorting, recording, format="zarr", folder=folder, sparse=False, sparsity=None + sorting, recording, format="zarr", folder=folder, sparse=False, sparsity=None, overwrite=True ) sorting_analyzer.compute(["random_spikes", "templates"]) sorting_analyzer = load_sorting_analyzer(folder, format="auto") _check_sorting_analyzers(sorting_analyzer, sorting, cache_folder=tmp_path) + # check that compression is applied + assert ( + sorting_analyzer._get_zarr_root()["extensions"]["random_spikes"]["random_spikes_indices"].compressor.codec_id + == default_compressor.codec_id + ) + assert ( + sorting_analyzer._get_zarr_root()["extensions"]["templates"]["average"].compressor.codec_id + == default_compressor.codec_id + ) + # test select_units see https://github.com/SpikeInterface/spikeinterface/issues/3041 # this bug requires that we have an info.json file so we calculate templates above select_units_sorting_analyer = sorting_analyzer.select_units(unit_ids=[1]) @@ -117,11 +127,45 @@ def test_SortingAnalyzer_zarr(tmp_path, dataset): assert len(remove_units_sorting_analyer.unit_ids) == len(sorting_analyzer.unit_ids) - 1 assert 1 not in remove_units_sorting_analyer.unit_ids - folder = tmp_path / "test_SortingAnalyzer_zarr.zarr" - if folder.exists(): - shutil.rmtree(folder) - sorting_analyzer = create_sorting_analyzer( - sorting, recording, format="zarr", folder=folder, sparse=False, sparsity=None, return_scaled=False + # test no compression + sorting_analyzer_no_compression = create_sorting_analyzer( + sorting, + recording, + format="zarr", + folder=folder, + sparse=False, + sparsity=None, + return_scaled=False, + overwrite=True, + backend_options={"saving_options": {"compressor": None}}, + ) + print(sorting_analyzer_no_compression._backend_options) + sorting_analyzer_no_compression.compute(["random_spikes", "templates"]) + assert ( + sorting_analyzer_no_compression._get_zarr_root()["extensions"]["random_spikes"][ + "random_spikes_indices" + ].compressor + is None + ) + assert sorting_analyzer_no_compression._get_zarr_root()["extensions"]["templates"]["average"].compressor is None + + # test a different compressor + from numcodecs import LZMA + + lzma_compressor = LZMA() + folder = tmp_path / "test_SortingAnalyzer_zarr_lzma.zarr" + sorting_analyzer_lzma = sorting_analyzer_no_compression.save_as( + format="zarr", folder=folder, backend_options={"saving_options": {"compressor": lzma_compressor}} + ) + assert ( + sorting_analyzer_lzma._get_zarr_root()["extensions"]["random_spikes"][ + "random_spikes_indices" + ].compressor.codec_id + == LZMA.codec_id + ) + assert ( + sorting_analyzer_lzma._get_zarr_root()["extensions"]["templates"]["average"].compressor.codec_id + == LZMA.codec_id ) @@ -326,7 +370,7 @@ def _check_sorting_analyzers(sorting_analyzer, original_sorting, cache_folder): else: folder = None sorting_analyzer5 = sorting_analyzer.merge_units( - merge_unit_groups=[[0, 1]], new_unit_ids=[50], format=format, folder=folder, mode="hard" + merge_unit_groups=[[0, 1]], new_unit_ids=[50], format=format, folder=folder, merging_mode="hard" ) # test compute with extension-specific params diff --git a/src/spikeinterface/core/zarrextractors.py b/src/spikeinterface/core/zarrextractors.py index 17f1ac08b3..ff552dfb54 100644 --- a/src/spikeinterface/core/zarrextractors.py +++ b/src/spikeinterface/core/zarrextractors.py @@ -12,6 +12,19 @@ from .core_tools import define_function_from_class, check_json from .job_tools import split_job_kwargs from .recording_tools import determine_cast_unsigned +from .core_tools import is_path_remote + + +def anononymous_zarr_open(folder_path: str | Path, mode: str = "r", storage_options: dict | None = None): + if is_path_remote(str(folder_path)) and storage_options is None: + try: + root = zarr.open(str(folder_path), mode="r", storage_options=storage_options) + except Exception as e: + storage_options = {"anon": True} + root = zarr.open(str(folder_path), mode="r", storage_options=storage_options) + else: + root = zarr.open(str(folder_path), mode="r", storage_options=storage_options) + return root class ZarrRecordingExtractor(BaseRecording): @@ -21,7 +34,11 @@ class ZarrRecordingExtractor(BaseRecording): Parameters ---------- folder_path : str or Path - Path to the zarr root folder + Path to the zarr root folder. This can be a local path or a remote path (s3:// or gcs://). + If the path is a remote path, the storage_options can be provided to specify credentials. + If the remote path is not accessible and backend_options is not provided, + the function will try to load the object in anonymous mode (anon=True), + which enables to load data from open buckets. storage_options : dict or None Storage options for zarr `store`. E.g., if "s3://" or "gcs://" they can provide authentication methods, etc. @@ -35,7 +52,7 @@ def __init__(self, folder_path: Path | str, storage_options: dict | None = None) folder_path, folder_path_kwarg = resolve_zarr_path(folder_path) - self._root = zarr.open(str(folder_path), mode="r", storage_options=storage_options) + self._root = anononymous_zarr_open(folder_path, mode="r", storage_options=storage_options) sampling_frequency = self._root.attrs.get("sampling_frequency", None) num_segments = self._root.attrs.get("num_segments", None) @@ -81,7 +98,10 @@ def __init__(self, folder_path: Path | str, storage_options: dict | None = None) nbytes_segment = self._root[trace_name].nbytes nbytes_stored_segment = self._root[trace_name].nbytes_stored - cr_by_segment[segment_index] = nbytes_segment / nbytes_stored_segment + if nbytes_stored_segment > 0: + cr_by_segment[segment_index] = nbytes_segment / nbytes_stored_segment + else: + cr_by_segment[segment_index] = np.nan total_nbytes += nbytes_segment total_nbytes_stored += nbytes_stored_segment @@ -105,7 +125,10 @@ def __init__(self, folder_path: Path | str, storage_options: dict | None = None) if annotations is not None: self.annotate(**annotations) # annotate compression ratios - cr = total_nbytes / total_nbytes_stored + if total_nbytes_stored > 0: + cr = total_nbytes / total_nbytes_stored + else: + cr = np.nan self.annotate(compression_ratio=cr, compression_ratio_segments=cr_by_segment) self._kwargs = {"folder_path": folder_path_kwarg, "storage_options": storage_options} @@ -150,7 +173,11 @@ class ZarrSortingExtractor(BaseSorting): Parameters ---------- folder_path : str or Path - Path to the zarr root file + Path to the zarr root file. This can be a local path or a remote path (s3:// or gcs://). + If the path is a remote path, the storage_options can be provided to specify credentials. + If the remote path is not accessible and backend_options is not provided, + the function will try to load the object in anonymous mode (anon=True), + which enables to load data from open buckets. storage_options : dict or None Storage options for zarr `store`. E.g., if "s3://" or "gcs://" they can provide authentication methods, etc. zarr_group : str or None, default: None @@ -165,7 +192,8 @@ def __init__(self, folder_path: Path | str, storage_options: dict | None = None, folder_path, folder_path_kwarg = resolve_zarr_path(folder_path) - zarr_root = self._root = zarr.open(str(folder_path), mode="r", storage_options=storage_options) + zarr_root = anononymous_zarr_open(folder_path, mode="r", storage_options=storage_options) + if zarr_group is None: self._root = zarr_root else: @@ -243,7 +271,7 @@ def read_zarr( """ # TODO @alessio : we should have something more explicit in our zarr format to tell which object it is. # for the futur SortingAnalyzer we will have this 2 fields!!! - root = zarr.open(str(folder_path), mode="r", storage_options=storage_options) + root = anononymous_zarr_open(folder_path, mode="r", storage_options=storage_options) if "channel_ids" in root.keys(): return read_zarr_recording(folder_path, storage_options=storage_options) elif "unit_ids" in root.keys(): @@ -329,8 +357,7 @@ def add_sorting_to_zarr_group(sorting: BaseSorting, zarr_group: zarr.hierarchy.G zarr_group.attrs["num_segments"] = int(num_segments) zarr_group.create_dataset(name="unit_ids", data=sorting.unit_ids, compressor=None) - if "compressor" not in kwargs: - compressor = get_default_zarr_compressor() + compressor = kwargs.get("compressor", get_default_zarr_compressor()) # save sub fields spikes_group = zarr_group.create_group(name="spikes")