diff --git a/icenet/data/interfaces/__init__.py b/icenet/data/interfaces/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/icenet/data/interfaces/cmems.py b/icenet/data/interfaces/cmems.py deleted file mode 100644 index 6b8a8874..00000000 --- a/icenet/data/interfaces/cmems.py +++ /dev/null @@ -1,194 +0,0 @@ -import configparser -import logging -import os -import time - -import numpy as np -import pandas as pd -import xarray as xr - -from icenet.data.cli import download_args -from icenet.data.interfaces.downloader import ClimateDownloader -from icenet.utils import run_command -""" -DATASET: global-reanalysis-phy-001-031-grepv2-daily -FTP ENDPOINT: ftp://my.cmems-du.eu/Core/GLOBAL_REANALYSIS_PHY_001_031/global-reanalysis-phy-001-031-grepv2-daily/1993/01/ - -""" - - -class ORAS5Downloader(ClimateDownloader): - """Climate downloader to provide ORAS5 reanalysis data from CMEMS API - - These aren't available for CMIP training at daily frequencies - - :param identifier: how to identify this dataset - :param var_map: override the default ERA5Downloader.CDI_MAP variable map - """ - ENDPOINTS = { - # TODO: See #49 - not yet used - "cas": "https://cmems-cas.cls.fr/cas/login", - "dap": "https://my.cmems-du.eu/thredds/dodsC/{dataset}", - "motu": "https://my.cmems-du.eu/motu-web/Motu", - } - - VAR_MAP = { - "thetao": "thetao_oras", # sea_water_potential_temperature - "so": "so_oras", # sea_water_salinity - "uo": "uo_oras", # eastward_sea_water_velocity - "vo": "vo_oras", # northward_sea_water_velocity - "zos": "zos_oras", # sea_surface_height_above_geoid - "mlotst": - "mlotst_oras", # ocean_mixed_layer_thickness_defined_by_sigma_theta - } - - def __init__(self, - *args, - cred_file: str = os.path.expandvars("$HOME/.cmems.creds"), - dataset: str = "global-reanalysis-phy-001-031-grepv2-daily", - identifier: str = "oras5", - max_failures: int = 3, - service: str = "GLOBAL_REANALYSIS_PHY_001_031-TDS", - var_map: object = None, - **kwargs): - super().__init__(*args, - drop_vars=["lambert_azimuthal_equal_area"], - identifier=identifier, - **kwargs) - - cp = configparser.ConfigParser(default_section="auth") - cp.read(cred_file) - self._creds = dict(cp["auth"]) - self._dataset = dataset - self._max_failures = max_failures - self._service = service - self._var_map = var_map if var_map else ORAS5Downloader.VAR_MAP - - assert self._max_threads <= 8, "Too many request threads for ORAS5 " \ - "(max.8)" - - for var_name in self._var_names: - assert var_name in self._var_map.keys(), \ - "{} not in ORAS5 var map".format(var_name) - - self.download_method = self._single_motu_download - - def postprocess(self, var: str, download_path: object): - """ - - :param var: - :param download_path: - """ - logging.info("Postprocessing {} to {}".format(var, download_path)) - ds = xr.open_dataset(download_path) - - da = getattr(ds, self._var_map[var]).rename(var) - da = da.mean("depth").compute() - da.to_netcdf(download_path) - - def _single_motu_download(self, var: str, level: object, req_dates: int, - download_path: object): - """Implements a single download from ... server - :param var: - :param level: - :param req_dates: - :param download_path: - :return: - - """ - attempts = 1 - success = False - - cmd = \ - """motuclient --quiet --motu {} \ - --service-id {} \ - --product-id {} \ - --longitude-min -180 \ - --longitude-max 179.75 \ - --latitude-min {} \ - --latitude-max {} \ - --date-min "{} 00:00:00" \ - --date-max "{} 00:00:00" \ - --depth-min 0.5056 \ - --depth-max 0.5059 \ - --variable {} \ - --out-dir {} \ - --out-name {} \ - --user {} \ - --pwd '{}' \ - """.format(self.ENDPOINTS['motu'], - self._service, - self._dataset, - self.hemisphere_loc[2], - self.hemisphere_loc[0], - req_dates[0].strftime("%Y-%m-%d"), - req_dates[-1].strftime("%Y-%m-%d"), - self._var_map[var], - os.path.split(download_path)[0], - os.path.split(download_path)[1], - self._creds['username'], - self._creds['password']) - - tic = time.time() - while not success: - logging.debug("Attempt {}".format(attempts)) - - ret = run_command(cmd) - if ret.returncode != 0 or not os.path.exists(download_path): - attempts += 1 - if attempts > self._max_failures: - logging.error( - "Couldn't download {} between {} and {}".format( - var, req_dates[0], req_dates[-1])) - break - time.sleep(30) - else: - success = True - - if success: - dur = time.time() - tic - logging.debug("Done in {}m:{:.0f}s. ".format( - np.floor(dur / 60), dur % 60)) - return success - - def additional_regrid_processing(self, datafile: object, - cube_ease: object) -> object: - """ - - :param datafile: - :param cube_ease: - :return: - """ - cube_ease.data = np.ma.filled(cube_ease.data, fill_value=0.) - return cube_ease - - -def main(): - args = download_args(workers=True, - extra_args=((("-n", "--do-not-download"), - dict(dest="download", - action="store_false", - default=True)), - (("-p", "--do-not-postprocess"), - dict(dest="postprocess", - action="store_false", - default=True)))) - - logging.info("ORAS5 Data Downloading") - oras5 = ORAS5Downloader( - var_names=args.vars, - # TODO: currently hardcoded - dates=[ - pd.to_datetime(date).date() - for date in pd.date_range(args.start_date, args.end_date, freq="D") - ], - delete_tempfiles=args.delete, - download=args.delete, - levels=[None for _ in args.vars], - max_threads=args.workers, - postprocess=args.postprocess, - north=args.hemisphere == "north", - south=args.hemisphere == "south", - ) - oras5.download() - oras5.regrid() diff --git a/icenet/data/interfaces/downloader.py b/icenet/data/interfaces/downloader.py deleted file mode 100644 index aa0d53f7..00000000 --- a/icenet/data/interfaces/downloader.py +++ /dev/null @@ -1,633 +0,0 @@ -import concurrent -import logging -import os -import re -import shutil -import tempfile - -from abc import abstractmethod -from concurrent.futures import ThreadPoolExecutor -from itertools import product - -from icenet.data.sic.mask import Masks -from icenet.data.sic.utils import SIC_HEMI_STR -from icenet.data.producers import Downloader -from icenet.data.utils import assign_lat_lon_coord_system, \ - gridcell_angles_from_dim_coords, \ - invert_gridcell_angles, \ - rotate_grid_vectors -from icenet.data.interfaces.utils import batch_requested_dates -from icenet.utils import run_command - -import iris -import iris.analysis -import iris.cube -import iris.exceptions -import numpy as np -import pandas as pd -import xarray as xr -""" - -""" - - -def filter_dates_on_data(latlon_path: str, - regridded_name: str, - req_dates: object, - check_latlon: bool = True, - check_regridded: bool = True, - drop_vars: list = None): - """Reduces request dates and target files based on existing data - - To avoid what is potentially significant resource expense downloading - extant data, downloaders should call this method to reduce the request - dates only to that data not already present. This is a fairly naive - implementation, in that if the data is present in either the latlon - intermediate file OR the target regridded file, we'll not bother - downloading again. This can be overridden via the method arguments. - - :param latlon_path: - :param regridded_name: - :param req_dates: - :param check_latlon: - :param check_regridded: - :param drop_vars: - :return: req_dates(list) - """ - - latlon_dates = list() - regridded_dates = list() - drop_vars = list() if drop_vars is None else drop_vars - - # Latlon files should in theory be aggregated and singular arrays - # meaning we can naively open and interrogate the dates - if check_latlon and os.path.exists(latlon_path): - try: - latlon_dates = xr.open_dataset(latlon_path, - drop_variables=drop_vars).time.values - logging.debug("{} latlon dates already available in {}".format( - len(latlon_dates), latlon_path)) - except ValueError: - logging.warning("Latlon {} dates not readable, ignoring file") - - if check_regridded and os.path.exists(regridded_name): - regridded_dates = xr.open_dataset(regridded_name, - drop_variables=drop_vars).time.values - logging.debug("{} regridded dates already available in {}".format( - len(regridded_dates), regridded_name)) - - exclude_dates = list(set(latlon_dates).union(set(regridded_dates))) - logging.debug("Excluding {} dates already existing from {} dates " - "requested.".format(len(exclude_dates), len(req_dates))) - - return sorted( - list( - pd.to_datetime(req_dates).difference( - pd.to_datetime(exclude_dates)))) - - -def merge_files(new_datafile: str, - other_datafile: str, - drop_variables: object = None): - """ - - :param new_datafile: - :param other_datafile: - :param drop_variables: - """ - drop_variables = list() if drop_variables is None else drop_variables - - if other_datafile is not None: - (datafile_path, new_filename) = os.path.split(new_datafile) - moved_new_datafile = \ - os.path.join(datafile_path, "new.{}".format(new_filename)) - os.rename(new_datafile, moved_new_datafile) - d1 = xr.open_dataarray(moved_new_datafile, - drop_variables=drop_variables) - - logging.info( - "Concatenating with previous data {}".format(other_datafile)) - d2 = xr.open_dataarray(other_datafile, drop_variables=drop_variables) - new_ds = xr.concat([d1, d2], dim="time").\ - sortby("time").\ - drop_duplicates("time", keep="first") - - logging.info("Saving merged data to {}... ".format(new_datafile)) - new_ds.to_netcdf(new_datafile) - os.unlink(other_datafile) - os.unlink(moved_new_datafile) - - -class ClimateDownloader(Downloader): - """Climate downloader base class - - :param dates: - :param delete_tempfiles: - :param download: - :param group_dates_by: - :param max_threads: - :param postprocess: - :param pregrid_prefix: - :param levels: - :param var_name_idx: - :param var_names: - """ - - def __init__(self, - *args, - dates: object = (), - delete_tempfiles: bool = True, - download: bool = True, - drop_vars: list = None, - group_dates_by: str = "year", - levels: object = (), - max_threads: int = 1, - postprocess: bool = True, - pregrid_prefix: str = "latlon_", - var_name_idx: int = -1, - var_names: object = (), - **kwargs): - super().__init__(*args, **kwargs) - - self._dates = list(dates) - self._delete = delete_tempfiles - self._download = download - self._drop_vars = list() if drop_vars is None else drop_vars - self._files_downloaded = [] - self._group_dates_by = group_dates_by - self._levels = list(levels) - self._masks = Masks(north=self.north, south=self.south) - self._max_threads = max_threads - self._postprocess = postprocess - self._pregrid_prefix = pregrid_prefix - self._rotatable_files = [] - self._sic_ease_cubes = dict() - self._var_name_idx = var_name_idx - self._var_names = list(var_names) - - assert len(self._var_names), "No variables requested" - assert len(self._levels) == len(self._var_names), \ - "# of levels must match # vars" - - if not self._delete: - logging.warning("!!! Deletions of temp files are switched off: be " - "careful with this, you need to manage your " - "files manually") - self._download_method = None - - self._validate_config() - - def _validate_config(self): - """ - - """ - if self.hemisphere_str in os.path.split(self.base_path): - raise RuntimeError("Don't include hemisphere string {} in " - "base path".format(self.hemisphere_str)) - - def download(self): - """Handles concurrent (threaded) downloading for variables - - This takes dates, variables and levels as configured, batches them into - requests and submits those via a ThreadPoolExecutor for concurrent - downloading. Returns nothing, relies on _single_download to implement - appropriate updates to this object to record state changes arising from - downloading. - """ - - logging.info("Building request(s), downloading and daily averaging " - "from {} API".format(self.identifier.upper())) - - requests = list() - - for idx, var_name in enumerate(self.var_names): - levels = [None] if not self.levels[idx] else self.levels[idx] - - dates_per_request = \ - batch_requested_dates(self._dates, - attribute=self._group_dates_by) - - for var_prefix, level, req_date in \ - product([var_name], levels, dates_per_request): - requests.append((var_prefix, level, req_date)) - - with ThreadPoolExecutor(max_workers=min(len(requests), - self._max_threads)) \ - as executor: - futures = [] - - for var_prefix, level, req_date in requests: - future = executor.submit(self._single_download, var_prefix, - level, req_date) - futures.append(future) - - for future in concurrent.futures.as_completed(futures): - try: - future.result() - except Exception as e: - logging.exception("Thread failure: {}".format(e)) - - logging.info("{} daily files downloaded".format( - len(self._files_downloaded))) - - def _single_download(self, var_prefix: str, level: object, - req_dates: object): - """Implements a single download based on configured download_method - - This allows delegation of downloading logic in a consistent manner to - the configured download_method, ensuring a guarantee of adherence to - naming and processing flow within ClimateDownloader implementations. - - :param var_prefix: the icenet variable name - :param level: the height to download - :param req_dates: the request date - """ - - logging.info( - "Processing single download for {} @ {} with {} dates".format( - var_prefix, level, len(req_dates))) - var = var_prefix if not level else \ - "{}{}".format(var_prefix, level) - var_folder = self.get_data_var_folder(var) - - latlon_path, regridded_name = \ - self.get_req_filenames(var_folder, req_dates[0]) - - req_dates = filter_dates_on_data(latlon_path, - regridded_name, - req_dates, - drop_vars=self._drop_vars) - - if len(req_dates): - if self._download: - with tempfile.TemporaryDirectory() as tmpdir: - tmp_latlon_path = os.path.join( - tmpdir, - os.path.basename("{}.download".format(latlon_path))) - - self.download_method(var, level, req_dates, tmp_latlon_path) - - if os.path.exists(latlon_path): - (ll_path, ll_file) = os.path.split(latlon_path) - rename_latlon_path = os.path.join( - ll_path, - "{}_old{}".format(*os.path.splitext(ll_file))) - os.rename(latlon_path, rename_latlon_path) - old_da = xr.open_dataarray( - rename_latlon_path, drop_variables=self._drop_vars) - tmp_da = xr.open_dataarray( - tmp_latlon_path, drop_variables=self._drop_vars) - - logging.debug("Input (old): \n{}".format(old_da)) - logging.debug("Input (dl): \n{}".format(tmp_da)) - - da = xr.concat([old_da, tmp_da], dim="time") - logging.debug("Output: \n{}".format(da)) - - da.to_netcdf(latlon_path) - old_da.close() - tmp_da.close() - os.unlink(rename_latlon_path) - else: - shutil.move(tmp_latlon_path, latlon_path) - - logging.info("Downloaded to {}".format(latlon_path)) - else: - logging.info( - "Skipping actual download to {}".format(latlon_path)) - else: - logging.info("No requested dates remain, likely already present") - - if self._postprocess and os.path.exists(latlon_path): - self.postprocess(var, latlon_path) - - if os.path.exists(latlon_path): - self._files_downloaded.append(latlon_path) - - def postprocess(self, var, download_path): - logging.debug("No postprocessing in place for {}: {}".format( - var, download_path)) - - def save_temporal_files(self, var, da, date_format=None, freq=None): - """ - - :param var: - :param da: - :param date_format: - :param freq: - """ - var_folder = self.get_data_var_folder(var) - group_by = "time.{}".format(self._group_dates_by) if not freq else freq - - for dt, dt_da in da.groupby(group_by): - req_date = pd.to_datetime(dt_da.time.values[0]) - latlon_path, regridded_name = \ - self.get_req_filenames(var_folder, - req_date, - date_format=date_format) - - logging.info("Retrieving and saving {}".format(latlon_path)) - dt_da.compute() - dt_da.to_netcdf(latlon_path) - - if not os.path.exists(regridded_name): - self._files_downloaded.append(latlon_path) - - @property - def sic_ease_cube(self): - """ - - :return sic_cube: - """ - if self._hemisphere not in self._sic_ease_cubes: - sic_day_fname = 'ice_conc_{}_ease2-250_cdr-v2p0_197901021200.nc'. \ - format(SIC_HEMI_STR[self.hemisphere_str[0]]) - sic_day_path = os.path.join(self.get_data_var_folder("siconca"), - sic_day_fname) - if not os.path.exists(sic_day_path): - logging.info("Downloading single daily SIC netCDF file for " - "regridding ERA5 data to EASE grid...") - - retrieve_sic_day_cmd = 'wget -m -nH --cut-dirs=6 -P {} ' \ - 'ftp://osisaf.met.no/reprocessed/ice/' \ - 'conc/v2p0/1979/01/{}'.\ - format(self.get_data_var_folder("siconca"), sic_day_fname) - - run_command(retrieve_sic_day_cmd) - - # Load a single SIC map to obtain the EASE grid for - # regridding ERA data - self._sic_ease_cubes[self._hemisphere] = \ - iris.load_cube(sic_day_path, 'sea_ice_area_fraction') - - # Convert EASE coord units to metres for regridding - self._sic_ease_cubes[self._hemisphere].coord( - 'projection_x_coordinate').convert_units('meters') - self._sic_ease_cubes[self._hemisphere].coord( - 'projection_y_coordinate').convert_units('meters') - return self._sic_ease_cubes[self._hemisphere] - - def regrid(self, files: object = None, rotate_wind: bool = True): - """ - - :param files: - """ - filelist = self._files_downloaded if not files else files - batches = [filelist[b:b + 1000] for b in range(0, len(filelist), 1000)] - - max_workers = min(len(batches), self._max_threads) - regrid_results = list() - - if max_workers > 0: - with ThreadPoolExecutor(max_workers=max_workers) \ - as executor: - futures = [] - - for files in batches: - future = executor.submit(self._batch_regrid, files) - futures.append(future) - - for future in concurrent.futures.as_completed(futures): - try: - fut_results = future.result() - - for res in fut_results: - logging.debug( - "Future result -> regrid_results: {}".format( - res)) - regrid_results.append(res) - except Exception as e: - logging.exception("Thread failure: {}".format(e)) - else: - logging.info("No regrid batches to processing, moving on...") - - if rotate_wind: - logging.info("Rotating wind data prior to merging") - self.rotate_wind_data() - - for new_datafile, moved_datafile in regrid_results: - merge_files(new_datafile, moved_datafile, self._drop_vars) - - def _batch_regrid(self, files: object): - """ - - :param files: - """ - results = list() - - for datafile in files: - (datafile_path, datafile_name) = os.path.split(datafile) - - new_filename = re.sub(r'^{}'.format(self.pregrid_prefix), '', - datafile_name) - new_datafile = os.path.join(datafile_path, new_filename) - - moved_datafile = None - - if os.path.exists(new_datafile): - moved_filename = "moved.{}".format(new_filename) - moved_datafile = os.path.join(datafile_path, moved_filename) - os.rename(new_datafile, moved_datafile) - - logging.info("{} already existed, moved to {}".format( - new_filename, moved_filename)) - - logging.debug("Regridding {}".format(datafile)) - - try: - cube = iris.load_cube(datafile) - cube = self.convert_cube(cube) - - cube_ease = cube.regrid(self.sic_ease_cube, - iris.analysis.Linear()) - - except iris.exceptions.CoordinateNotFoundError: - logging.warning( - "{} has no coordinates...".format(datafile_name)) - if self.delete: - logging.debug( - "Deleting failed file {}...".format(datafile_name)) - os.unlink(datafile) - continue - - self.additional_regrid_processing(datafile, cube_ease) - - logging.info("Saving regridded data to {}... ".format(new_datafile)) - iris.save(cube_ease, new_datafile, fill_value=np.nan) - results.append((new_datafile, moved_datafile)) - - if self.delete: - logging.info("Removing {}".format(datafile)) - os.remove(datafile) - - return results - - def convert_cube(self, cube: object): - """Converts Iris cube to be fit for regrid - - :param cube: the cube requiring alteration - :return cube: the altered cube - """ - - cube = assign_lat_lon_coord_system(cube) - return cube - - @abstractmethod - def additional_regrid_processing(self, datafile: str, cube_ease: object): - """ - - :param datafile: - :param cube_ease: - """ - pass - - def rotate_wind_data(self, - apply_to: object = ("uas", "vas"), - manual_files: object = None): - """ - - :param apply_to: - :param manual_files: - """ - assert len(apply_to) == 2, "Too many wind variables supplied: {}, " \ - "there should only be two.".\ - format(", ".join(apply_to)) - - angles = gridcell_angles_from_dim_coords(self.sic_ease_cube) - invert_gridcell_angles(angles) - - logging.info("Rotating wind data in {}".format(" ".join( - [self.get_data_var_folder(v) for v in apply_to]))) - - wind_files = {} - - for var in apply_to: - source = self.get_data_var_folder(var) - - file_source = self._files_downloaded \ - if not manual_files else manual_files - - latlon_files = [df for df in file_source if source in df] - wind_files[var] = sorted([ - re.sub(r'{}'.format(self.pregrid_prefix), '', df) - for df in latlon_files - if os.path.dirname(df).split(os.sep)[self._var_name_idx] == var - ], - key=lambda x: int( - re.search(r'^(?:\w+_)?(\d+).nc', - os.path.basename(x)).group(1) - )) - logging.info("{} files for {}".format(len(wind_files[var]), var)) - - # NOTE: we're relying on apply_to having equal datasets - assert len(wind_files[apply_to[0]]) == len(wind_files[apply_to[1]]), \ - "The wind file datasets are unequal in length" - - # validation - for idx, wind_file_0 in enumerate(wind_files[apply_to[0]]): - wind_file_1 = wind_files[apply_to[1]][idx] - - wd0 = re.sub(r'^{}_'.format(apply_to[0]), '', - os.path.basename(wind_file_0)) - - if not wind_file_1.endswith(wd0): - logging.error("Wind file array is not valid: {}".format( - zip(wind_files))) - raise RuntimeError("{} is not at the end of {}, something is " - "wrong".format(wd0, wind_file_1)) - - for idx, wind_file_0 in enumerate(wind_files[apply_to[0]]): - wind_file_1 = wind_files[apply_to[1]][idx] - - logging.info("Rotating {} and {}".format(wind_file_0, wind_file_1)) - - wind_cubes = dict() - wind_cubes_r = dict() - - wind_cubes[apply_to[0]] = iris.load_cube(wind_file_0) - wind_cubes[apply_to[1]] = iris.load_cube(wind_file_1) - - try: - wind_cubes_r[apply_to[0]], wind_cubes_r[apply_to[1]] = \ - rotate_grid_vectors( - wind_cubes[apply_to[0]], - wind_cubes[apply_to[1]], - angles, - ) - except iris.exceptions.CoordinateNotFoundError: - logging.exception("Failure to rotate due to coordinate issues. " - "moving onto next file") - continue - - # Original implementation is in danger of lost updates - # due to potential lazy loading - for i, name in enumerate([wind_file_0, wind_file_1]): - # NOTE: implementation with tempfile caused problems on NFS - # mounted filesystem, so avoiding in place of letting iris do it - temp_name = os.path.join( - os.path.split(name)[0], - "temp.{}".format(os.path.basename(name))) - logging.debug("Writing {}".format(temp_name)) - - iris.save(wind_cubes_r[apply_to[i]], temp_name) - os.replace(temp_name, name) - logging.debug("Overwritten {}".format(name)) - - def get_req_filenames(self, - var_folder: str, - req_date: object, - date_format: str = None): - """ - - :param var_folder: - :param req_date: - :param date_format: - :return: - """ - - filename_date = getattr(req_date, self._group_dates_by) \ - if not date_format else req_date.strftime(date_format) - - latlon_path = os.path.join( - var_folder, "{}{}.nc".format(self.pregrid_prefix, filename_date)) - regridded_name = os.path.join(var_folder, "{}.nc".format(filename_date)) - - logging.debug("Got {} filenames: {} and {}".format( - self._group_dates_by, latlon_path, regridded_name)) - - return latlon_path, regridded_name - - @property - def dates(self): - return self._dates - - @property - def delete(self): - return self._delete - - @property - def download_method(self) -> callable: - if not self._download_method: - raise RuntimeError("Downloader has no method set, " - "implementation error") - return self._download_method - - @download_method.setter - def download_method(self, method: callable): - self._download_method = method - - @property - def group_dates_by(self): - return self._group_dates_by - - @property - def levels(self): - return self._levels - - @property - def pregrid_prefix(self): - return self._pregrid_prefix - - @property - def var_names(self): - return self._var_names diff --git a/icenet/data/interfaces/esgf.py b/icenet/data/interfaces/esgf.py deleted file mode 100644 index dd9a3f7e..00000000 --- a/icenet/data/interfaces/esgf.py +++ /dev/null @@ -1,273 +0,0 @@ -import logging -import os -import warnings - -import numpy as np -import pandas as pd -import xarray as xr - -from icenet.data.interfaces.downloader import ClimateDownloader -from icenet.data.cli import download_args -from icenet.data.utils import esgf_search -""" - -""" - - -class CMIP6Downloader(ClimateDownloader): - """Climate downloader to provide CMIP6 reanalysis data from ESGF APIs - - Useful CMIP6 guidance: https://pcmdi.llnl.gov/CMIP6/Guide/dataUsers.html - - :param identifier: how to identify this dataset - :param source: source ID in ESGF node - :param member: member ID in ESGF node - :param nodes: list of ESGF nodes to query - :param experiments: experiment IDs to download - :param frequency: query parameter frequency - :param table_map: table map for - :param grid_map: - :param grid_override: - :param exclude_nodes: - - "MRI-ESM2-0", "r1i1p1f1", None - "MRI-ESM2-0", "r2i1p1f1", None - "MRI-ESM2-0", "r3i1p1f1", None - "MRI-ESM2-0", "r4i1p1f1", None - "MRI-ESM2-0", "r5i1p1f1", None - "EC-Earth3", "r2i1p1f1", "gr" - "EC-Earth3", "r7i1p1f1", "gr" - "EC-Earth3", "r10i1p1f1", "gr" - "EC-Earth3", "r12i1p1f1", "gr" - "EC-Earth3", "r14i1p1f1", "gr" - - """ - - TABLE_MAP = { - 'siconca': 'SIday', - 'tas': 'day', - 'ta': 'day', - 'tos': 'Oday', - 'hus': 'day', - 'psl': 'day', - 'rlds': 'day', - 'rsus': 'day', - 'rsds': 'day', - 'zg': 'day', - 'uas': 'day', - 'vas': 'day', - 'ua': 'day', - } - - GRID_MAP = { - 'siconca': 'gn', - 'tas': 'gn', - 'ta': 'gn', - 'tos': 'gr', - 'hus': 'gn', - 'psl': 'gn', - 'rlds': 'gn', - 'rsus': 'gn', # Surface Upwelling Shortwave Radiation - 'rsds': 'gn', # Surface Downwelling Shortwave Radiation - 'zg': 'gn', - 'uas': 'gn', - 'vas': 'gn', - 'ua': 'gn', - } - - # Prioritise European first, US last, avoiding unnecessary queries - # against nodes further afield (all traffic has a cost, and the coverage - # of local nodes is more than enough) - ESGF_NODES = ( - "esgf.ceda.ac.uk", - "esg1.umr-cnrm.fr", - "vesg.ipsl.upmc.fr", - "esgf3.dkrz.de", - "esgf.bsc.es", - "esgf-data.csc.fi", - "noresg.nird.sigma2.no", - "esgf-data.ucar.edu", - "esgf-data2.diasjp.net", - ) - - def __init__(self, - *args, - source: str, - member: str, - nodes: object = ESGF_NODES, - experiments: object = ('historical', 'ssp245'), - frequency: str = "day", - table_map: object = None, - grid_map: object = None, - grid_override: object = None, - exclude_nodes: object = None, - **kwargs): - super().__init__(*args, - identifier="cmip6.{}.{}".format(source, member), - **kwargs) - - self._source = source - self._member = member - self._frequency = frequency - self._experiments = experiments - - self._nodes = nodes if not exclude_nodes else \ - [n for n in nodes if n not in exclude_nodes] - - self._table_map = table_map if table_map else CMIP6Downloader.TABLE_MAP - self._grid_map = grid_map if grid_map else CMIP6Downloader.GRID_MAP - self._grid_map_override = grid_override - - def _single_download(self, var_prefix: str, level: object, - req_dates: object): - """Overridden CMIP implementation for downloading from DAP server - - Due to the size of the CMIP set and the fact that we don't want to make - 1850-2100 yearly requests for all downloads, we have a bespoke and - overridden download implementation for this. - - :param var_prefix: - :param level: - :param req_dates: - """ - - query = { - 'source_id': - self._source, - 'member_id': - self._member, - 'frequency': - self._frequency, - 'variable_id': - var_prefix, - 'table_id': - self._table_map[var_prefix], - 'grid_label': - self._grid_map_override - if self._grid_map_override else self._grid_map[var_prefix], - } - - var = var_prefix if not level else "{}{}".format(var_prefix, level) - - logging.info("Querying ESGF") - results = [] - - for experiment_id in self._experiments: - query['experiment_id'] = experiment_id - - for data_node in self._nodes: - query['data_node'] = data_node - - # FIXME: inefficient, we can strip redundant results files - # based on WCRP data management standards for file naming, - # such as based on date. Refactor/rewrite this impl... - node_results = esgf_search(**query) - - if len(node_results): - logging.debug("Query: {}".format(query)) - logging.debug("Found {}: {}".format(experiment_id, - node_results)) - results.extend(node_results) - break - - logging.info("Found {} {} results from ESGF search".format( - len(results), var_prefix)) - - try: - # http://xarray.pydata.org/en/stable/user-guide/io.html?highlight=opendap#opendap - # Avoid 500MB DAP request limit - cmip6_da = xr.open_mfdataset(results, - combine='by_coords', - chunks={'time': '499MB'})[var_prefix] - - cmip6_da = cmip6_da.sel(time=slice(req_dates[0], req_dates[-1])) - - # TODO: possibly other attributes, especially with ocean vars - if level: - cmip6_da = cmip6_da.sel(plev=int(level) * 100) - - cmip6_da = cmip6_da.sel( - lat=slice(self.hemisphere_loc[2], self.hemisphere_loc[0])) - self.save_temporal_files(var, cmip6_da) - except OSError as e: - logging.exception("Error encountered: {}".format(e), exc_info=False) - - def additional_regrid_processing(self, datafile: str, cube_ease: object): - """ - - :param datafile: - :param cube_ease: - """ - (datafile_path, datafile_name) = os.path.split(datafile) - var_name = datafile_path.split(os.sep)[self._var_name_idx] - - # TODO: regrid fixes need better implementations - if var_name == "siconca": - cube_ease.data[cube_ease.data.mask] = 0. - cube_ease.data[:, self._masks.get_land_mask()] = 0. - - if self._source == 'MRI-ESM2-0': - cube_ease.data = cube_ease.data / 100. - cube_ease.data = cube_ease.data.data - elif var_name in ["tos", "hus1000"]: - cube_ease.data[cube_ease.data.mask] = 0. - cube_ease.data[:, self._masks.get_land_mask()] = 0. - - cube_ease.data = cube_ease.data.data - - if cube_ease.data.dtype != np.float32: - logging.info("Regrid processing, data type not float: {}".format( - cube_ease.data.dtype)) - cube_ease.data = cube_ease.data.astype(np.float32) - - def convert_cube(self, cube: object) -> object: - """Converts Iris cube to be fit for CMIP regrid - - :param cube: the cube requiring alteration - :return cube: the altered cube - """ - - cs = self.sic_ease_cube.coord_system().ellipsoid - - for coord in ['longitude', 'latitude']: - cube.coord(coord).coord_system = cs - return cube - - -def main(): - args = download_args(dates=True, - extra_args=[ - (["source"], dict(type=str)), - (["member"], dict(type=str)), - (("-xs", "--exclude-server"), - dict(default=[], nargs="*")), - (("-o", "--override"), dict(required=None, - type=str)), - ], - workers=True) - - logging.info("CMIP6 Data Downloading") - - downloader = CMIP6Downloader( - source=args.source, - member=args.member, - var_names=args.vars, - dates=[ - pd.to_datetime(date).date() - for date in pd.date_range(args.start_date, args.end_date, freq="D") - ], - delete_tempfiles=args.delete, - grid_override=args.override, - levels=args.levels, - north=args.hemisphere == "north", - south=args.hemisphere == "south", - max_threads=args.workers, - exclude_nodes=args.exclude_server, - ) - logging.info("CMIP downloading: {} {}".format(args.source, args.member)) - downloader.download() - - logging.info("CMIP regridding: {} {}".format(args.source, args.member)) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=UserWarning) - downloader.regrid() diff --git a/icenet/data/interfaces/mars.py b/icenet/data/interfaces/mars.py deleted file mode 100644 index 74a353b1..00000000 --- a/icenet/data/interfaces/mars.py +++ /dev/null @@ -1,420 +0,0 @@ -import datetime -import logging -import os -import sys - -from itertools import product - -import ecmwfapi -import pandas as pd -import xarray as xr - -from icenet.data.cli import download_args -from icenet.data.interfaces.downloader import ClimateDownloader -from icenet.data.interfaces.utils import batch_requested_dates -""" - -""" - - -class HRESDownloader(ClimateDownloader): - """Climate downloader to provide CMIP6 reanalysis data from ESGF APIs - - :param identifier: how to identify this dataset - - """ - - PARAM_TABLE = 128 - - # Background on the use of forecast and observational data - # https://confluence.ecmwf.int/pages/viewpage.action?pageId=85402030 - # https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#ERA5:datadocumentation-Dateandtimespecification - HRES_PARAMS = { - "siconca": (31, "siconc"), # sea_ice_area_fraction - "tos": (34, "sst"), # sea surface temperature (actually - # sst?) - "zg": (129, "z"), # geopotential - "ta": (130, "t"), # air_temperature (t) - "hus": (133, "q"), # specific_humidity - "psl": (134, "sp"), # surface_pressure - "uas": (165, "u10"), # 10m_u_component_of_wind - "vas": (166, "v10"), # 10m_v_component_of_wind - "tas": (167, "t2m"), # 2m_temperature (t2m) - # https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#ERA5:datadocumentation-Meanrates/fluxesandaccumulations - # https://apps.ecmwf.int/codes/grib/param-db/?id=175 - # https://confluence.ecmwf.int/pages/viewpage.action?pageId=197702790 - # - # Mean rate/flux parameters in ERA5 (e.g. Table 4 for surface and - # single levels) provide similar information to accumulations (e.g. - # Table 3 for surface and single levels), except they are expressed as - # temporal means, over the same processing periods, and so have units - # of "per second". - "rlds": (175, "strd"), - "rsds": (169, "ssrd"), - - # plev 129.128 / 130.128 / 133.128 - # sfc 31.128 / 34.128 / 134.128 / - # 165.128 / 166.128 / 167.128 / 169.128 / 177.128 - - # ORAS5 variables in param-db (need to consider depth) - # "thetao": (151129, "thetao"), - # "so": (151130, "so"), - # Better matches than equivalent X / Y parameters in param-db - # "uo": (151131, "uo"), - # "vo": (151132, "vo"), - } - - # https://confluence.ecmwf.int/display/UDOC/Keywords+in+MARS+and+Dissemination+requests - MARS_TEMPLATE = """ -retrieve, - class=od, - date={date}, - expver=1, - levtype={levtype}, - {levlist}param={params}, - step={step}, - stream=oper, - time=12:00:00, - type=fc, - area={area}, - grid=0.25/0.25, - target="{target}", - format=netcdf - """ - - def __init__(self, *args, identifier: str = "mars.hres", **kwargs): - super().__init__(*args, identifier=identifier, **kwargs) - - self._server = ecmwfapi.ECMWFService("mars") - - def _single_download(self, var_names: object, pressures: object, - req_dates: object): - """ - - :param var_names: - :param pressures: - :param req_dates: - :return: - """ - - for dt in req_dates: - assert dt.year == req_dates[0].year - - downloads = [] - levtype = "plev" if pressures else "sfc" - - for req_batch in batch_requested_dates(req_dates, attribute="month"): - req_batch = sorted(req_batch) - request_month = req_batch[0].strftime("%Y%m") - - logging.info("Downloading month file {}".format(request_month)) - - if req_batch[-1] - datetime.datetime.utcnow().date() == \ - datetime.timedelta(days=-1): - logging.warning("Not allowing partial requests at present, " - "removing {}".format(req_batch[-1])) - req_batch = req_batch[:-1] - - request_target = os.path.join( - self.base_path, self.hemisphere_str[0], - "{}.{}.nc".format(levtype, request_month)) - - os.makedirs(os.path.dirname(request_target), exist_ok=True) - - request = self.mars_template.format( - area="/".join([str(s) for s in self.hemisphere_loc]), - date="/".join([el.strftime("%Y%m%d") for el in req_batch]), - levtype=levtype, - levlist="levelist={},\n ".format(pressures) - if pressures else "", - params="/".join([ - "{}.{}".format(self.params[v][0], self.param_table) - for v in var_names - ]), - target=request_target, - # We are only allowed date prior to -24 hours ago, dynamically - # retrieve if date is today - # TODO: too big - step="/".join([str(i) for i in range(24)]), - step=0, - ) - - if not os.path.exists(request_target): - logging.debug("MARS REQUEST: \n{}\n".format(request)) - - try: - self._server.execute(request, request_target) - except ecmwfapi.api.APIException: - logging.exception("Could not complete ECMWF request: {}") - else: - downloads.append(request_target) - else: - logging.debug("Already have {}".format(request_target)) - downloads.append(request_target) - - logging.debug("Files downloaded: {}".format(downloads)) - - ds = xr.open_mfdataset(downloads) - ds = ds.resample(time='1D', keep_attrs=True).mean(keep_attrs=True) - - for var_name, pressure in product( - var_names, - pressures.split('/') if pressures else [None]): - var = var_name if not pressure else \ - "{}{}".format(var_name, pressure) - - da = getattr(ds, self.params[var_name][1]) - - if pressure: - da = da.sel(level=int(pressure)) - - self.save_temporal_files(var, da) - - ds.close() - - if self.delete: - for downloaded_file in downloads: - if os.path.exists(downloaded_file): - logging.info("Removing {}".format(downloaded_file)) - os.unlink(downloaded_file) - - def download(self): - """ - - """ - logging.info("Building request(s), downloading and daily averaging " - "from {} API".format(self.identifier.upper())) - - sfc_vars = [ - var for idx, var in enumerate(self.var_names) - if not self.levels[idx] - ] - level_vars = [ - var for idx, var in enumerate(self.var_names) if self.levels[idx] - ] - levels = "/".join([ - str(s) - for s in sorted(set([p for ps in self.levels if ps for p in ps])) - ]) - - # req_dates = self.filter_dates_on_data() - - dates_per_request = \ - batch_requested_dates(self._dates, - attribute=self.group_dates_by) - - for req_batch in dates_per_request: - if len(sfc_vars) > 0: - self._single_download(sfc_vars, None, req_batch) - - if len(level_vars) > 0: - self._single_download(level_vars, levels, req_batch) - - logging.info("{} daily files downloaded".format( - len(self._files_downloaded))) - - def additional_regrid_processing(self, datafile: str, cube_ease: object): - """ - - :param datafile: - :param cube_ease: - """ - (datafile_path, datafile_name) = os.path.split(datafile) - var_name = datafile_path.split(os.sep)[self._var_name_idx] - - if var_name == 'tos': - # Overwrite maksed values with zeros - logging.debug("MARS additional regrid: {}".format(var_name)) - cube_ease.data[cube_ease.data.mask] = 0. - cube_ease.data[:, self._masks.get_land_mask()] = 0. - cube_ease.data = cube_ease.data.data - - if var_name in ['rlds', 'rsds']: - # FIXME: We're taking the mean across the hourly samples for the - # day in fc which needs to be comparative with the analysis product - # from ERA5. My interpretation is that this should be /24, but of - # course it doesn't work like that thanks to orbital rotation. - # We need to verify the exact mechanism for converting forecast - # values to reanalysis equivalents, but this rudimentary divisor - # should work in the meantime - # - # FIXME FIXME FIXME - cube_ease /= 12. - - if var_name.startswith("zg"): - # https://apps.ecmwf.int/codes/grib/param-db/?id=129 - # - # We want the geopotential height as per ERA5 - cube_ease /= 9.80665 - - @property - def mars_template(self): - return getattr(self, "MARS_TEMPLATE") - - @property - def params(self): - return getattr(self, "HRES_PARAMS") - - @property - def param_table(self): - return getattr(self, "PARAM_TABLE") - - -class SEASDownloader(HRESDownloader): - # TODO: step should be configurable for this downloader - # TODO: unsure why, but multiple dates break the download with - # ERROR 89 (MARS_EXPECTED_FIELDS): Expected 4700, got 2350 - MARS_TEMPLATE = """ -retrieve, - class=od, - date={date}, - expver=1, - levtype={levtype}, - method=1, - number=0/1/2/3/4/5/6/7/8/9/10/11/12/13/14/15/16/17/18/19/20/21/22/23/24, - origin=ecmf, - {levlist}param={params}, - step=0/to/2232/by/24, - stream=mmsf, - system=5, - time=00:00:00, - type=fc, - target="{target}", - format=netcdf, - grid=0.25/0.25, - area={area}""" - - def _single_download(self, var_names: object, pressures: object, - req_dates: object): - """ - - :param var_names: - :param pressures: - :param req_dates: - :return: - """ - - for dt in req_dates: - assert dt.year == req_dates[0].year - - downloads = [] - levtype = "plev" if pressures else "sfc" - - for req_date in req_dates: - request_day = req_date.strftime("%Y%m%d") - - logging.info("Downloading daily file {}".format(request_day)) - - request_target = os.path.join( - self.base_path, self.hemisphere_str[0], - "{}.{}.nc".format(levtype, request_day)) - os.makedirs(os.path.dirname(request_target), exist_ok=True) - - request = self.mars_template.format( - area="/".join([str(s) for s in self.hemisphere_loc]), - date=req_date.strftime("%Y-%m-%d"), - levtype=levtype, - levlist="levelist={},\n ".format(pressures) - if pressures else "", - params="/".join([ - "{}.{}".format(self.params[v][0], self.param_table) - for v in var_names - ]), - target=request_target, - ) - - if not os.path.exists(request_target): - logging.debug("MARS REQUEST: \n{}\n".format(request)) - - try: - self._server.execute(request, request_target) - except ecmwfapi.api.APIException: - logging.exception("Could not complete ECMWF request: {}") - else: - downloads.append(request_target) - else: - logging.debug("Already have {}".format(request_target)) - downloads.append(request_target) - - logging.debug("Files downloaded: {}".format(downloads)) - - for download_filename in downloads: - logging.info("Processing {}".format(download_filename)) - ds = xr.open_dataset(download_filename) - ds = ds.mean("number") - - for var_name, pressure in product( - var_names, - pressures.split('/') if pressures else [None]): - var = var_name if not pressure else \ - "{}{}".format(var_name, pressure) - - da = getattr(ds, self.params[var_name][1]) - - if pressure: - da = da.sel(level=int(pressure)) - - self.save_temporal_files(var, da, date_format="%Y%m%d") - - ds.close() - - if self.delete: - for downloaded_file in downloads: - if os.path.exists(downloaded_file): - logging.info("Removing {}".format(downloaded_file)) - os.unlink(downloaded_file) - - def save_temporal_files(self, var, da, date_format=None, freq=None): - """ - - :param var: - :param da: - :param date_format: - :param freq: - """ - var_folder = self.get_data_var_folder(var) - - req_date = pd.to_datetime(da.time.values[0]) - latlon_path, regridded_name = \ - self.get_req_filenames(var_folder, - req_date, - date_format=date_format) - - logging.info("Retrieving and saving {}".format(latlon_path)) - da.compute() - da.to_netcdf(latlon_path) - - if not os.path.exists(regridded_name): - self._files_downloaded.append(latlon_path) - - -def main(identifier, extra_kwargs=None): - args = download_args() - - logging.info("ECMWF {} Data Downloading".format(identifier)) - cls = getattr(sys.modules[__name__], "{}Downloader".format(identifier)) - - if extra_kwargs is None: - extra_kwargs = dict() - - instance = cls( - identifier="mars.{}".format(identifier.lower()), - var_names=args.vars, - dates=[ - pd.to_datetime(date).date() - for date in pd.date_range(args.start_date, args.end_date, freq="D") - ], - delete_tempfiles=args.delete, - levels=args.levels, - north=args.hemisphere == "north", - south=args.hemisphere == "south", - **extra_kwargs) - instance.download() - instance.regrid() - - -def seas_main(): - main("SEAS", dict(group_dates_by="day",)) - - -def hres_main(): - main("HRES") diff --git a/icenet/data/interfaces/utils.py b/icenet/data/interfaces/utils.py deleted file mode 100644 index 050f9b11..00000000 --- a/icenet/data/interfaces/utils.py +++ /dev/null @@ -1,251 +0,0 @@ -import argparse -import collections -import glob -import logging -import os - -import pandas as pd -import xarray as xr - -from icenet.utils import setup_logging - - -def batch_requested_dates(dates: object, attribute: str = "month") -> object: - """ - - TODO: should be using Pandas DatetimeIndexes / Periods for this, but the - need to refactor slightly, and this is working for the moment - - :param dates: - :param attribute: - :return: - """ - dates = collections.deque(sorted(dates)) - - batched_dates = [] - batch = [] - - while len(dates): - if not len(batch): - batch.append(dates.popleft()) - else: - if getattr(batch[-1], attribute) == getattr(dates[0], attribute): - batch.append(dates.popleft()) - else: - batched_dates.append(batch) - batch = [] - - if len(batch): - batched_dates.append(batch) - - if len(dates) > 0: - raise RuntimeError("Batching didn't work!") - - return batched_dates - - -def reprocess_monthlies(source: str, - hemisphere: str, - identifier: str, - output_base: str, - dry: bool = False, - var_names: object = None): - """ - - :param source: - :param hemisphere: - :param identifier: - :param output_base: - :param dry: - :param var_names: - """ - if not var_names: - var_names = [] - - for var_name in var_names: - var_path = os.path.join(source, hemisphere, var_name) - files = glob.glob("{}/{}_*.nc".format(var_path, var_name)) - - for file in files: - _, year = os.path.basename(os.path.splitext(file)[0]).\ - split("_")[0:2] - - try: - year = int(year) - - if not (1900 < year < 2200): - logging.warning("File is not between 1900-2200, probably " - "not something to convert: {}".format(year)) - except ValueError: - logging.warning("Cannot derive year from {}".format(year)) - continue - - destination = os.path.join(output_base, identifier, hemisphere, - var_name, str(year)) - - if not os.path.exists(destination): - os.makedirs(destination, exist_ok=True) - - logging.info("Processing {} from {} to {}".format( - var_name, year, destination)) - - ds = xr.open_dataset(file) - - var_names = [ - name for name in list(ds.data_vars.keys()) - if not name.startswith("lambert_") - ] - - var_names = set(var_names) - logging.debug( - "Files have var names {} which will be renamed to {}".format( - ", ".join(var_names), var_name)) - - ds = ds.rename({k: var_name for k in var_names}) - da = getattr(ds, var_name) - - for date in da.time.values: - date = pd.Timestamp(date) - fname = '{:04d}_{:02d}_{:02d}.nc'. \ - format(date.year, date.month, date.day) - daily = da.sel(time=slice(date, date)) - - output_path = os.path.join(destination, fname) - - if dry or os.path.exists(output_path): - continue - else: - daily.to_netcdf(output_path) - - -def add_time_dim(source: str, - hemisphere: str, - identifier: str, - dry: bool = False, - var_names: object = []): - """ - - :param source: - :param hemisphere: - :param identifier: - :param dry: - :param var_names: - """ - files = {} - - for var_name in var_names: - var_path = os.path.join(source, identifier, hemisphere, var_name) - - if var_name not in files: - files[var_name] = {} - - file_list = glob.glob("{}/*/*.nc".format(var_path)) - - for path, filename in [os.path.split(el) for el in file_list]: - if filename.startswith("{}_".format(var_name)): - raise RuntimeError( - "{} starts with var name, we only want " - "correctly named files to convert".format(filename)) - year = str(path.split(os.sep)[-1]) - - if year not in files[var_name]: - files[var_name][year] = [] - - src = os.path.join(path, filename) - dest = os.path.join(path, "{}_{}".format(var_name, filename)) - - if not dry: - try: - os.rename(src, dest) - except OSError as e: - logging.exception("Not able to move file to temporary" - "destination {}".format(dest)) - raise e - else: - logging.info("{} -> {}".format(src, dest)) - - files[var_name][year].append(dest) - - for year_files in [files[var][el] for var in files for el in files[var]]: - if not dry: - ds = xr.open_mfdataset(year_files, - combine="nested", - concat_dim="time", - parallel=True) - - if "siconca" in year_files[0]: - ds = ds.rename_vars({"siconca": "ice_conc"}) - ds = ds.sortby("time") - ds['time'] = [ - pd.Timestamp(el) for el in ds.indexes['time'].normalize() - ] - - for d in ds.time.values: - dt = pd.to_datetime(d) - date_str = dt.strftime("%Y_%m_%d") - fpath = os.path.join( - os.path.split(year_files[0])[0], "{}.nc".format(date_str)) - - if not os.path.exists(fpath): - dw = ds.sel(time=slice(dt, dt)) - - logging.info("Writing {}".format(fpath)) - dw.to_netcdf(fpath) - else: - raise RuntimeError("Already exists: {}".format(fpath)) - - ds.close() - - for orig_file in year_files: - logging.info("Removing {}".format(orig_file)) - os.unlink(orig_file) - else: - logging.info("Would process out: {}".format(year_files)) - - -@setup_logging -def get_args(): - """ - - :return: - """ - a = argparse.ArgumentParser() - a.add_argument("-d", "--dry", default=False, action="store_true") - a.add_argument("-o", "--output", default="./data") - a.add_argument("-v", "--verbose", default=False, action="store_true") - a.add_argument("source") - a.add_argument("hemisphere", choices=["nh", "sh"]) - a.add_argument("identifier") - a.add_argument("vars", nargs='+') - return a.parse_args() - - -def add_time_dim_main(): - """CLI stub to sort out missing time dimensions in daily data - """ - - args = get_args() - logging.info("Temporary solution for sorting missing time dim in files") - - if args.output != "./data": - raise RuntimeError("output is not used for this command: {}".format( - args.output)) - - add_time_dim(args.source, - args.hemisphere, - args.identifier, - dry=args.dry, - var_names=args.vars) - - -def reprocess_main(): - """CLI stub solution for reprocessing monthly files - """ - args = get_args() - logging.info("Temporary solution for reprocessing monthly files") - reprocess_monthlies(args.source, - args.hemisphere, - args.identifier, - output_base=args.output, - dry=args.dry, - var_names=args.vars) diff --git a/icenet/data/sic/osisaf.py b/icenet/data/sic/osisaf.py deleted file mode 100644 index 8684802d..00000000 --- a/icenet/data/sic/osisaf.py +++ /dev/null @@ -1,808 +0,0 @@ -import copy -import csv -import fnmatch -import ftplib -import logging -import os - -import datetime as dt -from ftplib import FTP - -import dask -from distributed import Client, LocalCluster -import numpy as np -import pandas as pd -import xarray as xr - -from icenet.data.cli import download_args -from icenet.data.producers import Downloader -from icenet.data.sic.mask import Masks -from icenet.utils import Hemisphere, run_command -from icenet.data.sic.utils import SIC_HEMI_STR -""" - -""" - -invalid_sic_days = { - Hemisphere.NORTH: [ - *[ - d.date() - for d in pd.date_range(dt.date(1979, 5, 21), dt.date(1979, 6, 4)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1979, 6, 10), dt.date(1979, 6, 26)) - ], - dt.date(1979, 7, 1), - *[ - d.date() - for d in pd.date_range(dt.date(1979, 7, 24), dt.date(1979, 7, 28)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1980, 1, 4), dt.date(1980, 1, 10)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1980, 2, 27), dt.date(1980, 3, 4)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1980, 3, 16), dt.date(1980, 3, 22)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1980, 4, 9), dt.date(1980, 4, 15)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1981, 2, 27), dt.date(1981, 3, 5)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1984, 8, 12), dt.date(1984, 8, 24)) - ], - dt.date(1984, 9, 14), - *[ - d.date() - for d in pd.date_range(dt.date(1985, 9, 22), dt.date(1985, 9, 28)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1986, 3, 29), dt.date(1986, 7, 1)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1987, 1, 3), dt.date(1987, 1, 19)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1987, 1, 29), dt.date(1987, 2, 2)) - ], - dt.date(1987, 2, 23), - *[ - d.date() - for d in pd.date_range(dt.date(1987, 2, 26), dt.date(1987, 3, 2)) - ], - dt.date(1987, 3, 13), - *[ - d.date() - for d in pd.date_range(dt.date(1987, 3, 22), dt.date(1987, 3, 26)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1987, 4, 3), dt.date(1987, 4, 17)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1987, 12, 1), dt.date(1988, 1, 12)) - ], - dt.date(1989, 1, 3), - *[ - d.date() - for d in pd.date_range(dt.date(1990, 12, 21), dt.date(1990, 12, 26)) - ], - dt.date(1979, 5, 28), - dt.date(1979, 5, 30), - dt.date(1979, 6, 1), - dt.date(1979, 6, 3), - dt.date(1979, 6, 11), - dt.date(1979, 6, 13), - dt.date(1979, 6, 15), - dt.date(1979, 6, 17), - dt.date(1979, 6, 19), - dt.date(1979, 6, 21), - dt.date(1979, 6, 23), - dt.date(1979, 6, 25), - dt.date(1979, 7, 1), - dt.date(1979, 7, 25), - dt.date(1979, 7, 27), - dt.date(1984, 9, 14), - dt.date(1987, 1, 16), - dt.date(1987, 1, 18), - dt.date(1987, 1, 30), - dt.date(1987, 2, 1), - dt.date(1987, 2, 23), - dt.date(1987, 2, 27), - dt.date(1987, 3, 1), - dt.date(1987, 3, 13), - dt.date(1987, 3, 23), - dt.date(1987, 3, 25), - dt.date(1987, 4, 4), - dt.date(1987, 4, 6), - dt.date(1987, 4, 10), - dt.date(1987, 4, 12), - dt.date(1987, 4, 14), - dt.date(1987, 4, 16), - dt.date(1987, 4, 4), - dt.date(1990, 1, 26), - dt.date(2022, 11, 9), - ], - Hemisphere.SOUTH: [ - dt.date(1979, 2, 5), - dt.date(1979, 2, 25), - dt.date(1979, 3, 23), - *[ - d.date() - for d in pd.date_range(dt.date(1979, 3, 26), dt.date(1979, 3, 30)) - ], - dt.date(1979, 4, 12), - dt.date(1979, 5, 16), - *[ - d.date() - for d in pd.date_range(dt.date(1979, 5, 21), dt.date(1979, 5, 27)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1979, 7, 10), dt.date(1979, 7, 18)) - ], - dt.date(1979, 8, 10), - dt.date(1979, 9, 3), - *[ - d.date() - for d in pd.date_range(dt.date(1980, 1, 4), dt.date(1980, 1, 10)) - ], - dt.date(1980, 2, 16), - *[ - d.date() - for d in pd.date_range(dt.date(1980, 2, 27), dt.date(1980, 3, 4)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1980, 3, 14), dt.date(1980, 3, 22)) - ], - dt.date(1980, 3, 31), - *[ - d.date() - for d in pd.date_range(dt.date(1980, 4, 9), dt.date(1980, 4, 15)) - ], - dt.date(1980, 4, 22), - *[ - d.date() - for d in pd.date_range(dt.date(1981, 2, 27), dt.date(1981, 3, 5)) - ], - dt.date(1981, 6, 10), - *[ - d.date() - for d in pd.date_range(dt.date(1981, 8, 3), dt.date(1982, 8, 9)) - ], - dt.date(1982, 8, 6), - *[ - d.date() - for d in pd.date_range(dt.date(1983, 7, 7), dt.date(1983, 7, 11)) - ], - dt.date(1983, 7, 22), - dt.date(1984, 6, 12), - *[ - d.date() - for d in pd.date_range(dt.date(1984, 8, 12), dt.date(1984, 8, 24)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1984, 9, 13), dt.date(1984, 9, 17)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1984, 10, 3), dt.date(1984, 10, 9)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1984, 11, 18), dt.date(1984, 11, 22)) - ], - dt.date(1985, 7, 23), - *[ - d.date() - for d in pd.date_range(dt.date(1985, 9, 22), dt.date(1985, 9, 28)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1986, 3, 29), dt.date(1986, 11, 2)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1987, 1, 3), dt.date(1987, 1, 15)) - ], - *[ - d.date() - for d in pd.date_range(dt.date(1987, 12, 1), dt.date(1988, 1, 12)) - ], - dt.date(1990, 8, 14), - dt.date(1990, 8, 15), - dt.date(1990, 8, 24), - *[ - d.date() - for d in pd.date_range(dt.date(1990, 12, 22), dt.date(1990, 12, 26)) - ], - dt.date(1979, 2, 5), - dt.date(1979, 2, 25), - dt.date(1979, 3, 23), - dt.date(1979, 3, 27), - dt.date(1979, 3, 29), - dt.date(1979, 4, 12), - dt.date(1979, 5, 16), - dt.date(1979, 7, 11), - dt.date(1979, 7, 13), - dt.date(1979, 7, 15), - dt.date(1979, 7, 17), - dt.date(1979, 8, 10), - dt.date(1979, 9, 3), - dt.date(1980, 2, 16), - dt.date(1980, 3, 15), - dt.date(1980, 3, 31), - dt.date(1980, 4, 22), - dt.date(1981, 6, 10), - dt.date(1982, 8, 6), - dt.date(1983, 7, 8), - dt.date(1983, 7, 10), - dt.date(1983, 7, 22), - dt.date(1984, 6, 12), - dt.date(1984, 9, 14), - dt.date(1984, 9, 16), - dt.date(1984, 10, 4), - dt.date(1984, 10, 6), - dt.date(1984, 10, 8), - dt.date(1984, 11, 19), - dt.date(1984, 11, 21), - dt.date(1985, 7, 23), - *[ - d.date() - for d in pd.date_range(dt.date(1986, 7, 2), dt.date(1986, 11, 1)) - ], - dt.date(1990, 8, 14), - dt.date(1990, 8, 15), - dt.date(1990, 8, 24), - dt.date(2022, 11, 9), - ] -} - -var_remove_list = [ - 'time_bnds', 'raw_ice_conc_values', 'total_standard_error', - 'smearing_standard_error', 'algorithm_standard_error', 'status_flag', - 'Lambert_Azimuthal_Grid' -] - - -# This is adapted from the data/loaders implementations -class DaskWrapper: - """ - - :param dask_port: - :param dask_timeouts: - :param dask_tmp_dir: - :param workers: - """ - - def __init__(self, - dask_port: int = 8888, - dask_timeouts: int = 60, - dask_tmp_dir: object = "/tmp", - workers: int = 8): - - self._dashboard_port = dask_port - self._timeout = dask_timeouts - self._tmp_dir = dask_tmp_dir - self._workers = workers - - def dask_process(self, *args, method: callable, **kwargs): - """ - - :param method: - """ - dashboard = "localhost:{}".format(self._dashboard_port) - - with dask.config.set({ - "temporary_directory": self._tmp_dir, - "distributed.comm.timeouts.connect": self._timeout, - "distributed.comm.timeouts.tcp": self._timeout, - }): - cluster = LocalCluster( - dashboard_address=dashboard, - n_workers=self._workers, - threads_per_worker=1, - scheduler_port=0, - ) - logging.info("Dashboard at {}".format(dashboard)) - - with Client(cluster) as client: - logging.info("Using dask client {}".format(client)) - ret = method(*args, **kwargs) - return ret - - -class SICDownloader(Downloader): - """Downloads OSI-SAF SIC data from 1979-present using OpenDAP. - - The dataset comprises OSI-450 (1979-2015) and OSI-430-b (2016-ownards) - Monthly averages are-computed on the server-side. - This script can take about an hour to run. - - The query URLs were obtained from the following sites: - - OSI-450 (1979-2016): https://thredds.met.no/thredds/dodsC/osisaf/ - met.no/reprocessed/ice/conc_v2p0_nh_agg.html - - OSI-430-b (2016-present): https://thredds.met.no/thredds/dodsC/osisaf/ - met.no/reprocessed/ice/conc_crb_nh_agg.html - - OSI-430-a (2022-present): https://osi-saf.eumetsat.int/products/osi-430-as - - :param additional_invalid_dates: - :param chunk_size: - :param dates: - :param delete_tempfiles: - :param download: - :param dtype: - """ - - def __init__(self, - *args, - additional_invalid_dates: object = (), - chunk_size: int = 10, - dates: object = (), - delete_tempfiles: bool = True, - download: bool = True, - dtype: object = np.float32, - parallel_opens: bool = True, - **kwargs): - super().__init__(*args, identifier="osisaf", **kwargs) - - self._chunk_size = chunk_size - self._dates = dates - self._delete = delete_tempfiles - self._download = download - self._dtype = dtype - self._parallel_opens = parallel_opens - self._invalid_dates = invalid_sic_days[self.hemisphere] + \ - list(additional_invalid_dates) - self._masks = Masks(north=self.north, south=self.south) - - self._ftp_osi450 = "/reprocessed/ice/conc/v2p0/{:04d}/{:02d}/" - self._ftp_osi430b = "/reprocessed/ice/conc-cont-reproc/v2p0/{:04d}/{:02d}/" - self._ftp_osi430a = "/reprocessed/ice/conc-cont-reproc/v3p0/{:04d}/{:02d}/" - - self._mask_dict = { - month: self._masks.get_active_cell_mask(month) - for month in np.arange(1, 12 + 1) - } - - # Load dates that previously had a file size of zero. - # To recheck they haven't been fixed since last download. - zero_dates_path = os.path.join(self.get_data_var_folder("siconca"), - "zero_size_days.csv") - - self._zero_dates_path = zero_dates_path - self._zero_dates = [] - if os.path.exists(zero_dates_path): - with open(zero_dates_path, "r") as fh: - self._zero_dates = [ - pd.to_datetime("-".join(date)).date() - for date in csv.reader(fh) - ] - - def download(self): - """ - - """ - hs = SIC_HEMI_STR[self.hemisphere_str[0]] - data_files = [] - ftp = None - var = "siconca" - - logging.info("Not downloading SIC files, (re)processing NC files in " - "existence already" if not self._download else - "Downloading SIC datafiles to .temp intermediates...") - - cache = {} - osi430b_start = dt.date(2016, 1, 1) - osi430a_start = dt.date(2021, 1, 1) - - dt_arr = list(reversed(sorted(copy.copy(self._dates)))) - - # Filtering dates based on existing data - filter_years = sorted(set([d.year for d in dt_arr])) - extant_paths = [ - os.path.join(self.get_data_var_folder(var), - "{}.nc".format(filter_ds)) - for filter_ds in filter_years - ] - extant_paths = [df for df in extant_paths if os.path.exists(df)] - - if len(extant_paths) > 0: - extant_ds = xr.open_mfdataset(extant_paths) - exclude_dates = [ - pd.to_datetime(date).date() for date in extant_ds.time.values - ] - - # Do not exclude dates that previously had a file size of 0 - exclude_dates = set(exclude_dates).difference(self._zero_dates) - - logging.info("Excluding {} dates already existing from {} dates " - "requested.".format(len(exclude_dates), len(dt_arr))) - - dt_arr = sorted(list(set(dt_arr).difference(exclude_dates))) - dt_arr.reverse() - - # We won't hold onto an active dataset during network I/O - extant_ds.close() - - # End filtering - - while len(dt_arr): - el = dt_arr.pop() - - if el in self._invalid_dates: - logging.warning("Date {} is in invalid list".format(el)) - continue - - date_str = el.strftime("%Y_%m_%d") - temp_path = os.path.join( - self.get_data_var_folder(var, append=[str(el.year)]), - "{}.temp".format(date_str)) - nc_path = os.path.join( - self.get_data_var_folder(var, append=[str(el.year)]), - "{}.nc".format(date_str)) - - if not self._download: - if os.path.exists(nc_path): - reproc_path = os.path.join( - self.get_data_var_folder(var, append=[str(el.year)]), - "{}.reproc.nc".format(date_str)) - - logging.debug("{} exists, becoming {}".format( - nc_path, reproc_path)) - os.rename(nc_path, reproc_path) - data_files.append(reproc_path) - else: - if os.path.exists(temp_path): - logging.info("Using existing {}".format(temp_path)) - data_files.append(temp_path) - else: - logging.debug("{} does not exist".format(nc_path)) - continue - else: - if not os.path.isdir(os.path.dirname(temp_path)): - os.makedirs(os.path.dirname(temp_path), exist_ok=True) - - if os.path.exists(temp_path) or os.path.exists(nc_path): - logging.debug("{} file exists, skipping".format(date_str)) - if not os.path.exists(nc_path): - data_files.append(temp_path) - continue - - if not ftp: - logging.info("FTP opening") - ftp = FTP('osisaf.met.no') - ftp.login() - - chdir_path = self._ftp_osi450 \ - if el < osi430b_start else self._ftp_osi430b \ - if el < osi430a_start else self._ftp_osi430a - - chdir_path = chdir_path.format(el.year, el.month) - - try: - ftp.cwd(chdir_path) - - if chdir_path not in cache: - cache[chdir_path] = ftp.nlst() - - cache_match = "ice_conc_{}_ease*_{:04d}{:02d}{:02d}*.nc".\ - format(hs, el.year, el.month, el.day) - ftp_files = [ - el for el in cache[chdir_path] - if fnmatch.fnmatch(el, cache_match) - ] - - if len(ftp_files) > 1: - raise ValueError( - "More than a single file found: {}".format( - ftp_files)) - elif not len(ftp_files): - logging.warning( - "File is not available: {}".format(cache_match)) - continue - except ftplib.error_perm: - logging.warning("FTP error, possibly missing month chdir " - "for {}".format(date_str)) - continue - - # Check if remote file size is too small, if so, render date invalid - # and continue. - file_size = ftp.size(ftp_files[0]) - - # Check remote file size in bytes - if file_size < 100: - logging.warning( - f"Date {el} is in invalid list, as file size too small") - self._zero_dates.append(el) - self._invalid_dates.append(el) - continue - else: - # Removing missing date file if it was created for a file with zero size before - if el in self._zero_dates: - self._zero_dates.remove(el) - fpath = os.path.join( - self.get_data_var_folder( - "siconca", - append=[str(pd.to_datetime(el).year)]), - "missing.{}.nc".format(date_str)) - if os.path.exists(fpath): - os.unlink(fpath) - - with open(temp_path, "wb") as fh: - ftp.retrbinary("RETR {}".format(ftp_files[0]), fh.write) - - logging.debug("Downloaded {}".format(temp_path)) - data_files.append(temp_path) - - self._zero_dates = set(self._zero_dates) - self.zero_dates() - - if ftp: - ftp.quit() - - logging.debug("Files being processed: {}".format(data_files)) - - if len(data_files): - ds = xr.open_mfdataset(data_files, - combine="nested", - concat_dim="time", - data_vars=["ice_conc"], - drop_variables=var_remove_list, - engine="netcdf4", - chunks=dict(time=self._chunk_size,), - parallel=self._parallel_opens) - - logging.debug("Processing out extraneous data") - - ds = ds.drop_vars(var_remove_list, errors="ignore") - da = ds.resample(time="1D").mean().ice_conc - - da = da.where(da < 9.9e+36, 0.) # Missing values - da /= 100. # Convert from SIC % to fraction - - for coord in ['lat', 'lon']: - if coord not in da.coords: - logging.warning("Adding {} vals to coords, as missing in " - "this the combined dataset".format(coord)) - da.coords[coord] = self._get_missing_coordinates( - var, hs, coord) - - # In experimenting, I don't think this is actually required - for month, mask in self._mask_dict.items(): - da.loc[dict( - time=(da['time.month'] == month))].values[:, ~mask] = 0. - - for date in da.time.values: - day_da = da.sel(time=slice(date, date)) - - if np.sum(np.isnan(day_da.data)) > 0: - logging.warning("NaNs detected, adding to invalid " - "list: {}".format(date)) - self._invalid_dates.append(pd.to_datetime(date)) - - var_folder = self.get_data_var_folder(var) - group_by = "time.year" - - for year, year_da in da.groupby(group_by): - req_date = pd.to_datetime(year_da.time.values[0]) - - year_path = os.path.join( - var_folder, "{}.nc".format(getattr(req_date, "year"))) - old_year_path = os.path.join( - var_folder, "old.{}.nc".format(getattr(req_date, "year"))) - - if os.path.exists(year_path): - logging.info( - "Existing file needs concatenating: {} -> {}".format( - year_path, old_year_path)) - os.rename(year_path, old_year_path) - old_da = xr.open_dataarray(old_year_path) - year_da = year_da.drop_sel(time=old_da.time, - errors="ignore") - year_da = xr.concat([old_da, year_da], - dim="time").sortby("time") - old_da.close() - os.unlink(old_year_path) - - logging.info("Saving {}".format(year_path)) - year_da.compute() - year_da.to_netcdf(year_path) - - self.missing_dates() - - if self._delete: - for fpath in data_files: - os.unlink(fpath) - - def zero_dates(self): - """ - Write out any dates that have zero file size on the ftp server to csv - """ - if not self._zero_dates and os.path.exists(self._zero_dates_path): - os.unlink(self._zero_dates_path) - elif self._zero_dates: - logging.info(f"Processing {len(self._zero_dates)} zero dates") - with open(self._zero_dates_path, "w") as fh: - for date in self._zero_dates: - # FIXME: slightly unusual format for Ymd dates - fh.write(date.strftime("%Y,%m,%d\n")) - - def missing_dates(self): - """ - - :return: - """ - filenames = set([ - os.path.join(self.get_data_var_folder("siconca"), - "{}.nc".format(el.strftime("%Y"))) - for el in self._dates - ]) - filenames = [f for f in filenames if os.path.exists(f)] - - logging.info("Opening for interpolation: {}".format(filenames)) - ds = xr.open_mfdataset(filenames, - combine="nested", - concat_dim="time", - chunks=dict(time=self._chunk_size,), - parallel=self._parallel_opens) - return self._missing_dates(ds.ice_conc) - - def _missing_dates(self, da: object) -> object: - """ - - :param da: - :return: - """ - if pd.Timestamp(1979, 1, 2) in da.time.values \ - and dt.date(1979, 1, 1) in self._dates\ - and pd.Timestamp(1979, 1, 1) not in da.time.values: - da_1979_01_01 = da.sel( - time=[pd.Timestamp(1979, 1, 2)]).copy().assign_coords( - {'time': [pd.Timestamp(1979, 1, 1)]}) - da = xr.concat([da, da_1979_01_01], dim='time') - da = da.sortby('time') - - dates_obs = [pd.to_datetime(date).date() for date in da.time.values] - dates_all = [ - pd.to_datetime(date).date() - for date in pd.date_range(min(self._dates), max(self._dates)) - ] - - # Weirdly, we were getting future warnings for timestamps, but unsure - # where from - invalid_dates = [pd.to_datetime(d).date() for d in self._invalid_dates] - missing_dates = [ - date for date in dates_all - if date not in dates_obs or date in invalid_dates - ] - - logging.info("Processing {} missing dates".format(len(missing_dates))) - - missing_dates_path = os.path.join(self.get_data_var_folder("siconca"), - "missing_days.csv") - - with open(missing_dates_path, "a") as fh: - for date in missing_dates: - # FIXME: slightly unusual format for Ymd dates - fh.write(date.strftime("%Y,%m,%d\n")) - - logging.debug("Interpolating {} missing dates".format( - len(missing_dates))) - - for date in missing_dates: - if pd.Timestamp(date) not in da.time.values: - logging.info("Interpolating {}".format(date)) - da = xr.concat([da, da.interp(time=pd.to_datetime(date))], - dim='time') - - logging.debug("Finished interpolation") - - da = da.sortby('time') - da.data = np.array(da.data, dtype=self._dtype) - - for date in missing_dates: - date_str = pd.to_datetime(date).strftime("%Y_%m_%d") - fpath = os.path.join( - self.get_data_var_folder( - "siconca", append=[str(pd.to_datetime(date).year)]), - "missing.{}.nc".format(date_str)) - - if not os.path.exists(fpath): - day_da = da.sel(time=slice(date, date)) - mask = self._mask_dict[pd.to_datetime(date).month] - - day_da.data[0][~mask] = 0. - - logging.info("Writing missing date file {}".format(fpath)) - day_da.to_netcdf(fpath) - - return da - - def _get_missing_coordinates(self, var, hs, coord): - """ - - :param var: - :param hs: - :param coord: - """ - missing_coord_file = os.path.join(self.get_data_var_folder(var), - "missing_coord_data.nc") - - if not os.path.exists(missing_coord_file): - ftp_source_path = self._ftp_osi450.format(2000, 1) - - retrieve_cmd_template_osi450 = \ - "wget -m -nH -nd -O {} " \ - "ftp://osisaf.met.no{}/{}" - filename_osi450 = \ - "ice_conc_{}_ease2-250_cdr-v2p0_200001011200.nc".format(hs) - - run_command( - retrieve_cmd_template_osi450.format(missing_coord_file, - ftp_source_path, - filename_osi450)) - else: - logging.info( - "Coordinate path {} already exists".format(missing_coord_file)) - - ds = xr.open_dataset(missing_coord_file, - drop_variables=var_remove_list, - engine="netcdf4").load() - try: - coord_data = getattr(ds, coord) - except AttributeError as e: - logging.exception( - "{} does not exist in coord reference file {}".format( - coord, missing_coord_file)) - raise RuntimeError(e) - return coord_data - - -def main(): - args = download_args(var_specs=False, - workers=True, - extra_args=[(("-u", "--use-dask"), - dict(action="store_true", default=False)), - (("-c", "--sic-chunking-size"), - dict(type=int, default=10)), - (("-dt", "--dask-timeouts"), - dict(type=int, default=120)), - (("-dp", "--dask-port"), - dict(type=int, default=8888))]) - - logging.info("OSASIF-SIC Data Downloading") - sic = SICDownloader( - chunk_size=args.sic_chunking_size, - dates=[ - pd.to_datetime(date).date() - for date in pd.date_range(args.start_date, args.end_date, freq="D") - ], - delete_tempfiles=args.delete, - north=args.hemisphere == "north", - south=args.hemisphere == "south", - parallel_opens=args.parallel_opens, - ) - if args.use_dask: - logging.warning("Attempting to use dask client for SIC processing") - dw = DaskWrapper(workers=args.workers) - dw.dask_process(method=sic.download) - else: - sic.download() diff --git a/icenet/data/sic/utils.py b/icenet/data/sic/utils.py deleted file mode 100644 index ec75ec3f..00000000 --- a/icenet/data/sic/utils.py +++ /dev/null @@ -1,5 +0,0 @@ -""" - -""" - -SIC_HEMI_STR = dict(north="nh", south="sh") diff --git a/requirements.txt b/requirements.txt index 899b469e..6481736a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ cdsapi cfgrib dask distributed +download-toolbox @ git+https://github.com/antarctica/download-toolbox@main eccodes ecmwf-api-client h5py>2.10 diff --git a/setup.py b/setup.py index 333a4b30..8abf042f 100644 --- a/setup.py +++ b/setup.py @@ -40,18 +40,6 @@ def get_content(filename): "console_scripts": [ "icenet_data_masks = icenet.data.sic.mask:main", - "icenet_data_cmip = icenet.data.interfaces.esgf:main", - "icenet_data_era5 = icenet.data.interfaces.cds:main", - "icenet_data_oras5 = icenet.data.interfaces.cmems:main", - "icenet_data_hres = icenet.data.interfaces.mars:hres_main", - "icenet_data_seas = icenet.data.interfaces.mars:seas_main", - "icenet_data_sic = icenet.data.sic.osisaf:main", - - "icenet_data_reproc_monthly = " - "icenet.data.interfaces.utils:reprocess_main", - "icenet_data_add_time_dim = " - "icenet.data.interfaces.utils:add_time_dim_main", - "icenet_process_cmip = icenet.data.processors.cmip:main", "icenet_process_era5 = icenet.data.processors.era5:main", "icenet_process_oras5 = icenet.data.processors.oras5:main",