-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Opening virtual datasets (dmr-adapter) #606
Changes from 10 commits
06592f1
c3eafb1
90f296a
dd006a3
b626a41
b09c3f0
ed4a98b
abe2c28
dde9c57
a9a9234
760f340
8fb0140
2c28278
c3e43ac
d3d6e7c
512e89c
67dbbe7
61afb95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
from __future__ import annotations | ||
|
||
from typing import TYPE_CHECKING | ||
|
||
import earthaccess | ||
|
||
if TYPE_CHECKING: | ||
import xarray as xr | ||
|
||
|
||
def open_virtual_mfdataset( | ||
granules: list[earthaccess.DataGranule], | ||
group: str | None = None, | ||
access: str = "indirect", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great that you're explicitly exposing this, I think we are going to deprecate the "magic" of detecting the runtime environment |
||
load: bool = False, | ||
preprocess: callable | None = None, # type: ignore | ||
parallel: bool = True, | ||
**xr_combine_nested_kwargs, | ||
) -> xr.Dataset: | ||
"""Open multiple granules as a single virtual xarray Dataset. | ||
|
||
Uses DMR++ metadata files to create a virtual xarray dataset with ManifestArrays. This virtual dataset can be used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. | ||
|
||
Parameters | ||
---------- | ||
granules : list[earthaccess.DataGranule] | ||
The granules to open | ||
group : str or None (default=None) | ||
The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the root group will be opened. | ||
If the DMR++ file does not have groups, this parameter is ignored. | ||
access : str (default="indirect") | ||
The access method to use. One of "direct" or "indirect". Use direct when running on AWS, use indirect when running on a local machine. | ||
load : bool (default=False) | ||
Create an xarray dataset with indexes and lazy loaded data. | ||
|
||
When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when `load=True` all the data is now available to access but not loaded into memory. When `load=False` a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. | ||
preprocess : callable (default=None) | ||
A function to apply to each virtual dataset before combining | ||
parallel : bool (default=True) | ||
Open the virtual datasets in parallel (using dask.delayed) | ||
xr_combine_nested_kwargs : dict | ||
Keyword arguments for xarray.combine_nested. | ||
See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html | ||
|
||
Returns: | ||
---------- | ||
xr.Dataset | ||
|
||
Examplea: | ||
---------- | ||
>>> results = earthaccess.search_data(count=5, temporal=("2024"), short_name="MUR-JPL-L4-GLOB-v4.1") | ||
>>> vds = open_virtual_mfdataset(results, access="indirect", load=False, concat_dim="time", coords='minimal', compat='override', combine_attrs="drop_conflicts") | ||
>>> vds | ||
<xarray.Dataset> Size: 29GB | ||
Dimensions: (time: 5, lat: 17999, lon: 36000) | ||
Coordinates: | ||
time (time) int32 20B ManifestArray<shape=(5,), dtype=int32,... | ||
lat (lat) float32 72kB ManifestArray<shape=(17999,), dtype=... | ||
lon (lon) float32 144kB ManifestArray<shape=(36000,), dtype... | ||
Data variables: | ||
mask (time, lat, lon) int8 3GB ManifestArray<shape=(5, 17999... | ||
sea_ice_fraction (time, lat, lon) int8 3GB ManifestArray<shape=(5, 17999... | ||
dt_1km_data (time, lat, lon) int8 3GB ManifestArray<shape=(5, 17999... | ||
analysed_sst (time, lat, lon) int16 6GB ManifestArray<shape=(5, 1799... | ||
analysis_error (time, lat, lon) int16 6GB ManifestArray<shape=(5, 1799... | ||
sst_anomaly (time, lat, lon) int16 6GB ManifestArray<shape=(5, 1799... | ||
Attributes: (12/42) | ||
Conventions: CF-1.7 | ||
title: Daily MUR SST, Final product | ||
|
||
>>> vds = open_virtual_mfdataset(results, access="indirect", load=True, concat_dim="time", coords='minimal', compat='override', combine_attrs="drop_conflicts") | ||
>>> vds | ||
<xarray.Dataset> Size: 143GB | ||
Dimensions: (time: 5, lat: 17999, lon: 36000) | ||
Coordinates: | ||
* lat (lat) float32 72kB -89.99 -89.98 -89.97 ... 89.98 89.99 | ||
* lon (lon) float32 144kB -180.0 -180.0 -180.0 ... 180.0 180.0 | ||
* time (time) datetime64[ns] 40B 2024-01-01T09:00:00 ... 2024-... | ||
Data variables: | ||
analysed_sst (time, lat, lon) float64 26GB dask.array<chunksize=(1, 3600, 7200), meta=np.ndarray> | ||
analysis_error (time, lat, lon) float64 26GB dask.array<chunksize=(1, 3600, 7200), meta=np.ndarray> | ||
dt_1km_data (time, lat, lon) timedelta64[ns] 26GB dask.array<chunksize=(1, 4500, 9000), meta=np.ndarray> | ||
mask (time, lat, lon) float32 13GB dask.array<chunksize=(1, 4500, 9000), meta=np.ndarray> | ||
sea_ice_fraction (time, lat, lon) float64 26GB dask.array<chunksize=(1, 4500, 9000), meta=np.ndarray> | ||
sst_anomaly (time, lat, lon) float64 26GB dask.array<chunksize=(1, 3600, 7200), meta=np.ndarray> | ||
Attributes: (12/42) | ||
Conventions: CF-1.7 | ||
title: Daily MUR SST, Final product | ||
""" | ||
import xarray as xr | ||
|
||
import virtualizarr as vz | ||
|
||
if access == "direct": | ||
fs = earthaccess.get_s3_filesystem(results=granules[0]) | ||
fs.storage_options["anon"] = False # type: ignore | ||
else: | ||
fs = earthaccess.get_fsspec_https_session() | ||
if parallel: | ||
import dask | ||
|
||
# wrap _open_virtual_dataset and preprocess with delayed | ||
open_ = dask.delayed(vz.open_virtual_dataset) # type: ignore | ||
if preprocess is not None: | ||
preprocess = dask.delayed(preprocess) # type: ignore | ||
else: | ||
open_ = vz.open_virtual_dataset | ||
vdatasets = [] | ||
# Get list of virtual datasets (or dask delayed objects) | ||
for g in granules: | ||
vdatasets.append( | ||
open_( | ||
filepath=g.data_links(access=access)[0] + ".dmrpp", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the linter tells me that |
||
filetype="dmrpp", # type: ignore | ||
group=group, | ||
indexes={}, | ||
reader_options={"storage_options": fs.storage_options}, # type: ignore | ||
) | ||
) | ||
if preprocess is not None: | ||
vdatasets = [preprocess(ds) for ds in vdatasets] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. amazing! |
||
if parallel: | ||
vdatasets = dask.compute(vdatasets)[0] # type: ignore | ||
if len(vdatasets) == 1: | ||
vds = vdatasets[0] | ||
else: | ||
vds = xr.combine_nested(vdatasets, **xr_combine_nested_kwargs) | ||
if load: | ||
refs = vds.virtualize.to_kerchunk(filepath=None, format="dict") | ||
return xr.open_dataset( | ||
"reference://", | ||
engine="zarr", | ||
chunks={}, | ||
backend_kwargs={ | ||
"consolidated": False, | ||
"storage_options": { | ||
"fo": refs, | ||
"remote_protocol": fs.protocol, | ||
"remote_options": fs.storage_options, # type: ignore | ||
}, | ||
}, | ||
) | ||
return vds | ||
|
||
|
||
def open_virtual_dataset( | ||
granule: earthaccess.DataGranule, | ||
group: str | None = None, | ||
access: str = "indirect", | ||
load: bool = False, | ||
) -> xr.Dataset: | ||
"""Open a granule as a single virtual xarray Dataset. | ||
|
||
Uses DMR++ metadata files to create a virtual xarray dataset with ManifestArrays. This virtual dataset can be used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. | ||
|
||
Parameters | ||
---------- | ||
granule : earthaccess.DataGranule | ||
The granule to open | ||
group : str or None (default=None) | ||
The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the root group will be opened. | ||
If the DMR++ file does not have groups, this parameter is ignored. | ||
access : str (default="indirect") | ||
The access method to use. One of "direct" or "indirect". Use direct when running on AWS, use indirect when running on a local machine. | ||
load: bool (default=False) | ||
Create an xarray dataset with indexes and lazy loaded data. | ||
|
||
When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when `load=True` all the data is now available to access but not loaded into memory. When `load=False` a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. | ||
|
||
|
||
Returns: | ||
---------- | ||
xr.Dataset | ||
|
||
Examples: | ||
---------- | ||
>>> results = earthaccess.search_data(count=2, temporal=("2023"), short_name="SWOT_L2_LR_SSH_Expert_2.0") | ||
>>> open_virtual_dataset(results[0], access="indirect", load=False) | ||
<xarray.Dataset> Size: 149MB | ||
Dimensions: (num_lines: 9866, num_pixels: 69, | ||
num_sides: 2) | ||
Coordinates: | ||
longitude (num_lines, num_pixels) int32 3MB ... | ||
latitude (num_lines, num_pixels) int32 3MB ... | ||
latitude_nadir (num_lines) int32 39kB ManifestArr... | ||
longitude_nadir (num_lines) int32 39kB ManifestArr... | ||
Dimensions without coordinates: num_lines, num_pixels, num_sides | ||
Data variables: (12/98) | ||
height_cor_xover_qual (num_lines, num_pixels) uint8 681kB ManifestArray<shape=(9866, 69), dtype=uint8, chunks=(9866, 69... | ||
""" | ||
return open_virtual_mfdataset( | ||
granules=[granule], | ||
group=group, | ||
access=access, | ||
load=load, | ||
parallel=False, | ||
preprocess=None, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,9 @@ kerchunk = [ | |
"h5netcdf", | ||
"xarray", | ||
] | ||
virtualizarr = [ | ||
"virtualizarr @ git+https://github.com/zarr-developers/VirtualiZarr.git" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this because we want to be up to date for now? are we targeting an upcoming release? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I released like 2 days ago, so you probably want to just pin to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great, will update that! |
||
] | ||
dev = [ | ||
"bump-my-version >=0.10.0", | ||
"nox", | ||
|
@@ -82,6 +85,7 @@ test = [ | |
"types-setuptools >=0.1", | ||
"vcrpy >=6.0.1", | ||
"earthaccess[kerchunk]", | ||
"earthaccess[virtualizarr]", | ||
] | ||
docs = [ | ||
"jupyterlab >=3", | ||
|
@@ -122,6 +126,9 @@ docs = [ | |
[tool.pytest] | ||
filterwarnings = ["error::UserWarning"] | ||
|
||
[tool.hatch.metadata] | ||
allow-direct-references = true | ||
|
||
[tool.mypy] | ||
python_version = "3.10" | ||
files = ["earthaccess", "tests"] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import logging | ||
import os | ||
import unittest | ||
|
||
import earthaccess | ||
import pytest | ||
|
||
logger = logging.getLogger(__name__) | ||
assertions = unittest.TestCase("__init__") | ||
|
||
assertions.assertTrue("EARTHDATA_USERNAME" in os.environ) | ||
assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ) | ||
|
||
logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}") | ||
logger.info(f"earthaccess version: {earthaccess.__version__}") | ||
|
||
|
||
@pytest.fixture( | ||
scope="module", params=["MUR25-JPL-L4-GLOB-v04.2"] | ||
) | ||
def granule(request): | ||
granules = earthaccess.search_data( | ||
count=1, temporal=("2024"), short_name=request.param | ||
) | ||
return granules[0] | ||
|
||
|
||
def test_dmrpp(granule): | ||
from virtualizarr import open_virtual_dataset | ||
|
||
fs = earthaccess.get_fsspec_https_session() | ||
data_path = granule.data_links(access="indirect")[0] | ||
dmrpp_path = data_path + ".dmrpp" | ||
|
||
result = open_virtual_dataset( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the test of correctness here is assuming that virtualizarr has correct behaviour? You might want to add more tests because virtualizarr definitely still has bugs in it (e.g. around CF coordinate decoding). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we are assuming that virtualizarr has the correct behavior. However this is mostly an integration test checking that the function operates as expected. The actual tests for correctness are in virtualizarr (for the dmrpp backend) |
||
dmrpp_path, | ||
filetype="dmrpp", # type: ignore | ||
indexes={}, | ||
reader_options={"storage_options": fs.storage_options}, # type: ignore | ||
) | ||
|
||
expected = open_virtual_dataset( | ||
data_path, | ||
indexes={}, | ||
reader_options={"storage_options": fs.storage_options}, # type: ignore | ||
) | ||
|
||
# TODO: replace with xr.testing when virtualizarr fill_val is fixed (https://github.com/zarr-developers/VirtualiZarr/issues/287) | ||
# and dmrpp deflateLevel (zlib compression level) is always present (https://github.com/OPENDAP/bes/issues/954) | ||
for var in result.variables: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the replacement for "almost equal"? |
||
assert var in expected.variables | ||
assert result[var].dims == expected[var].dims | ||
assert result[var].shape == expected[var].shape | ||
assert result[var].dtype == expected[var].dtype | ||
assert result[var].data.manifest == expected[var].data.manifest | ||
assert set(result.coords) == set(expected.coords) | ||
assert result.attrs == expected.attrs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add a
virtualizarr.open_virtual_mfdataset
upstream in virtualizarr. Though I would like to be more confident about the best way to parallelize reference generation firstzarr-developers/VirtualiZarr#123
zarr-developers/VirtualiZarr#7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracked in zarr-developers/VirtualiZarr#345