From c1df94f96fd3e7f5fcae22ae225d8345f693e94e Mon Sep 17 00:00:00 2001 From: James Byrne Date: Mon, 1 Jul 2024 19:32:37 +0100 Subject: [PATCH] Dev #186: removing key elements of preprocessing functionality --- icenet/data/process.py | 613 ----------------------------- icenet/data/processors/__init__.py | 0 icenet/data/processors/cmip.py | 62 --- icenet/data/processors/era5.py | 40 -- icenet/data/processors/hres.py | 40 -- icenet/data/processors/meta.py | 120 ------ icenet/data/processors/oras5.py | 37 -- icenet/data/processors/osi.py | 68 ---- icenet/data/processors/utils.py | 175 -------- icenet/data/sic/__init__.py | 0 icenet/data/sic/mask.py | 331 ---------------- pyproject.toml | 7 +- 12 files changed, 1 insertion(+), 1492 deletions(-) delete mode 100644 icenet/data/process.py delete mode 100644 icenet/data/processors/__init__.py delete mode 100644 icenet/data/processors/cmip.py delete mode 100644 icenet/data/processors/era5.py delete mode 100644 icenet/data/processors/hres.py delete mode 100644 icenet/data/processors/meta.py delete mode 100644 icenet/data/processors/oras5.py delete mode 100644 icenet/data/processors/osi.py delete mode 100644 icenet/data/processors/utils.py delete mode 100644 icenet/data/sic/__init__.py delete mode 100644 icenet/data/sic/mask.py diff --git a/icenet/data/process.py b/icenet/data/process.py deleted file mode 100644 index 224a2fc9..00000000 --- a/icenet/data/process.py +++ /dev/null @@ -1,613 +0,0 @@ -import datetime as dt -import json -import logging -import os - -import dask -import numpy as np -import pandas as pd -import xarray as xr - -from icenet.data.producers import Processor -from icenet.data.sic.mask import Masks -from icenet.model.models import linear_trend_forecast -""" - -""" - - -class IceNetPreProcessor(Processor): - """ - - :param abs_vars: - :param anom_vars: - :param name: - :param train_dates: - :param val_dates: - :param test_dates: - :param *args: - :param data_shape: - :param dtype: - :param exclude_vars: - :param file_filters: - :param identifier: - :param linear_trends: - :param linear_trend_days: - :param meta_vars: - :param missing_dates: - :param minmax: - :param no_normalise: - :param path: - :param parallel_opens: - :param ref_procdir: - :param source_data: - :param update_key: - :param update_loader: - """ - - DATE_FORMAT = "%Y_%m_%d" - - def __init__( - self, - abs_vars, - anom_vars, - name, - # FIXME: the preprocessors don't need to have the concept of - # train, test, val: they only need to output daily files - # that either are, or are not, part of normalisation / - # climatology calculations. Not a problem, just fix - train_dates, - val_dates, - test_dates, - *args, - data_shape=(432, 432), - dtype=np.float32, - exclude_vars=(), - file_filters=tuple(["latlon_"]), - identifier=None, - linear_trends=tuple(["siconca"]), - linear_trend_steps=7, - meta_vars=tuple(), - missing_dates=tuple(), - minmax=True, - no_normalise=tuple(["siconca"]), - path=os.path.join(".", "processed"), - parallel_opens=False, - ref_procdir=None, - source_data=os.path.join(".", "data"), - update_key=None, - update_loader=True, - **kwargs): - super().__init__(identifier, - source_data, - *args, - file_filters=file_filters, - path=os.path.join(path, name), - train_dates=train_dates, - val_dates=val_dates, - test_dates=test_dates, - **kwargs) - - self._abs_vars = abs_vars if abs_vars else [] - self._anom_vars = anom_vars if anom_vars else [] - # TODO: Ugh, this should not be here any longer - self._meta_vars = list(meta_vars) - - self._name = name - - self._data_shape = data_shape - self._dtype = dtype - self._exclude_vars = exclude_vars - self._linear_trends = linear_trends - self._missing_dates = list(missing_dates) - self._no_normalise = no_normalise - self._normalise = self._normalise_array_mean \ - if not minmax else self._normalise_array_scaling - self._parallel = parallel_opens - self._refdir = ref_procdir - self._update_key = self.identifier if not update_key else update_key - self._update_loader = os.path.join(".", - "loader.{}.json".format(name)) \ - if update_loader else None - - if type(linear_trend_steps) == int: - logging.debug( - "Setting range for linear trend steps based on {}".format( - linear_trend_steps)) - self._linear_trend_steps = list(range(1, linear_trend_steps + 1)) - else: - self._linear_trend_steps = [int(el) for el in linear_trend_steps] - - def process(self): - """ - - """ - var_suffixes = ["abs", "anom"] - var_lists = [self._abs_vars, self._anom_vars] - for var_suffix, var_list in zip(var_suffixes, var_lists): - for var_name in var_list: - if var_name not in self._var_files.keys(): - logging.warning("{} does not exist".format(var_name)) - continue - self._save_variable(var_name, var_suffix) - - if self._update_loader: - self.update_loader_config() - - def pre_normalisation(self, var_name: str, da: object): - """ - - :param var_name: - :param da: - :return: - """ - logging.debug( - "No pre normalisation implemented for {}".format(var_name)) - return da - - def post_normalisation(self, var_name: str, da: object): - """ - - :param var_name: - :param da: - :return: - """ - logging.debug( - "No post normalisation implemented for {}".format(var_name)) - return da - - # TODO: update this to store parameters, if appropriate - def update_loader_config(self): - """ - - :return: - """ - - def _serialize(x): - if x is dt.date: - return x.strftime(IceNetPreProcessor.DATE_FORMAT) - return str(x) - - # We have to be explicit with "dates" as the properties will not be - # caught by _serialize - source = { - "name": self._name, - "implementation": self.__class__.__name__, - "anom": self._anom_vars, - "abs": self._abs_vars, - "dates": { - "train": [ - d.strftime(IceNetPreProcessor.DATE_FORMAT) - for d in self._dates.train - ], - "val": [ - d.strftime(IceNetPreProcessor.DATE_FORMAT) - for d in self._dates.val - ], - "test": [ - d.strftime(IceNetPreProcessor.DATE_FORMAT) - for d in self._dates.test - ], - }, - "linear_trends": self._linear_trends, - "linear_trend_steps": self._linear_trend_steps, - "meta": self._meta_vars, - # TODO: intention should perhaps be to strip these from - # other date sets, this is just an indicative placeholder - # for the mo - "var_files": self._processed_files, - } - - configuration = {"sources": {}} - - if os.path.exists(self._update_loader): - logging.info("Loading configuration {}".format( - self._update_loader)) - with open(self._update_loader, "r") as fh: - obj = json.load(fh) - configuration.update(obj) - - configuration["sources"][self._update_key] = source - - # Ideally should always be in together - if "dtype" in configuration: - assert configuration["dtype"] == self._dtype.__name__ - - if "shape" in configuration: - assert configuration["shape"] == list(self._data_shape) - - configuration["dtype"] = self._dtype.__name__ - configuration["shape"] = list(self._data_shape) - - if "missing_dates" not in configuration: - configuration["missing_dates"] = [] - - # Conversion required one way or another, so perhaps more efficient - # than a union - for d in sorted(self._missing_dates): - date_str = d.strftime(IceNetPreProcessor.DATE_FORMAT) - - if date_str not in configuration["missing_dates"]: - configuration["missing_dates"].append(date_str) - - logging.info("Writing configuration to {}".format(self._update_loader)) - - with open(self._update_loader, "w") as fh: - json.dump(configuration, fh, indent=4, default=_serialize) - - def _save_variable(self, var_name: str, var_suffix: str): - """ - - :param var_name: - :param var_suffix: - """ - with dask.config.set(**{'array.slicing.split_large_chunks': True}): - da = self._open_dataarray_from_files(var_name) - - # FIXME: we should ideally store train dates against the - # normalisation and climatology, to ensure recalculation on - # reprocess. All this need be is in the path, to be honest - - if var_name in self._anom_vars: - if self._refdir: - logging.info("Loading climatology from alternate " - "directory: {}".format(self._refdir)) - clim_path = os.path.join(self._refdir, "params", - "climatology.{}".format(var_name)) - else: - clim_path = os.path.join( - self.get_data_var_folder("params"), - "climatology.{}".format(var_name)) - - if not os.path.exists(clim_path): - logging.info("Generating climatology {}".format(clim_path)) - - if self._dates.train: - climatology = da.sel(time=self._dates.train).\ - groupby('time.month', restore_coord_dims=True).\ - mean() - - climatology.to_netcdf(clim_path) - else: - raise RuntimeError( - "{} does not exist and no " - "training data is supplied".format(clim_path)) - else: - logging.info("Reusing climatology {}".format(clim_path)) - climatology = xr.open_dataarray(clim_path) - - if not set(da.groupby("time.month").all().month.values).\ - issubset(set(climatology.month.values)): - logging.warning( - "We don't have a full climatology ({}) " - "compared with data ({})".format( - ",".join( - [str(i) for i in climatology.month.values]), - ",".join([ - str(i) for i in da.groupby( - "time.month").all().month.values - ]))) - da = da - climatology.mean() - else: - da = da.groupby("time.month") - climatology - - # FIXME: this is not the way to reconvert underlying data on - # dask arrays - da.data = np.asarray(da.data, dtype=self._dtype) - - da = self.pre_normalisation(var_name, da) - # We don't do this (https://github.com/tom-andersson/icenet2/blob/ - # 4ca0f1300fbd82335d8bb000c85b1e71855630fa/icenet/utils.py#L520) - # any more - - if var_name in self._linear_trends and var_suffix == "abs": - # TODO: verify, this used to be da = , but we should not be - # overwriting the abs da with linear trend da - ref_da = None - - if self._refdir: - logging.info( - "We have a reference {}, so will load " - "and supply abs from that for linear trend of " - "{}".format(self._refdir, var_name)) - ref_da = xr.open_dataarray( - os.path.join(self._refdir, var_name, - "{}_{}.nc".format(var_name, var_suffix))) - - self._build_linear_trend_da(da, var_name, ref_da=ref_da) - - elif var_name in self._linear_trends \ - and var_name not in self._abs_vars: - raise NotImplementedError( - "You've asked for linear trend " - "without an absolute value var: {}".format(var_name)) - - if var_name in self._no_normalise: - logging.info("No normalisation for {}".format(var_name)) - else: - logging.info("Normalising {}".format(var_name)) - da = self._normalise(var_name, da) - - da = self.post_normalisation(var_name, da) - - self.save_processed_file( - var_name, "{}_{}.nc".format(var_name, var_suffix), - da.rename("_".join([var_name, var_suffix]))) - - def _open_dataarray_from_files(self, var_name: str): - """ - Open the yearly xarray files, accounting for some ERA5 variables that - have erroneous 'unknown' NetCDF variable names which prevents - concatentation. - - :param var_name: - :return: - """ - - logging.info("Opening files for {}".format(var_name)) - logging.debug("Files: {}".format(self._var_files[var_name])) - ds = xr.open_mfdataset( - self._var_files[var_name], - # Solves issue with inheriting files without - # time dimension (only having coordinate) - combine="nested", - concat_dim="time", - coords="minimal", - compat="override", - drop_variables=("lat", "lon"), - parallel=self._parallel) - - # For processing one file, we're going to assume a single non-lambert - # variable exists at the start and rename all of them - var_names = [ - name for name in list(ds.data_vars.keys()) - if not name.startswith("lambert_") - ] - - var_names = set(var_names) - logging.debug( - "Files have var names {} which will be renamed to {}".format( - ", ".join(var_names), var_name)) - - ds = ds.rename({k: var_name for k in var_names}) - da = getattr(ds, var_name) - - # all_dates = self.dates.train + self.dates.val + self.dates.test - # logging.debug("{} dates in total".format(len(all_dates))) - - da_dates = [pd.to_datetime(d).date() for d in da.time.values] - logging.debug("{} dates in da".format(len(da_dates))) - - # search = sorted(list(set([el for el in all_dates - # if pd.to_datetime(el).date() in da_dates]))) - # logging.debug("Selecting {} dates from da".format(len(search))) - - # We no longer select on all_dates, as it destroys lag/lead processed - # selections from the dataset - try: - da = da.sel(time=da_dates) - except KeyError: - # There is likely non-resampled data being used - # TODO: we could use nearest neighbour on this coordinate, - # but this feels more reliable to dodgy input data when - # transferring - logging.warning("Data selection failed, likely not daily sampled " - "data so will give that a try") - da = da.resample(time="1D").mean().sel( - time=da_dates).sortby("time") - logging.info("Filtered to {} units long based on configuration " - "requirements".format(len(da.time))) - - for coord in ["yc", "xc"]: - if getattr(da, coord).attrs['units'] == "km": - da[coord] = da[coord] * 1000 - da[coord].attrs['units'] = "meters" - return da - - @staticmethod - def mean_and_std(array: object): - """ - Return the mean and standard deviation of an array-like object (intended - use case is for normalising a raw satellite data array based on a list - of samples used for training). - :param array: - :return: - """ - - mean = np.nanmean(array) - std = np.nanstd(array) - - logging.info("Mean: {:.3f}, std: {:.3f}".format( - mean.item(), std.item())) - - return mean, std - - def _normalise_array_mean(self, var_name: str, da: object): - """ - Using the *training* data only, compute the mean and - standard deviation of the input raw satellite DataArray (`da`) - and return a normalised version. If minmax=True, - instead normalise to lie between min and max of the elements of `array`. - - If min, max, mean, or std are given values other than None, - those values are used rather than being computed from the training - months. - - :param var_name: - :param da: - :return: - """ - - if self._refdir: - logging.info("Using alternate processing directory {} for " - "mean".format(self._refdir)) - proc_dir = os.path.join(self._refdir, "normalisation.mean") - else: - proc_dir = self.get_data_var_folder("normalisation.mean") - - mean_path = os.path.join(proc_dir, "{}".format(var_name)) - - if os.path.exists(mean_path): - logging.debug( - "Loading norm-average mean-std from {}".format(mean_path)) - mean, std = tuple([ - self._dtype(el) - for el in open(mean_path, "r").read().split(",") - ]) - elif self._dates.train: - logging.debug("Generating norm-average mean-std from {} training " - "dates".format(len(self._dates.train))) - training_samples = da.sel(time=self._dates.train).data - training_samples = training_samples.ravel() - - mean, std = IceNetPreProcessor.mean_and_std(training_samples) - else: - raise RuntimeError("Either a normalisation file or training data " - "must be supplied") - - new_da = (da - mean) / std - - if not self._refdir: - open(mean_path, "w").write(",".join([str(f) for f in [mean, std]])) - return new_da - - def _normalise_array_scaling(self, var_name: str, da: object): - """ - - :param var_name: - :param da: - :return: - """ - if self._refdir: - logging.info("Using alternate processing directory {} for " - "scaling".format(self._refdir)) - proc_dir = os.path.join(self._refdir, "normalisation.scale") - else: - proc_dir = self.get_data_var_folder("normalisation.scale") - - scale_path = os.path.join(proc_dir, "{}".format(var_name)) - - if os.path.exists(scale_path): - logging.debug( - "Loading norm-scaling min-max from {}".format(scale_path)) - minimum, maximum = tuple([ - self._dtype(el) - for el in open(scale_path, "r").read().split(",") - ]) - elif self._dates.train: - logging.debug("Generating norm-scaling min-max from {} training " - "dates".format(len(self._dates.train))) - training_samples = da.sel(time=self._dates.train).data - training_samples = training_samples.ravel() - - minimum = np.nanmin(training_samples).astype(self._dtype) - maximum = np.nanmax(training_samples).astype(self._dtype) - else: - raise RuntimeError("Either a normalisation file or training data " - "must be supplied") - - new_da = (da - minimum) / (maximum - minimum) - if not self._refdir: - open(scale_path, - "w").write(",".join([str(f) for f in [minimum, maximum]])) - return new_da - - def _build_linear_trend_da(self, - input_da: object, - var_name: str, - max_years: int = 35, - ref_da: object = None): - """ - Construct a DataArray `linear_trend_da` containing the linear trend - forecasts based on the input DataArray `input_da`. - - :param input_da: - :param var_name: - :param max_years: - :param ref_da: - :return: - """ - - if ref_da is None: - ref_da = input_da - data_dates = sorted( - [pd.Timestamp(date) for date in input_da.time.values]) - - trend_dates = set() - trend_steps = max(self._linear_trend_steps) - logging.info( - "Generating trend data up to {} steps ahead for {} dates".format( - trend_steps, len(data_dates))) - - for dat_date in data_dates: - trend_dates = trend_dates.union([ - dat_date + pd.DateOffset(days=d) - for d in self._linear_trend_steps - ]) - - trend_dates = list(sorted(trend_dates)) - logging.info("Generating {} trend dates".format(len(trend_dates))) - - linear_trend_da = \ - xr.broadcast(input_da, xr.DataArray(pd.date_range( - data_dates[0], - data_dates[-1] + pd.DateOffset(days=trend_steps)), - dims="time"))[0] - linear_trend_da = linear_trend_da.sel(time=trend_dates) - linear_trend_da.data = np.zeros(linear_trend_da.shape) - - land_mask = Masks(north=self.north, south=self.south).get_land_mask() - - # Could use shelve, but more likely we'll run into concurrency issues - # pickleshare might be an option but a little over-engineery - trend_cache_path = os.path.join(self.get_data_var_folder(var_name), - "{}_linear_trend.nc".format(var_name)) - trend_cache = linear_trend_da.copy() - trend_cache.data = np.full_like(linear_trend_da.data, np.nan) - - if os.path.exists(trend_cache_path): - trend_cache = xr.open_dataarray(trend_cache_path) - logging.info("Loaded {} entries from {}".format( - len(trend_cache.time), trend_cache_path)) - - def data_selector(da, processing_date, missing_dates=tuple()): - target_date = pd.to_datetime(processing_date) - - date_da = da[(da.time['time.month'] == target_date.month) & - (da.time['time.day'] == target_date.day) & - (da.time <= target_date) & - ~da.time.isin(missing_dates)].\ - isel(time=slice(0, max_years)) - return date_da - - for forecast_date in sorted(trend_dates, reverse=True): - if not trend_cache.sel(time=forecast_date).isnull().all(): - output_map = trend_cache.sel(time=forecast_date) - else: - output_map = linear_trend_forecast( - data_selector, - forecast_date, - ref_da, - land_mask, - missing_dates=self._missing_dates, - shape=self._data_shape) - - linear_trend_da.loc[dict(time=forecast_date)] = output_map - - logging.info("Writing new trend cache for {}".format(var_name)) - trend_cache.close() - linear_trend_da = linear_trend_da.rename( - "{}_linear_trend".format(var_name)) - self.save_processed_file(var_name, - "{}_linear_trend.nc".format(var_name), - linear_trend_da) - - return linear_trend_da - - @property - def missing_dates(self): - return self._missing_dates - - @missing_dates.setter - def missing_dates(self, arr): - self._missing_dates = arr diff --git a/icenet/data/processors/__init__.py b/icenet/data/processors/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/icenet/data/processors/cmip.py b/icenet/data/processors/cmip.py deleted file mode 100644 index 1326d42e..00000000 --- a/icenet/data/processors/cmip.py +++ /dev/null @@ -1,62 +0,0 @@ -from icenet.data.cli import process_args, process_date_args -from icenet.data.process import IceNetPreProcessor -from icenet.data.sic.mask import Masks -from icenet.data.processors.utils import sic_interpolate -""" - -""" - - -class IceNetCMIPPreProcessor(IceNetPreProcessor): - """ - - :param source: - :param member: - """ - - def __init__(self, source: str, member: str, *args, **kwargs): - cmip_source = "{}.{}".format(source, member) - super().__init__(*args, - identifier="cmip6.{}".format(cmip_source), - **kwargs) - - def pre_normalisation(self, var_name: str, da: object): - """ - - :param var_name: - :param da: - :return: - """ - if var_name == "siconca": - masks = Masks(north=self.north, south=self.south) - return sic_interpolate(da, masks) - - return da - - -def main(): - args = process_args(extra_args=[ - (["source"], dict(type=str)), - (["member"], dict(type=str)), - ],) - dates = process_date_args(args) - - cmip = IceNetCMIPPreProcessor( - args.source, - args.member, - args.abs, - args.anom, - args.name, - dates["train"], - dates["val"], - dates["test"], - linear_trends=args.trends, - linear_trend_days=args.trend_lead, - north=args.hemisphere == "north", - parallel_opens=args.parallel_opens, - ref_procdir=args.ref, - south=args.hemisphere == "south", - update_key=args.update_key, - ) - cmip.init_source_data(lag_days=args.lag,) - cmip.process() diff --git a/icenet/data/processors/era5.py b/icenet/data/processors/era5.py deleted file mode 100644 index 0f45810a..00000000 --- a/icenet/data/processors/era5.py +++ /dev/null @@ -1,40 +0,0 @@ -from icenet.data.cli import process_args, process_date_args -from icenet.data.process import IceNetPreProcessor -""" - -""" - - -class IceNetERA5PreProcessor(IceNetPreProcessor): - """ - - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, - file_filters=["latlon_"], - identifier="era5", - **kwargs) - - -def main(): - args = process_args() - dates = process_date_args(args) - - era5 = IceNetERA5PreProcessor( - args.abs, - args.anom, - args.name, - dates["train"], - dates["val"], - dates["test"], - linear_trends=args.trends, - linear_trend_days=args.trend_lead, - north=args.hemisphere == "north", - parallel_opens=args.parallel_opens, - ref_procdir=args.ref, - south=args.hemisphere == "south", - update_key=args.update_key, - ) - era5.init_source_data(lag_days=args.lag,) - era5.process() diff --git a/icenet/data/processors/hres.py b/icenet/data/processors/hres.py deleted file mode 100644 index ee7f1db1..00000000 --- a/icenet/data/processors/hres.py +++ /dev/null @@ -1,40 +0,0 @@ -from icenet.data.cli import process_args, process_date_args -from icenet.data.process import IceNetPreProcessor -""" - -""" - - -class IceNetHRESPreProcessor(IceNetPreProcessor): - """ - - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, - file_filters=["latlon_"], - identifier="mars.hres", - **kwargs) - - -def main(): - args = process_args() - dates = process_date_args(args) - - hres = IceNetHRESPreProcessor( - args.abs, - args.anom, - args.name, - dates["train"], - dates["val"], - dates["test"], - linear_trends=args.trends, - linear_trend_steps=args.trend_lead, - north=args.hemisphere == "north", - parallel_opens=args.parallel_opens, - ref_procdir=args.ref, - south=args.hemisphere == "south", - update_key=args.update_key, - ) - hres.init_source_data(lag_days=args.lag,) - hres.process() diff --git a/icenet/data/processors/meta.py b/icenet/data/processors/meta.py deleted file mode 100644 index 5d1b6f46..00000000 --- a/icenet/data/processors/meta.py +++ /dev/null @@ -1,120 +0,0 @@ -import numpy as np -import pandas as pd -import xarray as xr - -from icenet.data.cli import process_args -from icenet.data.process import IceNetPreProcessor -from icenet.data.sic.mask import Masks -""" - -""" - - -class IceNetMetaPreProcessor(IceNetPreProcessor): - """ - - :param name: - :param include_circday: - :param include_land: - """ - - def __init__(self, - name: str, - include_circday: bool = True, - include_land: bool = True, - **kwargs): - super().__init__(abs_vars=[], - anom_vars=[], - identifier="meta", - linear_trends=tuple(), - name=name, - test_dates=[], - train_dates=[], - val_dates=[], - **kwargs) - - self._include_circday = include_circday - self._include_land = include_land - - def init_source_data(self, - lag_days: object = None, - lead_days: object = None): - """ - - :param lag_days: - :param lead_days: - """ - raise NotImplementedError("No need to execute implementation for meta") - - def process(self): - """ - - """ - if self._include_circday: - self._save_circday() - - if self._include_land: - self._save_land() - - # FIXME: this is a bit messy, clarify meta_vars and interface between - # processing and dataset generation - if self._update_loader: - self.update_loader_config() - - def _save_land(self): - """ - - """ - land_mask = Masks(north=self.north, south=self.south).get_land_mask() - land_map = np.ones(self._data_shape, dtype=self._dtype) - land_map[~land_mask] = -1. - - if "land" not in self._meta_vars: - self._meta_vars.append("land") - - da = xr.DataArray(data=land_map, - dims=["yc", "xc"], - attrs=dict(description="IceNet land mask metadata")) - land_path = self.save_processed_file("land", "land.nc", da) - return land_path - - def _save_circday(self): - """ - - """ - cos = [] - sin = [] - paths = [] - - for date in pd.date_range(start='2012-1-1', end='2012-12-31'): - if self.north: - circday = date.dayofyear - else: - circday = date.dayofyear + 365.25 / 2 - - cos.append(np.cos(2 * np.pi * circday / 366, dtype=self._dtype)) - sin.append(np.sin(2 * np.pi * circday / 366, dtype=self._dtype)) - - for var_name in ["sin", "cos"]: - if var_name not in self._meta_vars: - self._meta_vars.append(var_name) - - da = xr.DataArray( - data=eval(var_name), - dims=["time"], - coords=dict( - time=pd.date_range(start='2012-1-1', end='2012-12-31')), - attrs=dict( - description="IceNet {} mask metadata".format(var_name))) - paths.append( - self.save_processed_file(var_name, "{}.nc".format(var_name), - da)) - return paths - - -def main(): - args = process_args(dates=False, ref_option=False) - - IceNetMetaPreProcessor(args.name, - north=args.hemisphere == "north", - south=args.hemisphere == "south").process() diff --git a/icenet/data/processors/oras5.py b/icenet/data/processors/oras5.py deleted file mode 100644 index 6f6a15a2..00000000 --- a/icenet/data/processors/oras5.py +++ /dev/null @@ -1,37 +0,0 @@ -from icenet.data.cli import process_args, process_date_args -from icenet.data.process import IceNetPreProcessor - - -class IceNetORAS5PreProcessor(IceNetPreProcessor): - """ - - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, - file_filters=["latlon_"], - identifier="oras5", - **kwargs) - - -def main(): - args = process_args() - dates = process_date_args(args) - - oras5 = IceNetORAS5PreProcessor( - args.abs, - args.anom, - args.name, - dates["train"], - dates["val"], - dates["test"], - linear_trends=args.trends, - linear_trend_days=args.trend_lead, - north=args.hemisphere == "north", - parallel_opens=args.parallel_opens, - ref_procdir=args.ref, - south=args.hemisphere == "south", - update_key=args.update_key, - ) - oras5.init_source_data(lag_days=args.lag,) - oras5.process() diff --git a/icenet/data/processors/osi.py b/icenet/data/processors/osi.py deleted file mode 100644 index 4be1d433..00000000 --- a/icenet/data/processors/osi.py +++ /dev/null @@ -1,68 +0,0 @@ -import datetime as dt -import os - -from icenet.data.cli import process_args, process_date_args -from icenet.data.process import IceNetPreProcessor -from icenet.data.sic.mask import Masks -from icenet.data.processors.utils import sic_interpolate -""" - -""" - - -class IceNetOSIPreProcessor(IceNetPreProcessor): - """ - - :param missing_dates: - """ - - def __init__(self, *args, missing_dates: object = None, **kwargs): - super().__init__(*args, identifier="osisaf", **kwargs) - - missing_dates_path = os.path.join(self._source_data, "siconca", - "missing_days.csv") - - missing_dates = [] if missing_dates is None else missing_dates - assert type(missing_dates) is list - - with open(missing_dates_path, "r") as fh: - missing_dates += [ - dt.date(*[int(s) - for s in line.strip().split(",")]) - for line in fh.readlines() - ] - self.missing_dates = list(set(missing_dates)) - - def pre_normalisation(self, var_name: str, da: object): - """ - - :param var_name: - :param da: - :return: - """ - if var_name != "siconca": - raise RuntimeError("OSISAF SIC implementation should be dealing " - "with siconca, ") - else: - masks = Masks(north=self.north, south=self.south) - return sic_interpolate(da, masks) - - -def main(): - args = process_args() - dates = process_date_args(args) - - osi = IceNetOSIPreProcessor(args.abs, - args.anom, - args.name, - dates["train"], - dates["val"], - dates["test"], - linear_trends=args.trends, - linear_trend_steps=args.trend_lead, - north=args.hemisphere == "north", - parallel_opens=args.parallel_opens, - ref_procdir=args.ref, - south=args.hemisphere == "south") - osi.init_source_data(lag_days=args.lag,) - osi.process() diff --git a/icenet/data/processors/utils.py b/icenet/data/processors/utils.py deleted file mode 100644 index 9ae36064..00000000 --- a/icenet/data/processors/utils.py +++ /dev/null @@ -1,175 +0,0 @@ -import argparse -import glob -import logging -import os - -import numpy as np -import pandas as pd -import xarray as xr - -from icenet.utils import Hemisphere -from icenet.data.producers import DataProducer - -from scipy import interpolate -from scipy.spatial.qhull import QhullError -""" - -""" - - -def sic_interpolate(da: object, masks: object) -> object: - """ - - :param da: - :param masks: - :return: - """ - for date in da.time.values: - polarhole_mask = masks.get_polarhole_mask(pd.to_datetime(date).date()) - - da_day = da.sel(time=date) - xx, yy = np.meshgrid(np.arange(432), np.arange(432)) - - # Grid cells outside of polar hole or NaN regions - valid = ~np.isnan(da_day.data) - - # Interpolate polar hole - if type(polarhole_mask) is np.ndarray: - valid = valid & ~polarhole_mask - - # Interpolate if there is more than one missing grid cell - if np.sum(~valid) >= 1: - # Find grid cell locations surrounding NaN regions for bilinear - # interpolation - nan_mask = np.ma.masked_array(np.full((432, 432), 0.)) - nan_mask[~valid] = np.ma.masked - - nan_neighbour_arrs = {} - for order in 'C', 'F': - # starts and ends indexes of masked element chunks - slice_ends = np.ma.clump_masked(nan_mask.ravel(order=order)) - - nan_neighbour_idxs = [] - nan_neighbour_idxs.extend([s.start for s in slice_ends]) - nan_neighbour_idxs.extend([s.stop - 1 for s in slice_ends]) - - nan_neighbour_arr_i = np.array(np.full((432, 432), False), - order=order) - nan_neighbour_arr_i.ravel(order=order)[nan_neighbour_idxs] \ - = True - nan_neighbour_arrs[order] = nan_neighbour_arr_i - - nan_neighbour_arr = nan_neighbour_arrs['C'] + \ - nan_neighbour_arrs['F'] - # Remove artefacts along edge of the grid - nan_neighbour_arr[:, 0] = \ - nan_neighbour_arr[0, :] = \ - nan_neighbour_arr[:, -1] = \ - nan_neighbour_arr[-1, :] = False - - if np.sum(nan_neighbour_arr) == 1: - res = np.where( - np.array(nan_neighbour_arr) == True) # noqa: E712 - logging.warning( - "Not enough nans for interpolation, extending {}".format( - res)) - x_idx, y_idx = res[0][0], res[1][0] - nan_neighbour_arr[x_idx - 1:x_idx + 2, y_idx] = True - nan_neighbour_arr[x_idx, y_idx - 1:y_idx + 2] = True - logging.debug( - np.where(np.array(nan_neighbour_arr) == True)) # noqa: E712 - - # Perform bilinear interpolation - x_valid = xx[nan_neighbour_arr] - y_valid = yy[nan_neighbour_arr] - values = da_day.data[nan_neighbour_arr] - - x_interp = xx[~valid] - y_interp = yy[~valid] - - try: - if len(x_valid) or len(y_valid): - interp_vals = interpolate.griddata((x_valid, y_valid), - values, - (x_interp, y_interp), - method='linear') - da.sel(time=date).data[~valid] = interp_vals - else: - logging.warning("No valid values to interpolate with on " - "{}".format(date)) - except QhullError: - logging.exception( - "Geometrical degeneracy from QHull, interpolation failed") - - return da - - -def condense_main(): - ap = argparse.ArgumentParser() - ap.add_argument("identifier") - ap.add_argument("hemisphere", choices=("north", "south")) - ap.add_argument("variable") - - ap.add_argument("-n", "--numpy", action="store_true", default=False) - ap.add_argument("-v", "--verbose", action="store_true", default=False) - args = ap.parse_args() - logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) - condense_data(args.identifier, args.hemisphere, args.variable) - - -def condense_data(identifier: str, hemisphere: str, variable: str): - """Takes existing daily files and creates yearly files - - Previous early versions of the pipeline were storing files day by day, which - is pretty wasteful. This allows us to create the yearly files and avoid - all that nasty re-downloading business - - :param identifier: - :param hemisphere: - :param variable: - """ - logging.info("Condensing data into singular file") - - dp = DataProducer(identifier=identifier, - north=getattr(Hemisphere, - hemisphere.upper()) == Hemisphere.NORTH, - south=getattr(Hemisphere, - hemisphere.upper()) == Hemisphere.SOUTH) - - data_path = dp.get_data_var_folder(variable, missing_error=True) - - logging.debug("Collecting files from {}".format(data_path)) - dfs = glob.glob(os.path.join(data_path, "**", "*.nc")) - - def year_batch(filenames): - df_years = set([ - os.path.split(os.path.dirname(f_year))[-1] for f_year in filenames - ]) - - for year_el in df_years: - year_dfs = [ - el for el in filenames - if os.path.split(os.path.dirname(el))[-1] == year_el and - not os.path.split(el)[1].startswith("latlon") - ] - logging.debug("{} has {} files".format(year_el, len(year_dfs))) - yield year_el, year_dfs - - if len(dfs): - logging.debug("Got {} files, collecting to {}...".format( - len(dfs), data_path)) - - for year, year_files in year_batch(dfs): - year_path = os.path.join(data_path, "{}.nc".format(year)) - - if not os.path.exists(year_path): - logging.info("Loading {}".format(year)) - ds = xr.open_mfdataset(year_files, parallel=True) - years, datasets = zip(*ds.groupby("time.year")) - if len(years) > 1: - raise RuntimeError( - "Too many years in one file {}".format(years)) - logging.info("Saving to {}".format(year_path)) - xr.save_mfdataset(datasets, [year_path]) - else: - logging.info("No valid files found.") diff --git a/icenet/data/sic/__init__.py b/icenet/data/sic/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/icenet/data/sic/mask.py b/icenet/data/sic/mask.py deleted file mode 100644 index 0704b2d3..00000000 --- a/icenet/data/sic/mask.py +++ /dev/null @@ -1,331 +0,0 @@ -import datetime as dt -import logging -import os -import shutil - -import numpy as np -import pandas as pd -import xarray as xr - -from icenet.data.cli import download_args -from icenet.data.producers import Generator -from icenet.utils import run_command -from download_toolbox.utils import SIC_HEMI_STR -"""Sea Ice Masks - -""" - - -class Masks(Generator): - """Masking of regions to include/omit in dataset. - - TODO: Add example usage. - """ - - LAND_MASK_FILENAME = "land_mask.npy" - # FIXME: nh/sh? - POLARHOLE_RADII = (28, 11, 3) - POLARHOLE_DATES = ( - dt.date(1987, 6, 1), - dt.date(2005, 10, 1), - dt.date(2015, 12, 1), - ) - - def __init__(self, - *args, - polarhole_dates: object = POLARHOLE_DATES, - polarhole_radii: object = POLARHOLE_RADII, - data_shape: object = (432, 432), - dtype: object = np.float32, - **kwargs): - """Initialises Masks across specified hemispheres. - - Args: - polarhole_dates: Dates for polar hole (missing data) in data. - polarhole_radii: Radii of polar hole. - data_shape: Shape of input dataset. - dtype: Store mask as this type. - """ - super().__init__(*args, identifier="masks", **kwargs) - - self._polarhole_dates = polarhole_dates - self._polarhole_radii = polarhole_radii - self._dtype = dtype - self._shape = data_shape - self._region = (slice(None, None), slice(None, None)) - - self.init_params() - - def init_params(self): - """Initialises the parameters of the Masks class. - - This method will create a `masks.params` file if it does not exist. - And, stores the polar_radii and polar_dates instance variables into it. - If it already exists, it will read and store the values to the instance - variables - """ - - params_path = os.path.join(self.get_data_var_folder("masks"), - "masks.params") - - if not os.path.exists(params_path): - with open(params_path, "w") as fh: - for i, polarhole in enumerate(self._polarhole_radii): - fh.write("{}\n".format(",".join([ - str(polarhole), - self._polarhole_dates[i].strftime("%Y%m%d") - ]))) - else: - lines = [ - el.strip().split(",") - for el in open(params_path, "r").readlines() - ] - radii, dates = zip(*lines) - self._polarhole_dates = [ - dt.datetime.strptime(el, "%Y%m%d").date() for el in dates - ] - self._polarhole_radii = [int(r) for r in radii] - - def generate(self, - year: int = 2000, - save_land_mask: bool = True, - save_polarhole_masks: bool = True, - remove_temp_files: bool = False): - """Generate a set of data masks. - - Args: - year (optional): Which year to use for generate masks from. - Defaults to 2000. - save_land_mask (optional): Whether to output land mask. - Defaults to True. - save_polarhole_masks (optional): Whether to output polar hole masks. - Defaults to True. - remove_temp_files (optional): Whether to remove temporary directory. - Defaults to False. - """ - siconca_folder = self.get_data_var_folder("siconca") - - # FIXME: cut-dirs can be change intolerant, better use -O, changed - # from 4 to 5 - retrieve_cmd_template_osi450 = \ - "wget -m -nH --cut-dirs=4 -P {} " \ - "ftp://osisaf.met.no/reprocessed/ice/conc/v2p0/{:04d}/{:02d}/{}" - filename_template_osi450 = \ - 'ice_conc_{}_ease2-250_cdr-v2p0_{:04d}{:02d}021200.nc' - - # Generate the land-lake-sea mask using the second day from each month - # of the year 2000 (chosen arbitrarily as the mask is fixed within - # month) - for month in range(1, 13): - mask_path = os.path.join( - self.get_data_var_folder("masks"), - "active_grid_cell_mask_{:02d}.npy".format(month)) - if not os.path.exists(mask_path): - # Download the data if not already downloaded - filename_osi450 = filename_template_osi450.format( - SIC_HEMI_STR[self.hemisphere_str[0]], year, month) - - month_str = '{:02d}'.format(month) - month_folder = os.path.join(siconca_folder, str(year), month_str) - month_path = os.path.join(month_folder, filename_osi450) - - if not os.path.exists(month_path): - run_command( - retrieve_cmd_template_osi450.format( - siconca_folder, year, month, filename_osi450)) - else: - logging.info( - "siconca {} already exists".format(filename_osi450)) - - with xr.open_dataset(month_path) as ds: - status_flag = ds['status_flag'] - status_flag = np.array(status_flag.data).astype(np.uint8) - status_flag = status_flag.reshape(*self._shape) - - binary = np.unpackbits(status_flag, axis=1).\ - reshape(*self._shape, 8) - - # TODO: Add source/explanation for these magic numbers (index slicing nos.). - # Mask out: land, lake, and 'outside max climatology' (open sea) - max_extent_mask = np.sum(binary[:, :, [7, 6, 0]], - axis=2).reshape(*self._shape) >= 1 - max_extent_mask = ~max_extent_mask - - # FIXME: Remove Caspian and Black seas - should we do this sh? - if self.north: - # TODO: Add source/explanation for these indices. - max_extent_mask[325:386, 317:380] = False - - logging.info("Saving {}".format(mask_path)) - - np.save(mask_path, max_extent_mask) - - land_mask_path = os.path.join(self.get_data_var_folder("masks"), - Masks.LAND_MASK_FILENAME) - - if save_land_mask and \ - not os.path.exists(land_mask_path) \ - and month == 1: - land_mask = np.sum(binary[:, :, [7, 6]], axis=2).\ - reshape(*self._shape) >= 1 - - logging.info("Saving {}".format(land_mask_path)) - np.save(land_mask_path, land_mask) - else: - logging.info("Skipping {}, already exists".format(mask_path)) - - # Delete the data/siconca/2000 folder holding the temporary daily files - if remove_temp_files: - logging.info("Removing {}".format(siconca_folder)) - shutil.rmtree(siconca_folder) - - if save_polarhole_masks and not self.south: - # Generate the polar hole masks - x = np.tile(np.arange(0, self._shape[1]). - reshape(self._shape[0], 1), (1, self._shape[1])).\ - astype(self._dtype) - 215.5 - y = np.tile(np.arange(0, self._shape[1]). - reshape(1, self._shape[1]), (self._shape[0], 1)).\ - astype(self._dtype) - 215.5 - squaresum = np.square(x) + np.square(y) - - for i, radius in enumerate(self._polarhole_radii): - polarhole = np.full(self._shape, False) - polarhole[squaresum < radius**2] = True - - polarhole_path = os.path.join( - self.get_data_var_folder("masks"), - "polarhole{}_mask.npy".format(i + 1)) - logging.info("Saving polarhole {}".format(polarhole_path)) - np.save(polarhole_path, polarhole) - - def get_active_cell_mask(self, month: object) -> object: - """Loads an active grid cell mask from numpy file. - - Also, checks if a mask file exists for input month, and raises an error if it does not. - - Args: - month: Month index representing the month for which the mask file is being checked. - - Returns: - Active cell mask boolean(s) for corresponding month and pre-defined `self._region`. - - Raises: - RuntimeError: If the mask file for the input month does not exist. - """ - mask_path = os.path.join( - self.get_data_var_folder("masks"), - "active_grid_cell_mask_{:02d}.npy".format(month)) - - if not os.path.exists(mask_path): - raise RuntimeError("Active cell masks have not been generated, " - "this is not done automatically so you might " - "want to address this!") - - # logging.debug("Loading active cell mask {}".format(mask_path)) - return np.load(mask_path)[self._region] - - def get_active_cell_da(self, src_da: object) -> object: - """Generate an xarray.DataArray object containing the active cell masks - for each timestamp in a given source DataArray. - - Args: - src_da: Source xarray.DataArray object containing time, xc, yc - coordinates. - - Returns: - An xarray.DataArray containing active cell masks for each time - in source DataArray. - """ - return xr.DataArray( - [ - self.get_active_cell_mask(pd.to_datetime(date).month) - for date in src_da.time.values - ], - dims=('time', 'yc', 'xc'), - coords={ - 'time': src_da.time.values, - 'yc': src_da.yc.values, - 'xc': src_da.xc.values, - }) - - def get_land_mask(self, - land_mask_filename: str = LAND_MASK_FILENAME) -> object: - """Generate an xarray.DataArray object containing the active cell masks - for each timestamp in a given source DataArray. - - Args: - land_mask_filename (optional): Land mask output filename. - Defaults to `Masks.LAND_MASK_FILENAME`. - - Returns: - An numpy array of land mask flag(s) for corresponding month and - pre-defined `self._region`. - """ - mask_path = os.path.join(self.get_data_var_folder("masks"), - land_mask_filename) - - if not os.path.exists(mask_path): - raise RuntimeError("Land mask has not been generated, this is " - "not done automatically so you might want to " - "address this!") - - # logging.debug("Loading land mask {}".format(mask_path)) - return np.load(mask_path)[self._region] - - def get_polarhole_mask(self, date: object) -> object: - """Get mask of polar hole region. - - TODO: - Explain date literals as class instance for POLARHOLE_DATES - and POLARHOLE_RADII - """ - if self.south: - return None - - for i, r in enumerate(self._polarhole_radii): - if date <= self._polarhole_dates[i]: - polarhole_path = os.path.join( - self.get_data_var_folder("masks"), - "polarhole{}_mask.npy".format(i + 1)) - # logging.debug("Loading polarhole {}".format(polarhole_path)) - return np.load(polarhole_path)[self._region] - return None - - def get_blank_mask(self) -> object: - """Returns an empty mask. - - Returns: - A numpy array of flags set to false for pre-defined `self._region` - of shape `self._shape` (the `data_shape` instance initialisation - value). - """ - return np.full(self._shape, False)[self._region] - - def __getitem__(self, item): - """Sets slice of region wanted for masking, and allows method chaining. - - This might be a semantically dodgy thing to do, but it works for the mo - - Args: - item: Index/slice to extract. - """ - logging.info("Mask region set to: {}".format(item)) - self._region = item - return self - - def reset_region(self): - """Resets the mask region and logs a message indicating that the whole mask will be returned.""" - logging.info("Mask region reset, whole mask will be returned") - self._region = (slice(None, None), slice(None, None)) - - -def main(): - """Entry point of Masks class - used to create executable that calls it.""" - args = download_args(dates=False, var_specs=False) - - north = args.hemisphere == "north" - south = args.hemisphere == "south" - - Masks(north=north, south=south).\ - generate(save_polarhole_masks=north) diff --git a/pyproject.toml b/pyproject.toml index ba61135c..02707c26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,12 +56,7 @@ classifiers = [ ] [project.scripts] -icenet_process_cmip = "icenet.data.processors.cmip:main" -icenet_process_era5 = "icenet.data.processors.era5:main" -icenet_process_oras5 = "icenet.data.processors.oras5:main" -icenet_process_hres = "icenet.data.processors.hres:main" -icenet_process_sic = "icenet.data.processors.osi:main" -icenet_process_metadata = "icenet.data.processors.meta:main" +# TODO: review condense, not necessarily required icenet_process_condense = "icenet.data.processors.utils:condense_main" icenet_dataset_check = "icenet.data.dataset:check_dataset" icenet_dataset_create = "icenet.data.loader:create"