From 0d7fcf27c0aafed445815b93f968f211ce465e1e Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Thu, 26 Dec 2024 13:51:02 -0600 Subject: [PATCH] moar > Insert changelog entry or delete this section. --- .../asset_graph_view/asset_graph_view.py | 15 ++ .../dagster/_core/execution/asset_backfill.py | 147 +++++++----------- .../execution_tests/test_asset_backfill.py | 18 +-- 3 files changed, 78 insertions(+), 102 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index fe537d4140fc1..2f7aea4502074 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -45,6 +45,7 @@ ) from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.partition import PartitionsDefinition + from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.instance import DagsterInstance from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionResolvedStatus from dagster._core.storage.dagster_run import RunRecord @@ -195,6 +196,20 @@ def get_empty_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: value = partitions_def.empty_subset() if partitions_def else False return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) + def get_entity_subset_in_range( + self, asset_key: AssetKey, partition_key_range: "PartitionKeyRange" + ) -> EntitySubset[AssetKey]: + partitions_def = check.not_none( + self._get_partitions_def(asset_key), "Must have partitions def" + ) + partition_subset_in_range = partitions_def.get_subset_in_range( + partition_key_range=partition_key_range, + dynamic_partitions_store=self._queryer, + ) + return EntitySubset( + self, key=asset_key, value=_ValidatedEntitySubsetValue(partition_subset_in_range) + ) + def get_entity_subset_from_asset_graph_subset( self, asset_graph_subset: AssetGraphSubset, key: AssetKey ) -> EntitySubset[AssetKey]: diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 69119e6499d50..926fb57afd9e4 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -12,7 +12,6 @@ NamedTuple, Optional, Sequence, - Set, Tuple, Union, cast, @@ -206,9 +205,9 @@ def with_run_requests_submitted( return self.replace_requested_subset(submitted_partitions) - def get_target_root_asset_partitions( + def get_target_root_asset_graph_subset( self, instance_queryer: CachingInstanceQueryer - ) -> Iterable[AssetKeyPartitionKey]: + ) -> AssetGraphSubset: def _get_self_and_downstream_targeted_subset( initial_subset: AssetGraphSubset, ) -> AssetGraphSubset: @@ -280,7 +279,7 @@ def _get_self_and_downstream_targeted_subset( " This is likely a system error. Please report this issue to the Dagster team." ) - return list(root_subset.iterate_asset_partitions()) + return root_subset def get_target_partitions_subset(self, asset_key: AssetKey) -> PartitionsSubset: # Return the targeted partitions for the root partitioned asset keys @@ -669,7 +668,6 @@ def _get_requested_asset_graph_subset_from_run_requests( asset_graph_view: AssetGraphView, ) -> AssetGraphSubset: asset_graph = asset_graph_view.asset_graph - instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() requested_subset = AssetGraphSubset.empty() for run_request in run_requests: # Run request targets a range of partitions @@ -680,26 +678,9 @@ def _get_requested_asset_graph_subset_from_run_requests( # have the same partitions def selected_assets = cast(Sequence[AssetKey], run_request.asset_selection) check.invariant(len(selected_assets) > 0) - partitions_defs = set( - asset_graph.get(asset_key).partitions_def for asset_key in selected_assets - ) - check.invariant( - len(partitions_defs) == 1, - "Expected all assets selected in partition range run request to have the same" - " partitions def", - ) - - partitions_def = cast(PartitionsDefinition, next(iter(partitions_defs))) - partition_subset_in_range = partitions_def.get_subset_in_range( - partition_key_range=PartitionKeyRange(range_start, range_end), - dynamic_partitions_store=instance_queryer, - ) + partition_range = PartitionKeyRange(range_start, range_end) entity_subsets = [ - check.not_none( - asset_graph_view.get_subset_from_serializable_subset( - SerializableEntitySubset(key=asset_key, value=partition_subset_in_range) - ) - ) + asset_graph_view.get_entity_subset_in_range(asset_key, partition_range) for asset_key in selected_assets ] requested_subset = requested_subset | AssetGraphSubset.from_entity_subsets( @@ -745,7 +726,6 @@ def _submit_runs_and_update_backfill_in_chunks( asset_backfill_iteration_result: AssetBackfillIterationResult, logger: logging.Logger, run_tags: Mapping[str, str], - instance_queryer: CachingInstanceQueryer, ) -> Iterable[None]: from dagster._core.execution.backfill import BulkActionStatus @@ -1110,7 +1090,6 @@ def execute_asset_backfill_iteration( result, logger, run_tags=updated_backfill.tags, - instance_queryer=instance_queryer, ) updated_backfill = cast( @@ -1194,8 +1173,7 @@ def execute_asset_backfill_iteration( for updated_asset_backfill_data in get_canceling_asset_backfill_iteration_data( backfill.backfill_id, previous_asset_backfill_data, - instance_queryer, - asset_graph, + asset_graph_view, backfill.backfill_timestamp, ): yield None @@ -1246,37 +1224,32 @@ def execute_asset_backfill_iteration( def get_canceling_asset_backfill_iteration_data( backfill_id: str, asset_backfill_data: AssetBackfillData, - instance_queryer: CachingInstanceQueryer, - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph_view: AssetGraphView, backfill_start_timestamp: float, ) -> Iterable[Optional[AssetBackfillData]]: """For asset backfills in the "canceling" state, fetch the asset backfill data with the updated materialized and failed subsets. """ + asset_graph = cast(RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph) + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() updated_materialized_subset = None - for updated_materialized_subset in get_asset_backfill_iteration_materialized_partitions( + for updated_materialized_subset in get_asset_backfill_iteration_materialized_subset( backfill_id, asset_backfill_data, asset_graph, instance_queryer ): yield None if not isinstance(updated_materialized_subset, AssetGraphSubset): check.failed( - "Expected get_asset_backfill_iteration_materialized_partitions to return an" + "Expected get_asset_backfill_iteration_materialized_subset to return an" " AssetGraphSubset object" ) - # TODO: Make this use subsets throughout instead of a sequence of AssetKeyPartitionKeys - failed_subset = AssetGraphSubset.from_asset_partition_set( - set( - _get_failed_asset_partitions( - instance_queryer, - backfill_id, - asset_graph, - materialized_subset=updated_materialized_subset, - ) - ), - asset_graph, + failed_subset = _get_failed_asset_graph_subset( + asset_graph_view, + backfill_id, + materialized_subset=updated_materialized_subset, ) + # we fetch the failed_subset to get any new assets that have failed and add that to the set of # assets we already know failed and their downstreams. However we need to remove any assets in # updated_materialized_subset to account for the case where a run retry successfully @@ -1297,7 +1270,7 @@ def get_canceling_asset_backfill_iteration_data( yield updated_backfill_data -def get_asset_backfill_iteration_materialized_partitions( +def get_asset_backfill_iteration_materialized_subset( backfill_id: str, asset_backfill_data: AssetBackfillData, asset_graph: RemoteWorkspaceAssetGraph, @@ -1370,24 +1343,16 @@ def _get_subset_in_target_subset( return AssetGraphSubset.from_entity_subsets([subset_in_target_subset]) -def _get_failed_and_downstream_asset_partitions( +def _get_failed_and_downstream_asset_graph_subset( backfill_id: str, asset_backfill_data: AssetBackfillData, asset_graph_view: AssetGraphView, materialized_subset: AssetGraphSubset, ) -> AssetGraphSubset: - asset_graph = cast(RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph) - - failed_asset_graph_subset = AssetGraphSubset.from_asset_partition_set( - set( - _get_failed_asset_partitions( - asset_graph_view.get_inner_queryer_for_back_compat(), - backfill_id, - asset_graph, - materialized_subset, - ) - ), - asset_graph, + failed_asset_graph_subset = _get_failed_asset_graph_subset( + asset_graph_view, + backfill_id, + materialized_subset, ) failed_and_downstream_subset = bfs_filter_asset_graph_view( @@ -1475,16 +1440,16 @@ def execute_asset_backfill_iteration_inner( RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph ) - initial_candidates: Set[AssetKeyPartitionKey] = set() + initial_asset_graph_subset: AssetGraphSubset = AssetGraphSubset.empty() request_roots = not asset_backfill_data.requested_runs_for_target_roots if request_roots: logger.info( "Not all root assets (assets in backfill that do not have parents in the backill) have been requested, finding root assets." ) - target_roots = asset_backfill_data.get_target_root_asset_partitions(instance_queryer) - initial_candidates.update(target_roots) + target_roots = asset_backfill_data.get_target_root_asset_graph_subset(instance_queryer) + initial_asset_graph_subset = initial_asset_graph_subset | target_roots logger.info( - f"Root assets that have not yet been requested:\n {_asset_graph_subset_to_str(AssetGraphSubset.from_asset_partition_set(set(target_roots), asset_graph), asset_graph)}" + f"Root assets that have not yet been requested:\n {_asset_graph_subset_to_str(target_roots, asset_graph)}" ) yield None @@ -1502,14 +1467,14 @@ def execute_asset_backfill_iteration_inner( time.sleep(cursor_delay_time) updated_materialized_subset = None - for updated_materialized_subset in get_asset_backfill_iteration_materialized_partitions( + for updated_materialized_subset in get_asset_backfill_iteration_materialized_subset( backfill_id, asset_backfill_data, asset_graph, instance_queryer ): yield None if not isinstance(updated_materialized_subset, AssetGraphSubset): check.failed( - "Expected get_asset_backfill_iteration_materialized_partitions to return an" + "Expected get_asset_backfill_iteration_materialized_subset to return an" " AssetGraphSubset" ) @@ -1531,11 +1496,16 @@ def execute_asset_backfill_iteration_inner( for asset_key in asset_backfill_data.target_subset.asset_keys ) ) - initial_candidates.update(parent_materialized_asset_partitions) + initial_asset_graph_subset = ( + initial_asset_graph_subset + | AssetGraphSubset.from_asset_partition_set( + parent_materialized_asset_partitions, asset_graph + ) + ) yield None - failed_and_downstream_subset = _get_failed_and_downstream_asset_partitions( + failed_and_downstream_subset = _get_failed_and_downstream_asset_graph_subset( backfill_id, asset_backfill_data, asset_graph_view, @@ -1544,10 +1514,6 @@ def execute_asset_backfill_iteration_inner( yield None - initial_asset_graph_subset = AssetGraphSubset.from_asset_partition_set( - initial_candidates, asset_graph - ) - asset_subset_to_request, not_requested_and_reasons = bfs_filter_asset_graph_view( asset_graph_view, lambda candidate_asset_graph_subset, @@ -2007,13 +1973,12 @@ def _should_backfill_atomic_asset_graph_subset_unit( ) -def _get_failed_asset_partitions( - instance_queryer: CachingInstanceQueryer, +def _get_failed_asset_graph_subset( + asset_graph_view: AssetGraphView, backfill_id: str, - asset_graph: RemoteAssetGraph, materialized_subset: AssetGraphSubset, -) -> Sequence[AssetKeyPartitionKey]: - """Returns asset partitions that materializations were requested for as part of the backfill, but were +) -> AssetGraphSubset: + """Returns asset subset that materializations were requested for as part of the backfill, but were not successfully materialized. This function gets a list of all runs for the backfill that have failed and extracts the asset partitions @@ -2025,6 +1990,8 @@ def _get_failed_asset_partitions( Includes canceled asset partitions. Implementation assumes that successful runs won't have any failed partitions. """ + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() + runs = instance_queryer.instance.get_runs( filters=RunsFilter( tags={BACKFILL_ID_TAG: backfill_id}, @@ -2032,8 +1999,7 @@ def _get_failed_asset_partitions( ) ) - result: List[AssetKeyPartitionKey] = [] - + result: AssetGraphSubset = AssetGraphSubset.empty() for run in runs: planned_asset_keys = instance_queryer.get_planned_materializations_for_run( run_id=run.run_id @@ -2053,25 +2019,22 @@ def _get_failed_asset_partitions( start=run.tags[ASSET_PARTITION_RANGE_START_TAG], end=run.tags[ASSET_PARTITION_RANGE_END_TAG], ) - asset_partition_candidates = [] - for asset_key in failed_asset_keys: - asset_partition_candidates.extend( - asset_graph.get_partitions_in_range( - asset_key, partition_range, instance_queryer - ) - ) + candidate_subset = AssetGraphSubset.from_entity_subsets( + [ + asset_graph_view.get_entity_subset_in_range(asset_key, partition_range) + for asset_key in failed_asset_keys + ] + ) + else: # a regular backfill run that run on a single partition partition_key = run.tags.get(PARTITION_NAME_TAG) - asset_partition_candidates = [ - AssetKeyPartitionKey(asset_key, partition_key) for asset_key in failed_asset_keys - ] + candidate_subset = AssetGraphSubset.from_asset_partition_set( + {AssetKeyPartitionKey(asset_key, partition_key) for asset_key in failed_asset_keys}, + asset_graph_view.asset_graph, + ) - asset_partitions_still_failed = [ - asset_partition - for asset_partition in asset_partition_candidates - if asset_partition not in materialized_subset - ] - result.extend(asset_partitions_still_failed) + asset_subset_still_failed = candidate_subset - materialized_subset + result = result | asset_subset_still_failed return result diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 521ecc7e58a45..e3ad97a955adb 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -1198,14 +1198,15 @@ def downstream_daily_partitioned_asset( assert len(instance.get_runs()) == 1 - instance_queryer = _get_instance_queryer(instance, asset_graph, backfill_start_datetime) - canceling_backfill_data = None for canceling_backfill_data in get_canceling_asset_backfill_iteration_data( backfill_id, asset_backfill_data, - instance_queryer, - asset_graph, + _get_asset_graph_view( + instance, + asset_graph, + backfill_start_datetime, + ), backfill_start_datetime.timestamp(), ): pass @@ -1283,14 +1284,11 @@ def downstream_daily_partitioned_asset( in asset_backfill_data.failed_and_downstream_subset ) - instance_queryer = _get_instance_queryer(instance, asset_graph, backfill_start_datetime) - canceling_backfill_data = None for canceling_backfill_data in get_canceling_asset_backfill_iteration_data( backfill_id, asset_backfill_data, - instance_queryer, - asset_graph, + _get_asset_graph_view(instance, asset_graph, backfill_start_datetime), backfill_start_datetime.timestamp(), ): pass @@ -1516,8 +1514,8 @@ def foo_grandchild(foo_child): ], ) - target_root_partitions = asset_backfill_data.get_target_root_asset_partitions(instance_queryer) - assert set(target_root_partitions) == { + target_root_subset = asset_backfill_data.get_target_root_asset_graph_subset(instance_queryer) + assert set(target_root_subset.iterate_asset_partitions()) == { AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-05"), AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-03"), AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-04"),