Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata consolidation utility #278

Merged
merged 13 commits into from
Dec 1, 2023
Merged

Conversation

jrbourbeau
Copy link
Collaborator

It can be prohibitively slow to read metadata for datasets from earthaccess for a few reasons (e.g. datasets are netcdf instead of zarr, https access is slow). @betolink has a nice description here https://discourse.pangeo.io/t/avoid-metadata-reads-when-loading-many-similar-netcdf-files/3594/5.

Kerchunk helps the situation by doing a single pass over the dataset and creating a single, consolidated metadata file that can be used to quickly access the metadata for the full dataset in subsequent reads. Additionally, the initial pass over the dataset metadata can be parallelized, which helps further speed things up.

This PR adds a new earthaccess.consolidate_metadata(...) utility for integrating earthaccess and kerchunk together to make it straightforwards for users to go through this metadata consolidation process for the datasets they're interested in.

Here's an example of what this looks like in practice:

import earthaccess
import xarray as xr
from dask.distributed import LocalCluster

if __name__ == "__main__":

    # Authenticate my machine with `earthaccess`
    earthaccess.login()

    # Retrieve data files for the dataset I'm interested in
    short_name = "SEA_SURFACE_HEIGHT_ALT_GRIDS_L4_2SATS_5DAY_6THDEG_V_JPL2205"
    granuales = earthaccess.search_data(
        short_name=short_name,
        cloud_hosted=True,
        temporal=("1990", "2019"),
        count=10,  # For demo purposes
    )

    # Create a local Dask cluster for parallel metadata consolidation
    # (but works with any Dask cluster)
    cluster = LocalCluster()
    client = cluster.get_client()

    # Save consolidated metdata file
    outfile = earthaccess.consolidate_metadata(
        granuales,
        outfile=f"./{short_name}-metadata.json",    # Writing to a local file for demo purposes
        # outfile=f"s3://my-bucket/{short_name}-metadata.json",   # We could also write to a remote file
        access="indirect",
        kerchunk_options={"concat_dims": "Time"}
    )
    print(f"Consolidated metadata written to {outfile}")

    # Load the dataset using the consolidated metadata file
    fs = earthaccess.get_fsspec_https_session()
    ds = xr.open_dataset(
        "reference://",
        engine="zarr",
        chunks={},
        backend_kwargs={
            "consolidated": False,
            "storage_options": {
                "fo": outfile,
                "remote_protocol": "https",
                "remote_options": fs.storage_options,
            }
        },
    )

    result = ds.SLA.mean({"Latitude", "Longitude"}).compute()
    print(f"{result = }")

cc @betolink for thoughts

@github-actions
Copy link

github-actions bot commented Aug 15, 2023

Binder 👈 Launch a binder notebook on this branch for commit 0a6c7fa

I will automatically update this comment whenever this PR is modified

Binder 👈 Launch a binder notebook on this branch for commit 50f0cb6

Binder 👈 Launch a binder notebook on this branch for commit 1fb9bd7

Binder 👈 Launch a binder notebook on this branch for commit a303705

Binder 👈 Launch a binder notebook on this branch for commit 61d256b

Binder 👈 Launch a binder notebook on this branch for commit a7760af

Binder 👈 Launch a binder notebook on this branch for commit 81b25a3

Binder 👈 Launch a binder notebook on this branch for commit 079f385

Binder 👈 Launch a binder notebook on this branch for commit cd260a5

Binder 👈 Launch a binder notebook on this branch for commit 980cc76

Binder 👈 Launch a binder notebook on this branch for commit cd62af2

pyproject.toml Outdated
@@ -37,6 +37,8 @@ s3fs = ">=2021.11, <2024"
fsspec = ">=2022.1"
tinynetrc = "^1.3.1"
multimethod = ">=1.8"
kerchunk = ">=0.1.2"
dask = {extras = ["complete"], version = ">=2023.5.0"}
Copy link

@MattF-NSIDC MattF-NSIDC Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of making these extra dependencies? I don't use poetry very much, maybe looks like this?

[tool.poetry.extras]
distributed = ["kerchunk >=0.1.2", "dask[complete] >=2023.5.0"]

We'll need to think a little bit about user messaging if they attempt to use consolidate_metadata but don't have them installed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, I'd be happy to move these to optional dependencies, only use dask-specific code if dask is installed, etc. I was just adding them as hard dependencies to start as I wasn't sure what you and @betolink would prefer.

Copy link

@MattF-NSIDC MattF-NSIDC Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thinking! @betolink is far more suited to speak to overall direction than me, so I'll quiet down and let him take it from here :)

Thanks for another wonderful contribution! ❤️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome @jrbourbeau , I think what @MattF-NSIDC and you are describing makes sense, optional dependencies and only use Dask specific code if Dask is installed (if this is not too much of a hassle)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW my general recommendation in cases like these is to depend on core dask (it's trivially lightweight) and then use dask.delayed and dask.compute. This will pick up a dask.distributed cluster if one is available, and if not use a local thread pool.

@jrbourbeau jrbourbeau changed the title Add metadata consolidation utility [WIP] Add metadata consolidation utility Aug 15, 2023
@jrbourbeau
Copy link
Collaborator Author

I should have mentioned originally that, while this works, I'm sure there are additional improvements we can iterate on (e.g. making kerchunk, dask, etc. optional dependencies). I'm also happy to make those changes, but was first wanting to get some feedback on if this sort of functionality is something you think fits into earthaccess generally

@MattF-NSIDC
Copy link

Hey James, looking for docs on backend_kwargs to the zarr engine to understand your example a bit better. Could you link to some reading material? Thanks!

@jrbourbeau
Copy link
Collaborator Author

Yeah, no problem. From https://docs.xarray.dev/en/stable/generated/xarray.open_dataset.html, backend_kwargs get forwarded to the open function for the corresponding backend engine. I didn't see that this was well documented in the Xarray docs, but here is the open method for the Zarr engine (see, for example, the consolidated= and storage_options= kwargs)

@jrbourbeau jrbourbeau changed the title [WIP] Add metadata consolidation utility Add metadata consolidation utility Aug 15, 2023
@MattF-NSIDC
Copy link

I didn't see that this was well documented in the Xarray docs

I certainly failed to find it on my own :) Thanks! I haven't made time yet to dig, but I'm specifically curious about storage_options.fo; everything else has a reasonably descriptive name!

@jrbourbeau
Copy link
Collaborator Author

Ah, I see. fo is getting passed to fsspecs ReferenceFileSystem implementation. That said it's not immediately clear to be what fo means. File object maybe? File to open? Not sure.

@betolink
Copy link
Member

Thanks for this PR @jrbourbeau, this is an incredible addition to earthaccess I'm looking forward to have it on main! I'll review the PR and test some of the notoriously slow datasets from different DAACs, we may even have some benchmarking to back things up on why kerchunk helps. Related thread -> #251 based on preliminary results from some work we started last week.

Regarding the dependencies, I like what @mrocklin said about having dask core as extras (same way you introduced kerchunk) and using distributed or LocalCluster upon availability.

@martindurant
Copy link

For those interested, this was given a demo at the monthly dask meeting, recording to come shortly. Thanks for putting it together, @jrbourbeau !

The main function seems to be a specialised version of https://github.com/fsspec/kerchunk/blob/main/kerchunk/combine.py#L649 , which also supports tree reduction of the single-file reference sets.

It would be nice, if there was an obvious way to point to an existing reference file, if someone else has already made it and stashed it somewhere. Of course, users can always save their own copy, but it would be one less barrier when available. Here is one dataset on the planetary computer which has a kerchunk reference file alongside the data: https://planetarycomputer.microsoft.com/dataset/deltares-water-availability

@jrbourbeau
Copy link
Collaborator Author

Earlier today @martindurant mentioned that kerchunk also supports in-memory representation of consolidated metadata. This is nice in that it makes consolidated metadata more accessible for users who, for example, may not have easy write access to cloud bucket. I've pushed a small commit to make outfile= optional and, when an outfile isn't specified, we use the in-memory representation.

It would be nice, if there was an obvious way to point to an existing reference file, if someone else has already made it and stashed it somewhere. Of course, users can always save their own copy, but it would be one less barrier when available. Here is one dataset on the planetary computer which has a kerchunk reference file alongside the data: https://planetarycomputer.microsoft.com/dataset/deltares-water-availability

