diff --git a/python_modules/dagster/dagster/_core/definitions/automation_tick_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/automation_tick_evaluation_context.py index 416851a12540e..3eaef667c956a 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_tick_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_tick_evaluation_context.py @@ -6,7 +6,6 @@ AbstractSet, Any, Dict, - FrozenSet, Iterable, List, Mapping, @@ -36,8 +35,8 @@ from dagster._core.definitions.declarative_automation.serialized_objects import ( AutomationConditionEvaluation, ) -from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey -from dagster._core.definitions.partition import PartitionsDefinition +from dagster._core.definitions.events import AssetKey +from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset from dagster._core.definitions.run_request import RunRequest from dagster._core.instance import DynamicPartitionsStore from dagster._core.storage.tags import ( @@ -344,44 +343,53 @@ def _build_run_requests_from_partitions_def_mapping( def build_run_requests_with_backfill_policies( - asset_partitions: Iterable[AssetKeyPartitionKey], + asset_subset_to_request: AssetGraphSubset, asset_graph: BaseAssetGraph, dynamic_partitions_store: DynamicPartitionsStore, ) -> Sequence[RunRequest]: """Build run requests for a selection of asset partitions based on the associated BackfillPolicies.""" run_requests = [] - asset_partition_keys: Mapping[AssetKey, Set[str]] = { - asset_key_partition.asset_key: set() for asset_key_partition in asset_partitions - } - for asset_partition in asset_partitions: - if asset_partition.partition_key: - asset_partition_keys[asset_partition.asset_key].add(asset_partition.partition_key) - assets_to_reconcile_by_partitions_def_partition_keys_backfill_policy: Mapping[ - Tuple[Optional[PartitionsDefinition], Optional[FrozenSet[str]], Optional[BackfillPolicy]], + Tuple[ + Optional[PartitionsDefinition], + Optional[Tuple[PartitionKeyRange]], + Optional[BackfillPolicy], + ], Set[AssetKey], ] = defaultdict(set) - # here we are grouping assets by their partitions def, selected partition keys, and backfill policy. - for asset_key, partition_keys in asset_partition_keys.items(): + # here we are grouping assets by their partitions def, selected partition ranges, and backfill policy. + for serializable_entity_subset in asset_subset_to_request.iterate_asset_subsets(asset_graph): + asset_key = serializable_entity_subset.key + partitions_def = asset_graph.get(asset_key).partitions_def + partition_key_ranges = ( + tuple( # allows the value to be hashed + serializable_entity_subset.value.get_partition_key_ranges( + partitions_def, dynamic_partitions_store=dynamic_partitions_store + ) + ) + if isinstance(serializable_entity_subset.value, PartitionsSubset) + else None + ) + assets_to_reconcile_by_partitions_def_partition_keys_backfill_policy[ - asset_graph.get(asset_key).partitions_def, - frozenset(partition_keys) if partition_keys else None, + partitions_def, + partition_key_ranges, asset_graph.get(asset_key).backfill_policy, ].add(asset_key) for ( partitions_def, - partition_keys, + partition_key_ranges, backfill_policy, ), asset_keys in assets_to_reconcile_by_partitions_def_partition_keys_backfill_policy.items(): asset_check_keys = asset_graph.get_check_keys_for_assets(asset_keys) - if partitions_def is None and partition_keys is not None: - check.failed("Partition key provided for unpartitioned asset") - elif partitions_def is not None and partition_keys is None: - check.failed("Partition key missing for partitioned asset") - elif partitions_def is None and partition_keys is None: + if partitions_def is None and partition_key_ranges is not None: + check.failed("Partition key range provided for unpartitioned asset") + elif partitions_def is not None and partition_key_ranges is None: + check.failed("Partition key range missing for partitioned asset") + elif partitions_def is None and partition_key_ranges is None: # non partitioned assets will be backfilled in a single run run_requests.append( RunRequest( @@ -392,6 +400,14 @@ def build_run_requests_with_backfill_policies( ) elif backfill_policy is None: # just use the normal single-partition behavior + partition_keys = set() + for partition_key_range in check.not_none(partition_key_ranges): + partition_keys = partition_keys.union( + check.not_none(partitions_def).get_partition_keys_in_range( + partition_key_range, + dynamic_partitions_store=dynamic_partitions_store, + ) + ) entity_keys = cast(Set[EntityKey], asset_keys) mapping: _PartitionsDefKeyMapping = { (partitions_def, pk): entity_keys for pk in (partition_keys or [None]) @@ -405,7 +421,7 @@ def build_run_requests_with_backfill_policies( list(asset_keys), list(asset_check_keys), check.not_none(backfill_policy), - check.not_none(partition_keys), + check.not_none(partition_key_ranges), check.not_none(partitions_def), tags={}, dynamic_partitions_store=dynamic_partitions_store, @@ -418,16 +434,12 @@ def _build_run_requests_with_backfill_policy( asset_keys: Sequence[AssetKey], asset_check_keys: Sequence[AssetCheckKey], backfill_policy: BackfillPolicy, - partition_keys: FrozenSet[str], + partition_key_ranges: Sequence[PartitionKeyRange], partitions_def: PartitionsDefinition, tags: Dict[str, Any], dynamic_partitions_store: DynamicPartitionsStore, ) -> Sequence[RunRequest]: run_requests = [] - partition_subset = partitions_def.subset_with_partition_keys(partition_keys) - partition_key_ranges = partition_subset.get_partition_key_ranges( - partitions_def, dynamic_partitions_store=dynamic_partitions_store - ) for partition_key_range in partition_key_ranges: # We might resolve more than one partition key range for the given partition keys. # We can only apply chunking on individual partition key ranges. @@ -470,6 +482,9 @@ def _build_run_requests_for_partition_key_range( """Builds multiple run requests for the given partition key range. Each run request will have at most max_partitions_per_run partitions. """ + # TODO ass a method to PartitionsDefinition that lets us return a list of ranges + # with at most max_partitions_per_run partitions in each range, so that we don't + # have to enumerate every partition key here partition_keys = partitions_def.get_partition_keys_in_range( partition_key_range, dynamic_partitions_store=dynamic_partitions_store ) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 926fb57afd9e4..73ebadbf7ccda 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1536,8 +1536,6 @@ def execute_asset_backfill_iteration_inner( else "No asset partitions to request." ) - asset_partitions_to_request = set(asset_subset_to_request.iterate_asset_partitions()) - if len(not_requested_and_reasons) > 0: def _format_graph_subset(asset_graph_subset: AssetGraphSubset): @@ -1559,7 +1557,7 @@ def _format_graph_subset(asset_graph_subset: AssetGraphSubset): ) run_requests = build_run_requests_with_backfill_policies( - asset_partitions=asset_partitions_to_request, + asset_subset_to_request=asset_subset_to_request, asset_graph=asset_graph, dynamic_partitions_store=instance_queryer, )