Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Failing Test Case #676

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pangeo_forge_recipes/injections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ def get_injection_specs():
"WriteReference": {
"target_root": "TARGET_STORAGE",
},
"ConsolidateMetadata": {
"target_root": "TARGET_STORAGE",
},
"WriteCombinedReference": {
"target_root": "TARGET_STORAGE",
},
Expand Down
12 changes: 10 additions & 2 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,18 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
@dataclass
class ConsolidateMetadata(beam.PTransform):
"""Calls Zarr Python consolidate_metadata on an existing Zarr store or Kerchunk reference
(https://zarr.readthedocs.io/en/stable/_modules/zarr/convenience.html#consolidate_metadata)"""
(https://zarr.readthedocs.io/en/stable/_modules/zarr/convenience.html#consolidate_metadata)

:param target_root: Root path the Zarr store will be created inside;
`store_name` will be appended to this prefix to create a full path.
"""

target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
default_factory=RequiredAtRuntimeDefault
)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(consolidate_metadata)
return pcoll | beam.Map(consolidate_metadata, fsspec_kwargs=self.target_root.fsspec_kwargs)


@dataclass
Expand Down
6 changes: 4 additions & 2 deletions pangeo_forge_recipes/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ def _is_first_in_merge_dim(index):
return True


def consolidate_metadata(store: MutableMapping) -> MutableMapping:
def consolidate_metadata(store: MutableMapping, fsspec_kwargs: dict) -> zarr.hierarchy.Group:
"""Consolidate metadata for a Zarr store or Kerchunk reference

:param store: Input Store for Zarr or Kerchunk reference
:type store: MutableMapping
:param fsspec_kwargs: all optional fsspec kwargs
:type fsspec_kwargs: dict
:return: Output Store
:rtype: MutableMapping
"""
Expand All @@ -80,7 +82,7 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping:

if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem):
ref_path = store.fs.storage_args[0]
path = fsspec.get_mapper("reference://", fo=ref_path)
path = fsspec.get_mapper("reference://", fo=ref_path, **fsspec_kwargs)
if isinstance(store, zarr.storage.FSStore):
path = store.path

Expand Down
32 changes: 30 additions & 2 deletions tests/test_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ def test_reference_netcdf(
netcdf_local_file_pattern_sequential,
pipeline,
tmp_target,
# why are we not using tmp_target?
output_file_name,
):
pattern = netcdf_local_file_pattern_sequential
Expand All @@ -203,6 +202,35 @@ def test_reference_netcdf(
)

full_path = os.path.join(tmp_target.root_path, store_name, output_file_name)

mapper = fsspec.get_mapper("reference://", fo=full_path)
assert zarr.open_consolidated(mapper)


@pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"])
def test_reference_netcdf_with_xarray_read(
netcdf_local_file_pattern_sequential,
pipeline,
tmp_target,
output_file_name,
):
pattern = netcdf_local_file_pattern_sequential
store_name = "daily-xarray-dataset"
with pipeline as p:
(
p
| beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| WriteCombinedReference(
identical_dims=["lat", "lon"],
target_root=tmp_target,
store_name=store_name,
concat_dims=["time"],
output_file_name=output_file_name,
)
| ConsolidateMetadata()
)

full_path = os.path.join(tmp_target.root_path, store_name, output_file_name)
mapper = fsspec.get_mapper("reference://", fo=full_path)
with pytest.raises(KeyError):
xr.open_dataset(mapper, engine="zarr", consolidated=True)
Loading