Skip to content

Commit

Permalink
Various utilities (#431)
Browse files Browse the repository at this point in the history
<!-- Please ensure the PR fulfills the following requirements! -->
<!-- If this is your first PR, make sure to add your details to the
AUTHORS.rst! -->
### Pull Request Checklist:
- [ ] This PR addresses an already opened issue (for bug fixes /
features)
    - This PR fixes #xyz
- [x] (If applicable) Documentation has been added / updated (for bug
fixes / features).
- [x] (If applicable) Tests have been added.
- [x] This PR does not seem to break the templates.
- [x] CHANGELOG.rst has been updated (with summary of main changes).
- [x] Link to issue (:issue:`number`) and pull request (:pull:`number`)
has been added.

### What kind of change does this PR introduce?

* `zip_directory` and `unzip_directory` to do what their name hints to.
Useful for working with zipped zarrs. I compared different tools and
using python's built-in `zipfile` was only slightly slower than
subprocessing something like `7z` or `zip`. So I chose that module for
maximum compatibility (not tested on windows though).
* `DataCatalog.copy_files` : copy everything from a catalog to a new
root folder. Can unzip on the way, can recreate a structure or not. At
first I didn't want to do this in-place, but the result of
`search_data_catalogs` is not easily copiable.
- I use this to transfer my inputs from a network to a local storage on
narval because I know my workflow (indicators) will read the input
multiple times.
* Allow string `directories` in `parse_directory`. Wow did I wonder why
my parsing was taking so much time. When passing a single string, it is
still treated as a sequence so my path starting with `/tank` was first
looking through `/ `...
* `indicators.get_indicator_output` : a helper for something we often do
when checking if indicators exist in a catalog. Returns the output
frequency and variable names of an indicator.
* Option `rechunk_for_resample` for `compute_indicators` (and
corresponding util). Not 100% how much this changes the performance, but
the idea is to have chunks that fit the resampling period before calling
the indicator.
* `indicators.fix_semiannual` to fix the *?!&# problem with "2QS-OCT"
where pandas doesn't want to respect the anchor. (and pandas doesn't
have a 6-month period).
* `xs.catutils.patterns_from_schema` to help when running
`parse_directory` on a folder that was filled with `build_path`. Taking
the name of the pattern as in the config gile, returns all possible
patterns.

### Does this PR introduce a breaking change?
Shouldn't.

### Other information:
Pour les intéressées, voir mon utilisation des ces choses dans le script
`1_indicators.py` et le script `slurm_indicators.sh` dans ma branche de
Portraits :
https://github.com/Ouranosinc/portraits_climatiques/tree/ind-in-pc/preparation
.
  • Loading branch information
