Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opening virtual datasets (dmr-adapter) #606

Merged
merged 18 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions earthaccess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .services import DataServices
from .store import Store
from .system import PROD, UAT
from .virtualizarr import open_virtual_dataset, open_virtual_mfdataset

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,6 +59,9 @@
"Store",
# kerchunk
"consolidate_metadata",
# virtualizarr
"open_virtual_dataset",
"open_virtual_mfdataset",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a virtualizarr.open_virtual_mfdataset upstream in virtualizarr. Though I would like to be more confident about the best way to parallelize reference generation first

zarr-developers/VirtualiZarr#123

zarr-developers/VirtualiZarr#7

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PROD",
"UAT",
]
Expand Down
198 changes: 198 additions & 0 deletions earthaccess/virtualizarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import earthaccess

if TYPE_CHECKING:
import xarray as xr


def open_virtual_mfdataset(
granules: list[earthaccess.DataGranule],
group: str | None = None,
access: str = "indirect",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great that you're explicitly exposing this, I think we are going to deprecate the "magic" of detecting the runtime environment in-region vs out-of-region

load: bool = False,
preprocess: callable | None = None, # type: ignore
parallel: bool = True,
**xr_combine_nested_kwargs,
) -> xr.Dataset:
"""Open multiple granules as a single virtual xarray Dataset.

Uses DMR++ metadata files to create a virtual xarray dataset with ManifestArrays. This virtual dataset can be used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets.

Parameters
----------
granules : list[earthaccess.DataGranule]
The granules to open
group : str or None (default=None)
The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the root group will be opened.
If the DMR++ file does not have groups, this parameter is ignored.
access : str (default="indirect")
The access method to use. One of "direct" or "indirect". Use direct when running on AWS, use indirect when running on a local machine.
load : bool (default=False)
Create an xarray dataset with indexes and lazy loaded data.

When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when `load=True` all the data is now available to access but not loaded into memory. When `load=False` a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets.
preprocess : callable (default=None)
A function to apply to each virtual dataset before combining
parallel : bool (default=True)
Open the virtual datasets in parallel (using dask.delayed)
xr_combine_nested_kwargs : dict
Keyword arguments for xarray.combine_nested.
See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html

Returns:
----------
xr.Dataset

Examplea:
----------
>>> results = earthaccess.search_data(count=5, temporal=("2024"), short_name="MUR-JPL-L4-GLOB-v4.1")
>>> vds = open_virtual_mfdataset(results, access="indirect", load=False, concat_dim="time", coords='minimal', compat='override', combine_attrs="drop_conflicts")
>>> vds
<xarray.Dataset> Size: 29GB
Dimensions: (time: 5, lat: 17999, lon: 36000)
Coordinates:
time (time) int32 20B ManifestArray<shape=(5,), dtype=int32,...
lat (lat) float32 72kB ManifestArray<shape=(17999,), dtype=...
lon (lon) float32 144kB ManifestArray<shape=(36000,), dtype...
Data variables:
mask (time, lat, lon) int8 3GB ManifestArray<shape=(5, 17999...
sea_ice_fraction (time, lat, lon) int8 3GB ManifestArray<shape=(5, 17999...
dt_1km_data (time, lat, lon) int8 3GB ManifestArray<shape=(5, 17999...
analysed_sst (time, lat, lon) int16 6GB ManifestArray<shape=(5, 1799...
analysis_error (time, lat, lon) int16 6GB ManifestArray<shape=(5, 1799...
sst_anomaly (time, lat, lon) int16 6GB ManifestArray<shape=(5, 1799...
Attributes: (12/42)
Conventions: CF-1.7
title: Daily MUR SST, Final product

>>> vds = open_virtual_mfdataset(results, access="indirect", load=True, concat_dim="time", coords='minimal', compat='override', combine_attrs="drop_conflicts")
>>> vds
<xarray.Dataset> Size: 143GB
Dimensions: (time: 5, lat: 17999, lon: 36000)
Coordinates:
* lat (lat) float32 72kB -89.99 -89.98 -89.97 ... 89.98 89.99
* lon (lon) float32 144kB -180.0 -180.0 -180.0 ... 180.0 180.0
* time (time) datetime64[ns] 40B 2024-01-01T09:00:00 ... 2024-...
Data variables:
analysed_sst (time, lat, lon) float64 26GB dask.array<chunksize=(1, 3600, 7200), meta=np.ndarray>
analysis_error (time, lat, lon) float64 26GB dask.array<chunksize=(1, 3600, 7200), meta=np.ndarray>
dt_1km_data (time, lat, lon) timedelta64[ns] 26GB dask.array<chunksize=(1, 4500, 9000), meta=np.ndarray>
mask (time, lat, lon) float32 13GB dask.array<chunksize=(1, 4500, 9000), meta=np.ndarray>
sea_ice_fraction (time, lat, lon) float64 26GB dask.array<chunksize=(1, 4500, 9000), meta=np.ndarray>
sst_anomaly (time, lat, lon) float64 26GB dask.array<chunksize=(1, 3600, 7200), meta=np.ndarray>
Attributes: (12/42)
Conventions: CF-1.7
title: Daily MUR SST, Final product
"""
import xarray as xr

