From bf39e31df1a0a58053b9726966054a236d7db16e Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Fri, 15 Nov 2024 20:14:44 -0600 Subject: [PATCH] Make asset backfills subset-aware refactor > Insert changelog entry or delete this section. moar refactor > Insert changelog entry or delete this section. subsets instead of values > Insert changelog entry or delete this section. more progress on subsets instead of values > Insert changelog entry or delete this section. more progress > Insert changelog entry or delete this section. yet more progress > Insert changelog entry or delete this section. more progress > Insert changelog entry or delete this section. more > Insert changelog entry or delete this section. renames > Insert changelog entry or delete this section. final names? > Insert changelog entry or delete this section. owen feedback > Insert changelog entry or delete this section. --- .../graphql/test_partition_backfill.py | 118 ++-- .../asset_graph_view/asset_graph_view.py | 30 + .../dagster/_core/asset_graph_view/bfs.py | 221 +++++++ .../_core/definitions/asset_graph_subset.py | 4 + .../_core/definitions/base_asset_graph.py | 143 +---- .../dagster/_core/execution/asset_backfill.py | 571 +++++++++++++----- .../asset_graph_view_tests/test_bfs.py | 253 ++++++++ .../execution_tests/test_asset_backfill.py | 82 ++- 8 files changed, 1028 insertions(+), 394 deletions(-) create mode 100644 python_modules/dagster/dagster/_core/asset_graph_view/bfs.py create mode 100644 python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_bfs.py diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 5ae64fce18d1f..58cdd36fd17b7 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -10,9 +10,9 @@ asset, define_asset_job, ) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.partition_key_range import PartitionKeyRange -from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.execution.asset_backfill import ( AssetBackfillIterationResult, execute_asset_backfill_iteration, @@ -34,7 +34,6 @@ from dagster._core.utils import make_new_backfill_id from dagster._seven import get_system_temp_directory from dagster._utils import safe_tempfile_path -from dagster._utils.caching_instance_queryer import CachingInstanceQueryer from dagster_graphql.client.query import ( LAUNCH_PARTITION_BACKFILL_MUTATION, LAUNCH_PIPELINE_EXECUTION_MUTATION, @@ -241,27 +240,25 @@ def _get_run_stats(partition_statuses): } -def _execute_asset_backfill_iteration_no_side_effects( - graphql_context, backfill_id: str, asset_graph: RemoteWorkspaceAssetGraph -) -> None: +def _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id: str) -> None: """Executes an asset backfill iteration and updates the serialized asset backfill data. However, does not execute side effects i.e. launching runs. """ backfill = graphql_context.instance.get_backfill(backfill_id) asset_backfill_data = backfill.asset_backfill_data result = None - instance_queryer = CachingInstanceQueryer( - graphql_context.instance, - asset_graph, - graphql_context, - asset_backfill_data.backfill_start_datetime, + asset_graph_view = AssetGraphView( + temporal_context=TemporalContext( + effective_dt=asset_backfill_data.backfill_start_datetime, last_event_id=None + ), + instance=graphql_context.instance, + asset_graph=graphql_context.asset_graph, ) with environ({"ASSET_BACKFILL_CURSOR_DELAY_TIME": "0"}): for result in execute_asset_backfill_iteration_inner( backfill_id=backfill_id, asset_backfill_data=asset_backfill_data, - instance_queryer=instance_queryer, - asset_graph=asset_graph, + asset_graph_view=asset_graph_view, backfill_start_timestamp=asset_backfill_data.backfill_start_timestamp, logger=logging.getLogger("fake_logger"), ): @@ -275,10 +272,12 @@ def _execute_asset_backfill_iteration_no_side_effects( updated_backfill = backfill.with_asset_backfill_data( result.backfill_data.with_run_requests_submitted( - result.run_requests, asset_graph=asset_graph, instance_queryer=instance_queryer + result.run_requests, + asset_graph=graphql_context.asset_graph, + instance_queryer=asset_graph_view.get_inner_queryer_for_back_compat(), ), dynamic_partitions_store=graphql_context.instance, - asset_graph=asset_graph, + asset_graph=graphql_context.asset_graph, ) graphql_context.instance.update_backfill(updated_backfill) @@ -312,12 +311,11 @@ def _execute_job_backfill_iteration_with_side_effects(graphql_context, backfill_ def _mock_asset_backfill_runs( graphql_context, asset_key: AssetKey, - asset_graph: RemoteWorkspaceAssetGraph, backfill_id: str, status: DagsterRunStatus, partition_key: Optional[str], ): - partitions_def = asset_graph.get(asset_key).partitions_def + partitions_def = graphql_context.asset_graph.get(asset_key).partitions_def @asset( partitions_def=partitions_def, @@ -751,10 +749,7 @@ def test_cancel_asset_backfill(self, graphql_context): # Update asset backfill data to contain requested partition, but does not execute side effects, # since launching the run will cause test process will hang forever. - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # Launch the run that runs forever selector = infer_job_selector(graphql_context, "hanging_partition_asset_job") @@ -830,10 +825,8 @@ def test_cancel_then_retry_asset_backfill(self, graphql_context): # Update asset backfill data to contain requested partition, but does not execute side effects, # since launching the run will cause test process will hang forever. - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # Launch the run that runs forever selector = infer_job_selector(graphql_context, "hanging_partition_asset_job") @@ -1144,11 +1137,7 @@ def test_asset_backfill_partition_stats(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) for partition, status in [ ("a", DagsterRunStatus.SUCCESS), @@ -1157,11 +1146,9 @@ def test_asset_backfill_partition_stats(self, graphql_context): ("e", DagsterRunStatus.SUCCESS), ("f", DagsterRunStatus.FAILURE), ]: - _mock_asset_backfill_runs( - graphql_context, asset_key, asset_graph, backfill_id, status, partition - ) + _mock_asset_backfill_runs(graphql_context, asset_key, backfill_id, status, partition) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) result = execute_dagster_graphql( graphql_context, @@ -1187,10 +1174,6 @@ def test_asset_backfill_partition_stats(self, graphql_context): assert asset_partition_status_counts[0]["numPartitionsFailed"] == 2 def test_asset_backfill_status_with_upstream_failure(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1213,25 +1196,23 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) result = execute_dagster_graphql( graphql_context, @@ -1658,10 +1639,6 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context): assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10 def test_reexecute_asset_backfill_from_failure(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1684,25 +1661,23 @@ def test_reexecute_asset_backfill_from_failure(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as completed so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -1755,10 +1730,6 @@ def test_reexecute_asset_backfill_from_failure(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_successful_asset_backfill(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1780,25 +1751,23 @@ def test_reexecute_successful_asset_backfill(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as complete so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -1842,10 +1811,6 @@ def test_reexecute_successful_asset_backfill(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1868,25 +1833,23 @@ def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # try to retry the backfill while it is still in progress result = execute_dagster_graphql( @@ -1966,10 +1929,6 @@ def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_asset_backfill_twice(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1992,25 +1951,23 @@ def test_reexecute_asset_backfill_twice(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as completed so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -2040,19 +1997,18 @@ def test_reexecute_asset_backfill_twice(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id _execute_asset_backfill_iteration_no_side_effects( - graphql_context, retried_backfill.backfill_id, asset_graph + graphql_context, retried_backfill.backfill_id ) # mark some partitions failed so we can retry again _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, retried_backfill.backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) _execute_asset_backfill_iteration_no_side_effects( - graphql_context, retried_backfill.backfill_id, asset_graph + graphql_context, retried_backfill.backfill_id ) # refetch the backfill to get the updated statuses of all assets diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 4c2c6917a0d2c..fe537d4140fc1 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -6,6 +6,7 @@ Awaitable, Callable, Dict, + Iterable, Literal, NamedTuple, Optional, @@ -17,6 +18,7 @@ from dagster import _check as check from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset +from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey, T_EntityKey from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.multi_dimensional_partitions import ( @@ -193,6 +195,34 @@ def get_empty_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: value = partitions_def.empty_subset() if partitions_def else False return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) + def get_entity_subset_from_asset_graph_subset( + self, asset_graph_subset: AssetGraphSubset, key: AssetKey + ) -> EntitySubset[AssetKey]: + check.invariant( + self.asset_graph.has(key), f"Asset graph does not contain {key.to_user_string()}" + ) + + serializable_subset = asset_graph_subset.get_asset_subset(key, self.asset_graph) + check.invariant( + serializable_subset.is_compatible_with_partitions_def( + self._get_partitions_def(key), + ), + f"Partitions definition for {key.to_user_string()} is not compatible with the passed in AssetGraphSubset", + ) + + return EntitySubset( + self, key=key, value=_ValidatedEntitySubsetValue(serializable_subset.value) + ) + + def iterate_asset_subsets( + self, asset_graph_subset: AssetGraphSubset + ) -> Iterable[EntitySubset[AssetKey]]: + """Returns an Iterable of EntitySubsets representing the subset of each asset that this + AssetGraphSubset contains. + """ + for asset_key in asset_graph_subset.asset_keys: + yield self.get_entity_subset_from_asset_graph_subset(asset_graph_subset, asset_key) + def get_subset_from_serializable_subset( self, serializable_subset: SerializableEntitySubset[T_EntityKey] ) -> Optional[EntitySubset[T_EntityKey]]: diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/bfs.py b/python_modules/dagster/dagster/_core/asset_graph_view/bfs.py new file mode 100644 index 0000000000000..b0717250c414e --- /dev/null +++ b/python_modules/dagster/dagster/_core/asset_graph_view/bfs.py @@ -0,0 +1,221 @@ +from functools import total_ordering +from heapq import heapify, heappop, heappush +from typing import TYPE_CHECKING, Callable, Iterable, NamedTuple, Optional, Sequence, Tuple + +import dagster._check as check +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset +from dagster._core.definitions.asset_graph_subset import AssetGraphSubset +from dagster._core.definitions.base_asset_graph import BaseAssetGraph +from dagster._core.definitions.time_window_partitions import get_time_partitions_def + +if TYPE_CHECKING: + from dagster._core.asset_graph_view.entity_subset import EntitySubset + from dagster._core.definitions.asset_key import AssetKey + + +class AssetGraphViewBfsFilterConditionResult(NamedTuple): + passed_asset_graph_subset: AssetGraphSubset + excluded_asset_graph_subsets_and_reasons: Sequence[Tuple[AssetGraphSubset, str]] + + +def bfs_filter_asset_graph_view( + asset_graph_view: AssetGraphView, + condition_fn: Callable[ + ["AssetGraphSubset", "AssetGraphSubset"], + AssetGraphViewBfsFilterConditionResult, + ], + initial_asset_graph_subset: "AssetGraphSubset", + include_full_execution_set: bool, +) -> Tuple[AssetGraphSubset, Sequence[Tuple[AssetGraphSubset, str]]]: + """Returns the subset of the graph that satisfy supplied criteria. + + - Are >= initial_asset_graph_subset + - Match the condition_fn + - Any of their ancestors >= initial_asset_graph_subset match the condition_fn + + Also returns a list of tuples, where each tuple is an asset subset that did not + satisfy the condition and the reason they were filtered out. + + The condition_fn takes in: + - a subset of the asset graph to evaluate the condition for. If include_full_execution_set=True, + the asset keys are all part of the same execution set (i.e. non-subsettable multi-asset). If + include_full_execution_set=False, only a single asset key will be in the subset. + + - An AssetGraphSubset for the portion of the graph that has so far been visited and passed + the condition. + + The condition_fn should return a object with an AssetGraphSubset indicating the portion + of the subset that passes the condition, and a list of (AssetGraphSubset, str) + tuples with more information about why certain subsets were excluded. + + Visits parents before children. + """ + initial_subsets = list(asset_graph_view.iterate_asset_subsets(initial_asset_graph_subset)) + + # invariant: we never consider an asset partition before considering its ancestors + queue = ToposortedPriorityQueue( + asset_graph_view, initial_subsets, include_full_execution_set=include_full_execution_set + ) + + visited_graph_subset = initial_asset_graph_subset + + result: AssetGraphSubset = AssetGraphSubset.empty() + failed_reasons: Sequence[Tuple[AssetGraphSubset, str]] = [] + + asset_graph = asset_graph_view.asset_graph + + while len(queue) > 0: + candidate_subset = queue.dequeue() + condition_result = condition_fn(candidate_subset, result) + + subset_that_meets_condition = condition_result.passed_asset_graph_subset + failed_reasons.extend(condition_result.excluded_asset_graph_subsets_and_reasons) + + result = result | subset_that_meets_condition + + for matching_entity_subset in asset_graph_view.iterate_asset_subsets( + subset_that_meets_condition + ): + # Add any child subsets that have not yet been visited to the queue + for child_key in asset_graph.get(matching_entity_subset.key).child_keys: + child_subset = asset_graph_view.compute_child_subset( + child_key, matching_entity_subset + ) + unvisited_child_subset = child_subset.compute_difference( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + visited_graph_subset, child_key + ) + ) + if not unvisited_child_subset.is_empty: + queue.enqueue(unvisited_child_subset) + visited_graph_subset = ( + visited_graph_subset + | AssetGraphSubset.from_entity_subsets([unvisited_child_subset]) + ) + + return result, failed_reasons + + +def sort_key_for_asset_key(asset_graph: BaseAssetGraph, asset_key: "AssetKey") -> float: + """Returns an integer sort key such that asset partition ranges are sorted in + the order in which they should be materialized. For assets without a time + window partition dimension, this is always 0. + Assets with a time window partition dimension will be sorted from newest to oldest, unless they + have a self-dependency, in which case they are sorted from oldest to newest. + """ + partitions_def = asset_graph.get(asset_key).partitions_def + time_partitions_def = get_time_partitions_def(partitions_def) + if time_partitions_def is None: + return 0 + + # A sort key such that time window partitions are sorted from oldest start time to newest start time + start_timestamp = time_partitions_def.start_ts.timestamp + + if asset_graph.get(asset_key).has_self_dependency: + # sort self dependencies from oldest to newest, as older partitions must exist before + # new ones can execute + return start_timestamp + else: + # sort non-self dependencies from newest to oldest, as newer partitions are more relevant + # than older ones + return -1 * start_timestamp + + +class ToposortedPriorityQueue: + """Queue that returns parents before their children.""" + + @total_ordering + class QueueItem(NamedTuple): + level: int + partition_sort_key: Optional[float] + asset_graph_subset: AssetGraphSubset + + def __eq__(self, other: object) -> bool: + if isinstance(other, ToposortedPriorityQueue.QueueItem): + return ( + self.level == other.level + and self.partition_sort_key == other.partition_sort_key + ) + return False + + def __lt__(self, other: object) -> bool: + if isinstance(other, ToposortedPriorityQueue.QueueItem): + return self.level < other.level or ( + self.level == other.level + and self.partition_sort_key is not None + and other.partition_sort_key is not None + and self.partition_sort_key < other.partition_sort_key + ) + raise TypeError() + + def __init__( + self, + asset_graph_view: AssetGraphView, + items: Iterable["EntitySubset"], + include_full_execution_set: bool, + ): + self._asset_graph_view = asset_graph_view + self._include_full_execution_set = include_full_execution_set + + self._toposort_level_by_asset_key = { + asset_key: i + for i, asset_keys in enumerate( + asset_graph_view.asset_graph.toposorted_asset_keys_by_level + ) + for asset_key in asset_keys + } + self._heap = [self._queue_item(entity_subset) for entity_subset in items] + heapify(self._heap) + + def enqueue(self, entity_subset: "EntitySubset") -> None: + heappush(self._heap, self._queue_item(entity_subset)) + + def dequeue(self) -> AssetGraphSubset: + # For multi-assets, will include all required multi-asset keys if + # include_full_execution_set is set to True, or just the passed in + # asset key if it was not. If there are multiple assets in the subset + # the subset will have the same partitions included for each asset. + heap_value = heappop(self._heap) + return heap_value.asset_graph_subset + + def _queue_item(self, entity_subset: "EntitySubset") -> "ToposortedPriorityQueue.QueueItem": + asset_key = entity_subset.key + + if self._include_full_execution_set: + execution_set_keys = self._asset_graph_view.asset_graph.get( + asset_key + ).execution_set_asset_keys + else: + execution_set_keys = {asset_key} + + level = max( + self._toposort_level_by_asset_key[asset_key] for asset_key in execution_set_keys + ) + + serializable_entity_subset = entity_subset.convert_to_serializable_subset() + + serializable_entity_subsets = [ + SerializableEntitySubset(key=asset_key, value=serializable_entity_subset.value) + for asset_key in execution_set_keys + ] + + entity_subsets = [ + check.not_none( + self._asset_graph_view.get_subset_from_serializable_subset( + serializable_entity_subset + ) + ) + for serializable_entity_subset in serializable_entity_subsets + ] + + asset_graph_subset = AssetGraphSubset.from_entity_subsets(entity_subsets) + + return ToposortedPriorityQueue.QueueItem( + level, + sort_key_for_asset_key(self._asset_graph_view.asset_graph, asset_key), + asset_graph_subset=asset_graph_subset, + ) + + def __len__(self) -> int: + return len(self._heap) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index dd184e4c42b0b..c746ddd45428f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -268,6 +268,10 @@ def from_asset_partition_set( non_partitioned_asset_keys=non_partitioned_asset_keys, ) + @classmethod + def empty(cls) -> "AssetGraphSubset": + return AssetGraphSubset({}, set()) + @classmethod def from_entity_subsets( cls, entity_subsets: Iterable[EntitySubset[AssetKey]] diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 9c936179d747f..f84306bbf7561 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -1,8 +1,7 @@ from abc import ABC, abstractmethod from collections import defaultdict, deque from datetime import datetime -from functools import cached_property, total_ordering -from heapq import heapify, heappop, heappush +from functools import cached_property from typing import ( TYPE_CHECKING, AbstractSet, @@ -11,13 +10,11 @@ Generic, Iterable, Iterator, - List, Mapping, NamedTuple, Optional, Sequence, Set, - Tuple, TypeVar, Union, cast, @@ -754,67 +751,6 @@ def bfs_filter_subsets( return result - def bfs_filter_asset_partitions( - self, - dynamic_partitions_store: DynamicPartitionsStore, - condition_fn: Callable[ - [Iterable[AssetKeyPartitionKey], AbstractSet[AssetKeyPartitionKey]], - Tuple[bool, str], - ], - initial_asset_partitions: Iterable[AssetKeyPartitionKey], - evaluation_time: datetime, - ) -> Tuple[ - AbstractSet[AssetKeyPartitionKey], - Sequence[Tuple[Iterable[AssetKeyPartitionKey], str]], - ]: - """Returns asset partitions within the graph that satisfy supplied criteria. - - - Are >= initial_asset_partitions - - Match the condition_fn - - Any of their ancestors >= initial_asset_partitions match the condition_fn - - Also returns a list of tuples, where each tuple is a candidated_unit (list of - AssetKeyPartitionKeys that must be materialized together - ie multi_asset) that do not - satisfy the criteria and the reason they were filtered out. - - The condition_fn should return a tuple of a boolean indicating whether the asset partition meets - the condition and a string explaining why it does not meet the condition, if applicable. - - Visits parents before children. - - When asset partitions are part of the same execution set (non-subsettable multi-asset), - they're provided all at once to the condition_fn. - """ - all_nodes = set(initial_asset_partitions) - - # invariant: we never consider an asset partition before considering its ancestors - queue = ToposortedPriorityQueue(self, all_nodes, include_full_execution_set=True) - - result: Set[AssetKeyPartitionKey] = set() - failed_reasons: List[Tuple[Iterable[AssetKeyPartitionKey], str]] = [] - - while len(queue) > 0: - candidates_unit = queue.dequeue() - - meets_condition, fail_reason = condition_fn(candidates_unit, result) - if meets_condition: - result.update(candidates_unit) - - for candidate in candidates_unit: - for child in self.get_children_partitions( - dynamic_partitions_store, - evaluation_time, - candidate.asset_key, - candidate.partition_key, - ): - if child not in all_nodes: - queue.enqueue(child) - all_nodes.add(child) - else: - failed_reasons.append((candidates_unit, fail_reason)) - - return result, failed_reasons - def split_entity_keys_by_repository( self, keys: AbstractSet[EntityKey] ) -> Sequence[AbstractSet[EntityKey]]: @@ -854,80 +790,3 @@ def sort_key_for_asset_partition( # sort non-self dependencies from newest to oldest, as newer partitions are more relevant # than older ones return -1 * partition_timestamp - - -class ToposortedPriorityQueue: - """Queue that returns parents before their children.""" - - @total_ordering - class QueueItem(NamedTuple): - level: int - partition_sort_key: Optional[float] - asset_partitions: Iterable[AssetKeyPartitionKey] - - def __eq__(self, other: object) -> bool: - if isinstance(other, ToposortedPriorityQueue.QueueItem): - return ( - self.level == other.level - and self.partition_sort_key == other.partition_sort_key - ) - return False - - def __lt__(self, other: object) -> bool: - if isinstance(other, ToposortedPriorityQueue.QueueItem): - return self.level < other.level or ( - self.level == other.level - and self.partition_sort_key is not None - and other.partition_sort_key is not None - and self.partition_sort_key < other.partition_sort_key - ) - raise TypeError() - - def __init__( - self, - asset_graph: BaseAssetGraph, - items: Iterable[AssetKeyPartitionKey], - include_full_execution_set: bool, - ): - self._asset_graph = asset_graph - self._include_full_execution_set = include_full_execution_set - - self._toposort_level_by_asset_key = { - asset_key: i - for i, asset_keys in enumerate(asset_graph.toposorted_asset_keys_by_level) - for asset_key in asset_keys - } - self._heap = [self._queue_item(asset_partition) for asset_partition in items] - heapify(self._heap) - - def enqueue(self, asset_partition: AssetKeyPartitionKey) -> None: - heappush(self._heap, self._queue_item(asset_partition)) - - def dequeue(self) -> Iterable[AssetKeyPartitionKey]: - # For multi-assets, will include all required multi-asset keys if - # include_full_execution_set is set to True, or a list of size 1 with just the passed in - # asset key if it was not. - return heappop(self._heap).asset_partitions - - def _queue_item( - self, asset_partition: AssetKeyPartitionKey - ) -> "ToposortedPriorityQueue.QueueItem": - asset_key = asset_partition.asset_key - - if self._include_full_execution_set: - execution_set_keys = self._asset_graph.get(asset_key).execution_set_asset_keys - else: - execution_set_keys = {asset_key} - - level = max( - self._toposort_level_by_asset_key[asset_key] for asset_key in execution_set_keys - ) - - return ToposortedPriorityQueue.QueueItem( - level, - sort_key_for_asset_partition(self._asset_graph, asset_partition), - [AssetKeyPartitionKey(ak, asset_partition.partition_key) for ak in execution_set_keys], - ) - - def __len__(self) -> int: - return len(self._heap) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index cf882c2e85aa7..3d9a20ddac6c4 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -2,13 +2,11 @@ import logging import os import time -from collections import defaultdict from datetime import datetime from enum import Enum from typing import ( TYPE_CHECKING, AbstractSet, - Dict, Iterable, List, Mapping, @@ -22,6 +20,16 @@ ) import dagster._check as check +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext +from dagster._core.asset_graph_view.bfs import ( + AssetGraphViewBfsFilterConditionResult, + bfs_filter_asset_graph_view, +) +from dagster._core.asset_graph_view.entity_subset import EntitySubset +from dagster._core.asset_graph_view.serializable_entity_subset import ( + EntitySubsetValue, + SerializableEntitySubset, +) from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_selection import KeysAssetSelection from dagster._core.definitions.automation_tick_evaluation_context import ( @@ -1009,13 +1017,18 @@ def execute_asset_backfill_iteration( check.failed("Backfill must be an asset backfill") backfill_start_datetime = datetime_from_timestamp(backfill.backfill_timestamp) - instance_queryer = CachingInstanceQueryer( + + asset_graph_view = AssetGraphView( + temporal_context=TemporalContext( + effective_dt=backfill_start_datetime, + last_event_id=None, + ), instance=instance, asset_graph=asset_graph, - loading_context=workspace_context, - evaluation_time=backfill_start_datetime, ) + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() + previous_asset_backfill_data = _check_validity_and_deserialize_asset_backfill_data( workspace_context, backfill, asset_graph, instance_queryer, logger ) @@ -1047,8 +1060,7 @@ def execute_asset_backfill_iteration( for result in execute_asset_backfill_iteration_inner( backfill_id=backfill.backfill_id, asset_backfill_data=previous_asset_backfill_data, - instance_queryer=instance_queryer, - asset_graph=asset_graph, + asset_graph_view=asset_graph_view, backfill_start_timestamp=backfill.backfill_timestamp, logger=logger, ): @@ -1245,6 +1257,7 @@ def get_canceling_asset_backfill_iteration_data( " AssetGraphSubset object" ) + # TODO: Make this use subsets throughout instead of a sequence of AssetKeyPartitionKeys failed_subset = AssetGraphSubset.from_asset_partition_set( set( _get_failed_asset_partitions( @@ -1325,31 +1338,66 @@ def get_asset_backfill_iteration_materialized_partitions( yield updated_materialized_subset +def _get_subset_in_target_subset( + asset_graph_view: AssetGraphView, + candidate_asset_graph_subset: AssetGraphSubset, + target_subset: AssetGraphSubset, +) -> "AssetGraphSubset": + candidate_entity_subsets = list( + asset_graph_view.iterate_asset_subsets(candidate_asset_graph_subset) + ) + + assert ( + len(candidate_entity_subsets) == 1 + ), "Since include_execution_set=False, there should be exactly one candidate entity subset" + + candidate_entity_subset = next(iter(candidate_entity_subsets)) + + subset_in_target_subset: EntitySubset[AssetKey] = candidate_entity_subset.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + target_subset, candidate_entity_subset.key + ) + ) + + return AssetGraphSubset.from_entity_subsets([subset_in_target_subset]) + + def _get_failed_and_downstream_asset_partitions( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, - instance_queryer: CachingInstanceQueryer, - backfill_start_timestamp: float, + asset_graph_view: AssetGraphView, materialized_subset: AssetGraphSubset, ) -> AssetGraphSubset: - failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set( - asset_graph.bfs_filter_asset_partitions( - instance_queryer, - lambda asset_partitions, _: ( - any( - asset_partition in asset_backfill_data.target_subset - for asset_partition in asset_partitions - ), - "", - ), + asset_graph = cast(RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph) + + failed_asset_graph_subset = AssetGraphSubset.from_asset_partition_set( + set( _get_failed_asset_partitions( - instance_queryer, backfill_id, asset_graph, materialized_subset - ), - evaluation_time=datetime_from_timestamp(backfill_start_timestamp), - )[0], + asset_graph_view.get_inner_queryer_for_back_compat(), + backfill_id, + asset_graph, + materialized_subset, + ) + ), asset_graph, ) + + failed_and_downstream_subset = bfs_filter_asset_graph_view( + asset_graph_view, + lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=_get_subset_in_target_subset( + asset_graph_view, + candidate_asset_graph_subset, + asset_backfill_data.target_subset, + ), + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=failed_asset_graph_subset, + include_full_execution_set=False, + )[0] + return failed_and_downstream_subset @@ -1402,8 +1450,7 @@ def _asset_graph_subset_to_str( def execute_asset_backfill_iteration_inner( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, - instance_queryer: CachingInstanceQueryer, + asset_graph_view: AssetGraphView, backfill_start_timestamp: float, logger: logging.Logger, ) -> Iterable[Optional[AssetBackfillIterationResult]]: @@ -1415,6 +1462,11 @@ def execute_asset_backfill_iteration_inner( This is a generator so that we can return control to the daemon and let it heartbeat during expensive operations. """ + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() + asset_graph: RemoteWorkspaceAssetGraph = cast( + RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph + ) + initial_candidates: Set[AssetKeyPartitionKey] = set() request_roots = not asset_backfill_data.requested_runs_for_target_roots if request_roots: @@ -1478,54 +1530,54 @@ def execute_asset_backfill_iteration_inner( failed_and_downstream_subset = _get_failed_and_downstream_asset_partitions( backfill_id, asset_backfill_data, - asset_graph, - instance_queryer, - backfill_start_timestamp, + asset_graph_view, updated_materialized_subset, ) yield None - backfill_start_datetime = datetime_from_timestamp(backfill_start_timestamp) + initial_asset_graph_subset = AssetGraphSubset.from_asset_partition_set( + initial_candidates, asset_graph + ) - asset_partitions_to_request, not_requested_and_reasons = ( - asset_graph.bfs_filter_asset_partitions( - instance_queryer, - lambda unit, visited: should_backfill_atomic_asset_partitions_unit( - candidates_unit=unit, - asset_partitions_to_request=visited, - asset_graph=asset_graph, - materialized_subset=updated_materialized_subset, - requested_subset=asset_backfill_data.requested_subset, - target_subset=asset_backfill_data.target_subset, - failed_and_downstream_subset=failed_and_downstream_subset, - dynamic_partitions_store=instance_queryer, - current_time=backfill_start_datetime, - ), - initial_asset_partitions=initial_candidates, - evaluation_time=backfill_start_datetime, - ) + asset_subset_to_request, not_requested_and_reasons = bfs_filter_asset_graph_view( + asset_graph_view, + lambda candidate_asset_graph_subset, + visited: _should_backfill_atomic_asset_graph_subset_unit( + asset_graph_view=asset_graph_view, + candidate_asset_graph_subset_unit=candidate_asset_graph_subset, + asset_graph_subset_matched_so_far=visited, + materialized_subset=updated_materialized_subset, + requested_subset=asset_backfill_data.requested_subset, + target_subset=asset_backfill_data.target_subset, + failed_and_downstream_subset=failed_and_downstream_subset, + ), + initial_asset_graph_subset=initial_asset_graph_subset, + include_full_execution_set=True, ) logger.info( - f"Asset partitions to request:\n {_asset_graph_subset_to_str(AssetGraphSubset.from_asset_partition_set(asset_partitions_to_request, asset_graph), asset_graph)}" - if asset_partitions_to_request + f"Asset partitions to request:\n {_asset_graph_subset_to_str(asset_subset_to_request, asset_graph)}" + if asset_subset_to_request else "No asset partitions to request." ) + + asset_partitions_to_request = set(asset_subset_to_request.iterate_asset_partitions()) + if len(not_requested_and_reasons) > 0: - def _format_keys(keys: Iterable[AssetKeyPartitionKey]): + def _format_graph_subset(asset_graph_subset: AssetGraphSubset): return ", ".join( - f"({key.asset_key.to_user_string()}, {key.partition_key})" - if key.partition_key - else key.asset_key.to_user_string() - for key in keys + f"({entity_subset.key.to_user_string()}, {entity_subset.value}" + if isinstance(entity_subset.value, PartitionsSubset) + else entity_subset.key.to_user_string() + for entity_subset in asset_graph_subset.iterate_asset_subsets(asset_graph) ) not_requested_str = "\n".join( [ - f"[{_format_keys(keys)}] - Reason: {reason}." - for keys, reason in not_requested_and_reasons + f"[{_format_graph_subset(asset_graph_subset)}] - Reason: {reason}." + for asset_graph_subset, reason in not_requested_and_reasons ] ) logger.info( @@ -1561,27 +1613,165 @@ def _format_keys(keys: Iterable[AssetKeyPartitionKey]): ) -def can_run_with_parent( - parent: AssetKeyPartitionKey, - candidate: AssetKeyPartitionKey, - candidates_unit: Iterable[AssetKeyPartitionKey], - asset_graph: RemoteWorkspaceAssetGraph, +def _should_backfill_atomic_asset_subset_unit( + asset_graph_view: AssetGraphView, + entity_subset_to_filter: EntitySubset[AssetKey], + candidate_asset_graph_subset_unit: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, target_subset: AssetGraphSubset, - asset_partitions_to_request_map: Mapping[AssetKey, AbstractSet[Optional[str]]], -) -> Tuple[bool, str]: - """Returns if a given candidate can be materialized in the same run as a given parent on - this tick, and the reason it cannot be materialized, if applicable. - """ - parent_target_subset = target_subset.get_asset_subset(parent.asset_key, asset_graph) - candidate_target_subset = target_subset.get_asset_subset(candidate.asset_key, asset_graph) + requested_subset: AssetGraphSubset, + materialized_subset: AssetGraphSubset, + failed_and_downstream_subset: AssetGraphSubset, +) -> Tuple[SerializableEntitySubset[AssetKey], Iterable[Tuple[EntitySubsetValue, str]]]: + failure_subsets_with_reasons: List[Tuple[EntitySubsetValue, str]] = [] + asset_graph = asset_graph_view.asset_graph + asset_key = entity_subset_to_filter.key + + missing_in_target_partitions = entity_subset_to_filter.compute_difference( + asset_graph_view.get_entity_subset_from_asset_graph_subset(target_subset, asset_key) + ) + if not missing_in_target_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + missing_in_target_partitions.get_internal_value(), + f"{missing_in_target_partitions} not targeted by backfill", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + missing_in_target_partitions + ) + + failed_and_downstream_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + failed_and_downstream_subset, asset_key + ) + ) + if not failed_and_downstream_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + failed_and_downstream_partitions.get_internal_value(), + f"{failed_and_downstream_partitions} has failed or is downstream of a failed asset", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + failed_and_downstream_partitions + ) + + materialized_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset(materialized_subset, asset_key) + ) + if not materialized_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + materialized_partitions.get_internal_value(), + f"{materialized_partitions} already materialized by backfill", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + materialized_partitions + ) + + requested_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset(requested_subset, asset_key) + ) + + if not requested_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + requested_partitions.get_internal_value(), + f"{requested_partitions} already requested by backfill", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference(requested_partitions) + + for parent_key in asset_graph.get(asset_key).parent_keys: + if entity_subset_to_filter.is_empty: + break + + parent_subset, required_but_nonexistent_subset = ( + asset_graph_view.compute_parent_subset_and_required_but_nonexistent_subset( + parent_key, + entity_subset_to_filter, + ) + ) + + if not required_but_nonexistent_subset.is_empty: + raise DagsterInvariantViolationError( + f"Asset partition subset {entity_subset_to_filter}" + f" depends on invalid partitions {required_but_nonexistent_subset}" + ) + + # Children with parents that are targeted but not materialized are eligible + # to be filtered out if the parent has not run yet + targeted_but_not_materialized_parent_subset: EntitySubset[AssetKey] = ( + parent_subset.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + target_subset, parent_key + ) + ) + ).compute_difference( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + materialized_subset, parent_key + ) + ) + + possibly_waiting_for_parent_subset = ( + asset_graph_view.compute_child_subset( + asset_key, targeted_but_not_materialized_parent_subset + ) + ).compute_intersection(entity_subset_to_filter) + + not_waiting_for_parent_subset = entity_subset_to_filter.compute_difference( + possibly_waiting_for_parent_subset + ) + + if not possibly_waiting_for_parent_subset.is_empty: + can_run_with_parent_subset, parent_failure_subsets_with_reasons = ( + get_can_run_with_parent_subsets( + targeted_but_not_materialized_parent_subset, + possibly_waiting_for_parent_subset, + asset_graph_view, + target_subset, + asset_graph_subset_matched_so_far, + candidate_asset_graph_subset_unit, + ) + ) + if parent_failure_subsets_with_reasons: + failure_subsets_with_reasons.extend(parent_failure_subsets_with_reasons) + + entity_subset_to_filter = not_waiting_for_parent_subset.compute_union( + can_run_with_parent_subset + ) + + return ( + entity_subset_to_filter.convert_to_serializable_subset(), + failure_subsets_with_reasons, + ) + + +def get_can_run_with_parent_subsets( + parent_subset: EntitySubset[AssetKey], + entity_subset_to_filter: EntitySubset[AssetKey], + asset_graph_view: AssetGraphView, + target_subset: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, + candidate_asset_graph_subset_unit: AssetGraphSubset, +) -> Tuple[EntitySubset[AssetKey], Iterable[Tuple[EntitySubsetValue, str]]]: + candidate_asset_key = entity_subset_to_filter.key + parent_asset_key = parent_subset.key + + assert isinstance(asset_graph_view.asset_graph, RemoteWorkspaceAssetGraph) + asset_graph: RemoteWorkspaceAssetGraph = asset_graph_view.asset_graph + + parent_node = asset_graph.get(parent_asset_key) + candidate_node = asset_graph.get(candidate_asset_key) partition_mapping = asset_graph.get_partition_mapping( - candidate.asset_key, parent_asset_key=parent.asset_key + candidate_asset_key, parent_asset_key=parent_asset_key ) - is_self_dependency = parent.asset_key == candidate.asset_key + # First filter out cases where even if the parent was requested this iteration, it wouldn't + # matter, because the parent and child can't execute in the same run - parent_node = asset_graph.get(parent.asset_key) - candidate_node = asset_graph.get(candidate.asset_key) # checks if there is a simple partition mapping between the parent and the child has_identity_partition_mapping = ( # both unpartitioned @@ -1598,31 +1788,53 @@ def can_run_with_parent( ) if parent_node.backfill_policy != candidate_node.backfill_policy: return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different backfill policies so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different backfill policies so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key} is materialized.", + ) + ], ) if ( parent_node.resolve_to_singular_repo_scoped_node().repository_handle != candidate_node.resolve_to_singular_repo_scoped_node().repository_handle ): return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} are in different code locations so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} are in different code locations so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + ) + ], ) + if parent_node.partitions_def != candidate_node.partitions_def: return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different partitions definitions so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different partitions definitions so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + ) + ], ) - if ( - parent.partition_key not in asset_partitions_to_request_map[parent.asset_key] - and parent not in candidates_unit - ): - return ( - False, - f"parent {parent.asset_key.to_user_string()} with partition key {parent.partition_key} is not requested in this iteration", + + parent_target_subset = target_subset.get_asset_subset(parent_asset_key, asset_graph) + candidate_target_subset = target_subset.get_asset_subset(candidate_asset_key, asset_graph) + + parent_being_requested_this_tick_subset = ( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + asset_graph_subset_matched_so_far, parent_asset_key ) - if ( + ) + + num_parent_partitions_being_requested_this_tick = parent_being_requested_this_tick_subset.size + + is_self_dependency = parent_asset_key == candidate_asset_key + + if not ( # if there is a simple mapping between the parent and the child, then # with the parent has_identity_partition_mapping @@ -1638,17 +1850,15 @@ def can_run_with_parent( parent_node.backfill_policy.max_partitions_per_run is None # a single run can materialize all requested parent partitions or parent_node.backfill_policy.max_partitions_per_run - > len(asset_partitions_to_request_map[parent.asset_key]) + > num_parent_partitions_being_requested_this_tick ) - # all targeted parents are being requested this tick, or its a self dependency + # all targeted parents are being requested this tick, or its a self depdendancy and ( - len(asset_partitions_to_request_map[parent.asset_key]) == parent_target_subset.size + num_parent_partitions_being_requested_this_tick == parent_target_subset.size or is_self_dependency ) ) ): - return True, "" - else: failed_reason = ( f"partition mapping between {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} is not simple and " f"{parent_node.key.to_user_string()} does not meet requirements of: targeting the same partitions as " @@ -1656,80 +1866,137 @@ def can_run_with_parent( "a backfill policy, and that backfill policy size limit is not exceeded by adding " f"{candidate_node.key.to_user_string()} to the run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized." ) - return False, failed_reason + return ( + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + failed_reason, + ) + ], + ) + # We now know that the parent and child are eligible to happen in the same run, so pass + # any children of parents that actually are being requested in this iteration (by + # being in either the asset_graph_subset_matched_so_far subset, or more rarely in + # candidate_asset_graph_subset_unit if they are part of a non-subsettable multi-asset) + failure_subsets_with_reasons = [] -def should_backfill_atomic_asset_partitions_unit( - asset_graph: RemoteWorkspaceAssetGraph, - candidates_unit: Iterable[AssetKeyPartitionKey], - asset_partitions_to_request: AbstractSet[AssetKeyPartitionKey], + candidate_subset = asset_graph_view.get_entity_subset_from_asset_graph_subset( + candidate_asset_graph_subset_unit, parent_asset_key + ) + + not_yet_requested_parent_subset = parent_subset.compute_difference( + parent_being_requested_this_tick_subset.compute_union(candidate_subset) + ) + + children_of_not_yet_requested_parents = asset_graph_view.compute_child_subset( + candidate_asset_key, not_yet_requested_parent_subset + ).compute_intersection(entity_subset_to_filter) + + if not children_of_not_yet_requested_parents.is_empty: + failure_subsets_with_reasons.append( + ( + children_of_not_yet_requested_parents.get_internal_value(), + f"Parent subset {not_yet_requested_parent_subset} is not requested in this iteration", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + children_of_not_yet_requested_parents + ) + + return ( + entity_subset_to_filter, + failure_subsets_with_reasons, + ) + + +def _should_backfill_atomic_asset_graph_subset_unit( + asset_graph_view: AssetGraphView, + candidate_asset_graph_subset_unit: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, target_subset: AssetGraphSubset, requested_subset: AssetGraphSubset, materialized_subset: AssetGraphSubset, failed_and_downstream_subset: AssetGraphSubset, - dynamic_partitions_store: DynamicPartitionsStore, - current_time: datetime, -) -> Tuple[bool, str]: - """Args: - candidates_unit: A set of asset partitions that must all be materialized if any is - materialized. - - returns if the candidates_unit can be materialized in this tick of the backfill, and the reason - it cannot, if applicable - """ - for candidate in candidates_unit: - candidate_asset_partition_string = f"Asset {candidate.asset_key.to_user_string()} {f'with partition {candidate.partition_key}' if candidate.partition_key is not None else ''}" - if candidate not in target_subset: - return ( - False, - f"{candidate_asset_partition_string} is not targeted by backfill", - ) - elif candidate in failed_and_downstream_subset: - return ( - False, - f"{candidate_asset_partition_string} has failed or is downstream of a failed asset", - ) - elif candidate in materialized_subset: - return ( - False, - f"{candidate_asset_partition_string} was already materialized by backfill", - ) - elif candidate in requested_subset: - return ( - False, - f"{candidate_asset_partition_string} was already requested by backfill", - ) +) -> AssetGraphViewBfsFilterConditionResult: + failure_subset_values_with_reasons: List[Tuple[EntitySubsetValue, str]] = [] - parent_partitions_result = asset_graph.get_parents_partitions( - dynamic_partitions_store, current_time, *candidate + candidate_entity_subsets = list( + asset_graph_view.iterate_asset_subsets(candidate_asset_graph_subset_unit) + ) + + # this value is the same for all passed in asset keys since they are always part of the same + # execution set + passed_subset_value = candidate_entity_subsets[0].get_internal_value() + + candidate_asset_keys = [ + candidate_entity_subset.key for candidate_entity_subset in candidate_entity_subsets + ] + + for candidate_asset_key in candidate_asset_keys: + # filter down the set of matching values for each asset key + passed_serializable_entity_subset = SerializableEntitySubset( + candidate_asset_key, + passed_subset_value, + ) + entity_subset_to_filter = check.not_none( + asset_graph_view.get_subset_from_serializable_subset(passed_serializable_entity_subset) ) - if parent_partitions_result.required_but_nonexistent_parents_partitions: - raise DagsterInvariantViolationError( - f"Asset partition {candidate}" - " depends on invalid partition keys" - f" {parent_partitions_result.required_but_nonexistent_parents_partitions}" - ) - asset_partitions_to_request_map: Dict[AssetKey, Set[Optional[str]]] = defaultdict(set) - for asset_partition in asset_partitions_to_request: - asset_partitions_to_request_map[asset_partition.asset_key].add( - asset_partition.partition_key + if entity_subset_to_filter.is_empty: + break + + entity_subset_to_filter, new_failure_subset_values_with_reasons = ( + _should_backfill_atomic_asset_subset_unit( + asset_graph_view, + entity_subset_to_filter=entity_subset_to_filter, + candidate_asset_graph_subset_unit=candidate_asset_graph_subset_unit, + asset_graph_subset_matched_so_far=asset_graph_subset_matched_so_far, + target_subset=target_subset, + requested_subset=requested_subset, + materialized_subset=materialized_subset, + failed_and_downstream_subset=failed_and_downstream_subset, ) + ) + passed_subset_value = entity_subset_to_filter.value + failure_subset_values_with_reasons.extend(new_failure_subset_values_with_reasons) + + passed_entity_subsets = [] + for candidate_entity_subset in candidate_entity_subsets: + passed_entity_subsets.append( + check.not_none( + asset_graph_view.get_subset_from_serializable_subset( + SerializableEntitySubset(candidate_entity_subset.key, passed_subset_value) + ) + ) + ) - for parent in parent_partitions_result.parent_partitions: - if parent in target_subset and parent not in materialized_subset: - can_run, failed_reason = can_run_with_parent( - parent, - candidate, - candidates_unit, - asset_graph, - target_subset, - asset_partitions_to_request_map, + failure_asset_graph_subsets_with_reasons = [] + # Any failure partition values apply to all candidate asset keys, so construct a subset + # graph with that partition subset value for each key + for failure_subset_value, reason in failure_subset_values_with_reasons: + failure_entity_subsets = [ + check.not_none( + asset_graph_view.get_subset_from_serializable_subset( + SerializableEntitySubset(candidate_entity_subset.key, failure_subset_value) ) - if not can_run: - return False, failed_reason + ) + for candidate_entity_subset in candidate_entity_subsets + ] + failure_asset_graph_subsets_with_reasons.append( + ( + AssetGraphSubset.from_entity_subsets( + entity_subsets=failure_entity_subsets, + ), + reason, + ) + ) - return True, "" + return AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=AssetGraphSubset.from_entity_subsets(passed_entity_subsets), + excluded_asset_graph_subsets_and_reasons=failure_asset_graph_subsets_with_reasons, + ) def _get_failed_asset_partitions( diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_bfs.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_bfs.py new file mode 100644 index 0000000000000..75943325517cb --- /dev/null +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_bfs.py @@ -0,0 +1,253 @@ +from dagster import AssetKey, AssetSpec, Definitions, asset, multi_asset +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.asset_graph_view.bfs import ( + AssetGraphViewBfsFilterConditionResult, + bfs_filter_asset_graph_view, +) +from dagster._core.definitions.asset_graph_subset import AssetGraphSubset +from dagster._core.definitions.events import AssetKeyPartitionKey +from dagster._core.definitions.partition_mapping import IdentityPartitionMapping + + +def _get_subset_with_keys(graph_view, keys): + return AssetGraphSubset.from_entity_subsets( + [graph_view.get_full_subset(key=key) for key in keys] + ) + + +def test_bfs_filter_empty_graph(): + """Test BFS filter on empty graph returns empty result.""" + graph_view = AssetGraphView.for_test(Definitions()) + initial_subset = AssetGraphSubset.empty() + + def condition_fn(subset, _visited): + return AssetGraphViewBfsFilterConditionResult(subset, []) + + result, failed = bfs_filter_asset_graph_view( + graph_view, condition_fn, initial_subset, include_full_execution_set=False + ) + + assert result == AssetGraphSubset.empty() + assert failed == [] + + +def test_bfs_filter_dependency_chain(): + """Test BFS filter on linear dependency chain.""" + + @asset + def asset1(): + return 1 + + @asset + def asset2(asset1): + return asset1 + 1 + + @asset + def asset3(asset2): + return asset2 + 1 + + @asset + def asset4(asset3): + return asset3 + 1 + + graph_view = AssetGraphView.for_test(Definitions(assets=[asset1, asset2, asset3, asset4])) + initial_subset = AssetGraphSubset.from_entity_subsets( + [graph_view.get_full_subset(key=asset1.key)] + ) + + def condition_fn(subset, visited): + # Only allow asset1 and asset3 + if AssetKey("asset3") in subset.asset_keys: + assert visited == AssetGraphSubset.from_entity_subsets( + [ + graph_view.get_full_subset(key=asset1.key), + graph_view.get_full_subset(key=asset2.key), + ] + ) + + return AssetGraphViewBfsFilterConditionResult( + AssetGraphSubset.empty(), + [(subset, "asset3 not allowed")], + ) + return AssetGraphViewBfsFilterConditionResult(subset, []) + + result, failed = bfs_filter_asset_graph_view( + graph_view, condition_fn, initial_subset, include_full_execution_set=False + ) + + assert result == AssetGraphSubset.from_entity_subsets( + [graph_view.get_full_subset(key=asset1.key), graph_view.get_full_subset(key=asset2.key)] + ) + assert len(failed) == 1 + assert failed[0][0] == AssetGraphSubset.from_entity_subsets( + [graph_view.get_full_subset(key=asset3.key)] + ) + assert failed[0][1] == "asset3 not allowed" + + +def test_bfs_filter_multi_asset(): + """Test BFS filter with multi-asset.""" + + @asset + def a(): + return 1 + + @asset + def b(): + return 2 + + @multi_asset( + specs=[AssetSpec("c", deps=["a"]), AssetSpec("d", deps=["b"])] + ) # d is level 1, e is level 3, gets assigned 3 + def my_multi_asset(): + pass + + @asset + def e(d): + pass + + graph_view = AssetGraphView.for_test(Definitions(assets=[a, b, my_multi_asset, e])) + initial_subset = _get_subset_with_keys(graph_view, [a.key]) + + def condition_fn(subset, _visited): + return AssetGraphViewBfsFilterConditionResult(subset, []) + + result_without_full_execution_set, _failed = bfs_filter_asset_graph_view( + graph_view, condition_fn, initial_subset, include_full_execution_set=False + ) + + assert result_without_full_execution_set == _get_subset_with_keys( + graph_view, [a.key, AssetKey(["c"])] + ) + + result_with_full_execution_set, _failed = bfs_filter_asset_graph_view( + graph_view, condition_fn, initial_subset, include_full_execution_set=True + ) + + assert result_with_full_execution_set == _get_subset_with_keys( + graph_view, [a.key, AssetKey(["c"]), AssetKey(["d"]), e.key] + ) + + +def test_bfs_filter_diamond(): + """Test BFS filter with a diamond-shaped graph to ensure bottom node is visited once.""" + + @asset + def top(): + return 1 + + @asset + def left(top): + return top + 1 + + @asset + def right(top): + return top + 2 + + @asset + def bottom(left, right): + return left + right + + graph_view = AssetGraphView.for_test(Definitions(assets=[top, left, right, bottom])) + initial_subset = _get_subset_with_keys(graph_view, [AssetKey("top")]) + + visit_count = {} + + def condition_fn(subset, _visited): + for key in subset.asset_keys: + visit_count[key] = visit_count.get(key, 0) + 1 + return AssetGraphViewBfsFilterConditionResult(subset, []) + + result, failed = bfs_filter_asset_graph_view( + graph_view, condition_fn, initial_subset, include_full_execution_set=True + ) + + # Each node should be visited exactly once + assert visit_count == { + AssetKey("bottom"): 1, + AssetKey("left"): 1, + AssetKey("right"): 1, + AssetKey("top"): 1, + } + assert result == _get_subset_with_keys( + graph_view, [AssetKey("top"), AssetKey("left"), AssetKey("right"), AssetKey("bottom")] + ) + assert failed == [] + + +from dagster import AssetIn, StaticPartitionsDefinition + + +def test_bfs_filter_with_partitions(): + """Test BFS filter with partitioned assets where condition filters some partitions.""" + + @asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c"])) + def upstream(): + return 1 + + @asset( + partitions_def=StaticPartitionsDefinition(["a", "b", "c"]), + ins={"upstream": AssetIn(partition_mapping=IdentityPartitionMapping())}, + ) + def downstream(upstream): + return upstream + 1 + + graph_view = AssetGraphView.for_test(Definitions(assets=[upstream, downstream])) + initial_subset = AssetGraphSubset.from_asset_partition_set( + { + AssetKeyPartitionKey(AssetKey(["upstream"]), "a"), + AssetKeyPartitionKey(AssetKey(["upstream"]), "b"), + }, + graph_view.asset_graph, + ) + + def condition_fn(subset, _visited): + # Only allow partition "a" for upstream and corresponding mapped partition "x" for downstream + included = set() + excluded = set() + + for entity_subset in graph_view.iterate_asset_subsets(subset): + for asset_partition in entity_subset.expensively_compute_asset_partitions(): + if asset_partition.partition_key == "b": + excluded.add(asset_partition) + else: + included.add(asset_partition) + + return AssetGraphViewBfsFilterConditionResult( + AssetGraphSubset.from_asset_partition_set(included, graph_view.asset_graph), + ( + [ + ( + AssetGraphSubset.from_asset_partition_set(excluded, graph_view.asset_graph), + "b is not welcome here", + ) + ] + if excluded + else [] + ), + ) + + result, failed = bfs_filter_asset_graph_view( + graph_view, condition_fn, initial_subset, include_full_execution_set=True + ) + + # Should only include partition "a" for upstream and "x" for downstream + assert result == AssetGraphSubset.from_asset_partition_set( + { + AssetKeyPartitionKey(AssetKey(["upstream"]), "a"), + AssetKeyPartitionKey(AssetKey(["downstream"]), "a"), + }, + graph_view.asset_graph, + ) + + assert failed == [ + ( + AssetGraphSubset.from_asset_partition_set( + { + AssetKeyPartitionKey(AssetKey(["upstream"]), "b"), + }, + graph_view.asset_graph, + ), + "b is not welcome here", + ), + ] diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 765204ab65ebe..9031226b543bc 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -44,6 +44,10 @@ multi_asset, ) from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext +from dagster._core.asset_graph_view.bfs import ( + AssetGraphViewBfsFilterConditionResult, + bfs_filter_asset_graph_view, +) from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.base_asset_graph import BaseAssetGraph from dagster._core.definitions.decorators.repository_decorator import repository @@ -248,15 +252,25 @@ def test_from_asset_partitions_target_subset( ) -def _get_instance_queryer( - instance: DagsterInstance, asset_graph: BaseAssetGraph, evaluation_time: datetime.datetime -) -> CachingInstanceQueryer: +def _get_asset_graph_view( + instance: DagsterInstance, + asset_graph: BaseAssetGraph, + evaluation_time: Optional[datetime.datetime] = None, +) -> AssetGraphView: return AssetGraphView( temporal_context=TemporalContext( effective_dt=evaluation_time or get_current_datetime(), last_event_id=None ), instance=instance, asset_graph=asset_graph, + ) + + +def _get_instance_queryer( + instance: DagsterInstance, asset_graph: BaseAssetGraph, evaluation_time: datetime.datetime +) -> CachingInstanceQueryer: + return _get_asset_graph_view( + instance, asset_graph, evaluation_time ).get_inner_queryer_for_back_compat() @@ -522,11 +536,21 @@ def make_random_subset( if i % 2 == 0: root_asset_partitions.add(AssetKeyPartitionKey(root_asset_key, None)) - target_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, lambda _a, _b: (True, ""), root_asset_partitions, evaluation_time=evaluation_time - ) + asset_graph_view = _get_asset_graph_view(instance, asset_graph, evaluation_time=evaluation_time) - return AssetGraphSubset.from_asset_partition_set(target_asset_partitions, asset_graph) + return bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + root_asset_partitions, asset_graph + ), + include_full_execution_set=True, + )[0] def make_subset_from_partition_keys( @@ -545,11 +569,21 @@ def make_subset_from_partition_keys( else: root_asset_partitions.add(AssetKeyPartitionKey(root_asset_key, None)) - target_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, lambda _a, _b: (True, ""), root_asset_partitions, evaluation_time=evaluation_time - ) + asset_graph_view = _get_asset_graph_view(instance, asset_graph, evaluation_time=evaluation_time) - return AssetGraphSubset.from_asset_partition_set(target_asset_partitions, asset_graph) + return bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + root_asset_partitions, asset_graph + ), + include_full_execution_set=True, + )[0] def get_asset_graph( @@ -590,10 +624,9 @@ def execute_asset_backfill_iteration_consume_generator( for result in execute_asset_backfill_iteration_inner( backfill_id=backfill_id, asset_backfill_data=asset_backfill_data, - instance_queryer=_get_instance_queryer( + asset_graph_view=_get_asset_graph_view( instance, asset_graph, asset_backfill_data.backfill_start_datetime ), - asset_graph=asset_graph, backfill_start_timestamp=asset_backfill_data.backfill_start_timestamp, logger=logging.getLogger("fake_logger"), ): @@ -618,11 +651,22 @@ def run_backfill_to_completion( # assert each asset partition only targeted once requested_asset_partitions: Set[AssetKeyPartitionKey] = set() - fail_and_downstream_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, - lambda _a, _b: (True, ""), - fail_asset_partitions, - evaluation_time=backfill_data.backfill_start_datetime, + asset_graph_view = _get_asset_graph_view(instance, asset_graph) + + fail_and_downstream_asset_graph_subset, _ = bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + set(fail_asset_partitions), asset_graph + ), + include_full_execution_set=True, + ) + + fail_and_downstream_asset_partitions = set( + fail_and_downstream_asset_graph_subset.iterate_asset_partitions() ) while not backfill_is_complete( @@ -1112,7 +1156,7 @@ def may_asset( ) instance = DagsterInstance.ephemeral() - with pytest.raises(DagsterInvariantViolationError, match="depends on invalid partition keys"): + with pytest.raises(DagsterInvariantViolationError, match="depends on invalid partitions"): run_backfill_to_completion(asset_graph, assets_by_repo_name, backfill_data, [], instance)