Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiZarrToZarr combination associativity; issues in map/reduce workflow #416

Closed
moradology opened this issue Feb 7, 2024 · 8 comments
Closed

Comments

@moradology
Copy link

moradology commented Feb 7, 2024

This may end up being a super obvious mistake for someone that's a bit more familiar with the expectations of the kerchunk API, so forgive me if this issue is being lodged out of ignorance!

The workflow I'm dealing with is attempting to combine kerchunk datasets via map/reduce. The reduce step involves an assumption of associativity among things combined, as different numbers of workers for the same job will mean different combinations being created as each worker's kerchunk refs are combined via MultiZarrToZarr.translate() and then again on the driver node to hopefully get out a single ref for all underlying datasets.

Assumption:

MultiZarrToZarr combination and then translation is associative. Combining refs can happen in any order and then be combined again at the end of the process without different results.

Reality:

I'm seeing radically different results depending on how many workers are used, suggesting that associativity of this combination is not a safe assumption!

This is roughly what that workflow looks like:
Taking a list of netcdf files (with the same dimensions) and translate:

chunks = NetCDF3ToZarr(
    url,
    inline_threshold=inline_threshold,
    storage_options=storage_options,
    **(kerchunk_open_kwargs or {}),
)
refs = [chunks.translate()]

These dictionaries are distributed across workers, each of which builds a MultiZarrToZarr instance (Map):

# list[dict]) -> MultiZarrToZarr
MultiZarrToZarr(refs)

Each worker's MultiZarrToZarr is then translated and merged via another MultiZarrToZarr (reduce):

# Sequence[MultiZarrToZarr]) -> MultiZarrToZarr
refs = [a.translate() for a in multizarrtozarr]
accumulator = MultiZarrToZarr(refs)

Finally, the results are written out as a single ref:

# MultiZarrToZarr -> dict
accumulator.translate()

At this point, the results differ. Some statistics I've pulled from the resulting ref files. Note, especially, the different Non-NAN value count:

Stats for analysis_error (single worker):
Mean: 0.3497554361820221
Median: 0.3499999940395355
Standard Deviation: 0.004802505951374769
Non-NaN Count: 137376947

Stats for analysis_error (4 workers):
Mean: 0.3497870862483978
Median: 0.3499999940395355
Standard Deviation: 0.004866031929850578
Non-NaN Count: 109904479
@moradology
Copy link
Author

moradology commented Feb 7, 2024

Looking through some of the other issues, I noticed this, which seems likely to be relevant: #377

I'm also seeing that people have had success in exactly the kind of fan out and reduce strategy (seeing it referred to as a 'tree' combine), so perhaps others here will have wisdom to share about things to look out for?

@martindurant
Copy link
Member

I think there is an assumption that the concatenated chunks at least form a continuous block in coordinates, so tree reductions might only work when you do pairs/groups of neighbouring inputs. It should be noted that most (all?) of the big combines mentioned are concatenating over exactly one dimension, so that could be a thing. I can't tell if that's the case for you, but if not, you could do the tree by first concat one one dimension, then another.

@moradology
Copy link
Author

I'll see if I can pin down the exact combination of factors that give rise to the issue. So far, the small attempts I've constructed locally via multiprocessing have all worked perfectly which is good but also kind of annoying in terms of pinning down what's going on

@TomNicholas
Copy link

This would be yet another reason for kerchunk to expose an array-like API with a concat method defined - array concatenation is associative, so then you could safely make this assumption and hence use whatever map-reduce strategy you like.

@moradology
Copy link
Author

Perhaps you have some ideas about what that would look like? I saw the issue mentioning disambiguation of 'merge' semantics, but with a bit more direction maybe that would be a suitable community submission

@martindurant
Copy link
Member

Perhaps you have some ideas about what that would look like?

Yes, I think it might be time to flesh out how this looks. The "divide MultiZarrToZarr" and "make an array-compliant API" are sort of the same thing, but all of the cases and logic currently in MultZarrToZarr would still have to find a place in there somewhere. Perhaps ensuring associativity is easier when it only happens during the one type of combine operation, but still it would need to be done, I don't think it would be automatic.

@moradology
Copy link
Author

Alright, so I've reproduced the issue here should anyone be interested in comparing notes.

#!/usr/bin/env python
import json
import multiprocessing
from typing import Sequence

from kerchunk.combine import MultiZarrToZarr

CONCAT_DIMS = ['time']
IDENTICAL_DIMS = ['lat', 'lon']

def load_refs(ldjson_file: str) -> list[dict]:
    refs = []
    with open(ldjson_file, "r") as f:
        for line in f:
            refs.append(json.loads(line)[0])
    return refs

def mzz(refs):
    return MultiZarrToZarr(
        refs,
        concat_dims=CONCAT_DIMS,
        identical_dims=IDENTICAL_DIMS,
        target_options={"anon": True},
        remote_options={"anon": True},
        remote_protocol=None
    )

def merge_refs(refs: list[dict]) -> dict:
    return mzz(refs).translate()

# Distributed workflow
def worker_func(refs: list[dict]) -> MultiZarrToZarr:
    def create_accumulator():
        return None
    def add_input(accumulator: MultiZarrToZarr, item: dict) -> MultiZarrToZarr:
        if not accumulator:
            references = [item]
        else:
            references = [accumulator.translate(), item]
        return mzz(references)
    acc = create_accumulator()
    for ref in refs:
        acc = add_input(acc, ref)
    return acc

def distributed_merge(refs: list[list[dict]]) -> dict:
    def merge_accumulators(accumulators: Sequence[MultiZarrToZarr]) -> MultiZarrToZarr:
        references = [a.translate() for a in accumulators]
        return mzz(references)

    def extract_output(accumulator: MultiZarrToZarr) -> dict:
        return accumulator.translate(),

    with multiprocessing.Pool(4) as p:
        accumulators: list[MultiZarrToZarr] = p.map(worker_func, refs)
    merged = merge_accumulators(accumulators)
    return extract_output(merged)

def compare_merge_size(single_dict, multi_dict):
    single_json = json.dumps(single_dict)
    multi_json = json.dumps(multi_dict)
    single_bytes = len(single_json.encode("utf-8"))
    multi_bytes = len(multi_json.encode("utf-8"))
    with open("single_test.json", "w") as f:
        f.write(single_json)
    with open("multi_test.json", "w") as f:
        f.write(multi_json)

    print(f"The single process dict is {single_bytes}")
    print(f"The multi process dict is {multi_bytes}")

def main():
    # you can access the zipped kerchunk refs that this wants to load in the attached zip
    refs = load_refs("inputs_raw_15286.json")

    # Expected merge results
    single_merge = merge_refs(refs)

    multi_refs = [[refs[0]], [refs[4], refs[2]], [refs[3]], [refs[1]]]
    # this fails: multi_refs = [[refs[0]], [refs[4], refs[2]], [refs[3]], [refs[1]]]
    # this works: multi_refs = [[refs[0]], [refs[1], refs[2]], [refs[3]], [refs[4]]]
    multi_merge = distributed_merge(multi_refs)[0]

    # If things are working, the outputs should be the same
    compare_merge_size(single_merge, multi_merge)

if __name__ == "__main__":
    main()

kerchunk_refs.zip

@moradology
Copy link
Author

I'm going to close this issue as it appears to be a well known limitation of the merging behavior in MultiZarrToZarr. I ended up resolving the issue by more consciously controlling the grouping of items to be merged by kerchunk. Perhaps this PR will be instructive for others who face similar issues: pangeo-forge/pangeo-forge-recipes#689

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants