diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 5ae64fce18d1f..663e0e9a1d29d 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -10,9 +10,9 @@ asset, define_asset_job, ) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.partition_key_range import PartitionKeyRange -from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.execution.asset_backfill import ( AssetBackfillIterationResult, execute_asset_backfill_iteration, @@ -34,7 +34,6 @@ from dagster._core.utils import make_new_backfill_id from dagster._seven import get_system_temp_directory from dagster._utils import safe_tempfile_path -from dagster._utils.caching_instance_queryer import CachingInstanceQueryer from dagster_graphql.client.query import ( LAUNCH_PARTITION_BACKFILL_MUTATION, LAUNCH_PIPELINE_EXECUTION_MUTATION, @@ -241,27 +240,25 @@ def _get_run_stats(partition_statuses): } -def _execute_asset_backfill_iteration_no_side_effects( - graphql_context, backfill_id: str, asset_graph: RemoteWorkspaceAssetGraph -) -> None: +def _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id: str) -> None: """Executes an asset backfill iteration and updates the serialized asset backfill data. However, does not execute side effects i.e. launching runs. """ backfill = graphql_context.instance.get_backfill(backfill_id) asset_backfill_data = backfill.asset_backfill_data result = None - instance_queryer = CachingInstanceQueryer( - graphql_context.instance, - asset_graph, - graphql_context, - asset_backfill_data.backfill_start_datetime, + asset_graph_view = AssetGraphView( + temporal_context=TemporalContext( + effective_dt=asset_backfill_data.backfill_start_datetime, last_event_id=None + ), + instance=graphql_context.instance, + asset_graph=graphql_context.asset_graph, ) with environ({"ASSET_BACKFILL_CURSOR_DELAY_TIME": "0"}): for result in execute_asset_backfill_iteration_inner( backfill_id=backfill_id, asset_backfill_data=asset_backfill_data, - instance_queryer=instance_queryer, - asset_graph=asset_graph, + asset_graph_view=asset_graph_view, backfill_start_timestamp=asset_backfill_data.backfill_start_timestamp, logger=logging.getLogger("fake_logger"), ): @@ -275,10 +272,11 @@ def _execute_asset_backfill_iteration_no_side_effects( updated_backfill = backfill.with_asset_backfill_data( result.backfill_data.with_run_requests_submitted( - result.run_requests, asset_graph=asset_graph, instance_queryer=instance_queryer + result.run_requests, + asset_graph_view=asset_graph_view, ), dynamic_partitions_store=graphql_context.instance, - asset_graph=asset_graph, + asset_graph=graphql_context.asset_graph, ) graphql_context.instance.update_backfill(updated_backfill) @@ -312,12 +310,11 @@ def _execute_job_backfill_iteration_with_side_effects(graphql_context, backfill_ def _mock_asset_backfill_runs( graphql_context, asset_key: AssetKey, - asset_graph: RemoteWorkspaceAssetGraph, backfill_id: str, status: DagsterRunStatus, partition_key: Optional[str], ): - partitions_def = asset_graph.get(asset_key).partitions_def + partitions_def = graphql_context.asset_graph.get(asset_key).partitions_def @asset( partitions_def=partitions_def, @@ -751,10 +748,7 @@ def test_cancel_asset_backfill(self, graphql_context): # Update asset backfill data to contain requested partition, but does not execute side effects, # since launching the run will cause test process will hang forever. - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # Launch the run that runs forever selector = infer_job_selector(graphql_context, "hanging_partition_asset_job") @@ -830,10 +824,8 @@ def test_cancel_then_retry_asset_backfill(self, graphql_context): # Update asset backfill data to contain requested partition, but does not execute side effects, # since launching the run will cause test process will hang forever. - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # Launch the run that runs forever selector = infer_job_selector(graphql_context, "hanging_partition_asset_job") @@ -1144,11 +1136,7 @@ def test_asset_backfill_partition_stats(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) for partition, status in [ ("a", DagsterRunStatus.SUCCESS), @@ -1157,11 +1145,9 @@ def test_asset_backfill_partition_stats(self, graphql_context): ("e", DagsterRunStatus.SUCCESS), ("f", DagsterRunStatus.FAILURE), ]: - _mock_asset_backfill_runs( - graphql_context, asset_key, asset_graph, backfill_id, status, partition - ) + _mock_asset_backfill_runs(graphql_context, asset_key, backfill_id, status, partition) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) result = execute_dagster_graphql( graphql_context, @@ -1187,10 +1173,6 @@ def test_asset_backfill_partition_stats(self, graphql_context): assert asset_partition_status_counts[0]["numPartitionsFailed"] == 2 def test_asset_backfill_status_with_upstream_failure(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1213,25 +1195,23 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) result = execute_dagster_graphql( graphql_context, @@ -1658,10 +1638,6 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context): assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10 def test_reexecute_asset_backfill_from_failure(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1684,25 +1660,23 @@ def test_reexecute_asset_backfill_from_failure(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as completed so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -1755,10 +1729,6 @@ def test_reexecute_asset_backfill_from_failure(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_successful_asset_backfill(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1780,25 +1750,23 @@ def test_reexecute_successful_asset_backfill(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as complete so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -1842,10 +1810,6 @@ def test_reexecute_successful_asset_backfill(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1868,25 +1832,23 @@ def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # try to retry the backfill while it is still in progress result = execute_dagster_graphql( @@ -1966,10 +1928,6 @@ def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_asset_backfill_twice(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1992,25 +1950,23 @@ def test_reexecute_asset_backfill_twice(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as completed so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -2040,19 +1996,18 @@ def test_reexecute_asset_backfill_twice(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id _execute_asset_backfill_iteration_no_side_effects( - graphql_context, retried_backfill.backfill_id, asset_graph + graphql_context, retried_backfill.backfill_id ) # mark some partitions failed so we can retry again _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, retried_backfill.backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) _execute_asset_backfill_iteration_no_side_effects( - graphql_context, retried_backfill.backfill_id, asset_graph + graphql_context, retried_backfill.backfill_id ) # refetch the backfill to get the updated statuses of all assets diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 4c2c6917a0d2c..2f7aea4502074 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -6,6 +6,7 @@ Awaitable, Callable, Dict, + Iterable, Literal, NamedTuple, Optional, @@ -17,6 +18,7 @@ from dagster import _check as check from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset +from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey, T_EntityKey from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.multi_dimensional_partitions import ( @@ -43,6 +45,7 @@ ) from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.partition import PartitionsDefinition + from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.instance import DagsterInstance from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionResolvedStatus from dagster._core.storage.dagster_run import RunRecord @@ -193,6 +196,48 @@ def get_empty_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]: value = partitions_def.empty_subset() if partitions_def else False return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) + def get_entity_subset_in_range( + self, asset_key: AssetKey, partition_key_range: "PartitionKeyRange" + ) -> EntitySubset[AssetKey]: + partitions_def = check.not_none( + self._get_partitions_def(asset_key), "Must have partitions def" + ) + partition_subset_in_range = partitions_def.get_subset_in_range( + partition_key_range=partition_key_range, + dynamic_partitions_store=self._queryer, + ) + return EntitySubset( + self, key=asset_key, value=_ValidatedEntitySubsetValue(partition_subset_in_range) + ) + + def get_entity_subset_from_asset_graph_subset( + self, asset_graph_subset: AssetGraphSubset, key: AssetKey + ) -> EntitySubset[AssetKey]: + check.invariant( + self.asset_graph.has(key), f"Asset graph does not contain {key.to_user_string()}" + ) + + serializable_subset = asset_graph_subset.get_asset_subset(key, self.asset_graph) + check.invariant( + serializable_subset.is_compatible_with_partitions_def( + self._get_partitions_def(key), + ), + f"Partitions definition for {key.to_user_string()} is not compatible with the passed in AssetGraphSubset", + ) + + return EntitySubset( + self, key=key, value=_ValidatedEntitySubsetValue(serializable_subset.value) + ) + + def iterate_asset_subsets( + self, asset_graph_subset: AssetGraphSubset + ) -> Iterable[EntitySubset[AssetKey]]: + """Returns an Iterable of EntitySubsets representing the subset of each asset that this + AssetGraphSubset contains. + """ + for asset_key in asset_graph_subset.asset_keys: + yield self.get_entity_subset_from_asset_graph_subset(asset_graph_subset, asset_key) + def get_subset_from_serializable_subset( self, serializable_subset: SerializableEntitySubset[T_EntityKey] ) -> Optional[EntitySubset[T_EntityKey]]: diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/bfs.py b/python_modules/dagster/dagster/_core/asset_graph_view/bfs.py new file mode 100644 index 0000000000000..a143bd8d4163f --- /dev/null +++ b/python_modules/dagster/dagster/_core/asset_graph_view/bfs.py @@ -0,0 +1,221 @@ +from functools import total_ordering +from heapq import heapify, heappop, heappush +from typing import TYPE_CHECKING, Callable, Iterable, NamedTuple, Optional, Sequence, Tuple + +import dagster._check as check +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset +from dagster._core.definitions.asset_graph_subset import AssetGraphSubset +from dagster._core.definitions.base_asset_graph import BaseAssetGraph +from dagster._core.definitions.time_window_partitions import get_time_partitions_def + +if TYPE_CHECKING: + from dagster._core.asset_graph_view.entity_subset import EntitySubset + from dagster._core.definitions.asset_key import AssetKey + + +class AssetGraphViewBfsFilterConditionResult(NamedTuple): + passed_asset_graph_subset: AssetGraphSubset + excluded_asset_graph_subsets_and_reasons: Sequence[Tuple[AssetGraphSubset, str]] + + +def bfs_filter_asset_graph_view( + asset_graph_view: AssetGraphView, + condition_fn: Callable[ + ["AssetGraphSubset", "AssetGraphSubset"], + AssetGraphViewBfsFilterConditionResult, + ], + initial_asset_graph_subset: "AssetGraphSubset", + include_full_execution_set: bool, +) -> Tuple[AssetGraphSubset, Sequence[Tuple[AssetGraphSubset, str]]]: + """Returns the subset of the graph that satisfy supplied criteria. + + - Are >= initial_asset_graph_subset + - Match the condition_fn + - Any of their ancestors >= initial_asset_graph_subset match the condition_fn + + Also returns a list of tuples, where each tuple is an asset subset that did not + satisfy the condition and the reason they were filtered out. + + The condition_fn takes in: + - a subset of the asset graph to evaluate the condition for. If include_full_execution_set=True, + the asset keys are all part of the same execution set (i.e. non-subsettable multi-asset). If + include_full_execution_set=False, only a single asset key will be in the subset. + + - An AssetGraphSubset for the portion of the graph that has so far been visited and passed + the condition. + + The condition_fn should return a object with an AssetGraphSubset indicating the portion + of the subset that passes the condition, and a list of (AssetGraphSubset, str) + tuples with more information about why certain subsets were excluded. + + Visits parents before children. + """ + initial_subsets = list(asset_graph_view.iterate_asset_subsets(initial_asset_graph_subset)) + + # invariant: we never consider an asset partition before considering its ancestors + queue = ToposortedPriorityQueue( + asset_graph_view, initial_subsets, include_full_execution_set=include_full_execution_set + ) + + visited_graph_subset = initial_asset_graph_subset + + result: AssetGraphSubset = AssetGraphSubset.empty() + failed_reasons: Sequence[Tuple[AssetGraphSubset, str]] = [] + + asset_graph = asset_graph_view.asset_graph + + while len(queue) > 0: + candidate_subset = queue.dequeue() + condition_result = condition_fn(candidate_subset, result) + + subset_that_meets_condition = condition_result.passed_asset_graph_subset + failed_reasons = condition_result.excluded_asset_graph_subsets_and_reasons + + result = result | subset_that_meets_condition + + for matching_entity_subset in asset_graph_view.iterate_asset_subsets( + subset_that_meets_condition + ): + # Add any child subsets that have not yet been visited to the queue + for child_key in asset_graph.get(matching_entity_subset.key).child_keys: + child_subset = asset_graph_view.compute_child_subset( + child_key, matching_entity_subset + ) + unvisited_child_subset = child_subset.compute_difference( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + visited_graph_subset, child_key + ) + ) + if not unvisited_child_subset.is_empty: + queue.enqueue(unvisited_child_subset) + visited_graph_subset = ( + visited_graph_subset + | AssetGraphSubset.from_entity_subsets([unvisited_child_subset]) + ) + + return result, failed_reasons + + +def sort_key_for_asset_key(asset_graph: BaseAssetGraph, asset_key: "AssetKey") -> float: + """Returns an integer sort key such that asset partition ranges are sorted in + the order in which they should be materialized. For assets without a time + window partition dimension, this is always 0. + Assets with a time window partition dimension will be sorted from newest to oldest, unless they + have a self-dependency, in which case they are sorted from oldest to newest. + """ + partitions_def = asset_graph.get(asset_key).partitions_def + time_partitions_def = get_time_partitions_def(partitions_def) + if time_partitions_def is None: + return 0 + + # A sort key such that time window partitions are sorted from oldest start time to newest start time + start_timestamp = time_partitions_def.start_ts.timestamp + + if asset_graph.get(asset_key).has_self_dependency: + # sort self dependencies from oldest to newest, as older partitions must exist before + # new ones can execute + return start_timestamp + else: + # sort non-self dependencies from newest to oldest, as newer partitions are more relevant + # than older ones + return -1 * start_timestamp + + +class ToposortedPriorityQueue: + """Queue that returns parents before their children.""" + + @total_ordering + class QueueItem(NamedTuple): + level: int + partition_sort_key: Optional[float] + asset_graph_subset: AssetGraphSubset + + def __eq__(self, other: object) -> bool: + if isinstance(other, ToposortedPriorityQueue.QueueItem): + return ( + self.level == other.level + and self.partition_sort_key == other.partition_sort_key + ) + return False + + def __lt__(self, other: object) -> bool: + if isinstance(other, ToposortedPriorityQueue.QueueItem): + return self.level < other.level or ( + self.level == other.level + and self.partition_sort_key is not None + and other.partition_sort_key is not None + and self.partition_sort_key < other.partition_sort_key + ) + raise TypeError() + + def __init__( + self, + asset_graph_view: AssetGraphView, + items: Iterable["EntitySubset"], + include_full_execution_set: bool, + ): + self._asset_graph_view = asset_graph_view + self._include_full_execution_set = include_full_execution_set + + self._toposort_level_by_asset_key = { + asset_key: i + for i, asset_keys in enumerate( + asset_graph_view.asset_graph.toposorted_asset_keys_by_level + ) + for asset_key in asset_keys + } + self._heap = [self._queue_item(entity_subset) for entity_subset in items] + heapify(self._heap) + + def enqueue(self, entity_subset: "EntitySubset") -> None: + heappush(self._heap, self._queue_item(entity_subset)) + + def dequeue(self) -> AssetGraphSubset: + # For multi-assets, will include all required multi-asset keys if + # include_full_execution_set is set to True, or just the passed in + # asset key if it was not. If there are multiple assets in the subset + # the subset will have the same partitions included for each asset. + heap_value = heappop(self._heap) + return heap_value.asset_graph_subset + + def _queue_item(self, entity_subset: "EntitySubset") -> "ToposortedPriorityQueue.QueueItem": + asset_key = entity_subset.key + + if self._include_full_execution_set: + execution_set_keys = self._asset_graph_view.asset_graph.get( + asset_key + ).execution_set_asset_keys + else: + execution_set_keys = {asset_key} + + level = max( + self._toposort_level_by_asset_key[asset_key] for asset_key in execution_set_keys + ) + + serializable_entity_subset = entity_subset.convert_to_serializable_subset() + + serializable_entity_subsets = [ + SerializableEntitySubset(key=asset_key, value=serializable_entity_subset.value) + for asset_key in execution_set_keys + ] + + entity_subsets = [ + check.not_none( + self._asset_graph_view.get_subset_from_serializable_subset( + serializable_entity_subset + ) + ) + for serializable_entity_subset in serializable_entity_subsets + ] + + asset_graph_subset = AssetGraphSubset.from_entity_subsets(entity_subsets) + + return ToposortedPriorityQueue.QueueItem( + level, + sort_key_for_asset_key(self._asset_graph_view.asset_graph, asset_key), + asset_graph_subset=asset_graph_subset, + ) + + def __len__(self) -> int: + return len(self._heap) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index dd184e4c42b0b..c746ddd45428f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -268,6 +268,10 @@ def from_asset_partition_set( non_partitioned_asset_keys=non_partitioned_asset_keys, ) + @classmethod + def empty(cls) -> "AssetGraphSubset": + return AssetGraphSubset({}, set()) + @classmethod def from_entity_subsets( cls, entity_subsets: Iterable[EntitySubset[AssetKey]] diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 9c936179d747f..f84306bbf7561 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -1,8 +1,7 @@ from abc import ABC, abstractmethod from collections import defaultdict, deque from datetime import datetime -from functools import cached_property, total_ordering -from heapq import heapify, heappop, heappush +from functools import cached_property from typing import ( TYPE_CHECKING, AbstractSet, @@ -11,13 +10,11 @@ Generic, Iterable, Iterator, - List, Mapping, NamedTuple, Optional, Sequence, Set, - Tuple, TypeVar, Union, cast, @@ -754,67 +751,6 @@ def bfs_filter_subsets( return result - def bfs_filter_asset_partitions( - self, - dynamic_partitions_store: DynamicPartitionsStore, - condition_fn: Callable[ - [Iterable[AssetKeyPartitionKey], AbstractSet[AssetKeyPartitionKey]], - Tuple[bool, str], - ], - initial_asset_partitions: Iterable[AssetKeyPartitionKey], - evaluation_time: datetime, - ) -> Tuple[ - AbstractSet[AssetKeyPartitionKey], - Sequence[Tuple[Iterable[AssetKeyPartitionKey], str]], - ]: - """Returns asset partitions within the graph that satisfy supplied criteria. - - - Are >= initial_asset_partitions - - Match the condition_fn - - Any of their ancestors >= initial_asset_partitions match the condition_fn - - Also returns a list of tuples, where each tuple is a candidated_unit (list of - AssetKeyPartitionKeys that must be materialized together - ie multi_asset) that do not - satisfy the criteria and the reason they were filtered out. - - The condition_fn should return a tuple of a boolean indicating whether the asset partition meets - the condition and a string explaining why it does not meet the condition, if applicable. - - Visits parents before children. - - When asset partitions are part of the same execution set (non-subsettable multi-asset), - they're provided all at once to the condition_fn. - """ - all_nodes = set(initial_asset_partitions) - - # invariant: we never consider an asset partition before considering its ancestors - queue = ToposortedPriorityQueue(self, all_nodes, include_full_execution_set=True) - - result: Set[AssetKeyPartitionKey] = set() - failed_reasons: List[Tuple[Iterable[AssetKeyPartitionKey], str]] = [] - - while len(queue) > 0: - candidates_unit = queue.dequeue() - - meets_condition, fail_reason = condition_fn(candidates_unit, result) - if meets_condition: - result.update(candidates_unit) - - for candidate in candidates_unit: - for child in self.get_children_partitions( - dynamic_partitions_store, - evaluation_time, - candidate.asset_key, - candidate.partition_key, - ): - if child not in all_nodes: - queue.enqueue(child) - all_nodes.add(child) - else: - failed_reasons.append((candidates_unit, fail_reason)) - - return result, failed_reasons - def split_entity_keys_by_repository( self, keys: AbstractSet[EntityKey] ) -> Sequence[AbstractSet[EntityKey]]: @@ -854,80 +790,3 @@ def sort_key_for_asset_partition( # sort non-self dependencies from newest to oldest, as newer partitions are more relevant # than older ones return -1 * partition_timestamp - - -class ToposortedPriorityQueue: - """Queue that returns parents before their children.""" - - @total_ordering - class QueueItem(NamedTuple): - level: int - partition_sort_key: Optional[float] - asset_partitions: Iterable[AssetKeyPartitionKey] - - def __eq__(self, other: object) -> bool: - if isinstance(other, ToposortedPriorityQueue.QueueItem): - return ( - self.level == other.level - and self.partition_sort_key == other.partition_sort_key - ) - return False - - def __lt__(self, other: object) -> bool: - if isinstance(other, ToposortedPriorityQueue.QueueItem): - return self.level < other.level or ( - self.level == other.level - and self.partition_sort_key is not None - and other.partition_sort_key is not None - and self.partition_sort_key < other.partition_sort_key - ) - raise TypeError() - - def __init__( - self, - asset_graph: BaseAssetGraph, - items: Iterable[AssetKeyPartitionKey], - include_full_execution_set: bool, - ): - self._asset_graph = asset_graph - self._include_full_execution_set = include_full_execution_set - - self._toposort_level_by_asset_key = { - asset_key: i - for i, asset_keys in enumerate(asset_graph.toposorted_asset_keys_by_level) - for asset_key in asset_keys - } - self._heap = [self._queue_item(asset_partition) for asset_partition in items] - heapify(self._heap) - - def enqueue(self, asset_partition: AssetKeyPartitionKey) -> None: - heappush(self._heap, self._queue_item(asset_partition)) - - def dequeue(self) -> Iterable[AssetKeyPartitionKey]: - # For multi-assets, will include all required multi-asset keys if - # include_full_execution_set is set to True, or a list of size 1 with just the passed in - # asset key if it was not. - return heappop(self._heap).asset_partitions - - def _queue_item( - self, asset_partition: AssetKeyPartitionKey - ) -> "ToposortedPriorityQueue.QueueItem": - asset_key = asset_partition.asset_key - - if self._include_full_execution_set: - execution_set_keys = self._asset_graph.get(asset_key).execution_set_asset_keys - else: - execution_set_keys = {asset_key} - - level = max( - self._toposort_level_by_asset_key[asset_key] for asset_key in execution_set_keys - ) - - return ToposortedPriorityQueue.QueueItem( - level, - sort_key_for_asset_partition(self._asset_graph, asset_partition), - [AssetKeyPartitionKey(ak, asset_partition.partition_key) for ak in execution_set_keys], - ) - - def __len__(self) -> int: - return len(self._heap) diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index a676cb83040d5..0b84a298d0783 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -171,6 +171,17 @@ def get_first_partition_key( partition_keys = self.get_partition_keys(current_time, dynamic_partitions_store) return partition_keys[0] if partition_keys else None + def get_subset_in_range( + self, + partition_key_range: PartitionKeyRange, + dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, + ) -> "PartitionsSubset": + return self.empty_subset().with_partition_key_range( + partitions_def=self, + partition_key_range=partition_key_range, + dynamic_partitions_store=dynamic_partitions_store, + ) + def get_partition_keys_in_range( self, partition_key_range: PartitionKeyRange, diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index cf882c2e85aa7..09b48d5f6542b 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -2,26 +2,32 @@ import logging import os import time -from collections import defaultdict from datetime import datetime from enum import Enum from typing import ( TYPE_CHECKING, - AbstractSet, - Dict, Iterable, List, Mapping, NamedTuple, Optional, Sequence, - Set, Tuple, Union, cast, ) import dagster._check as check +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext +from dagster._core.asset_graph_view.bfs import ( + AssetGraphViewBfsFilterConditionResult, + bfs_filter_asset_graph_view, +) +from dagster._core.asset_graph_view.entity_subset import EntitySubset +from dagster._core.asset_graph_view.serializable_entity_subset import ( + EntitySubsetValue, + SerializableEntitySubset, +) from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_selection import KeysAssetSelection from dagster._core.definitions.automation_tick_evaluation_context import ( @@ -174,36 +180,34 @@ def all_targeted_partitions_have_materialization_status(self) -> bool: ) def all_requested_partitions_marked_as_materialized_or_failed(self) -> bool: - for partition in self.requested_subset.iterate_asset_partitions(): - if ( - partition not in self.materialized_subset - and partition not in self.failed_and_downstream_subset - ): - return False - - return True + return ( + len( + ( + self.requested_subset + - self.materialized_subset + - self.failed_and_downstream_subset + ).asset_keys + ) + == 0 + ) def with_run_requests_submitted( self, run_requests: Sequence[RunRequest], - asset_graph: RemoteAssetGraph, - instance_queryer: CachingInstanceQueryer, + asset_graph_view: AssetGraphView, ) -> "AssetBackfillData": - requested_partitions = get_requested_asset_partitions_from_run_requests( + requested_subset = _get_requested_asset_graph_subset_from_run_requests( run_requests, - asset_graph, - instance_queryer, + asset_graph_view, ) - submitted_partitions = self.requested_subset | AssetGraphSubset.from_asset_partition_set( - set(requested_partitions), asset_graph=asset_graph - ) + submitted_partitions = self.requested_subset | requested_subset return self.replace_requested_subset(submitted_partitions) - def get_target_root_asset_partitions( + def get_target_root_asset_graph_subset( self, instance_queryer: CachingInstanceQueryer - ) -> Iterable[AssetKeyPartitionKey]: + ) -> AssetGraphSubset: def _get_self_and_downstream_targeted_subset( initial_subset: AssetGraphSubset, ) -> AssetGraphSubset: @@ -275,7 +279,7 @@ def _get_self_and_downstream_targeted_subset( " This is likely a system error. Please report this issue to the Dagster team." ) - return list(root_subset.iterate_asset_partitions()) + return root_subset def get_target_partitions_subset(self, asset_key: AssetKey) -> PartitionsSubset: # Return the targeted partitions for the root partitioned asset keys @@ -659,12 +663,12 @@ class AssetBackfillIterationResult(NamedTuple): reserved_run_ids: Sequence[str] -def get_requested_asset_partitions_from_run_requests( +def _get_requested_asset_graph_subset_from_run_requests( run_requests: Sequence[RunRequest], - asset_graph: RemoteAssetGraph, - instance_queryer: CachingInstanceQueryer, -) -> AbstractSet[AssetKeyPartitionKey]: - requested_partitions = set() + asset_graph_view: AssetGraphView, +) -> AssetGraphSubset: + asset_graph = asset_graph_view.asset_graph + requested_subset = AssetGraphSubset.empty() for run_request in run_requests: # Run request targets a range of partitions range_start = run_request.tags.get(ASSET_PARTITION_RANGE_START_TAG) @@ -674,31 +678,24 @@ def get_requested_asset_partitions_from_run_requests( # have the same partitions def selected_assets = cast(Sequence[AssetKey], run_request.asset_selection) check.invariant(len(selected_assets) > 0) - partitions_defs = set( - asset_graph.get(asset_key).partitions_def for asset_key in selected_assets - ) - check.invariant( - len(partitions_defs) == 1, - "Expected all assets selected in partition range run request to have the same" - " partitions def", - ) - - partitions_def = cast(PartitionsDefinition, next(iter(partitions_defs))) - partitions_in_range = partitions_def.get_partition_keys_in_range( - PartitionKeyRange(range_start, range_end), instance_queryer - ) - requested_partitions = requested_partitions | { - AssetKeyPartitionKey(asset_key, partition_key) + partition_range = PartitionKeyRange(range_start, range_end) + entity_subsets = [ + asset_graph_view.get_entity_subset_in_range(asset_key, partition_range) for asset_key in selected_assets - for partition_key in partitions_in_range - } + ] + requested_subset = requested_subset | AssetGraphSubset.from_entity_subsets( + entity_subsets + ) else: - requested_partitions = requested_partitions | { - AssetKeyPartitionKey(asset_key, run_request.partition_key) - for asset_key in cast(Sequence[AssetKey], run_request.asset_selection) - } + requested_subset = requested_subset | AssetGraphSubset.from_asset_partition_set( + { + AssetKeyPartitionKey(asset_key, run_request.partition_key) + for asset_key in cast(Sequence[AssetKey], run_request.asset_selection) + }, + asset_graph, + ) - return requested_partitions + return requested_subset def _write_updated_backfill_data( @@ -723,17 +720,18 @@ def _write_updated_backfill_data( def _submit_runs_and_update_backfill_in_chunks( - instance: DagsterInstance, + asset_graph_view: AssetGraphView, workspace_process_context: IWorkspaceProcessContext, backfill_id: str, asset_backfill_iteration_result: AssetBackfillIterationResult, - asset_graph: RemoteWorkspaceAssetGraph, logger: logging.Logger, run_tags: Mapping[str, str], - instance_queryer: CachingInstanceQueryer, ) -> Iterable[None]: from dagster._core.execution.backfill import BulkActionStatus + asset_graph = cast(RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph) + instance = asset_graph_view.instance + run_requests = asset_backfill_iteration_result.run_requests # Iterate through runs to request, submitting runs in chunks. @@ -790,8 +788,7 @@ def _submit_runs_and_update_backfill_in_chunks( updated_backfill_data: AssetBackfillData = ( updated_backfill_data.with_run_requests_submitted( [run_request], - asset_graph, - instance_queryer, + asset_graph_view, ) ) @@ -1009,13 +1006,18 @@ def execute_asset_backfill_iteration( check.failed("Backfill must be an asset backfill") backfill_start_datetime = datetime_from_timestamp(backfill.backfill_timestamp) - instance_queryer = CachingInstanceQueryer( + + asset_graph_view = AssetGraphView( + temporal_context=TemporalContext( + effective_dt=backfill_start_datetime, + last_event_id=None, + ), instance=instance, asset_graph=asset_graph, - loading_context=workspace_context, - evaluation_time=backfill_start_datetime, ) + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() + previous_asset_backfill_data = _check_validity_and_deserialize_asset_backfill_data( workspace_context, backfill, asset_graph, instance_queryer, logger ) @@ -1047,8 +1049,7 @@ def execute_asset_backfill_iteration( for result in execute_asset_backfill_iteration_inner( backfill_id=backfill.backfill_id, asset_backfill_data=previous_asset_backfill_data, - instance_queryer=instance_queryer, - asset_graph=asset_graph, + asset_graph_view=asset_graph_view, backfill_start_timestamp=backfill.backfill_timestamp, logger=logger, ): @@ -1083,14 +1084,12 @@ def execute_asset_backfill_iteration( if result.run_requests: yield from _submit_runs_and_update_backfill_in_chunks( - instance, + asset_graph_view, workspace_process_context, updated_backfill.backfill_id, result, - asset_graph, logger, run_tags=updated_backfill.tags, - instance_queryer=instance_queryer, ) updated_backfill = cast( @@ -1174,8 +1173,7 @@ def execute_asset_backfill_iteration( for updated_asset_backfill_data in get_canceling_asset_backfill_iteration_data( backfill.backfill_id, previous_asset_backfill_data, - instance_queryer, - asset_graph, + asset_graph_view, backfill.backfill_timestamp, ): yield None @@ -1226,36 +1224,32 @@ def execute_asset_backfill_iteration( def get_canceling_asset_backfill_iteration_data( backfill_id: str, asset_backfill_data: AssetBackfillData, - instance_queryer: CachingInstanceQueryer, - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph_view: AssetGraphView, backfill_start_timestamp: float, ) -> Iterable[Optional[AssetBackfillData]]: """For asset backfills in the "canceling" state, fetch the asset backfill data with the updated materialized and failed subsets. """ + asset_graph = cast(RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph) + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() updated_materialized_subset = None - for updated_materialized_subset in get_asset_backfill_iteration_materialized_partitions( + for updated_materialized_subset in get_asset_backfill_iteration_materialized_subset( backfill_id, asset_backfill_data, asset_graph, instance_queryer ): yield None if not isinstance(updated_materialized_subset, AssetGraphSubset): check.failed( - "Expected get_asset_backfill_iteration_materialized_partitions to return an" + "Expected get_asset_backfill_iteration_materialized_subset to return an" " AssetGraphSubset object" ) - failed_subset = AssetGraphSubset.from_asset_partition_set( - set( - _get_failed_asset_partitions( - instance_queryer, - backfill_id, - asset_graph, - materialized_subset=updated_materialized_subset, - ) - ), - asset_graph, + failed_subset = _get_failed_asset_graph_subset( + asset_graph_view, + backfill_id, + materialized_subset=updated_materialized_subset, ) + # we fetch the failed_subset to get any new assets that have failed and add that to the set of # assets we already know failed and their downstreams. However we need to remove any assets in # updated_materialized_subset to account for the case where a run retry successfully @@ -1276,7 +1270,7 @@ def get_canceling_asset_backfill_iteration_data( yield updated_backfill_data -def get_asset_backfill_iteration_materialized_partitions( +def get_asset_backfill_iteration_materialized_subset( backfill_id: str, asset_backfill_data: AssetBackfillData, asset_graph: RemoteWorkspaceAssetGraph, @@ -1325,31 +1319,58 @@ def get_asset_backfill_iteration_materialized_partitions( yield updated_materialized_subset -def _get_failed_and_downstream_asset_partitions( +def _get_subset_in_target_subset( + asset_graph_view: AssetGraphView, + candidate_asset_graph_subset: AssetGraphSubset, + target_subset: AssetGraphSubset, +) -> "AssetGraphSubset": + candidate_entity_subsets = list( + asset_graph_view.iterate_asset_subsets(candidate_asset_graph_subset) + ) + + assert ( + len(candidate_entity_subsets) == 1 + ), "Since include_execution_set=False, there should be exactly one candidate entity subset" + + candidate_entity_subset = next(iter(candidate_entity_subsets)) + + subset_in_target_subset: EntitySubset[AssetKey] = candidate_entity_subset.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + target_subset, candidate_entity_subset.key + ) + ) + + return AssetGraphSubset.from_entity_subsets([subset_in_target_subset]) + + +def _get_failed_and_downstream_asset_graph_subset( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, - instance_queryer: CachingInstanceQueryer, - backfill_start_timestamp: float, + asset_graph_view: AssetGraphView, materialized_subset: AssetGraphSubset, ) -> AssetGraphSubset: - failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set( - asset_graph.bfs_filter_asset_partitions( - instance_queryer, - lambda asset_partitions, _: ( - any( - asset_partition in asset_backfill_data.target_subset - for asset_partition in asset_partitions - ), - "", - ), - _get_failed_asset_partitions( - instance_queryer, backfill_id, asset_graph, materialized_subset - ), - evaluation_time=datetime_from_timestamp(backfill_start_timestamp), - )[0], - asset_graph, + failed_asset_graph_subset = _get_failed_asset_graph_subset( + asset_graph_view, + backfill_id, + materialized_subset, ) + + failed_and_downstream_subset = bfs_filter_asset_graph_view( + asset_graph_view, + lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=_get_subset_in_target_subset( + asset_graph_view, + candidate_asset_graph_subset, + asset_backfill_data.target_subset, + ), + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=failed_asset_graph_subset, + include_full_execution_set=False, + )[0] + return failed_and_downstream_subset @@ -1402,8 +1423,7 @@ def _asset_graph_subset_to_str( def execute_asset_backfill_iteration_inner( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, - instance_queryer: CachingInstanceQueryer, + asset_graph_view: AssetGraphView, backfill_start_timestamp: float, logger: logging.Logger, ) -> Iterable[Optional[AssetBackfillIterationResult]]: @@ -1415,16 +1435,20 @@ def execute_asset_backfill_iteration_inner( This is a generator so that we can return control to the daemon and let it heartbeat during expensive operations. """ - initial_candidates: Set[AssetKeyPartitionKey] = set() + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() + asset_graph: RemoteWorkspaceAssetGraph = cast( + RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph + ) + request_roots = not asset_backfill_data.requested_runs_for_target_roots if request_roots: logger.info( "Not all root assets (assets in backfill that do not have parents in the backill) have been requested, finding root assets." ) - target_roots = asset_backfill_data.get_target_root_asset_partitions(instance_queryer) - initial_candidates.update(target_roots) + target_roots = asset_backfill_data.get_target_root_asset_graph_subset(instance_queryer) + candidate_asset_graph_subset = target_roots logger.info( - f"Root assets that have not yet been requested:\n {_asset_graph_subset_to_str(AssetGraphSubset.from_asset_partition_set(set(target_roots), asset_graph), asset_graph)}" + f"Root assets that have not yet been requested:\n {_asset_graph_subset_to_str(target_roots, asset_graph)}" ) yield None @@ -1442,14 +1466,14 @@ def execute_asset_backfill_iteration_inner( time.sleep(cursor_delay_time) updated_materialized_subset = None - for updated_materialized_subset in get_asset_backfill_iteration_materialized_partitions( + for updated_materialized_subset in get_asset_backfill_iteration_materialized_subset( backfill_id, asset_backfill_data, asset_graph, instance_queryer ): yield None if not isinstance(updated_materialized_subset, AssetGraphSubset): check.failed( - "Expected get_asset_backfill_iteration_materialized_partitions to return an" + "Expected get_asset_backfill_iteration_materialized_subset to return an" " AssetGraphSubset" ) @@ -1471,61 +1495,59 @@ def execute_asset_backfill_iteration_inner( for asset_key in asset_backfill_data.target_subset.asset_keys ) ) - initial_candidates.update(parent_materialized_asset_partitions) + candidate_asset_graph_subset = AssetGraphSubset.from_asset_partition_set( + parent_materialized_asset_partitions, asset_graph + ) yield None - failed_and_downstream_subset = _get_failed_and_downstream_asset_partitions( + failed_and_downstream_subset = _get_failed_and_downstream_asset_graph_subset( backfill_id, asset_backfill_data, - asset_graph, - instance_queryer, - backfill_start_timestamp, + asset_graph_view, updated_materialized_subset, ) yield None - backfill_start_datetime = datetime_from_timestamp(backfill_start_timestamp) - - asset_partitions_to_request, not_requested_and_reasons = ( - asset_graph.bfs_filter_asset_partitions( - instance_queryer, - lambda unit, visited: should_backfill_atomic_asset_partitions_unit( - candidates_unit=unit, - asset_partitions_to_request=visited, - asset_graph=asset_graph, - materialized_subset=updated_materialized_subset, - requested_subset=asset_backfill_data.requested_subset, - target_subset=asset_backfill_data.target_subset, - failed_and_downstream_subset=failed_and_downstream_subset, - dynamic_partitions_store=instance_queryer, - current_time=backfill_start_datetime, - ), - initial_asset_partitions=initial_candidates, - evaluation_time=backfill_start_datetime, - ) + asset_subset_to_request, not_requested_and_reasons = bfs_filter_asset_graph_view( + asset_graph_view, + lambda candidate_asset_graph_subset, + visited: _should_backfill_atomic_asset_graph_subset_unit( + asset_graph_view=asset_graph_view, + candidate_asset_graph_subset_unit=candidate_asset_graph_subset, + asset_graph_subset_matched_so_far=visited, + materialized_subset=updated_materialized_subset, + requested_subset=asset_backfill_data.requested_subset, + target_subset=asset_backfill_data.target_subset, + failed_and_downstream_subset=failed_and_downstream_subset, + ), + initial_asset_graph_subset=candidate_asset_graph_subset, + include_full_execution_set=True, ) logger.info( - f"Asset partitions to request:\n {_asset_graph_subset_to_str(AssetGraphSubset.from_asset_partition_set(asset_partitions_to_request, asset_graph), asset_graph)}" - if asset_partitions_to_request + f"Asset partitions to request:\n {_asset_graph_subset_to_str(asset_subset_to_request, asset_graph)}" + if asset_subset_to_request else "No asset partitions to request." ) + + asset_partitions_to_request = set(asset_subset_to_request.iterate_asset_partitions()) + if len(not_requested_and_reasons) > 0: - def _format_keys(keys: Iterable[AssetKeyPartitionKey]): + def _format_graph_subset(asset_graph_subset: AssetGraphSubset): return ", ".join( - f"({key.asset_key.to_user_string()}, {key.partition_key})" - if key.partition_key - else key.asset_key.to_user_string() - for key in keys + f"({entity_subset.key.to_user_string()}, {entity_subset.value}" + if isinstance(entity_subset.value, PartitionsSubset) + else entity_subset.key.to_user_string() + for entity_subset in asset_graph_subset.iterate_asset_subsets(asset_graph) ) not_requested_str = "\n".join( [ - f"[{_format_keys(keys)}] - Reason: {reason}." - for keys, reason in not_requested_and_reasons + f"[{_format_graph_subset(asset_graph_subset)}] - Reason: {reason}." + for asset_graph_subset, reason in not_requested_and_reasons ] ) logger.info( @@ -1561,27 +1583,165 @@ def _format_keys(keys: Iterable[AssetKeyPartitionKey]): ) -def can_run_with_parent( - parent: AssetKeyPartitionKey, - candidate: AssetKeyPartitionKey, - candidates_unit: Iterable[AssetKeyPartitionKey], - asset_graph: RemoteWorkspaceAssetGraph, +def _should_backfill_atomic_asset_subset_unit( + asset_graph_view: AssetGraphView, + entity_subset_to_filter: EntitySubset[AssetKey], + candidate_asset_graph_subset_unit: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, target_subset: AssetGraphSubset, - asset_partitions_to_request_map: Mapping[AssetKey, AbstractSet[Optional[str]]], -) -> Tuple[bool, str]: - """Returns if a given candidate can be materialized in the same run as a given parent on - this tick, and the reason it cannot be materialized, if applicable. - """ - parent_target_subset = target_subset.get_asset_subset(parent.asset_key, asset_graph) - candidate_target_subset = target_subset.get_asset_subset(candidate.asset_key, asset_graph) + requested_subset: AssetGraphSubset, + materialized_subset: AssetGraphSubset, + failed_and_downstream_subset: AssetGraphSubset, +) -> Tuple[SerializableEntitySubset[AssetKey], Iterable[Tuple[EntitySubsetValue, str]]]: + failure_subsets_with_reasons: List[Tuple[EntitySubsetValue, str]] = [] + asset_graph = asset_graph_view.asset_graph + asset_key = entity_subset_to_filter.key + + missing_in_target_partitions = entity_subset_to_filter.compute_difference( + asset_graph_view.get_entity_subset_from_asset_graph_subset(target_subset, asset_key) + ) + if not missing_in_target_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + missing_in_target_partitions.get_internal_value(), + f"{missing_in_target_partitions} not targeted by backfill", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + missing_in_target_partitions + ) + + failed_and_downstream_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + failed_and_downstream_subset, asset_key + ) + ) + if not failed_and_downstream_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + failed_and_downstream_partitions.get_internal_value(), + f"{failed_and_downstream_partitions} has failed or is downstream of a failed asset", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + failed_and_downstream_partitions + ) + + materialized_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset(materialized_subset, asset_key) + ) + if not materialized_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + materialized_partitions.get_internal_value(), + f"{materialized_partitions} already materialized by backfill", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + materialized_partitions + ) + + requested_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset(requested_subset, asset_key) + ) + + if not requested_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + requested_partitions.get_internal_value(), + f"{requested_partitions} already requested by backfill", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference(requested_partitions) + + for parent_key in asset_graph.get(asset_key).parent_keys: + if entity_subset_to_filter.is_empty: + break + + parent_subset, required_but_nonexistent_subset = ( + asset_graph_view.compute_parent_subset_and_required_but_nonexistent_subset( + parent_key, + entity_subset_to_filter, + ) + ) + + if not required_but_nonexistent_subset.is_empty: + raise DagsterInvariantViolationError( + f"Asset partition subset {entity_subset_to_filter}" + f" depends on invalid partitions {required_but_nonexistent_subset}" + ) + + # Children with parents that are targeted but not materialized are eligible + # to be filtered out if the parent has not run yet + targeted_but_not_materialized_parent_subset: EntitySubset[AssetKey] = ( + parent_subset.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + target_subset, parent_key + ) + ) + ).compute_difference( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + materialized_subset, parent_key + ) + ) + + possibly_waiting_for_parent_subset = ( + asset_graph_view.compute_child_subset( + asset_key, targeted_but_not_materialized_parent_subset + ) + ).compute_intersection(entity_subset_to_filter) + + not_waiting_for_parent_subset = entity_subset_to_filter.compute_difference( + possibly_waiting_for_parent_subset + ) + + if not possibly_waiting_for_parent_subset.is_empty: + can_run_with_parent_subset, parent_failure_subsets_with_reasons = ( + get_can_run_with_parent_subsets( + targeted_but_not_materialized_parent_subset, + possibly_waiting_for_parent_subset, + asset_graph_view, + target_subset, + asset_graph_subset_matched_so_far, + candidate_asset_graph_subset_unit, + ) + ) + if parent_failure_subsets_with_reasons: + failure_subsets_with_reasons.extend(parent_failure_subsets_with_reasons) + + entity_subset_to_filter = not_waiting_for_parent_subset.compute_union( + can_run_with_parent_subset + ) + + return ( + entity_subset_to_filter.convert_to_serializable_subset(), + failure_subsets_with_reasons, + ) + + +def get_can_run_with_parent_subsets( + parent_subset: EntitySubset[AssetKey], + entity_subset_to_filter: EntitySubset[AssetKey], + asset_graph_view: AssetGraphView, + target_subset: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, + candidate_asset_graph_subset_unit: AssetGraphSubset, +) -> Tuple[EntitySubset[AssetKey], Iterable[Tuple[EntitySubsetValue, str]]]: + candidate_asset_key = entity_subset_to_filter.key + parent_asset_key = parent_subset.key + + assert isinstance(asset_graph_view.asset_graph, RemoteWorkspaceAssetGraph) + asset_graph: RemoteWorkspaceAssetGraph = asset_graph_view.asset_graph + + parent_node = asset_graph.get(parent_asset_key) + candidate_node = asset_graph.get(candidate_asset_key) partition_mapping = asset_graph.get_partition_mapping( - candidate.asset_key, parent_asset_key=parent.asset_key + candidate_asset_key, parent_asset_key=parent_asset_key ) - is_self_dependency = parent.asset_key == candidate.asset_key + # First filter out cases where even if the parent was requested this iteration, it wouldn't + # matter, because the parent and child can't execute in the same run - parent_node = asset_graph.get(parent.asset_key) - candidate_node = asset_graph.get(candidate.asset_key) # checks if there is a simple partition mapping between the parent and the child has_identity_partition_mapping = ( # both unpartitioned @@ -1598,31 +1758,53 @@ def can_run_with_parent( ) if parent_node.backfill_policy != candidate_node.backfill_policy: return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different backfill policies so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different backfill policies so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key} is materialized.", + ) + ], ) if ( parent_node.resolve_to_singular_repo_scoped_node().repository_handle != candidate_node.resolve_to_singular_repo_scoped_node().repository_handle ): return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} are in different code locations so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} are in different code locations so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + ) + ], ) + if parent_node.partitions_def != candidate_node.partitions_def: return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different partitions definitions so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different partitions definitions so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + ) + ], ) - if ( - parent.partition_key not in asset_partitions_to_request_map[parent.asset_key] - and parent not in candidates_unit - ): - return ( - False, - f"parent {parent.asset_key.to_user_string()} with partition key {parent.partition_key} is not requested in this iteration", + + parent_target_subset = target_subset.get_asset_subset(parent_asset_key, asset_graph) + candidate_target_subset = target_subset.get_asset_subset(candidate_asset_key, asset_graph) + + parent_being_requested_this_tick_subset = ( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + asset_graph_subset_matched_so_far, parent_asset_key ) - if ( + ) + + num_parent_partitions_being_requested_this_tick = parent_being_requested_this_tick_subset.size + + is_self_dependency = parent_asset_key == candidate_asset_key + + if not ( # if there is a simple mapping between the parent and the child, then # with the parent has_identity_partition_mapping @@ -1638,17 +1820,15 @@ def can_run_with_parent( parent_node.backfill_policy.max_partitions_per_run is None # a single run can materialize all requested parent partitions or parent_node.backfill_policy.max_partitions_per_run - > len(asset_partitions_to_request_map[parent.asset_key]) + > num_parent_partitions_being_requested_this_tick ) - # all targeted parents are being requested this tick, or its a self dependency + # all targeted parents are being requested this tick, or its a self depdendancy and ( - len(asset_partitions_to_request_map[parent.asset_key]) == parent_target_subset.size + num_parent_partitions_being_requested_this_tick == parent_target_subset.size or is_self_dependency ) ) ): - return True, "" - else: failed_reason = ( f"partition mapping between {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} is not simple and " f"{parent_node.key.to_user_string()} does not meet requirements of: targeting the same partitions as " @@ -1656,89 +1836,145 @@ def can_run_with_parent( "a backfill policy, and that backfill policy size limit is not exceeded by adding " f"{candidate_node.key.to_user_string()} to the run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized." ) - return False, failed_reason + return ( + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + failed_reason, + ) + ], + ) + # We now know that the parent and child are eligible to happen in the same run, so pass + # any children of parents that actually are being requested in this iteration (by + # being in either the asset_graph_subset_matched_so_far subset, or more rarely in + # candidate_asset_graph_subset_unit if they are part of a non-subsettable multi-asset) + failure_subsets_with_reasons = [] -def should_backfill_atomic_asset_partitions_unit( - asset_graph: RemoteWorkspaceAssetGraph, - candidates_unit: Iterable[AssetKeyPartitionKey], - asset_partitions_to_request: AbstractSet[AssetKeyPartitionKey], + candidate_subset = asset_graph_view.get_entity_subset_from_asset_graph_subset( + candidate_asset_graph_subset_unit, parent_asset_key + ) + + not_yet_requested_parent_subset = parent_subset.compute_difference( + parent_being_requested_this_tick_subset.compute_union(candidate_subset) + ) + + children_of_not_yet_requested_parents = asset_graph_view.compute_child_subset( + candidate_asset_key, not_yet_requested_parent_subset + ).compute_intersection(entity_subset_to_filter) + + if not children_of_not_yet_requested_parents.is_empty: + failure_subsets_with_reasons.append( + ( + children_of_not_yet_requested_parents.get_internal_value(), + f"Parent subset {not_yet_requested_parent_subset} is not requested in this iteration", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + children_of_not_yet_requested_parents + ) + + return ( + entity_subset_to_filter, + failure_subsets_with_reasons, + ) + + +def _should_backfill_atomic_asset_graph_subset_unit( + asset_graph_view: AssetGraphView, + candidate_asset_graph_subset_unit: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, target_subset: AssetGraphSubset, requested_subset: AssetGraphSubset, materialized_subset: AssetGraphSubset, failed_and_downstream_subset: AssetGraphSubset, - dynamic_partitions_store: DynamicPartitionsStore, - current_time: datetime, -) -> Tuple[bool, str]: - """Args: - candidates_unit: A set of asset partitions that must all be materialized if any is - materialized. - - returns if the candidates_unit can be materialized in this tick of the backfill, and the reason - it cannot, if applicable - """ - for candidate in candidates_unit: - candidate_asset_partition_string = f"Asset {candidate.asset_key.to_user_string()} {f'with partition {candidate.partition_key}' if candidate.partition_key is not None else ''}" - if candidate not in target_subset: - return ( - False, - f"{candidate_asset_partition_string} is not targeted by backfill", - ) - elif candidate in failed_and_downstream_subset: - return ( - False, - f"{candidate_asset_partition_string} has failed or is downstream of a failed asset", - ) - elif candidate in materialized_subset: - return ( - False, - f"{candidate_asset_partition_string} was already materialized by backfill", - ) - elif candidate in requested_subset: - return ( - False, - f"{candidate_asset_partition_string} was already requested by backfill", - ) +) -> AssetGraphViewBfsFilterConditionResult: + failure_subset_values_with_reasons: List[Tuple[EntitySubsetValue, str]] = [] - parent_partitions_result = asset_graph.get_parents_partitions( - dynamic_partitions_store, current_time, *candidate + candidate_entity_subsets = list( + asset_graph_view.iterate_asset_subsets(candidate_asset_graph_subset_unit) + ) + + # this value is the same for all passed in asset keys since they are always part of the same + # execution set + passed_subset_value = candidate_entity_subsets[0].get_internal_value() + + candidate_asset_keys = [ + candidate_entity_subset.key for candidate_entity_subset in candidate_entity_subsets + ] + + for candidate_asset_key in candidate_asset_keys: + # filter down the set of matching values for each asset key + passed_serializable_entity_subset = SerializableEntitySubset( + candidate_asset_key, + passed_subset_value, + ) + entity_subset_to_filter = check.not_none( + asset_graph_view.get_subset_from_serializable_subset(passed_serializable_entity_subset) ) - if parent_partitions_result.required_but_nonexistent_parents_partitions: - raise DagsterInvariantViolationError( - f"Asset partition {candidate}" - " depends on invalid partition keys" - f" {parent_partitions_result.required_but_nonexistent_parents_partitions}" - ) - asset_partitions_to_request_map: Dict[AssetKey, Set[Optional[str]]] = defaultdict(set) - for asset_partition in asset_partitions_to_request: - asset_partitions_to_request_map[asset_partition.asset_key].add( - asset_partition.partition_key + if entity_subset_to_filter.is_empty: + break + + entity_subset_to_filter, new_failure_subset_values_with_reasons = ( + _should_backfill_atomic_asset_subset_unit( + asset_graph_view, + entity_subset_to_filter=entity_subset_to_filter, + candidate_asset_graph_subset_unit=candidate_asset_graph_subset_unit, + asset_graph_subset_matched_so_far=asset_graph_subset_matched_so_far, + target_subset=target_subset, + requested_subset=requested_subset, + materialized_subset=materialized_subset, + failed_and_downstream_subset=failed_and_downstream_subset, ) + ) + passed_subset_value = entity_subset_to_filter.value + failure_subset_values_with_reasons.extend(new_failure_subset_values_with_reasons) + + passed_entity_subsets = [] + for candidate_entity_subset in candidate_entity_subsets: + passed_entity_subsets.append( + check.not_none( + asset_graph_view.get_subset_from_serializable_subset( + SerializableEntitySubset(candidate_entity_subset.key, passed_subset_value) + ) + ) + ) - for parent in parent_partitions_result.parent_partitions: - if parent in target_subset and parent not in materialized_subset: - can_run, failed_reason = can_run_with_parent( - parent, - candidate, - candidates_unit, - asset_graph, - target_subset, - asset_partitions_to_request_map, + failure_asset_graph_subsets_with_reasons = [] + # Any failure partition values apply to all candidate asset keys, so construct a subset + # graph with that partition subset value for each key + for failure_subset_value, reason in failure_subset_values_with_reasons: + failure_entity_subsets = [ + check.not_none( + asset_graph_view.get_subset_from_serializable_subset( + SerializableEntitySubset(candidate_entity_subset.key, failure_subset_value) ) - if not can_run: - return False, failed_reason + ) + for candidate_entity_subset in candidate_entity_subsets + ] + failure_asset_graph_subsets_with_reasons.append( + ( + AssetGraphSubset.from_entity_subsets( + entity_subsets=failure_entity_subsets, + ), + reason, + ) + ) - return True, "" + return AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=AssetGraphSubset.from_entity_subsets(passed_entity_subsets), + excluded_asset_graph_subsets_and_reasons=failure_asset_graph_subsets_with_reasons, + ) -def _get_failed_asset_partitions( - instance_queryer: CachingInstanceQueryer, +def _get_failed_asset_graph_subset( + asset_graph_view: AssetGraphView, backfill_id: str, - asset_graph: RemoteAssetGraph, materialized_subset: AssetGraphSubset, -) -> Sequence[AssetKeyPartitionKey]: - """Returns asset partitions that materializations were requested for as part of the backfill, but were +) -> AssetGraphSubset: + """Returns asset subset that materializations were requested for as part of the backfill, but were not successfully materialized. This function gets a list of all runs for the backfill that have failed and extracts the asset partitions @@ -1750,6 +1986,8 @@ def _get_failed_asset_partitions( Includes canceled asset partitions. Implementation assumes that successful runs won't have any failed partitions. """ + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() + runs = instance_queryer.instance.get_runs( filters=RunsFilter( tags={BACKFILL_ID_TAG: backfill_id}, @@ -1757,8 +1995,7 @@ def _get_failed_asset_partitions( ) ) - result: List[AssetKeyPartitionKey] = [] - + result: AssetGraphSubset = AssetGraphSubset.empty() for run in runs: planned_asset_keys = instance_queryer.get_planned_materializations_for_run( run_id=run.run_id @@ -1778,25 +2015,22 @@ def _get_failed_asset_partitions( start=run.tags[ASSET_PARTITION_RANGE_START_TAG], end=run.tags[ASSET_PARTITION_RANGE_END_TAG], ) - asset_partition_candidates = [] - for asset_key in failed_asset_keys: - asset_partition_candidates.extend( - asset_graph.get_partitions_in_range( - asset_key, partition_range, instance_queryer - ) - ) + candidate_subset = AssetGraphSubset.from_entity_subsets( + [ + asset_graph_view.get_entity_subset_in_range(asset_key, partition_range) + for asset_key in failed_asset_keys + ] + ) + else: # a regular backfill run that run on a single partition partition_key = run.tags.get(PARTITION_NAME_TAG) - asset_partition_candidates = [ - AssetKeyPartitionKey(asset_key, partition_key) for asset_key in failed_asset_keys - ] + candidate_subset = AssetGraphSubset.from_asset_partition_set( + {AssetKeyPartitionKey(asset_key, partition_key) for asset_key in failed_asset_keys}, + asset_graph_view.asset_graph, + ) - asset_partitions_still_failed = [ - asset_partition - for asset_partition in asset_partition_candidates - if asset_partition not in materialized_subset - ] - result.extend(asset_partitions_still_failed) + asset_subset_still_failed = candidate_subset - materialized_subset + result = result | asset_subset_still_failed return result diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 765204ab65ebe..e3ad97a955adb 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -44,6 +44,10 @@ multi_asset, ) from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext +from dagster._core.asset_graph_view.bfs import ( + AssetGraphViewBfsFilterConditionResult, + bfs_filter_asset_graph_view, +) from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.base_asset_graph import BaseAssetGraph from dagster._core.definitions.decorators.repository_decorator import repository @@ -248,15 +252,25 @@ def test_from_asset_partitions_target_subset( ) -def _get_instance_queryer( - instance: DagsterInstance, asset_graph: BaseAssetGraph, evaluation_time: datetime.datetime -) -> CachingInstanceQueryer: +def _get_asset_graph_view( + instance: DagsterInstance, + asset_graph: BaseAssetGraph, + evaluation_time: Optional[datetime.datetime] = None, +) -> AssetGraphView: return AssetGraphView( temporal_context=TemporalContext( effective_dt=evaluation_time or get_current_datetime(), last_event_id=None ), instance=instance, asset_graph=asset_graph, + ) + + +def _get_instance_queryer( + instance: DagsterInstance, asset_graph: BaseAssetGraph, evaluation_time: datetime.datetime +) -> CachingInstanceQueryer: + return _get_asset_graph_view( + instance, asset_graph, evaluation_time ).get_inner_queryer_for_back_compat() @@ -292,10 +306,7 @@ def _single_backfill_iteration( return backfill_data.with_run_requests_submitted( result.run_requests, - asset_graph, - instance_queryer=_get_instance_queryer( - instance, asset_graph, backfill_data.backfill_start_datetime - ), + _get_asset_graph_view(instance, asset_graph, backfill_data.backfill_start_datetime), ) @@ -522,11 +533,21 @@ def make_random_subset( if i % 2 == 0: root_asset_partitions.add(AssetKeyPartitionKey(root_asset_key, None)) - target_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, lambda _a, _b: (True, ""), root_asset_partitions, evaluation_time=evaluation_time - ) + asset_graph_view = _get_asset_graph_view(instance, asset_graph, evaluation_time=evaluation_time) - return AssetGraphSubset.from_asset_partition_set(target_asset_partitions, asset_graph) + return bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + root_asset_partitions, asset_graph + ), + include_full_execution_set=True, + )[0] def make_subset_from_partition_keys( @@ -545,11 +566,21 @@ def make_subset_from_partition_keys( else: root_asset_partitions.add(AssetKeyPartitionKey(root_asset_key, None)) - target_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, lambda _a, _b: (True, ""), root_asset_partitions, evaluation_time=evaluation_time - ) + asset_graph_view = _get_asset_graph_view(instance, asset_graph, evaluation_time=evaluation_time) - return AssetGraphSubset.from_asset_partition_set(target_asset_partitions, asset_graph) + return bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + root_asset_partitions, asset_graph + ), + include_full_execution_set=True, + )[0] def get_asset_graph( @@ -590,10 +621,9 @@ def execute_asset_backfill_iteration_consume_generator( for result in execute_asset_backfill_iteration_inner( backfill_id=backfill_id, asset_backfill_data=asset_backfill_data, - instance_queryer=_get_instance_queryer( + asset_graph_view=_get_asset_graph_view( instance, asset_graph, asset_backfill_data.backfill_start_datetime ), - asset_graph=asset_graph, backfill_start_timestamp=asset_backfill_data.backfill_start_timestamp, logger=logging.getLogger("fake_logger"), ): @@ -618,11 +648,22 @@ def run_backfill_to_completion( # assert each asset partition only targeted once requested_asset_partitions: Set[AssetKeyPartitionKey] = set() - fail_and_downstream_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, - lambda _a, _b: (True, ""), - fail_asset_partitions, - evaluation_time=backfill_data.backfill_start_datetime, + asset_graph_view = _get_asset_graph_view(instance, asset_graph) + + fail_and_downstream_asset_graph_subset, _ = bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + set(fail_asset_partitions), asset_graph + ), + include_full_execution_set=True, + ) + + fail_and_downstream_asset_partitions = set( + fail_and_downstream_asset_graph_subset.iterate_asset_partitions() ) while not backfill_is_complete( @@ -642,14 +683,11 @@ def run_backfill_to_completion( assert result1.backfill_data != backfill_data - instance_queryer = _get_instance_queryer( - instance, asset_graph, evaluation_time=backfill_data.backfill_start_datetime - ) - backfill_data_with_submitted_runs = result1.backfill_data.with_run_requests_submitted( result1.run_requests, - asset_graph, - instance_queryer, + _get_asset_graph_view( + instance, asset_graph, evaluation_time=backfill_data.backfill_start_datetime + ), ) # once everything that was requested is added to the requested subset, nothing should change if the iteration repeats @@ -1112,7 +1150,7 @@ def may_asset( ) instance = DagsterInstance.ephemeral() - with pytest.raises(DagsterInvariantViolationError, match="depends on invalid partition keys"): + with pytest.raises(DagsterInvariantViolationError, match="depends on invalid partitions"): run_backfill_to_completion(asset_graph, assets_by_repo_name, backfill_data, [], instance) @@ -1160,14 +1198,15 @@ def downstream_daily_partitioned_asset( assert len(instance.get_runs()) == 1 - instance_queryer = _get_instance_queryer(instance, asset_graph, backfill_start_datetime) - canceling_backfill_data = None for canceling_backfill_data in get_canceling_asset_backfill_iteration_data( backfill_id, asset_backfill_data, - instance_queryer, - asset_graph, + _get_asset_graph_view( + instance, + asset_graph, + backfill_start_datetime, + ), backfill_start_datetime.timestamp(), ): pass @@ -1245,14 +1284,11 @@ def downstream_daily_partitioned_asset( in asset_backfill_data.failed_and_downstream_subset ) - instance_queryer = _get_instance_queryer(instance, asset_graph, backfill_start_datetime) - canceling_backfill_data = None for canceling_backfill_data in get_canceling_asset_backfill_iteration_data( backfill_id, asset_backfill_data, - instance_queryer, - asset_graph, + _get_asset_graph_view(instance, asset_graph, backfill_start_datetime), backfill_start_datetime.timestamp(), ): pass @@ -1478,8 +1514,8 @@ def foo_grandchild(foo_child): ], ) - target_root_partitions = asset_backfill_data.get_target_root_asset_partitions(instance_queryer) - assert set(target_root_partitions) == { + target_root_subset = asset_backfill_data.get_target_root_asset_graph_subset(instance_queryer) + assert set(target_root_subset.iterate_asset_partitions()) == { AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-05"), AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-03"), AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-04"),