diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9d7cc3cd..ec4154a8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -17,6 +17,11 @@ New features and enhancements * More calendars are now recognized in ``translate_time_chunk``. (:pull:`450`). * `new_dim` in ``unstack_dates`` is now None by default and changes depending on the frequency. It becomes `month` if the data is exactly monthly, and keep the old default of `season` otherwise. (:pull:`450`). * Updated the list of libraries in `show_versions` to reflect our current environment. (:pull:`450`). +* New ``xscen.catutils.patterns_from_schema`` to generate all possible patterns from a given schema (or one of xscen's default), to use with :py:func:`parse_directory`. (:pull:`431`). +* New ``DataCatalog.copy_files`` to copy all files of catalog to a new destination, unzipping if needed and returning a new catalog. (:pull:`431`). +* Convenience functions ``xs.io.zip_directory`` and ``xs.io.unzip_directory`` (for zarrs). (:pull:`431`). +* New argument ``compute_indicators``: ``rechunk_input`` to rechunk the inputs to resample-appropriate chunks before calling xclim. (:pull:`431`). +* New ``xs.indicators.get_indicator_outputs`` to retrieve what variable name(s) and frequency to expect from an xclim indicator. (:pull:`431`). Bug fixes ^^^^^^^^^ @@ -29,9 +34,12 @@ Bug fixes * ``unstack_fill_nan`` now works if given a dictionary that contains both dimensions and coordinates. (:pull:`450`). * ``clean_up`` no longer modifies the original dataset. (:pull:`450`). * ``unstack_dates`` now works correctly for yearly datasets when `winter_starts_year=True`, as well as multi-year datasets. (:pull:`450`). +* Fix ``xs.catalog.concat_data_catalogs`` for catalogs that have not been search yet. (:pull:`431`). +* Fix indicator computation using ``freq=2Q*`` by assuming this means a semiannual frequency anchored at the given month (pandas assumes 2 quarter steps, any of them anchored at the given month). (:pull:`431`). Internal changes ^^^^^^^^^^^^^^^^ +* ``DataCatalog.to_dataset`` can now accept a ``preprocess`` argument even if ``create_ensemble_on`` is given. The user assumes calendar handling. (:pull:`431`). * Include domain in `weight_location` in ``regrid_dataset``. (:pull:`414`). * Added pins to `xarray`, `xclim`, `h5py`, and `netcdf4`. (:pull:`414`). * Add ``.zip`` and ``.zarr.zip`` as possible file extensions for Zarr datasets. (:pull:`426`). diff --git a/Makefile b/Makefile index 7beb153c..ee70daaf 100644 --- a/Makefile +++ b/Makefile @@ -111,14 +111,14 @@ dev: clean ## install the package in editable mode with all development dependen findfrench: ## Extract phrases and update the French translation catalog (this doesn't translate) pybabel extract -o src/xscen/data/messages.pot --omit-header --input-dirs=src/xscen/ - pybabel update -l fr -D xscen -i src/xscen/data/messages.pot -d src/xscen/data/ --omit-header + pybabel update -l fr -D xscen -i src/xscen/data/messages.pot -d src/xscen/data/ --omit-header --no-location translate: ## Compile the translation catalogs. pybabel compile -f -D xscen -d src/xscen/data/ checkfrench: ## Error if the catalog could be update or if the compilation is older than the catalog. rm -f .check_messages.pot - pybabel extract -o .check_messages.pot --omit-header --input-dirs=src/xscen/ + pybabel extract -o .check_messages.pot --omit-header --input-dirs=src/xscen/ --no-location pybabel update -l fr -D xscen -i .check_messages.pot -d src/xscen/data/ --omit-header --check rm -f .check_messages.pot if [ src/xscen/data/fr/LC_MESSAGES/xscen.mo -ot src/xscen/data/fr/LC_MESSAGES/xscen.po ]; then echo "Compilation is older than translations. Please compile with 'make translate'."; exit 1; fi diff --git a/src/xscen/catalog.py b/src/xscen/catalog.py index 6a300d12..20e88568 100644 --- a/src/xscen/catalog.py +++ b/src/xscen/catalog.py @@ -6,7 +6,7 @@ import logging import os import re -import warnings +import shutil as sh from collections.abc import Mapping, Sequence from copy import deepcopy from functools import reduce @@ -436,12 +436,12 @@ def to_dataset( If None, this will be the same as `create_ensemble_on`. The resulting coordinate must be unique. calendar : str, optional - If `create_ensemble_on` is given, all datasets are converted to this calendar before concatenation. - Ignored otherwise (default). If None, no conversion is done. - `align_on` is always "date". + If `create_ensemble_on` is given but not `preprocess`, all datasets are converted to this calendar before concatenation. + Ignored otherwise (default). If None, no conversion is done. `align_on` is always "date". + If `preprocess` is given, it must do the needed calendar handling. kwargs: Any other arguments are passed to :py:meth:`~intake_esm.core.esm_datastore.to_dataset_dict`. - The `preprocess` argument cannot be used if `create_ensemble_on` is given. + The `preprocess` argument must convert calendars as needed if `create_ensemble_on` is given. Returns ------- @@ -493,10 +493,6 @@ def to_dataset( ) if create_ensemble_on: - if kwargs.get("preprocess") is not None: - warnings.warn( - "Using `create_ensemble_on` will override the given `preprocess` function." - ) cat.df["realization"] = generate_id(cat.df, ensemble_name) cat.esmcat.aggregation_control.aggregations.append( intake_esm.cat.Aggregation( @@ -506,15 +502,19 @@ def to_dataset( ) xrfreq = cat.df["xrfreq"].unique()[0] - def preprocess(ds): - ds = ensure_correct_time(ds, xrfreq) - if calendar is not None: - ds = ds.convert_calendar( - calendar, use_cftime=(calendar != "default"), align_on="date" - ) - return ds + if kwargs.get("preprocess") is None: - kwargs["preprocess"] = preprocess + def preprocess(ds): + ds = ensure_correct_time(ds, xrfreq) + if calendar is not None: + ds = ds.convert_calendar( + calendar, + use_cftime=(calendar != "default"), + align_on="date", + ) + return ds + + kwargs["preprocess"] = preprocess if len(rm_from_id) > 1: # Guess what the ID was and rebuild a new one, omitting the columns part of the aggregation @@ -536,6 +536,81 @@ def preprocess(ds): ds = cat.to_dask(**kwargs) return ds + def copy_files( + self, + dest: Union[str, os.PathLike], + flat: bool = True, + unzip: bool = False, + inplace: bool = False, + ): + """Copy each file of the catalog to another location, unzipping datasets along the way if requested. + + Parameters + ---------- + cat: DataCatalog or ProjectCatalog + A catalog to copy. + dest: str, path + The root directory of the destination. + flat: bool + If True (default), all dataset files are copied in the same directory. + Renaming with an integer suffix ("{name}_01.{ext}") is done in case of duplicate file names. + If False, :py:func:`xscen.catutils.build_path` (with default arguments) is used to generated the new path below the destination. + Nothing is done in case of duplicates in that case. + unzip: bool + If True, any datasets with a `.zip` suffix are unzipped during the copy (or rather instead of a copy). + inplace : bool + If True, the catalog is updated in place. If False (default), a copy is returned. + + Returns + ------- + If inplace is False, this returns a catalog similar to self except with updated filenames. Some special attributes are not preserved, + such as those added by :py:func:`xscen.extract.search_data_catalogs`. In this case, use `inplace=True`. + """ + # Local imports to avoid circular imports + from .catutils import build_path + from .io import unzip_directory + + dest = Path(dest) + data = self.esmcat._df.copy() + if flat: + new_paths = [] + for path in map(Path, data.path.values): + if unzip and path.suffix == ".zip": + new = dest / path.with_suffix("").name + else: + new = dest / path.name + if new in new_paths: + suffixes = "".join(new.suffixes) + name = new.name.removesuffix(suffixes) + i = 1 + while new in new_paths: + new = dest / (name + f"_{i:02d}" + suffixes) + i += 1 + new_paths.append(new) + data["new_path"] = new_paths + else: + data = build_path(data, root=dest).drop(columns=["new_path_type"]) + + logger.debug(f"Will copy {len(data)} files.") + for i, row in data.iterrows(): + old = Path(row.path) + new = Path(row.new_path) + if unzip and old.suffix == ".zip": + logger.info(f"Unzipping {old} to {new}.") + unzip_directory(old, new) + elif old.is_dir(): + logger.info(f"Copying directory tree {old} to {new}.") + sh.copytree(old, new) + else: + logger.info(f"Copying file {old} to {new}.") + sh.copy(old, new) + if inplace: + self.esmcat._df["path"] = data["new_path"] + return + data["path"] = data["new_path"] + data = data.drop(columns=["new_path"]) + return self.__class__({"esmcat": self.esmcat.dict(), "df": data}) + class ProjectCatalog(DataCatalog): """A DataCatalog with additional 'write' functionalities that can update and upload itself. @@ -856,17 +931,20 @@ def concat_data_catalogs(*dcs): registry.update(dc.derivedcat._registry) catalogs.append(dc.df) requested_variables.extend(dc._requested_variables) - requested_variables_true.extend(dc._requested_variables_true) - dependent_variables.extend(dc._dependent_variables) - requested_variable_freqs.extend(dc._requested_variable_freqs) + requested_variables_true.extend(getattr(dc, "_requested_variables_true", [])) + dependent_variables.extend(getattr(dc, "_dependent_variables", [])) + requested_variable_freqs.extend(getattr(dc, "_requested_variable_freqs", [])) df = pd.concat(catalogs, axis=0).drop_duplicates(ignore_index=True) dvr = intake_esm.DerivedVariableRegistry() dvr._registry.update(registry) newcat = DataCatalog({"esmcat": dcs[0].esmcat.dict(), "df": df}, registry=dvr) newcat._requested_variables = requested_variables - newcat._requested_variables_true = requested_variables_true - newcat._dependent_variables = dependent_variables - newcat._requested_variable_freqs = requested_variable_freqs + if requested_variables_true: + newcat._requested_variables_true = requested_variables_true + if dependent_variables: + newcat._dependent_variables = dependent_variables + if requested_variable_freqs: + newcat._requested_variable_freqs = requested_variable_freqs return newcat diff --git a/src/xscen/catutils.py b/src/xscen/catutils.py index 9be103fd..86d24c63 100644 --- a/src/xscen/catutils.py +++ b/src/xscen/catutils.py @@ -12,6 +12,7 @@ from copy import deepcopy from fnmatch import fnmatch from functools import partial, reduce +from itertools import chain, combinations, product from multiprocessing import Pool from pathlib import Path from typing import Any, Optional, Union @@ -35,7 +36,13 @@ logger = logging.getLogger(__name__) -__all__ = ["build_path", "parse_directory", "parse_from_ds", "register_parse_type"] +__all__ = [ + "build_path", + "parse_directory", + "parse_from_ds", + "patterns_from_schema", + "register_parse_type", +] # ## File finding and path parsing ## # @@ -440,7 +447,7 @@ def _parse_first_ds( @parse_config def parse_directory( # noqa: C901 - directories: list[Union[str, os.PathLike]], + directories: Union[str, list[Union[str, os.PathLike]]], patterns: list[str], *, id_columns: Optional[list[str]] = None, @@ -543,6 +550,8 @@ def parse_directory( # noqa: C901 pd.DataFrame Parsed directory files """ + if isinstance(directories, (str, Path)): + directories = [directories] homogenous_info = homogenous_info or {} xr_open_kwargs = xr_open_kwargs or {} if only_official_columns: @@ -1173,3 +1182,114 @@ def build_path( df["new_path_type"] = paths[1] return df return _build_path(data, schemas=schemas, root=root, get_type=False, **extra_facets) + + +def _as_template(a): + return "{" + a + "}" + + +def partial_format(template, **fmtargs): + """Format a template only partially, leaving un-formatted templates intact.""" + + class PartialFormatDict(dict): + def __missing__(self, key): + return _as_template(key) + + return template.format_map(PartialFormatDict(**fmtargs)) + + +def patterns_from_schema( + schema: Union[str, dict], exts: Optional[Sequence[str]] = None +): + """Generate all valid patterns for a given schema. + + Generated patterns are meant for use with :py:func:`parse_directory`. + This hardcodes the rule that facet can never contain a underscore ("_") except "variable". + File names are not strict except for the date bounds element which must be at the end if present. + + Parameters + ---------- + schema: dict or str + A dict with keys "with" (optional), "folders" and "filename", constructed as described + in the `xscen/data/file_schema.yml` file. + Or the name of a pattern group from that file. + exts: sequence of strings, optional + A list of file extensions to consider, with the leading dot. + Defaults to ``[".nc", ".zarr", ".zarr.zip"]``. + + Returns + ------- + list of patterns compatible with :py:func:`parse_directory`. + """ + if isinstance(schema, str): + schemas = Path(__file__).parent / "data" / "file_schema.yml" + with open(schemas) as f: + schema = yaml.safe_load(f)[schema] + + # # Base folder patterns + + # Index of optional folder parts + opt_idx = [ + i + for i, k in enumerate(schema["folders"]) + if isinstance(k, str) and k.startswith("(") + ] + + raw_folders = [] + for skip in chain.from_iterable( + combinations(opt_idx, r) for r in range(len(opt_idx) + 1) + ): + # skip contains index of levels to skip + # we go through every possible missing levels combinations + parts = [] + for i, part in enumerate(schema["folders"]): + if i in skip: + continue + if isinstance(part, str): + if part.startswith("("): + part = part[1:-1] + parts.append(_as_template(part)) + elif isinstance(part, dict): + parts.append(part["text"]) + else: + parts.append("_".join(map(_as_template, part))) + raw_folders.append("/".join(parts)) + + # # Inject conditions + folders = raw_folders + for conditions in schema["with"]: + if "value" not in conditions: + # This means that the facet must be set. + # Not useful when parsing. Implicit with the facet in the pattern. + continue + + # Ensure a list + if isinstance(conditions["value"], str): + value = [conditions["value"]] + else: + value = conditions["value"] + + patterns = [] + for patt in folders: + for val in value: + patterns.append(partial_format(patt, **{conditions["facet"]: val})) + folders = patterns + + # # Inject parsing requirements (hardcoded :( ) + folders = [folder.replace("{variable}", "{variable:_}") for folder in folders] + + # # Filenames + if "DATES" in schema["filename"]: + if schema["filename"][-1] != "DATES": + raise ValueError( + "Reverse pattern generation is not supported for filenames with date bounds not at the end." + ) + filename = "{?:_}_{DATES}" + else: + filename = "{?:_}" + + exts = exts or [".nc", ".zarr", ".zarr.zip"] + + patterns = [f"{fold}/{filename}{ext}" for fold, ext in product(folders, exts)] + + return patterns diff --git a/src/xscen/data/file_schema.yml b/src/xscen/data/file_schema.yml index 62b29660..8ebe3c6f 100644 --- a/src/xscen/data/file_schema.yml +++ b/src/xscen/data/file_schema.yml @@ -17,6 +17,7 @@ # - text: < value > # A fixed string # filename: # The file name schema, a list of facet names. If a facet is empty, it will be skipped. Elements will be separated by "_". # # The special "DATES" facet will be replaced by the most concise way found to define the temporal range covered by the file. +# # DATES should only appear at the end. --- ### Original / raw data # diff --git a/src/xscen/data/fr/LC_MESSAGES/xscen.mo b/src/xscen/data/fr/LC_MESSAGES/xscen.mo index 0c1c2619..617067d3 100644 Binary files a/src/xscen/data/fr/LC_MESSAGES/xscen.mo and b/src/xscen/data/fr/LC_MESSAGES/xscen.mo differ diff --git a/src/xscen/data/fr/LC_MESSAGES/xscen.po b/src/xscen/data/fr/LC_MESSAGES/xscen.po index 027dae3f..68277795 100644 --- a/src/xscen/data/fr/LC_MESSAGES/xscen.po +++ b/src/xscen/data/fr/LC_MESSAGES/xscen.po @@ -1,32 +1,23 @@ -#: src/xscen/aggregate.py:435 -#, fuzzy msgid "{window}-year climatological {operation} of {attr}." msgstr "Moyenne {window} ans de {attr}." -#: src/xscen/aggregate.py:649 msgid "{attr1}: {kind} delta compared to {refhoriz}." msgstr "{attr1}: Delta {kind} comparé à {refhoriz}." -#: src/xscen/diagnostics.py:522 msgid "Ranking of measure performance" msgstr "Classement de performance de la mesure" -#: src/xscen/diagnostics.py:581 msgid "Fraction of improved grid cells" msgstr "Fraction de points de grille améliorés" -#: src/xscen/io.py:761 src/xscen/io.py:769 msgid "Variable" msgstr "Variable" -#: src/xscen/io.py:762 msgid "Description" msgstr "Description" -#: src/xscen/io.py:765 msgid "Units" msgstr "Unités" -#: src/xscen/io.py:770 msgid "Content" msgstr "Contenu" diff --git a/src/xscen/indicators.py b/src/xscen/indicators.py index f47b2b12..af866800 100644 --- a/src/xscen/indicators.py +++ b/src/xscen/indicators.py @@ -8,16 +8,18 @@ from types import ModuleType from typing import Optional, Union +import pandas as pd import xarray as xr import xclim as xc from intake_esm import DerivedVariableRegistry +from xclim.core.calendar import construct_offset, parse_offset from xclim.core.indicator import Indicator from yaml import safe_load from xscen.config import parse_config from .catutils import parse_from_ds -from .utils import CV, standardize_periods +from .utils import CV, rechunk_for_resample, standardize_periods logger = logging.getLogger(__name__) @@ -62,6 +64,42 @@ def load_xclim_module( return xc.build_indicator_module_from_yaml(filename) +def get_indicator_outputs(ind: xc.core.indicator.Indicator, in_freq: str): + """Returns the variables names and resampling frequency of a given indicator. + + CAUTION : Some indicators will build the variable name on-the-fly according to the arguments. + This function will return the template string (with "{}"). + + Parameters + ---------- + ind : Indicator + An xclim indicator + in_freq : str + The data's sampling frequency. + + Returns + ------- + var_names : list + List of variable names + freq : str + Indicator resampling frequency. "fx" for time-reducing indicator. + """ + if isinstance(ind, xc.core.indicator.ReducingIndicator): + frq = "fx" + elif not isinstance(ind, xc.core.indicator.ResamplingIndicator): + frq = in_freq + else: + frq = ( + ind.injected_parameters["freq"] + if "freq" in ind.injected_parameters + else ind.parameters["freq"].default + ) + if frq == "YS": + frq = "YS-JAN" + var_names = [cfa["var_name"] for cfa in ind.cf_attrs] + return var_names, frq + + @parse_config def compute_indicators( # noqa: C901 ds: xr.Dataset, @@ -76,6 +114,7 @@ def compute_indicators( # noqa: C901 periods: Optional[Union[list[str], list[list[str]]]] = None, restrict_years: bool = True, to_level: Optional[str] = "indicators", + rechunk_input: bool = False, ) -> dict: """Calculate variables and indicators based on a YAML call to xclim. @@ -105,6 +144,10 @@ def compute_indicators( # noqa: C901 to_level : str, optional The processing level to assign to the output. If None, the processing level of the inputs is preserved. + rechunk_input : bool + If True, the dataset is rechunked with :py:func:`flox.xarray.rechunk_for_blockwise` + according to the resampling frequency of the indicator. Each rechunking is done + once per frequency with :py:func:`xscen.utils.rechunk_for_resample`. Returns ------- @@ -129,18 +172,9 @@ def compute_indicators( # noqa: C901 else: logger.info(f"Computing {N} indicators.") - def _infer_freq_from_meta(ind): - return ( - ind.injected_parameters["freq"] - if "freq" in ind.injected_parameters - else ( - ind.parameters["freq"].default - if "freq" in ind.parameters - else ind.src_freq - ) - ) - periods = standardize_periods(periods) + in_freq = xr.infer_freq(ds.time) if "time" in ds.dims else "fx" + dss_rechunked = {} out_dict = dict() for i, ind in enumerate(indicators, 1): @@ -150,9 +184,27 @@ def _infer_freq_from_meta(ind): iden = ind.identifier logger.info(f"{i} - Computing {iden}.") + _, freq = get_indicator_outputs(ind, in_freq) + + if rechunk_input and freq not in ["fx", in_freq]: + if freq not in dss_rechunked: + logger.debug(f"Rechunking with flox for freq {freq}") + dss_rechunked[freq] = rechunk_for_resample(ds, time=freq) + else: + logger.debug(f"Using rechunked for freq {freq}") + ds_in = dss_rechunked[freq] + else: + ds_in = ds + if periods is None: + # Pandas as no semiannual frequency and 2Q is capricious + if freq.startswith("2Q"): + logger.debug( + "Dropping start of timeseries to ensure semiannual frequency works." + ) + ds_in = fix_semiannual(ds_in, freq) # Make the call to xclim - out = ind(ds=ds) + out = ind(ds=ds_in) # In the case of multiple outputs, merge them into a single dataset if isinstance(out, tuple): @@ -161,23 +213,18 @@ def _infer_freq_from_meta(ind): else: out = out.to_dataset() - # Infer the indicator's frequency - if "time" in out.dims: - if len(out.time) < 3: - freq = _infer_freq_from_meta(ind) - else: - freq = xr.infer_freq(out.time) - else: - freq = "fx" - if freq == "YS": - freq = "YS-JAN" - else: # Multiple time periods to concatenate concats = [] for period in periods: # Make the call to xclim - ds_subset = ds.sel(time=slice(period[0], period[1])) + ds_subset = ds_in.sel(time=slice(period[0], period[1])) + # Pandas as no semiannual frequency and 2Q is capricious + if freq.startswith("2Q"): + logger.debug( + "Dropping start of timeseries to ensure semiannual frequency works." + ) + ds_subset = fix_semiannual(ds_subset, freq) tmp = ind(ds=ds_subset) # In the case of multiple outputs, merge them into a single dataset @@ -187,21 +234,9 @@ def _infer_freq_from_meta(ind): else: tmp = tmp.to_dataset() - # Infer the indicator's frequency - if "time" in tmp.dims: - if len(tmp.time) < 3: - freq = _infer_freq_from_meta(ind) - else: - freq = xr.infer_freq(tmp.time) - else: - freq = "fx" - - if freq == "YS": - freq = "YS-JAN" # In order to concatenate time periods, the indicator still needs a time dimension if freq == "fx": tmp = tmp.assign_coords({"time": ds_subset.time[0]}) - concats.append(tmp) out = xr.concat(concats, dim="time") @@ -351,3 +386,48 @@ def select_inds_for_avail_vars( return xc.core.indicator.build_indicator_module( "inds_for_avail_vars", available_inds, reload=True ) + + +def _wrap_month(m): + # Ensure the month number is between 1 and 12 + # Modulo returns 0 if m is a multiple of 12, 0 is false and we want 12. + return (m % 12) or 12 + + +def fix_semiannual(ds, freq): + """Avoid wrong start dates for semiannual frequency. + + Resampling with offsets that are multiples of a base frequency (ex: 2QS-OCT) is broken in pandas (https://github.com/pandas-dev/pandas/issues/51563). + This will cut the beggining of the dataset so it starts exactly at the beginning of the resampling period. + """ + # I hate that we have to do that + mul, b, s, anc = parse_offset(freq) + if mul != 2 or b != "Q": + raise NotImplementedError("This only fixes 2Q frequencies.") + # Get MONTH: N mapping (invert xarray's) + months_inv = xr.coding.cftime_offsets._MONTH_ABBREVIATIONS + months = dict(zip(months_inv.values(), months_inv.keys())) + + if s: + m1 = months[anc] + else: + m1 = _wrap_month(months[anc] + 1) + freq = construct_offset(mul, b, True, months_inv[m1]) + m2 = _wrap_month(m1 + 6) + + time = ds.indexes["time"] + if isinstance(time, xr.CFTimeIndex): + offset = xr.coding.cftime_offsets.to_offset(freq) + is_on_offset = offset.onOffset + else: + offset = pd.tseries.frequencies.to_offset(freq) + is_on_offset = offset.is_on_offset + + if is_on_offset(time[0]) and time[0].month in (m1, m2): + # wow, already correct! + return ds + + for t in time: + if is_on_offset(t) and t.month in (m1, m2): + return ds.sel(time=(time >= t)) + raise ValueError(f"Can't find a start date that fits with frequency {freq}.") diff --git a/src/xscen/io.py b/src/xscen/io.py index 5d9a7763..40073947 100644 --- a/src/xscen/io.py +++ b/src/xscen/io.py @@ -9,6 +9,7 @@ from inspect import signature from pathlib import Path from typing import Optional, Union +from zipfile import ZipFile import h5py import netCDF4 @@ -43,6 +44,8 @@ "save_to_zarr", "subset_maxsize", "to_table", + "unzip_directory", + "zip_directory", ] @@ -994,3 +997,52 @@ def rechunk( if temp_store is not None: sh.rmtree(temp_store) + + +def zip_directory( + root: Union[str, os.PathLike], zipfile: Union[str, os.PathLike], **zip_args +): + r"""Make a zip archive of the content of a directory. + + Parameters + ---------- + root : path + The directory with the content to archive. + zipfile : path + The zip file to create. + \*\*zip_args + Any other arguments to pass to :py:mod:`zipfile.ZipFile`, such as "compression". + The default is to make no compression (``compression=ZIP_STORED``). + """ + root = Path(root) + + def _add_to_zip(zf, path, root): + zf.write(path, path.relative_to(root)) + if path.is_dir(): + for subpath in path.iterdir(): + _add_to_zip(zf, subpath, root) + + with ZipFile(zipfile, "w", **zip_args) as zf: + for file in root.iterdir(): + _add_to_zip(zf, file, root) + + +def unzip_directory(zipfile: Union[str, os.PathLike], root: Union[str, os.PathLike]): + r"""Unzip an archive to a directory. + + This function is the exact opposite of :py:func:`xscen.io.zip_directory`. + + Parameters + ---------- + zipfile : path + The zip file to read. + root : path + The directory where to put the content to archive. + If doesn't exist, it will be created (and all its parents). + If it exists, should be empty. + """ + root = Path(root) + root.mkdir(parents=True, exist_ok=True) + + with ZipFile(zipfile, "r") as zf: + zf.extractall(root) diff --git a/src/xscen/utils.py b/src/xscen/utils.py index 08428cfd..b6469589 100644 --- a/src/xscen/utils.py +++ b/src/xscen/utils.py @@ -1600,3 +1600,11 @@ def _xarray_defaults(**kwargs): "data_vars", "minimal" ) return kwargs + + +def rechunk_for_resample(obj: Union[xr.DataArray, xr.Dataset], **resample_kwargs): + if not uses_dask(obj): + return obj + + res = obj.resample(**resample_kwargs) + return flox.xarray.rechunk_for_blockwise(obj, res._dim, res._codes) diff --git a/tests/test_catutils.py b/tests/test_catutils.py index 6748c3c2..3f05827c 100644 --- a/tests/test_catutils.py +++ b/tests/test_catutils.py @@ -228,6 +228,14 @@ def test_build_path(samplecat): ) in df.new_path.values +def test_pattern_from_schema(samplecat): + df = cu.build_path(samplecat, mip_era="CMIP5") + patts = cu.patterns_from_schema("original-sims-raw") + for p in df.new_path.values: + res = [cu._compile_pattern(patt).parse(p) for patt in patts] + assert any(res) + + def test_build_path_ds(): ds = xr.tutorial.open_dataset("air_temperature") ds = ds.assign(time=xr.cftime_range("0001-01-01", freq="6h", periods=ds.time.size))