Skip to content

Commit

Permalink
Merge pull request #62 from EcoExtreML/fix_59
Browse files Browse the repository at this point in the history
Move to new cds
  • Loading branch information
SarahAlidoost authored Nov 25, 2024
2 parents 4f652d6 + 933b053 commit d95d0a5
Show file tree
Hide file tree
Showing 63 changed files with 765 additions and 312 deletions.
23 changes: 15 additions & 8 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@ The configuration file should contain the `working_directory`, for instance:
working_directory: /path_to_a_working_directory/ #for example: /home/bart/Zampy
```
If you need access to data on CDS or ADS server, you should add your CDS or ADS credentials to `zampy_config.yml`:
The old Climate Data Store (CDS) is shut down on 3 September 2024. For more
information see:
[the-new-climate-data-store-beta](https://forum.ecmwf.int/t/the-new-climate-data-store-beta-cds-beta-is-now-live/3315).
To use the new CDS/ADS, you need to have an ECMWF account, your existing CDS/ADS
credentials does not work.
If you need access to data on CDS or ADS server, you should add your CDS/ADS
credentials to `zampy_config.yml`. To find your key, go to [CDS how to
api](https://cds.climate.copernicus.eu/how-to-api), or [ADS how to
api](https://ads.atmosphere.copernicus.eu/how-to-api). You can skip the steps
related to `.cdsapirc` and simply add the key to `zampy_config.yml`:

```yaml
cdsapi:
url: # for example https://cds.climate.copernicus.eu/api/v2
key: # for example 12345:xhashd-232jcsha-dsaj429-cdjajd29319
url: # for example https://cds.climate.copernicus.eu/api
key: # for example xhashd-232jcsha-dsaj429-cdjajd29319
adsapi:
url: # for example https://ads.atmosphere.copernicus.eu/api/v2
key: # for example 12345:xhashd-232jcsha-dsaj429-cdjajd29319
url: # for example https://ads.atmosphere.copernicus.eu/api
key: # for example xhashd-232jcsha-dsaj429-cdjajd29319
```

## Instructions for CDS/ADS datasets
Expand All @@ -45,9 +55,6 @@ To download the following datasets, users need access to CDS/ADS via `cdsapi`/`a
- ADS
- CAMS EGG4 (e.g. co2)