aulemahal authored Sep 11, 2024
2 parents c0c7114 + 2076a43 commit da5aeec
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 73 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ New features and enhancements
* More calendars are now recognized in ``translate_time_chunk``. (:pull:`450`).
* `new_dim` in ``unstack_dates`` is now None by default and changes depending on the frequency. It becomes `month` if the data is exactly monthly, and keep the old default of `season` otherwise. (:pull:`450`).
* Updated the list of libraries in `show_versions` to reflect our current environment. (:pull:`450`).
* New ``xscen.catutils.patterns_from_schema`` to generate all possible patterns from a given schema (or one of xscen's default), to use with :py:func:`parse_directory`. (:pull:`431`).
* New ``DataCatalog.copy_files`` to copy all files of catalog to a new destination, unzipping if needed and returning a new catalog. (:pull:`431`).
* Convenience functions ``xs.io.zip_directory`` and ``xs.io.unzip_directory`` (for zarrs). (:pull:`431`).
* New argument ``compute_indicators``: ``rechunk_input`` to rechunk the inputs to resample-appropriate chunks before calling xclim. (:pull:`431`).
* New ``xs.indicators.get_indicator_outputs`` to retrieve what variable name(s) and frequency to expect from an xclim indicator. (:pull:`431`).

Bug fixes
^^^^^^^^^
Expand All @@ -29,9 +34,12 @@ Bug fixes
* ``unstack_fill_nan`` now works if given a dictionary that contains both dimensions and coordinates. (:pull:`450`).
* ``clean_up`` no longer modifies the original dataset. (:pull:`450`).
* ``unstack_dates`` now works correctly for yearly datasets when `winter_starts_year=True`, as well as multi-year datasets. (:pull:`450`).
* Fix ``xs.catalog.concat_data_catalogs`` for catalogs that have not been search yet. (:pull:`431`).
* Fix indicator computation using ``freq=2Q*`` by assuming this means a semiannual frequency anchored at the given month (pandas assumes 2 quarter steps, any of them anchored at the given month). (:pull:`431`).

Internal changes
^^^^^^^^^^^^^^^^
* ``DataCatalog.to_dataset`` can now accept a ``preprocess`` argument even if ``create_ensemble_on`` is given. The user assumes calendar handling. (:pull:`431`).
* Include domain in `weight_location` in ``regrid_dataset``. (:pull:`414`).
* Added pins to `xarray`, `xclim`, `h5py`, and `netcdf4`. (:pull:`414`).
* Add ``.zip`` and ``.zarr.zip`` as possible file extensions for Zarr datasets. (:pull:`426`).
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ dev: clean ## install the package in editable mode with all development dependen

findfrench: ## Extract phrases and update the French translation catalog (this doesn't translate)
pybabel extract -o src/xscen/data/messages.pot --omit-header --input-dirs=src/xscen/
pybabel update -l fr -D xscen -i src/xscen/data/messages.pot -d src/xscen/data/ --omit-header
pybabel update -l fr -D xscen -i src/xscen/data/messages.pot -d src/xscen/data/ --omit-header --no-location

translate: ## Compile the translation catalogs.
pybabel compile -f -D xscen -d src/xscen/data/

checkfrench: ## Error if the catalog could be update or if the compilation is older than the catalog.
rm -f .check_messages.pot
pybabel extract -o .check_messages.pot --omit-header --input-dirs=src/xscen/
pybabel extract -o .check_messages.pot --omit-header --input-dirs=src/xscen/ --no-location
pybabel update -l fr -D xscen -i .check_messages.pot -d src/xscen/data/ --omit-header --check
rm -f .check_messages.pot
if [ src/xscen/data/fr/LC_MESSAGES/xscen.mo -ot src/xscen/data/fr/LC_MESSAGES/xscen.po ]; then echo "Compilation is older than translations. Please compile with 'make translate'."; exit 1; fi
124 changes: 101 additions & 23 deletions src/xscen/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import os
import re
import warnings
import shutil as sh
from collections.abc import Mapping, Sequence
from copy import deepcopy
from functools import reduce
Expand Down Expand Up @@ -436,12 +436,12 @@ def to_dataset(
If None, this will be the same as `create_ensemble_on`.
The resulting coordinate must be unique.
calendar : str, optional
If `create_ensemble_on` is given, all datasets are converted to this calendar before concatenation.
Ignored otherwise (default). If None, no conversion is done.
`align_on` is always "date".
If `create_ensemble_on` is given but not `preprocess`, all datasets are converted to this calendar before concatenation.
Ignored otherwise (default). If None, no conversion is done. `align_on` is always "date".
If `preprocess` is given, it must do the needed calendar handling.
kwargs:
Any other arguments are passed to :py:meth:`~intake_esm.core.esm_datastore.to_dataset_dict`.
The `preprocess` argument cannot be used if `create_ensemble_on` is given.
The `preprocess` argument must convert calendars as needed if `create_ensemble_on` is given.
Returns
-------
Expand Down Expand Up @@ -493,10 +493,6 @@ def to_dataset(
)

if create_ensemble_on:
if kwargs.get("preprocess") is not None:
warnings.warn(
"Using `create_ensemble_on` will override the given `preprocess` function."
)
cat.df["realization"] = generate_id(cat.df, ensemble_name)
cat.esmcat.aggregation_control.aggregations.append(
intake_esm.cat.Aggregation(
Expand All @@ -506,15 +502,19 @@ def to_dataset(
)
xrfreq = cat.df["xrfreq"].unique()[0]

def preprocess(ds):
ds = ensure_correct_time(ds, xrfreq)
if calendar is not None:
ds = ds.convert_calendar(
calendar, use_cftime=(calendar != "default"), align_on="date"
)
return ds
if kwargs.get("preprocess") is None:

kwargs["preprocess"] = preprocess
def preprocess(ds):
ds = ensure_correct_time(ds, xrfreq)
if calendar is not None:
ds = ds.convert_calendar(
calendar,
use_cftime=(calendar != "default"),
align_on="date",
)
return ds

kwargs["preprocess"] = preprocess

if len(rm_from_id) > 1:
# Guess what the ID was and rebuild a new one, omitting the columns part of the aggregation
Expand All @@ -536,6 +536,81 @@ def preprocess(ds):
ds = cat.to_dask(**kwargs)
return ds

def copy_files(
self,
dest: Union[str, os.PathLike],
flat: bool = True,
unzip: bool = False,
inplace: bool = False,
):
"""Copy each file of the catalog to another location, unzipping datasets along the way if requested.
Parameters
----------
cat: DataCatalog or ProjectCatalog
A catalog to copy.
dest: str, path
The root directory of the destination.
flat: bool
If True (default), all dataset files are copied in the same directory.
Renaming with an integer suffix ("{name}_01.{ext}") is done in case of duplicate file names.
If False, :py:func:`xscen.catutils.build_path` (with default arguments) is used to generated the new path below the destination.
Nothing is done in case of duplicates in that case.
unzip: bool
If True, any datasets with a `.zip` suffix are unzipped during the copy (or rather instead of a copy).
inplace : bool
If True, the catalog is updated in place. If False (default), a copy is returned.
Returns
-------
If inplace is False, this returns a catalog similar to self except with updated filenames. Some special attributes are not preserved,
such as those added by :py:func:`xscen.extract.search_data_catalogs`. In this case, use `inplace=True`.
"""
# Local imports to avoid circular imports
from .catutils import build_path
from .io import unzip_directory

dest = Path(dest)
data = self.esmcat._df.copy()
if flat:
new_paths = []
for path in map(Path, data.path.values):
if unzip and path.suffix == ".zip":
new = dest / path.with_suffix("").name
else:
new = dest / path.name
if new in new_paths:
suffixes = "".join(new.suffixes)
name = new.name.removesuffix(suffixes)
i = 1
while new in new_paths:
new = dest / (name + f"_{i:02d}" + suffixes)
i += 1
new_paths.append(new)
data["new_path"] = new_paths
else:
data = build_path(data, root=dest).drop(columns=["new_path_type"])

logger.debug(f"Will copy {len(data)} files.")
for i, row in data.iterrows():
old = Path(row.path)
new = Path(row.new_path)
if unzip and old.suffix == ".zip":
logger.info(f"Unzipping {old} to {new}.")
unzip_directory(old, new)
elif old.is_dir():
logger.info(f"Copying directory tree {old} to {new}.")
sh.copytree(old, new)
else:
logger.info(f"Copying file {old} to {new}.")
sh.copy(old, new)
if inplace:
self.esmcat._df["path"] = data["new_path"]
return
data["path"] = data["new_path"]
data = data.drop(columns=["new_path"])
return self.__class__({"esmcat": self.esmcat.dict(), "df": data})


class ProjectCatalog(DataCatalog):
"""A DataCatalog with additional 'write' functionalities that can update and upload itself.
Expand Down Expand Up @@ -856,17 +931,20 @@ def concat_data_catalogs(*dcs):
registry.update(dc.derivedcat._registry)
catalogs.append(dc.df)
requested_variables.extend(dc._requested_variables)
requested_variables_true.extend(dc._requested_variables_true)
dependent_variables.extend(dc._dependent_variables)
requested_variable_freqs.extend(dc._requested_variable_freqs)
requested_variables_true.extend(getattr(dc, "_requested_variables_true", []))
dependent_variables.extend(getattr(dc, "_dependent_variables", []))
requested_variable_freqs.extend(getattr(dc, "_requested_variable_freqs", []))
df = pd.concat(catalogs, axis=0).drop_duplicates(ignore_index=True)
dvr = intake_esm.DerivedVariableRegistry()
dvr._registry.update(registry)
newcat = DataCatalog({"esmcat": dcs[0].esmcat.dict(), "df": df}, registry=dvr)
newcat._requested_variables = requested_variables
newcat._requested_variables_true = requested_variables_true
newcat._dependent_variables = dependent_variables
newcat._requested_variable_freqs = requested_variable_freqs
if requested_variables_true:
newcat._requested_variables_true = requested_variables_true
if dependent_variables:
newcat._dependent_variables = dependent_variables
if requested_variable_freqs:
newcat._requested_variable_freqs = requested_variable_freqs
return newcat


Expand Down
124 changes: 122 additions & 2 deletions src/xscen/catutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from copy import deepcopy
from fnmatch import fnmatch
from functools import partial, reduce
from itertools import chain, combinations, product
from multiprocessing import Pool
from pathlib import Path
from typing import Any, Optional, Union
Expand All @@ -35,7 +36,13 @@
logger = logging.getLogger(__name__)


__all__ = ["build_path", "parse_directory", "parse_from_ds", "register_parse_type"]
__all__ = [
"build_path",
"parse_directory",
"parse_from_ds",
"patterns_from_schema",
"register_parse_type",
]
# ## File finding and path parsing ## #


Expand Down Expand Up @@ -440,7 +447,7 @@ def _parse_first_ds(

@parse_config
def parse_directory( # noqa: C901
directories: list[Union[str, os.PathLike]],
directories: Union[str, list[Union[str, os.PathLike]]],
patterns: list[str],
*,
id_columns: Optional[list[str]] = None,
Expand Down Expand Up @@ -543,6 +550,8 @@ def parse_directory( # noqa: C901
pd.DataFrame
Parsed directory files
"""
if isinstance(directories, (str, Path)):
directories = [directories]
homogenous_info = homogenous_info or {}
xr_open_kwargs = xr_open_kwargs or {}
if only_official_columns:
Expand Down Expand Up @@ -1173,3 +1182,114 @@ def build_path(
df["new_path_type"] = paths[1]
return df
return _build_path(data, schemas=schemas, root=root, get_type=False, **extra_facets)


def _as_template(a):
return "{" + a + "}"


def partial_format(template, **fmtargs):
"""Format a template only partially, leaving un-formatted templates intact."""

class PartialFormatDict(dict):
def __missing__(self, key):
return _as_template(key)

return template.format_map(PartialFormatDict(**fmtargs))


def patterns_from_schema(
schema: Union[str, dict], exts: Optional[Sequence[str]] = None
):
"""Generate all valid patterns for a given schema.
Generated patterns are meant for use with :py:func:`parse_directory`.
This hardcodes the rule that facet can never contain a underscore ("_") except "variable".
File names are not strict except for the date bounds element which must be at the end if present.
Parameters
----------
schema: dict or str
A dict with keys "with" (optional), "folders" and "filename", constructed as described
in the `xscen/data/file_schema.yml` file.
Or the name of a pattern group from that file.
exts: sequence of strings, optional
A list of file extensions to consider, with the leading dot.
Defaults to ``[".nc", ".zarr", ".zarr.zip"]``.
Returns
-------
list of patterns compatible with :py:func:`parse_directory`.
"""
if isinstance(schema, str):
schemas = Path(__file__).parent / "data" / "file_schema.yml"
with open(schemas) as f:
schema = yaml.safe_load(f)[schema]

# # Base folder patterns

# Index of optional folder parts
opt_idx = [
i
for i, k in enumerate(schema["folders"])
if isinstance(k, str) and k.startswith("(")
]

raw_folders = []
for skip in chain.from_iterable(
combinations(opt_idx, r) for r in range(len(opt_idx) + 1)
):
# skip contains index of levels to skip
# we go through every possible missing levels combinations
parts = []
for i, part in enumerate(schema["folders"]):
if i in skip:
continue
if isinstance(part, str):
if part.startswith("("):
part = part[1:-1]
parts.append(_as_template(part))
elif isinstance(part, dict):
parts.append(part["text"])
else:
parts.append("_".join(map(_as_template, part)))
raw_folders.append("/".join(parts))

# # Inject conditions
folders = raw_folders
for conditions in schema["with"]:
if "value" not in conditions:
# This means that the facet must be set.
# Not useful when parsing. Implicit with the facet in the pattern.
continue

# Ensure a list
if isinstance(conditions["value"], str):
value = [conditions["value"]]
else:
value = conditions["value"]

patterns = []
for patt in folders:
for val in value:
patterns.append(partial_format(patt, **{conditions["facet"]: val}))
folders = patterns

# # Inject parsing requirements (hardcoded :( )
folders = [folder.replace("{variable}", "{variable:_}") for folder in folders]

# # Filenames
if "DATES" in schema["filename"]:
if schema["filename"][-1] != "DATES":
raise ValueError(
"Reverse pattern generation is not supported for filenames with date bounds not at the end."
)
filename = "{?:_}_{DATES}"
else:
filename = "{?:_}"

exts = exts or [".nc", ".zarr", ".zarr.zip"]

patterns = [f"{fold}/{filename}{ext}" for fold, ext in product(folders, exts)]

return patterns
1 change: 1 addition & 0 deletions src/xscen/data/file_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# - text: < value > # A fixed string
# filename: # The file name schema, a list of facet names. If a facet is empty, it will be skipped. Elements will be separated by "_".
# # The special "DATES" facet will be replaced by the most concise way found to define the temporal range covered by the file.
# # DATES should only appear at the end.
---
### Original / raw data
#
Expand Down
Binary file modified src/xscen/data/fr/LC_MESSAGES/xscen.mo
Binary file not shown.
Loading

0 comments on commit da5aeec

Please sign in to comment.