import virtualizarr as vz

if access == "direct":
fs = earthaccess.get_s3_filesystem(results=granules[0])
fs.storage_options["anon"] = False # type: ignore
else:
fs = earthaccess.get_fsspec_https_session()
if parallel:
import dask

# wrap _open_virtual_dataset and preprocess with delayed
open_ = dask.delayed(vz.open_virtual_dataset) # type: ignore
if preprocess is not None:
preprocess = dask.delayed(preprocess) # type: ignore
else:
open_ = vz.open_virtual_dataset
vdatasets = []
# Get list of virtual datasets (or dask delayed objects)
for g in granules:
vdatasets.append(
open_(
filepath=g.data_links(access=access)[0] + ".dmrpp",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the linter tells me that filepath is not a thing, same with indexes as this uses open_virtual_dataset() from down below in line 146. Maybe we need to refactor it?

filetype="dmrpp", # type: ignore
group=group,
indexes={},
reader_options={"storage_options": fs.storage_options}, # type: ignore
)
)
if preprocess is not None:
vdatasets = [preprocess(ds) for ds in vdatasets]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazing!

if parallel:
vdatasets = dask.compute(vdatasets)[0] # type: ignore
if len(vdatasets) == 1:
vds = vdatasets[0]
else:
vds = xr.combine_nested(vdatasets, **xr_combine_nested_kwargs)
if load:
refs = vds.virtualize.to_kerchunk(filepath=None, format="dict")
return xr.open_dataset(
"reference://",
engine="zarr",
chunks={},
backend_kwargs={
"consolidated": False,
"storage_options": {
"fo": refs,
"remote_protocol": fs.protocol,
"remote_options": fs.storage_options, # type: ignore
},
},
)
return vds


