From a546cb85187135ad4bc55815db17d01411a74632 Mon Sep 17 00:00:00 2001 From: Jeroen Verstraelen Date: Tue, 11 Oct 2022 14:13:19 +0200 Subject: [PATCH] issue #76 replace merge_dict_values with merge_lists_skip_duplicates --- .../metadata/models/stac_summaries.py | 14 +++--- src/openeo_aggregator/metadata/utils.py | 45 +++++-------------- 2 files changed, 16 insertions(+), 43 deletions(-) diff --git a/src/openeo_aggregator/metadata/models/stac_summaries.py b/src/openeo_aggregator/metadata/models/stac_summaries.py index 0a1cce51..c661b369 100644 --- a/src/openeo_aggregator/metadata/models/stac_summaries.py +++ b/src/openeo_aggregator/metadata/models/stac_summaries.py @@ -5,7 +5,7 @@ import attr from openeo_aggregator.metadata.models.statistics import Statistics -from openeo_aggregator.metadata.utils import merge_dict_values +from openeo_aggregator.metadata.utils import merge_lists_skip_duplicates T = TypeVar("T", bound="StacSummaries") @@ -125,15 +125,13 @@ def merge_all( additional_properties = [(cid, x.additional_properties) for cid, x in summaries_list] # Calculate the unique summary names. unique_summary_names: Set[str] = functools.reduce(lambda a, b: a.union(b), (d.keys() for _, d in additional_properties), set()) - + merged_addition_properties = {} for summary_name in unique_summary_names: - if summary_name in ["constellation", "platform", "instruments"]: - merged_addition_properties[summary_name] = merge_dict_values( - additional_properties, summary_name, [list], report) - elif summary_name.startswith("sar:") or summary_name.startswith("sat:"): - merged_addition_properties[summary_name] = merge_dict_values( - additional_properties, summary_name, [list], report) + if (summary_name in ["constellation", "platform", "instruments"] or + summary_name.startswith("sar:") or summary_name.startswith("sat:")): + summary_lists = [d.get(summary_name, []) for _, d in additional_properties] + merged_addition_properties[summary_name] = merge_lists_skip_duplicates(summary_lists) else: backends = [cid for cid, d in additional_properties if summary_name in d] report(f"{backends}: Unhandled merging of StacSummaries for summary_name: {summary_name!r}", "warning") diff --git a/src/openeo_aggregator/metadata/utils.py b/src/openeo_aggregator/metadata/utils.py index 4651adac..17b106e4 100644 --- a/src/openeo_aggregator/metadata/utils.py +++ b/src/openeo_aggregator/metadata/utils.py @@ -9,40 +9,15 @@ def __bool__(self) -> bool: UNSET: Unset = Unset() -def merge_dict_values( - dictionaries: List[Tuple[str, Dict]], - key, expected_types, report: Callable[[str, str], None] -) -> Any: +def merge_lists_skip_duplicates(lists: List[List[Any]]) -> List[Any]: """ - Merge all values of a given key from a list of dictionaries, skipping any duplicates. - Args: - dictionaries: List of dictionaries, given as tuples of (collection_identifier, dictionary) - key: Key to concatenate - expected_types: List of expected types for the value of the key, if None, all types are allowed - When a value is not of the expected type, it is reported and ignored. - report: function to report inconsistencies - - Returns: - Merged value, can be a list, a dict, or an object that implements the merge method. + Merge multiple lists into one, but only keep unique values. + :param lists: list of lists to merge + :return: merged list """ - result = None - for cid, d in dictionaries: - if key in d: - if expected_types is not None: - if not isinstance(d[key], tuple(expected_types)): - caller = inspect.stack()[1] - report(f"[{cid}]: Unexpected type for {key!r}: {type(d[key])!r} instead of {expected_types!r} in {caller.filename}:{caller.lineno}") - if result is None: - result = d[key] - elif isinstance(result, list): - if d[key] in result: - continue - result = list(result + d[key]) - elif isinstance(result, dict): - result = {**result, **d[key]} - elif hasattr(result, 'merge'): - result = result.merge(d[key]) - else: - caller = inspect.stack()[1] - report(f"[{cid}]: Unhandled merging of {key!r} with {type(result)} and {type(d[key])} in {caller.filename}:{caller.lineno}") - return result + merged = [] + for l in lists: + for v in l: + if v not in merged: + merged.append(v) + return merged