-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MultiZarrToZarr combination associativity; issues in map/reduce workflow #416
Comments
Looking through some of the other issues, I noticed this, which seems likely to be relevant: #377 I'm also seeing that people have had success in exactly the kind of fan out and reduce strategy (seeing it referred to as a 'tree' combine), so perhaps others here will have wisdom to share about things to look out for? |
I think there is an assumption that the concatenated chunks at least form a continuous block in coordinates, so tree reductions might only work when you do pairs/groups of neighbouring inputs. It should be noted that most (all?) of the big combines mentioned are concatenating over exactly one dimension, so that could be a thing. I can't tell if that's the case for you, but if not, you could do the tree by first concat one one dimension, then another. |
I'll see if I can pin down the exact combination of factors that give rise to the issue. So far, the small attempts I've constructed locally via multiprocessing have all worked perfectly which is good but also kind of annoying in terms of pinning down what's going on |
This would be yet another reason for kerchunk to expose an array-like API with a |
Perhaps you have some ideas about what that would look like? I saw the issue mentioning disambiguation of 'merge' semantics, but with a bit more direction maybe that would be a suitable community submission |
Yes, I think it might be time to flesh out how this looks. The "divide MultiZarrToZarr" and "make an array-compliant API" are sort of the same thing, but all of the cases and logic currently in MultZarrToZarr would still have to find a place in there somewhere. Perhaps ensuring associativity is easier when it only happens during the one type of combine operation, but still it would need to be done, I don't think it would be automatic. |
Alright, so I've reproduced the issue here should anyone be interested in comparing notes. #!/usr/bin/env python
import json
import multiprocessing
from typing import Sequence
from kerchunk.combine import MultiZarrToZarr
CONCAT_DIMS = ['time']
IDENTICAL_DIMS = ['lat', 'lon']
def load_refs(ldjson_file: str) -> list[dict]:
refs = []
with open(ldjson_file, "r") as f:
for line in f:
refs.append(json.loads(line)[0])
return refs
def mzz(refs):
return MultiZarrToZarr(
refs,
concat_dims=CONCAT_DIMS,
identical_dims=IDENTICAL_DIMS,
target_options={"anon": True},
remote_options={"anon": True},
remote_protocol=None
)
def merge_refs(refs: list[dict]) -> dict:
return mzz(refs).translate()
# Distributed workflow
def worker_func(refs: list[dict]) -> MultiZarrToZarr:
def create_accumulator():
return None
def add_input(accumulator: MultiZarrToZarr, item: dict) -> MultiZarrToZarr:
if not accumulator:
references = [item]
else:
references = [accumulator.translate(), item]
return mzz(references)
acc = create_accumulator()
for ref in refs:
acc = add_input(acc, ref)
return acc
def distributed_merge(refs: list[list[dict]]) -> dict:
def merge_accumulators(accumulators: Sequence[MultiZarrToZarr]) -> MultiZarrToZarr:
references = [a.translate() for a in accumulators]
return mzz(references)
def extract_output(accumulator: MultiZarrToZarr) -> dict:
return accumulator.translate(),
with multiprocessing.Pool(4) as p:
accumulators: list[MultiZarrToZarr] = p.map(worker_func, refs)
merged = merge_accumulators(accumulators)
return extract_output(merged)
def compare_merge_size(single_dict, multi_dict):
single_json = json.dumps(single_dict)
multi_json = json.dumps(multi_dict)
single_bytes = len(single_json.encode("utf-8"))
multi_bytes = len(multi_json.encode("utf-8"))
with open("single_test.json", "w") as f:
f.write(single_json)
with open("multi_test.json", "w") as f:
f.write(multi_json)
print(f"The single process dict is {single_bytes}")
print(f"The multi process dict is {multi_bytes}")
def main():
# you can access the zipped kerchunk refs that this wants to load in the attached zip
refs = load_refs("inputs_raw_15286.json")
# Expected merge results
single_merge = merge_refs(refs)
multi_refs = [[refs[0]], [refs[4], refs[2]], [refs[3]], [refs[1]]]
# this fails: multi_refs = [[refs[0]], [refs[4], refs[2]], [refs[3]], [refs[1]]]
# this works: multi_refs = [[refs[0]], [refs[1], refs[2]], [refs[3]], [refs[4]]]
multi_merge = distributed_merge(multi_refs)[0]
# If things are working, the outputs should be the same
compare_merge_size(single_merge, multi_merge)
if __name__ == "__main__":
main() |
I'm going to close this issue as it appears to be a well known limitation of the merging behavior in MultiZarrToZarr. I ended up resolving the issue by more consciously controlling the grouping of items to be merged by kerchunk. Perhaps this PR will be instructive for others who face similar issues: pangeo-forge/pangeo-forge-recipes#689 |
This may end up being a super obvious mistake for someone that's a bit more familiar with the expectations of the
kerchunk
API, so forgive me if this issue is being lodged out of ignorance!The workflow I'm dealing with is attempting to combine kerchunk datasets via map/reduce. The reduce step involves an assumption of associativity among things combined, as different numbers of workers for the same job will mean different combinations being created as each worker's kerchunk refs are combined via
MultiZarrToZarr.translate()
and then again on the driver node to hopefully get out a single ref for all underlying datasets.Assumption:
MultiZarrToZarr combination and then translation is associative. Combining refs can happen in any order and then be combined again at the end of the process without different results.
Reality:
I'm seeing radically different results depending on how many workers are used, suggesting that associativity of this combination is not a safe assumption!
This is roughly what that workflow looks like:
Taking a list of netcdf files (with the same dimensions) and translate:
These dictionaries are distributed across workers, each of which builds a
MultiZarrToZarr
instance (Map):Each worker's
MultiZarrToZarr
is then translated and merged via anotherMultiZarrToZarr
(reduce):Finally, the results are written out as a single ref:
At this point, the results differ. Some statistics I've pulled from the resulting ref files. Note, especially, the different Non-NAN value count:
The text was updated successfully, but these errors were encountered: