Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata consolidation utility #278

Merged
merged 13 commits into from
Dec 1, 2023
1 change: 1 addition & 0 deletions earthaccess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .auth import Auth
from .search import DataCollections, DataGranules
from .store import Store
from .kerchunk import consolidate_metadata

__all__ = [
"login",
Expand Down
65 changes: 65 additions & 0 deletions earthaccess/kerchunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

from dask.base import flatten
from dask.distributed import default_client, progress, Client, Worker, WorkerPlugin
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr

import earthaccess
from .auth import Auth


def get_chunk_metadata(
granuale: earthaccess.results.DataGranule, access: str
) -> list[dict]:
if access == "direct":
fs_data = earthaccess.get_s3fs_session(provider=granuale["meta"]["provider-id"])
else:
fs_data = earthaccess.get_fsspec_https_session()

metadata = []
for url in granuale.data_links(access=access):
with fs_data.open(url) as inf:
h5chunks = SingleHdf5ToZarr(inf, url)
m = h5chunks.translate()
metadata.append(m)
return metadata


class EarthAccessAuth(WorkerPlugin):
name = "earthaccess-auth"

def __init__(self, auth: Auth):
self.auth = auth

def setup(self, worker: Worker) -> None:
if not earthaccess.__auth__.authenticated:
earthaccess.__auth__ = self.auth
earthaccess.login()


def consolidate_metadata(
granuales: list[earthaccess.results.DataGranule],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we try to kerchunk and consolidate non uniform datasets kerchunk will fail, if we try to kerchunk 1 granule kerchunk succeeds but MultiZarrToZarr requires a variable mapping, if we don't pass it fails, for one granule we should use kerchunk_options={"coo_map": {}} and then kerchunks stays happy.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that the same as not doing any combination at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess? or maybe we can just skip the MultiZarrToZarr when the input is only 1 file. I wanted to do a quick test on a granule from https://podaac.jpl.nasa.gov/MEaSUREs-MUR and ran into this issue.

outfile: str,
storage_options: dict | None = None,
kerchunk_options: dict | None = None,
access: str = "direct",
client: Client | None = None,
) -> str:
if client is None:
client = default_client()

# Make sure cluster is authenticated
client.register_worker_plugin(EarthAccessAuth(earthaccess.__auth__))

# Write out metadata file for each granuale
futures = client.map(get_chunk_metadata, granuales, access=access)
progress(futures)
chunks = client.gather(futures)
chunks = list(flatten(chunks))

# Write combined metadata file
mzz = MultiZarrToZarr(chunks, **kerchunk_options)
mzz.translate(outfile, storage_options=storage_options or {})

return outfile
Loading