Yeah, I agree this would be nice. I'm not sure how easy uploading to Earthdata is -- my guess is @betolink has the most context there

@martindurant

This comment was marked as duplicate.

@martindurant
Copy link

I'm not sure how easy uploading to Earthdata is

I don't know how practical it is, but of course the reference files can be at any other accessible location, so it's deciding that location and getting the metadata to users which is the hard part. In kerchunk workflows we typically use intake (of course) to hide away the many arguments it takes to get xarray to load such data.

@betolink
Copy link
Member

betolink commented Aug 23, 2023

@jrbourbeau I just started to test this and I wonder if we can include 2 features to the consolidate_metadata(...) method,

  • "Warning Message": kerchunking on non gridded data is supported but has issues, e.g. trying to consolidate on a time dimension for granules that don't align.
  • Progress bar: to give an idea of how long does it take to consolidate the metadata, I think Dask has a nice progress bar context? I'm not sure if kerchunk can report back on individual progress.

I started to consolidate big ATL03 files, the same we used at the IS2 hackweek and ran into:

distributed.nanny - WARNING - Worker process still alive after 3.1999 seconds, killing

These files are very complex, with nested groups and hundreds of variables. We managed to kerchunk them at the hackweek but each took like 20 minutes and for some reason we had to run kerchunk sequentially, when I used Dask it never finished. Here are the params I used for this test if that helps.

# we use the file pattern to grab the same orbit
track = "0811"
region = "12"
    
params = {
    "short_name": "ATL03",
    "version": "006",
    "cloud_hosted": True,
    "temporal": ("2018-11-01", "2023-08-01"),
    "granule_name": f"ATL*_{track}??{region}*.h5",
    "count": 4
}
# Retrieve data files for the dataset I'm interested in
granules = earthaccess.search_data(**params)
# each granule is ~5GB
outfile = earthaccess.consolidate_metadata(
    granules,
    outfile=f"./direct-{short_name}-metadata.json",    # Writing to a local file for demo purposes
    # outfile=f"s3://my-bucket/{short_name}-metadata.json",   # We could also write to a remote file
    access="direct",
    # kerchunk_options={} # not concatenating because this is swath ~LIDAR data
)

In the case of the SST dataset it worked like a charm! I can see some of these global gridded datasets being worked at scale with this approach, it's like magic!

@martindurant
Copy link

it's like magic!

Can we quote you? :)

assertions.assertTrue("EARTHDATA_USERNAME" in os.environ)
assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ)

logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! way better than just printing all these info!

@betolink
Copy link
Member

This is more into the Kerchunk side of things but now that we have @martindurant here...

Currently the SingleHdf5ToZarr(...) takes a whole file as input, I wonder, how complicated would it be specifying which groups we want to make a reference for? for example in ATL03 we have 6 nested groups, each representing one of the satellite lasers, if we go one level down, xarray can figure out the coordinates. Something like this

ref = SingleHdf5ToZarr(file, group='/gt1l/heights', ...) 

would potentially generate something that xarray can read more easily.

This is just me thinking out loud and a little related to what @jrbourbeau and @martindurant talked about on where to store the reference files:

Down the road it would be nice to have a document store serving all these sidecar files, so users won't have to generate the references each time they want to use a given dataset. Although it won't be totally serverless it would be faster, I wonder if these reference files like kerchunk or dmrpp could be incorporated into say the STAC spec at the asset level.. NASA already does it for dmrpp but not Kerchunk.

@martindurant
Copy link

I wonder, how complicated would it be specifying which groups we want to make a reference for?

I'm not certain how you can persuade the h5py tree visitor to only scan part of the file, but in SingleHdf5ToZarr._translator , we could certainly prevent storing keys outside of some given group or apply key renamings to make some group appear the root of the generated zarr.

However, there may be easier ways:

  • the _fss attribute of a MultiZarrToZarr instance holds filesystem instances (access .fss to generate it). Replacing the ReferenceFS there with one wrapped with a dirFS wrapping it would apply the right kind of remapping.
  • the reference system is pretty easy, so the reference sets of a single file could be reprocessed like