def open_virtual_dataset(
granule: earthaccess.DataGranule,
group: str | None = None,
access: str = "indirect",
load: bool = False,
) -> xr.Dataset:
"""Open a granule as a single virtual xarray Dataset.

Uses DMR++ metadata files to create a virtual xarray dataset with ManifestArrays. This virtual dataset can be used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets.

Parameters
----------
granule : earthaccess.DataGranule
The granule to open
group : str or None (default=None)
The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the root group will be opened.
If the DMR++ file does not have groups, this parameter is ignored.
access : str (default="indirect")
The access method to use. One of "direct" or "indirect". Use direct when running on AWS, use indirect when running on a local machine.
load: bool (default=False)
Create an xarray dataset with indexes and lazy loaded data.

When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when `load=True` all the data is now available to access but not loaded into memory. When `load=False` a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets.


Returns:
----------
xr.Dataset

Examples:
----------
>>> results = earthaccess.search_data(count=2, temporal=("2023"), short_name="SWOT_L2_LR_SSH_Expert_2.0")
>>> open_virtual_dataset(results[0], access="indirect", load=False)
<xarray.Dataset> Size: 149MB
Dimensions: (num_lines: 9866, num_pixels: 69,
num_sides: 2)
Coordinates:
longitude (num_lines, num_pixels) int32 3MB ...
latitude (num_lines, num_pixels) int32 3MB ...
latitude_nadir (num_lines) int32 39kB ManifestArr...
longitude_nadir (num_lines) int32 39kB ManifestArr...
Dimensions without coordinates: num_lines, num_pixels, num_sides
Data variables: (12/98)
height_cor_xover_qual (num_lines, num_pixels) uint8 681kB ManifestArray<shape=(9866, 69), dtype=uint8, chunks=(9866, 69...
"""
return open_virtual_mfdataset(
granules=[granule],
group=group,
access=access,
load=load,
parallel=False,
preprocess=None,
)
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ kerchunk = [
"h5netcdf",
"xarray",
]
virtualizarr = [
"virtualizarr @ git+https://github.com/zarr-developers/VirtualiZarr.git"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because we want to be up to date for now? are we targeting an upcoming release?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I released like 2 days ago, so you probably want to just pin to >=1.2.0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, will update that!

]
dev = [
"bump-my-version >=0.10.0",
"nox",
Expand All @@ -82,6 +85,7 @@ test = [
"types-setuptools >=0.1",
"vcrpy >=6.0.1",
"earthaccess[kerchunk]",
"earthaccess[virtualizarr]",
]
docs = [
"jupyterlab >=3",
Expand Down Expand Up @@ -122,6 +126,9 @@ docs = [
[tool.pytest]
filterwarnings = ["error::UserWarning"]

[tool.hatch.metadata]
allow-direct-references = true

[tool.mypy]
python_version = "3.10"
files = ["earthaccess", "tests"]
Expand Down
57 changes: 57 additions & 0 deletions tests/integration/test_virtualizarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import os
import unittest

import earthaccess
import pytest

logger = logging.getLogger(__name__)
assertions = unittest.TestCase("__init__")

assertions.assertTrue("EARTHDATA_USERNAME" in os.environ)
assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ)

logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}")
logger.info(f"earthaccess version: {earthaccess.__version__}")


@pytest.fixture(
scope="module", params=["MUR25-JPL-L4-GLOB-v04.2"]
)
def granule(request):
granules = earthaccess.search_data(
count=1, temporal=("2024"), short_name=request.param
)
return granules[0]


def test_dmrpp(granule):
from virtualizarr import open_virtual_dataset

fs = earthaccess.get_fsspec_https_session()
data_path = granule.data_links(access="indirect")[0]
dmrpp_path = data_path + ".dmrpp"

result = open_virtual_dataset(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the test of correctness here is assuming that virtualizarr has correct behaviour? You might want to add more tests because virtualizarr definitely still has bugs in it (e.g. around CF coordinate decoding).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are assuming that virtualizarr has the correct behavior. However this is mostly an integration test checking that the function operates as expected. The actual tests for correctness are in virtualizarr (for the dmrpp backend)

dmrpp_path,
filetype="dmrpp", # type: ignore
indexes={},
reader_options={"storage_options": fs.storage_options}, # type: ignore
)

expected = open_virtual_dataset(
data_path,
indexes={},
reader_options={"storage_options": fs.storage_options}, # type: ignore
)

# TODO: replace with xr.testing when virtualizarr fill_val is fixed (https://github.com/zarr-developers/VirtualiZarr/issues/287)
# and dmrpp deflateLevel (zlib compression level) is always present (https://github.com/OPENDAP/bes/issues/954)
for var in result.variables:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the replacement for "almost equal"?

assert var in expected.variables
assert result[var].dims == expected[var].dims
assert result[var].shape == expected[var].shape
assert result[var].dtype == expected[var].dtype
assert result[var].data.manifest == expected[var].data.manifest
assert set(result.coords) == set(expected.coords)
assert result.attrs == expected.attrs
Loading