From 4423290b980a4289b65ae22f4483ae6a3563102f Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Mon, 30 Dec 2024 20:53:47 -0700 Subject: [PATCH] Accept set of keys instead of partition key ranges --- .../dagster_graphql/implementation/utils.py | 6 ++-- .../dagster/_core/instance/__init__.py | 35 +++++++++++-------- .../daemon_tests/test_backfill.py | 4 +-- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py b/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py index 2511ae29732f0..d1a80a5bde484 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py @@ -179,17 +179,17 @@ def assert_valid_asset_partition_backfill( if not asset_backfill_data: return - partition_subset_by_asset_key = ( + partitions_subsets_by_asset_key = ( asset_backfill_data.target_subset.partitions_subsets_by_asset_key ) - for asset_key, partition_subset in partition_subset_by_asset_key.items(): + for asset_key, partitions_subset in partitions_subsets_by_asset_key.items(): partitions_def = asset_graph.get(asset_key).partitions_def if not partitions_def: continue - invalid_keys = set(partition_subset.get_partition_keys()) - set( + invalid_keys = set(partitions_subset.get_partition_keys()) - set( partitions_def.get_partition_keys(backfill_datetime, dynamic_partitions_store) ) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index a27f3c0363684..f2a7fcdbc2748 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -3169,9 +3169,7 @@ def launch_backfill( self, asset_graph: "BaseAssetGraph", asset_selection: Optional[Sequence[AssetKey]] = None, - partitions_by_assets: Optional[ - Sequence[Tuple[AssetKey, Optional[Sequence[Tuple[str, str]]]]] - ] = None, + partitions_by_assets: Optional[Mapping[AssetKey, Optional[AbstractSet[str]]]] = None, partition_names: Optional[Sequence[str]] = None, tags: Mapping[str, str] = {}, all_partitions: bool = False, @@ -3185,8 +3183,8 @@ def launch_backfill( Args: asset_graph (BaseAssetGraph): The asset graph for the backfill. asset_selection (Optional[Sequence[AssetKey]]): List of asset keys to backfill. - partitions_by_assets (Optional[Sequence[Tuple[AssetKey, Optional[Sequence[Tuple[str, str]]]]]]): - List of partitions for each asset key to backfill. + partitions_by_assets (Optional[Mapping[AssetKey, Optional[AbstractSet[str]]]]): Set of + partitions for each asset key to backfill. partition_names (Optional[Sequence[str]]): List of partition names to backfill. Only valid if `asset_selection` is provided. @@ -3194,7 +3192,10 @@ def launch_backfill( str: The ID of the backfill. """ # TODO(deepyaman): Abstract logic shared with `dagster-graphql`. - from dagster._core.definitions.partition import CachingDynamicPartitionsLoader + from dagster._core.definitions.partition import ( + CachingDynamicPartitionsLoader, + DefaultPartitionsSubset, + ) from dagster._core.definitions.selector import ( PartitionRangeSelector, PartitionsByAssetSelector, @@ -3232,11 +3233,17 @@ def launch_backfill( ) elif partitions_by_assets is not None: partitions_by_asset_selectors = [] - for asset_key, partitions in partitions_by_assets: - partitions_selector = ( - PartitionsSelector(ranges=[PartitionRangeSelector(*r) for r in partitions]) - if partitions is not None - else None + for asset_key, subset in partitions_by_assets.items(): + partitions_def = asset_graph.get(asset_key).partitions_def + partitions_subset = DefaultPartitionsSubset(subset) + partition_key_ranges = partitions_subset.get_partition_key_ranges( + partitions_def, dynamic_partitions_store=dynamic_partitions_store + ) + partitions_selector = PartitionsSelector( + ranges=[ + PartitionRangeSelector(partition_key_range.start, partition_key_range.end) + for partition_key_range in partition_key_ranges + ] ) partitions_by_asset_selectors.append( PartitionsByAssetSelector(asset_key=asset_key, partitions=partitions_selector) @@ -3261,15 +3268,15 @@ def launch_backfill( # TODO(deepyaman): Abstract logic shared with `dagster-graphql`. asset_backfill_data = backfill.asset_backfill_data if asset_backfill_data: - partition_subset_by_asset_key = ( + partitions_subsets_by_asset_key = ( asset_backfill_data.target_subset.partitions_subsets_by_asset_key ) - for asset_key, partition_subset in partition_subset_by_asset_key.items(): + for asset_key, partitions_subset in partitions_subsets_by_asset_key.items(): partitions_def = asset_graph.get(asset_key).partitions_def if not partitions_def: continue - invalid_keys = set(partition_subset.get_partition_keys()) - set( + invalid_keys = set(partitions_subset.get_partition_keys()) - set( partitions_def.get_partition_keys(backfill_datetime, dynamic_partitions_store) ) if invalid_keys: diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 5d6efda7218b0..717397ef33be3 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -2497,9 +2497,7 @@ def test_asset_backfill_with_single_run_backfill_policy_using_python_api( backfill_id = instance.launch_backfill( asset_graph=asset_graph, - partitions_by_assets=[ - (asset_with_single_run_backfill_policy.key, [(partitions[0], partitions[-1])]) - ], + partitions_by_assets={asset_with_single_run_backfill_policy.key: set(partitions)}, tags={}, title=None, description=None,