def keep_group(refs, group):
    return {k[len(group) + 1:]: v for k, v in refs.items() if k.startswith(group)}

(where group would be a "/" delimited string)

@lsterzinger
Copy link

@betolink pinged me on slack to take a look at this. Something I wanted to mention (for either this PR or future kerchunk integration) is that for larger datasets, doing a tree reduction from indivudual references to a combined reference can drastically improve performance. https://fsspec.github.io/kerchunk/advanced.html#tree-reduction. Kerchunk provides kerchunk.combine.auto_dask() as a convenience function that takes a list of data URLs and wraps dask to do the individual processing and a tree reduction to final combined json. Here's an example of doing this manually: https://gist.github.com/peterm790/5f901453ed7ac75ac28ed21a7138dcf8

@martindurant
Copy link

What is the status here?

@MattF-NSIDC
Copy link

Chatted with @betolink about where we're at, and he identified the following needs:

  • Pick 5 popular datasets where Kerchunk can make an impact and run this on those, benchmarking
  • Implement a progress bar with Dask diagnostics
  • Explore the tree reduction Lucas is talking about (are we implementing it?) https://gist.github.com/peterm790/5f901453ed7ac75ac28ed21a7138dcf8
  • warning users: using Kerchunk can lead to incomplete references as is not totally compatible with all the features HDF has.

Luis and I agree the work in this PR shouldn't depend on these, and that we should break these concerns off in to separate issues/discussions/PRs. @jrbourbeau what do you think?

OT: We want to avoid NSIDC being a blocker to this library moving forward as much as possible. We still don't have a very robust governance plan for this library, but it's something we're thinking about. I started a discussion thread: #321

@jrbourbeau
Copy link
Collaborator Author

What is the status here?

I've shifted attention away to other things for a bit, but this PR just needs a little cleanup and possibly offloading some logic to auto_dask for it to be ready for folks to start playing with it. @martindurant did a use case pop up where this would be useful? I don't think getting this into a mergable state will take much effort

@martindurant
Copy link

did a use case pop up where this would be useful?

Not from me, I'm not too familiar with earthaccess at all, just trying to help out

@martindurant
Copy link

I wonder, is there a foolproof way to know that a particular earthaccess granule is a netCDF4 type, as is assumed here, or some other particulartype? As far as I can see, the granule objects have a data_links() method, where we can analyse the URL name, but that's all I can see.

@betolink
Copy link
Member

@martindurant I think the URL will be the way as the metadata is sometimes incomplete. I know kerchunk has been improved since we last tested it with some very nested datasets, I don't think we need to cover all use cases to start experimenting with this.

@jrbourbeau is there anything we can do to help merge this one?

@mrocklin
Copy link
Contributor

mrocklin commented Nov 22, 2023 via email

betolink
betolink previously approved these changes Dec 1, 2023
Copy link
Member

@betolink betolink left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ready to be merged and tested, we would probably need an example notebook and a warning that Kerchunk is experimental etc etc but definitely looking forward to start the Kerchunking.



def consolidate_metadata(
granuales: list[earthaccess.results.DataGranule],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we try to kerchunk and consolidate non uniform datasets kerchunk will fail, if we try to kerchunk 1 granule kerchunk succeeds but MultiZarrToZarr requires a variable mapping, if we don't pass it fails, for one granule we should use kerchunk_options={"coo_map": {}} and then kerchunks stays happy.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that the same as not doing any combination at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess? or maybe we can just skip the MultiZarrToZarr when the input is only 1 file. I wanted to do a quick test on a granule from https://podaac.jpl.nasa.gov/MEaSUREs-MUR and ran into this issue.

@jrbourbeau
Copy link
Collaborator Author

Thanks for reviewing @betolink. Like I mentioned offline, I think there are a few additional things we can add here (e.g. auto_dask, handle the single granule case better, etc) but don't want those to block getting an initial implementation out that works decently.

@martindurant
Copy link

👍

@jrbourbeau jrbourbeau merged commit 2a36080 into nsidc:main Dec 1, 2023
7 checks passed
@jrbourbeau jrbourbeau deleted the kerchunk branch December 1, 2023 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants