Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opening virtual datasets (dmr-adapter) #606

Merged
merged 18 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions earthaccess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .search import DataCollections, DataGranules
from .store import Store
from .system import PROD, UAT
from .virtualizarr import open_virtual_dataset, open_virtual_mfdataset

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,6 +50,9 @@
"Store",
# kerchunk
"consolidate_metadata",
# virtualizarr
"open_virtual_dataset",
"open_virtual_mfdataset",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a virtualizarr.open_virtual_mfdataset upstream in virtualizarr. Though I would like to be more confident about the best way to parallelize reference generation first

zarr-developers/VirtualiZarr#123

zarr-developers/VirtualiZarr#7

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PROD",
"UAT",
]
Expand Down
112 changes: 112 additions & 0 deletions earthaccess/virtualizarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

import fsspec
import xarray as xr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xarray is not one a core dependency, I think we need to add it as an optional dependency the same way the consolidate_metadata uses Dask (see the pyproject.yaml) and then the tests that are failing should pass!


import earthaccess


def _parse_dmr(
fs: fsspec.AbstractFileSystem,
data_path: str,
dmr_path: str = None
) -> xr.Dataset:
"""
Parse a granule's DMR++ file and return a virtual xarray dataset

Parameters
----------
granule : earthaccess.results.DataGranule
The granule to parse
fs : fsspec.AbstractFileSystem
The file system to use to open the DMR++

Returns
----------
xr.Dataset
The virtual dataset (with virtualizarr ManifestArrays)

Raises
----------
Exception
If the DMR++ file is not found or if there is an error parsing the DMR++
"""
from virtualizarr.readers.dmrpp import DMRParser
ayushnag marked this conversation as resolved.
Show resolved Hide resolved

dmr_path = data_path + ".dmrpp" if dmr_path is None else dmr_path
with fs.open(dmr_path) as f:
parser = DMRParser(f.read(), data_filepath=data_path)
return parser.parse()


def open_virtual_mfdataset(
granules: list[earthaccess.results.DataGranule],
access: str = "indirect",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great that you're explicitly exposing this, I think we are going to deprecate the "magic" of detecting the runtime environment in-region vs out-of-region

preprocess: callable | None = None,
parallel: bool = True,
**xr_combine_nested_kwargs,
) -> xr.Dataset:
"""
Open multiple granules as a single virtual xarray Dataset

Parameters
----------
granules : list[earthaccess.results.DataGranule]
The granules to open
access : str
The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access.
xr_combine_nested_kwargs : dict
Keyword arguments for xarray.combine_nested.
See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html

Returns
----------
xr.Dataset
The virtual dataset
"""
if access == "direct":
fs = earthaccess.get_s3fs_session(results=granules)
else:
fs = earthaccess.get_fsspec_https_session()
if parallel:
# wrap _parse_dmr and preprocess with delayed
import dask
open_ = dask.delayed(_parse_dmr)
if preprocess is not None:
preprocess = dask.delayed(preprocess)
else:
open_ = _parse_dmr
vdatasets = [open_(fs=fs, data_path=g.data_links(access=access)[0]) for g in granules]
if preprocess is not None:
vdatasets = [preprocess(ds) for ds in vdatasets]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazing!

if parallel:
vdatasets = dask.compute(vdatasets)[0]
if len(vdatasets) == 1:
vds = vdatasets[0]
else:
vds = xr.combine_nested(vdatasets, **xr_combine_nested_kwargs)
return vds


def open_virtual_dataset(
granule: earthaccess.results.DataGranule, access: str = "indirect"
) -> xr.Dataset:
"""
Open a granule as a single virtual xarray Dataset

Parameters
----------
granule : earthaccess.results.DataGranule
The granule to open
access : str
The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access.

Returns
----------
xr.Dataset
The virtual dataset
"""
return open_virtual_mfdataset(
granules=[granule], access=access, parallel=False, preprocess=None
)

49 changes: 49 additions & 0 deletions tests/integration/test_virtualizarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import os
import unittest

import earthaccess
import pytest

pytest.importorskip("virtualizarr")
pytest.importorskip("dask")

logger = logging.getLogger(__name__)
assertions = unittest.TestCase("__init__")

assertions.assertTrue("EARTHDATA_USERNAME" in os.environ)
assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ)

logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}")
logger.info(f"earthaccess version: {earthaccess.__version__}")


@pytest.fixture(scope="module")
def granules():
granules = earthaccess.search_data(
count=2,
short_name="MUR-JPL-L4-GLOB-v4.1",
cloud_hosted=True
)
return granules


@pytest.mark.parametrize("output", "memory")
def test_open_virtual_mfdataset(tmp_path, granules, output):
xr = pytest.importorskip("xarray")
# Open directly with `earthaccess.open`
expected = xr.open_mfdataset(earthaccess.open(granules), concat_dim="time", combine="nested", combine_attrs="drop_conflicts")

result = earthaccess.open_virtual_mfdataset(granules=granules, access="indirect", concat_dime="time", parallel=True, preprocess=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have Dask in the test dependencies?

# dimensions
assert result.sizes == expected.sizes
# variable names, variable dimensions
assert result.variables.keys() == expected.variables.keys()
# attributes
assert result.attrs == expected.attrs
# coordinates
assert result.coords.keys() == expected.coords.keys()
# chunks
assert result.chunks == expected.chunks
# encoding
assert result.encoding == expected.encoding
Loading