Skip to content

Commit

Permalink
Support in-memory consolidated metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau committed Aug 17, 2023
1 parent 079f385 commit cd260a5
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 19 deletions.
20 changes: 11 additions & 9 deletions earthaccess/kerchunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ def _get_chunk_metadata(

def consolidate_metadata(
granuales: list[earthaccess.results.DataGranule],
outfile: str,
storage_options: dict | None = None,
kerchunk_options: dict | None = None,
access: str = "direct",
) -> str:
outfile: str | None = None,
storage_options: dict | None = None,
) -> str | dict:
try:
import dask
from kerchunk.combine import MultiZarrToZarr
Expand All @@ -41,14 +41,16 @@ def consolidate_metadata(
else:
fs = earthaccess.get_fsspec_https_session()

# Write out metadata file for each granuale
# Get metadata for each granuale
get_chunk_metadata = dask.delayed(_get_chunk_metadata)
chunks = dask.compute(*[get_chunk_metadata(g, fs) for g in granuales])
chunks = sum(chunks, start=[])

# Write combined metadata file
# Get combined metadata object
mzz = MultiZarrToZarr(chunks, **(kerchunk_options or {}))
outfile = fsspec.utils.stringify_path(outfile)
mzz.translate(outfile, storage_options=storage_options or {})

return outfile
if outfile is not None:
output = fsspec.utils.stringify_path(outfile)
mzz.translate(outfile, storage_options=storage_options or {})
return output
else:
return mzz.translate()
30 changes: 20 additions & 10 deletions tests/integration/test_kerchunk.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import logging
import os
import unittest

import earthaccess
import pytest
from fsspec.core import strip_protocol

pytest.importorskip("kerchunk")
kerchunk = pytest.importorskip("kerchunk")
pytest.importorskip("dask")

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,17 +41,29 @@ def test_consolidate_metadata_outfile(tmp_path, granuales, protocol):
assert result == outfile


def test_consolidate_metadata(tmp_path, granuales):
def test_consolidate_metadata_memory(tmp_path, granuales):
result = earthaccess.consolidate_metadata(
granuales,
access="indirect",
kerchunk_options={"concat_dims": "Time"},
)
assert isinstance(result, dict)
assert "refs" in result


@pytest.mark.parametrize("output", ["file", "memory"])
def test_consolidate_metadata(tmp_path, granuales, output):
xr = pytest.importorskip("xarray")
# Open directly with `earthaccess.open`
expected = xr.open_mfdataset(earthaccess.open(granuales))

# Open with kerchunk consolidated metadata file
metadata_file = earthaccess.consolidate_metadata(
granuales,
outfile=tmp_path / "metadata.json",
access="indirect",
kerchunk_options={"concat_dims": "Time"},
if output == "file":
kwargs = {"outfile": tmp_path / "metadata.json"}
else:
kwargs = {}
metadata = earthaccess.consolidate_metadata(
granuales, access="indirect", kerchunk_options={"concat_dims": "Time"}, **kwargs
)

fs = earthaccess.get_fsspec_https_session()
Expand All @@ -64,7 +74,7 @@ def test_consolidate_metadata(tmp_path, granuales):
backend_kwargs={
"consolidated": False,
"storage_options": {
"fo": metadata_file,
"fo": metadata,
"remote_protocol": "https",
"remote_options": fs.storage_options,
},
Expand Down

0 comments on commit cd260a5

Please sign in to comment.