-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Opening virtual datasets (dmr-adapter) #606
Changes from 1 commit
06592f1
c3eafb1
90f296a
dd006a3
b626a41
b09c3f0
ed4a98b
abe2c28
dde9c57
a9a9234
760f340
8fb0140
2c28278
c3e43ac
d3d6e7c
512e89c
67dbbe7
61afb95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
from __future__ import annotations | ||
|
||
import fsspec | ||
import xarray as xr | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. xarray is not one a core dependency, I think we need to add it as an optional dependency the same way the consolidate_metadata uses Dask (see the pyproject.yaml) and then the tests that are failing should pass! |
||
|
||
import earthaccess | ||
|
||
|
||
def _parse_dmr( | ||
fs: fsspec.AbstractFileSystem, | ||
data_path: str, | ||
dmr_path: str = None | ||
) -> xr.Dataset: | ||
""" | ||
Parse a granule's DMR++ file and return a virtual xarray dataset | ||
|
||
Parameters | ||
---------- | ||
granule : earthaccess.results.DataGranule | ||
The granule to parse | ||
fs : fsspec.AbstractFileSystem | ||
The file system to use to open the DMR++ | ||
|
||
Returns | ||
---------- | ||
xr.Dataset | ||
The virtual dataset (with virtualizarr ManifestArrays) | ||
|
||
Raises | ||
---------- | ||
Exception | ||
If the DMR++ file is not found or if there is an error parsing the DMR++ | ||
""" | ||
from virtualizarr.readers.dmrpp import DMRParser | ||
ayushnag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
dmr_path = data_path + ".dmrpp" if dmr_path is None else dmr_path | ||
with fs.open(dmr_path) as f: | ||
parser = DMRParser(f.read(), data_filepath=data_path) | ||
return parser.parse() | ||
|
||
|
||
def open_virtual_mfdataset( | ||
granules: list[earthaccess.results.DataGranule], | ||
access: str = "indirect", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great that you're explicitly exposing this, I think we are going to deprecate the "magic" of detecting the runtime environment |
||
preprocess: callable | None = None, | ||
parallel: bool = True, | ||
**xr_combine_nested_kwargs, | ||
) -> xr.Dataset: | ||
""" | ||
Open multiple granules as a single virtual xarray Dataset | ||
|
||
Parameters | ||
---------- | ||
granules : list[earthaccess.results.DataGranule] | ||
The granules to open | ||
access : str | ||
The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. | ||
xr_combine_nested_kwargs : dict | ||
Keyword arguments for xarray.combine_nested. | ||
See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html | ||
|
||
Returns | ||
---------- | ||
xr.Dataset | ||
The virtual dataset | ||
""" | ||
if access == "direct": | ||
fs = earthaccess.get_s3fs_session(results=granules) | ||
else: | ||
fs = earthaccess.get_fsspec_https_session() | ||
if parallel: | ||
# wrap _parse_dmr and preprocess with delayed | ||
import dask | ||
open_ = dask.delayed(_parse_dmr) | ||
if preprocess is not None: | ||
preprocess = dask.delayed(preprocess) | ||
else: | ||
open_ = _parse_dmr | ||
vdatasets = [open_(fs=fs, data_path=g.data_links(access=access)[0]) for g in granules] | ||
if preprocess is not None: | ||
vdatasets = [preprocess(ds) for ds in vdatasets] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. amazing! |
||
if parallel: | ||
vdatasets = dask.compute(vdatasets)[0] | ||
if len(vdatasets) == 1: | ||
vds = vdatasets[0] | ||
else: | ||
vds = xr.combine_nested(vdatasets, **xr_combine_nested_kwargs) | ||
return vds | ||
|
||
|
||
def open_virtual_dataset( | ||
granule: earthaccess.results.DataGranule, access: str = "indirect" | ||
) -> xr.Dataset: | ||
""" | ||
Open a granule as a single virtual xarray Dataset | ||
|
||
Parameters | ||
---------- | ||
granule : earthaccess.results.DataGranule | ||
The granule to open | ||
access : str | ||
The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. | ||
|
||
Returns | ||
---------- | ||
xr.Dataset | ||
The virtual dataset | ||
""" | ||
return open_virtual_mfdataset( | ||
granules=[granule], access=access, parallel=False, preprocess=None | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import logging | ||
import os | ||
import unittest | ||
|
||
import earthaccess | ||
import pytest | ||
|
||
pytest.importorskip("virtualizarr") | ||
pytest.importorskip("dask") | ||
|
||
logger = logging.getLogger(__name__) | ||
assertions = unittest.TestCase("__init__") | ||
|
||
assertions.assertTrue("EARTHDATA_USERNAME" in os.environ) | ||
assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ) | ||
|
||
logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}") | ||
logger.info(f"earthaccess version: {earthaccess.__version__}") | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def granules(): | ||
granules = earthaccess.search_data( | ||
count=2, | ||
short_name="MUR-JPL-L4-GLOB-v4.1", | ||
cloud_hosted=True | ||
) | ||
return granules | ||
|
||
|
||
@pytest.mark.parametrize("output", "memory") | ||
def test_open_virtual_mfdataset(tmp_path, granules, output): | ||
xr = pytest.importorskip("xarray") | ||
# Open directly with `earthaccess.open` | ||
expected = xr.open_mfdataset(earthaccess.open(granules), concat_dim="time", combine="nested", combine_attrs="drop_conflicts") | ||
|
||
result = earthaccess.open_virtual_mfdataset(granules=granules, access="indirect", concat_dime="time", parallel=True, preprocess=None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we have Dask in the test dependencies? |
||
# dimensions | ||
assert result.sizes == expected.sizes | ||
# variable names, variable dimensions | ||
assert result.variables.keys() == expected.variables.keys() | ||
# attributes | ||
assert result.attrs == expected.attrs | ||
# coordinates | ||
assert result.coords.keys() == expected.coords.keys() | ||
# chunks | ||
assert result.chunks == expected.chunks | ||
# encoding | ||
assert result.encoding == expected.encoding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add a
virtualizarr.open_virtual_mfdataset
upstream in virtualizarr. Though I would like to be more confident about the best way to parallelize reference generation firstzarr-developers/VirtualiZarr#123
zarr-developers/VirtualiZarr#7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracked in zarr-developers/VirtualiZarr#345