To generate these API keys, you need to be a registered user on *CDS* via the [registration page](https://cds.climate.copernicus.eu/user/register?destination=%2F%23!%2Fhome), or on *ADS* via the [registration page](https://ads.atmosphere.copernicus.eu/user/register?destination=%2F%23!%2Fhome).

Before submitting any request with `zampy`, please put your `cdsapi`/`adsapi` credentials in `zampy_config.yml`. Here is a short [instruction](https://cds.climate.copernicus.eu/api-how-to) about how to find your CDS/ADS API key. You can skip the steps related to `.cdsapirc` and simply add the key to `zampy_config.yml`.

### Agree to the Terms of Use on CDS/ADS

Expand Down
12 changes: 11 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ download:
cams:
variables:
- co2_concentration

convert:
convention: ALMA
frequency: 1H # outputs at 1 hour frequency. Pandas-like freq-keyword.
Expand All @@ -67,6 +67,16 @@ When you have your reciped created and saved on your disk, you can execute your
zampy /path_to_recipe/sample_recipe.yml
```

!!! note

You may recieve an error message from CDS/ADS if not all the required
licences have been accepted. Follow the instructions in the error message to
accept the licences and run the recipe again.

When downloading process starts, you can also check the status of your requests
in your CDS/ADS profile.


### Interact with `zampy` in notebooks

It is possible to use `zampy` directly in Python via its Python API. This is not recommended, as it is more difficult to reproduce the workflow if there is no recipe.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies = [
"pint",
"cf_xarray", # required to auto-pint CF compliant datasets.
"pint-xarray",
"cdsapi",
"cdsapi>=0.7.2",
"xarray-regrid", # for regridding
]
dynamic = ["version"]
Expand Down
4 changes: 2 additions & 2 deletions recipes/STEMMUS_SCOPE_input.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name: "STEMMUS_SCOPE_input"

download:
time: ["2020-01-01", "2020-06-30"]
time: ["2020-01-01", "2020-02-15"]
bbox: [60, 10, 50, 0] # NESW
datasets:
era5_land:
Expand Down Expand Up @@ -37,5 +37,5 @@ download:

convert:
convention: ALMA
frequency: 1H # outputs at 1 hour frequency. Pandas-like freq-keyword.
frequency: 1h # outputs at 1 hour frequency. Pandas-like freq-keyword.
resolution: 0.25 # output resolution in degrees.
25 changes: 19 additions & 6 deletions src/zampy/datasets/cds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
"01", "02", "03", "04", "05", "06", "07", "08", "09", "10",
"11", "12", "13", "14", "15", "16", "17", "18", "19", "20",
"21", "22", "23", "24", "25", "26", "27", "28", "29", "30",
"31",
"31",
] # fmt: skip

ALL_HOURS = [
ALL_HOURS = [
"00:00", "01:00", "02:00", "03:00", "04:00", "05:00", "06:00",
"07:00", "08:00", "09:00", "10:00", "11:00", "12:00", "13:00",
"14:00", "15:00", "16:00", "17:00", "18:00", "19:00", "20:00",
Expand Down Expand Up @@ -97,11 +97,13 @@ def cds_request(

url, api_key = cds_api_key(fname)

# TODO: expose timeout, see issue 64
c = cdsapi.Client(
url=url,
key=api_key,
verify=True,
quiet=True,
timeout=300,
)
# choose retrieve function
retrieve_func = RETRIEVE_FUNCTION[fname]
Expand All @@ -124,7 +126,8 @@ def cds_request_land_cover(
dataset: str,
time_bounds: TimeBounds,
path: Path,
overwrite: bool,
spatial_bounds: SpatialBounds | None = None,
overwrite: bool = False,
) -> None:
"""Download land cover data via CDS API.
Expand All @@ -136,6 +139,7 @@ def cds_request_land_cover(
dataset: Dataset name for retrieval via `cdsapi`.
time_bounds: Zampy time bounds object.
path: File path to which the data should be saved.
spatial_bounds: Zampy spatial bounds object.
overwrite: If an existing file (of the same size!) should be overwritten.
"""
fname = PRODUCT_FNAME[dataset]
Expand All @@ -152,18 +156,27 @@ def cds_request_land_cover(
years_months = time_bounds_to_year_month(time_bounds)
years = {year for (year, _) in years_months}

if spatial_bounds is not None:
area = [
spatial_bounds.north,
spatial_bounds.west,
spatial_bounds.south,
spatial_bounds.east,
]

for year in tqdm(years):
if int(year) < 2016:
version = "v2.0.7cds"
version = "v2_0_7cds"
else:
version = "v2.1.1"
version = "v2_1_1"
r = c.retrieve(
dataset,
{
"variable": "all",
"format": "zip",
"year": year,
"version": version,
"area": area,
},
)
fpath = path / f"{fname}_LCCS_MAP_300m_{year}.zip"
Expand Down Expand Up @@ -348,7 +361,7 @@ def _check_and_download(

def time_bounds_to_year_month(time_bounds: TimeBounds) -> list[tuple[str, str]]:
"""Return year/month pairs."""
date_range = pd.date_range(start=time_bounds.start, end=time_bounds.end, freq="M")
date_range = pd.date_range(start=time_bounds.start, end=time_bounds.end, freq="ME")
year_month_pairs = [(str(date.year), str(date.month)) for date in date_range]
return year_month_pairs

Expand Down
10 changes: 9 additions & 1 deletion src/zampy/datasets/ecmwf_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,19 @@ def load(
files += (ingest_dir / self.name).glob(f"{self.name}_{var}*.nc")

ds = xr.open_mfdataset(files, chunks={"latitude": 200, "longitude": 200})
ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))

# rename valid_time to time
if "valid_time" in ds.dims:
ds = ds.rename({"valid_time": "time"})

ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))
grid = xarray_regrid.create_regridding_dataset(
make_grid(spatial_bounds, resolution)
)

# this is needed before regrid
ds = ds.unify_chunks()

ds = ds.regrid.linear(grid)

return ds
Expand Down
3 changes: 2 additions & 1 deletion src/zampy/datasets/eth_canopy_height.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ def convert_tiff_to_netcdf(
ds = parse_tiff_file(file, sd_file)

# Coarsen the data to be 1/100 deg resolution instead of 1/12000
ds = ds.coarsen({"latitude": 120, "longitude": 120}).mean() # type: ignore
if len(ds.latitude) >= 120 and len(ds.longitude) >= 120:
ds = ds.coarsen({"latitude": 120, "longitude": 120}).mean() # type: ignore
ds = ds.compute()
ds = ds.interpolate_na(dim="longitude", limit=1)
ds = ds.interpolate_na(dim="latitude", limit=1)
Expand Down
5 changes: 2 additions & 3 deletions src/zampy/datasets/fapar_lai.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ def load(
variable_names: list[str],
) -> xr.Dataset:
files = list((ingest_dir / self.name).glob("*.nc"))

ds = xr.open_mfdataset(files, parallel=True)
ds = xr.open_mfdataset(files) # see issue 65
ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))

grid = xarray_regrid.create_regridding_dataset(
Expand Down Expand Up @@ -223,7 +222,7 @@ def download_fapar_lai(
"format": "zip",
"variable": "lai",
"horizontal_resolution": "1km",
"product_version": "V3",
"product_version": "v3",
"satellite": "spot" if year < 2014 else "proba",
"sensor": "vgt",
"month": f"{month:0>2}",
Expand Down
61 changes: 50 additions & 11 deletions src/zampy/datasets/land_cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from tempfile import TemporaryDirectory
from zipfile import ZipFile
import dask.array
import numpy as np
import xarray as xr
import xarray_regrid
Expand Down Expand Up @@ -82,6 +83,7 @@ def download(
cds_utils.cds_request_land_cover(
dataset=self.cds_dataset,
time_bounds=time_bounds,
spatial_bounds=spatial_bounds,
path=download_folder,
overwrite=overwrite,
)
Expand Down Expand Up @@ -134,16 +136,28 @@ def load(
)
raise ValueError(msg)
files = list((ingest_dir / self.name).glob(f"{self.name}_*.nc"))

ds = xr.open_mfdataset(files, chunks={"latitude": 200, "longitude": 200})
ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))

grid = xarray_regrid.create_regridding_dataset(
utils.make_grid(spatial_bounds, resolution)
)
ds = ds.regrid.most_common(grid, time_dim="time", max_mem=1e9)

return ds
ds_regrid = {}
for variable in variable_names:
# select the variable to be regridded
da = ds[variable]

# get values for most common method
regrid_values = get_unique_values(da)

da_regrid = da.regrid.most_common(grid, values=regrid_values)

# make sure dtype is the same
# in the xarray_regrid> v0.4.0, this might not be necessary
ds_regrid[variable] = da_regrid.astype(da.dtype)

return xr.Dataset(ds_regrid)

def convert(
self,
Expand Down Expand Up @@ -207,27 +221,37 @@ def extract_netcdf_to_zampy(file: Path) -> xr.Dataset:

# only keep land cover class variable
with xr.open_dataset(unzip_folder / zipped_file_name) as ds:
var_list = [var for var in ds.data_vars]
var_list = list(ds.data_vars)
raw_variable = "lccs_class"
var_list.remove(raw_variable)
ds = ds.drop_vars(var_list) # noqa: PLW2901

ds = ds.sortby(["lat", "lon"]) # noqa: PLW2901
ds = ds.rename({"lat": "latitude", "lon": "longitude"}) # noqa: PLW2901
new_grid = xarray_regrid.Grid(
north=90,
east=180,
south=-90,
west=-180,
north=ds["latitude"].max().item(),
east=ds["longitude"].max().item(),
south=ds["latitude"].min().item(),
west=ds["longitude"].min().item(),
resolution_lat=0.05,
resolution_lon=0.05,
)

target_dataset = xarray_regrid.create_regridding_dataset(new_grid)

ds_regrid = ds.regrid.most_common(
target_dataset, time_dim="time", max_mem=1e9
)
# select the variable to be regridded
da = ds[raw_variable]

# get values for most common method
regrid_values = get_unique_values(da)

da_regrid = da.regrid.most_common(target_dataset, values=regrid_values)

# make sure dtype is the same
da_regrid = da_regrid.astype(da.dtype)

# convert dataarray to dataset
ds_regrid = da_regrid.to_dataset()

# rename variable to follow the zampy convention
variable_name = "land_cover"
Expand All @@ -240,3 +264,18 @@ def extract_netcdf_to_zampy(file: Path) -> xr.Dataset:
].desc

return ds_regrid


def get_unique_values(da: xr.DataArray) -> np.ndarray:
"""Get unique values of a land cover DataArray."""
if "flag_values" in da.attrs:
unique_values = da.attrs["flag_values"]
else:
# Convert to Dask array if not already
if not isinstance(da.data, dask.array.Array):
dask_array = dask.array.from_array(da.values, chunks="auto")
else:
dask_array = da.data
# Use Dask's unique function
unique_values = dask.array.unique(dask_array).compute()
return unique_values
7 changes: 5 additions & 2 deletions src/zampy/datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ def download_url(url: str, fpath: Path, overwrite: bool) -> None:
print(f"File '{fpath.name}' already exists, skipping...")


def get_url_size(url: str) -> int:
def get_url_size(url: str) -> int | None:
"""Return the size (bytes) of a given URL."""
response = requests.head(url)
return int(response.headers["Content-Length"])
content_length = response.headers.get("Content-Length")
if content_length:
return int(content_length)
return None


def get_file_size(fpath: Path) -> int:
Expand Down
14 changes: 10 additions & 4 deletions src/zampy/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,19 @@ def run(self) -> None:
ds = converter.convert(ds, dataset, convention=self.convention)

if "time" in ds.dims: # Dataset with only DEM (e.g.) has no time dim.
freq = xr.infer_freq(ds["time"])
if freq is None: # fallback:
freq = (
data_freq = None

if len(ds["time"]) == 1:
data_freq = pd.Timedelta(self.frequency)
elif len(ds["time"]) > 3: # see pandas _FrequencyInferer
freq = xr.infer_freq(ds["time"])
data_freq = pd.to_timedelta(pd.tseries.frequencies.to_offset(freq))

if data_freq is None: # fallback:
data_freq = pd.Timedelta(
ds["time"].isel(time=1).to_numpy()
- ds["time"].isel(time=0).to_numpy()
)
data_freq = pd.to_timedelta(pd.tseries.frequencies.to_offset(freq))

if data_freq < pd.Timedelta(self.frequency):
ds = ds.resample(time=self.frequency).mean()
Expand Down
Loading

0 comments on commit d95d0a5

Please sign in to comment.