From efc40d3eb8a334c95985495226626e780ec27f94 Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Fri, 15 Nov 2024 20:14:44 -0600 Subject: [PATCH] Make asset backfills subset-aware refactor > Insert changelog entry or delete this section. moar refactor > Insert changelog entry or delete this section. subsets instead of values > Insert changelog entry or delete this section. more progress on subsets instead of values > Insert changelog entry or delete this section. more progress > Insert changelog entry or delete this section. yet more progress > Insert changelog entry or delete this section. more progress > Insert changelog entry or delete this section. more > Insert changelog entry or delete this section. renames > Insert changelog entry or delete this section. final names? > Insert changelog entry or delete this section. owen feedback > Insert changelog entry or delete this section. --- .../graphql/test_partition_backfill.py | 118 ++-- .../_core/definitions/base_asset_graph.py | 141 +---- .../dagster/_core/execution/asset_backfill.py | 599 +++++++++++++----- .../execution_tests/test_asset_backfill.py | 110 +++- .../scenarios/partition_scenarios.py | 8 + 5 files changed, 567 insertions(+), 409 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index c06f5e61b843a..29aefded58b3d 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -10,9 +10,9 @@ asset, define_asset_job, ) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.partition_key_range import PartitionKeyRange -from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.execution.asset_backfill import ( AssetBackfillIterationResult, execute_asset_backfill_iteration, @@ -34,7 +34,6 @@ from dagster._core.utils import make_new_backfill_id from dagster._seven import get_system_temp_directory from dagster._utils import safe_tempfile_path -from dagster._utils.caching_instance_queryer import CachingInstanceQueryer from dagster_graphql.client.query import ( LAUNCH_PARTITION_BACKFILL_MUTATION, LAUNCH_PIPELINE_EXECUTION_MUTATION, @@ -241,27 +240,25 @@ def _get_run_stats(partition_statuses): } -def _execute_asset_backfill_iteration_no_side_effects( - graphql_context, backfill_id: str, asset_graph: RemoteWorkspaceAssetGraph -) -> None: +def _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id: str) -> None: """Executes an asset backfill iteration and updates the serialized asset backfill data. However, does not execute side effects i.e. launching runs. """ backfill = graphql_context.instance.get_backfill(backfill_id) asset_backfill_data = backfill.asset_backfill_data result = None - instance_queryer = CachingInstanceQueryer( - graphql_context.instance, - asset_graph, - graphql_context, - asset_backfill_data.backfill_start_datetime, + asset_graph_view = AssetGraphView( + temporal_context=TemporalContext( + effective_dt=asset_backfill_data.backfill_start_datetime, last_event_id=None + ), + instance=graphql_context.instance, + asset_graph=graphql_context.asset_graph, ) with environ({"ASSET_BACKFILL_CURSOR_DELAY_TIME": "0"}): for result in execute_asset_backfill_iteration_inner( backfill_id=backfill_id, asset_backfill_data=asset_backfill_data, - instance_queryer=instance_queryer, - asset_graph=asset_graph, + asset_graph_view=asset_graph_view, backfill_start_timestamp=asset_backfill_data.backfill_start_timestamp, logger=logging.getLogger("fake_logger"), ): @@ -275,10 +272,12 @@ def _execute_asset_backfill_iteration_no_side_effects( updated_backfill = backfill.with_asset_backfill_data( result.backfill_data.with_run_requests_submitted( - result.run_requests, asset_graph=asset_graph, instance_queryer=instance_queryer + result.run_requests, + asset_graph=graphql_context.asset_graph, + instance_queryer=asset_graph_view.get_inner_queryer_for_back_compat(), ), dynamic_partitions_store=graphql_context.instance, - asset_graph=asset_graph, + asset_graph=graphql_context.asset_graph, ) graphql_context.instance.update_backfill(updated_backfill) @@ -312,12 +311,11 @@ def _execute_job_backfill_iteration_with_side_effects(graphql_context, backfill_ def _mock_asset_backfill_runs( graphql_context, asset_key: AssetKey, - asset_graph: RemoteWorkspaceAssetGraph, backfill_id: str, status: DagsterRunStatus, partition_key: Optional[str], ): - partitions_def = asset_graph.get(asset_key).partitions_def + partitions_def = graphql_context.asset_graph.get(asset_key).partitions_def @asset( partitions_def=partitions_def, @@ -751,10 +749,7 @@ def test_cancel_asset_backfill(self, graphql_context): # Update asset backfill data to contain requested partition, but does not execute side effects, # since launching the run will cause test process will hang forever. - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # Launch the run that runs forever selector = infer_job_selector(graphql_context, "hanging_partition_asset_job") @@ -830,10 +825,8 @@ def test_cancel_then_retry_asset_backfill(self, graphql_context): # Update asset backfill data to contain requested partition, but does not execute side effects, # since launching the run will cause test process will hang forever. - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # Launch the run that runs forever selector = infer_job_selector(graphql_context, "hanging_partition_asset_job") @@ -1144,11 +1137,7 @@ def test_asset_backfill_partition_stats(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) for partition, status in [ ("a", DagsterRunStatus.SUCCESS), @@ -1157,11 +1146,9 @@ def test_asset_backfill_partition_stats(self, graphql_context): ("e", DagsterRunStatus.SUCCESS), ("f", DagsterRunStatus.FAILURE), ]: - _mock_asset_backfill_runs( - graphql_context, asset_key, asset_graph, backfill_id, status, partition - ) + _mock_asset_backfill_runs(graphql_context, asset_key, backfill_id, status, partition) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) result = execute_dagster_graphql( graphql_context, @@ -1187,10 +1174,6 @@ def test_asset_backfill_partition_stats(self, graphql_context): assert asset_partition_status_counts[0]["numPartitionsFailed"] == 2 def test_asset_backfill_status_with_upstream_failure(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1213,25 +1196,23 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) result = execute_dagster_graphql( graphql_context, @@ -1658,10 +1639,6 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context): assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10 def test_reexecute_asset_backfill_from_failure(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1684,25 +1661,23 @@ def test_reexecute_asset_backfill_from_failure(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as completed so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -1755,10 +1730,6 @@ def test_reexecute_asset_backfill_from_failure(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_successful_asset_backfill(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1780,25 +1751,23 @@ def test_reexecute_successful_asset_backfill(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as complete so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -1842,10 +1811,6 @@ def test_reexecute_successful_asset_backfill(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1868,25 +1833,23 @@ def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # try to retry the backfill while it is still in progress result = execute_dagster_graphql( @@ -1966,10 +1929,6 @@ def test_reexecute_asset_backfill_still_in_progress(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_reexecute_asset_backfill_twice(self, graphql_context): - code_location = graphql_context.get_code_location(main_repo_location_name()) - repository = code_location.get_repository("test_repo") - asset_graph = repository.asset_graph - asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), @@ -1992,25 +1951,23 @@ def test_reexecute_asset_backfill_twice(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("unpartitioned_upstream_of_partitioned"), - asset_graph, backfill_id, DagsterRunStatus.SUCCESS, None, ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id) # mark backfill as completed so we can retry it backfill = graphql_context.instance.get_backfill(backfill_id) @@ -2040,19 +1997,18 @@ def test_reexecute_asset_backfill_twice(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id _execute_asset_backfill_iteration_no_side_effects( - graphql_context, retried_backfill.backfill_id, asset_graph + graphql_context, retried_backfill.backfill_id ) # mark some partitions failed so we can retry again _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), - asset_graph, retried_backfill.backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) _execute_asset_backfill_iteration_no_side_effects( - graphql_context, retried_backfill.backfill_id, asset_graph + graphql_context, retried_backfill.backfill_id ) # refetch the backfill to get the updated statuses of all assets diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 8245d2629bd38..b8feec764e5d6 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -2,8 +2,7 @@ from collections import defaultdict, deque from collections.abc import Iterable, Iterator, Mapping, Sequence from datetime import datetime -from functools import cached_property, total_ordering -from heapq import heapify, heappop, heappush +from functools import cached_property from typing import ( # noqa: UP035 TYPE_CHECKING, AbstractSet, @@ -747,67 +746,6 @@ def bfs_filter_subsets( return result - def bfs_filter_asset_partitions( - self, - dynamic_partitions_store: DynamicPartitionsStore, - condition_fn: Callable[ - [Iterable[AssetKeyPartitionKey], AbstractSet[AssetKeyPartitionKey]], - tuple[bool, str], - ], - initial_asset_partitions: Iterable[AssetKeyPartitionKey], - evaluation_time: datetime, - ) -> tuple[ - AbstractSet[AssetKeyPartitionKey], - Sequence[tuple[Iterable[AssetKeyPartitionKey], str]], - ]: - """Returns asset partitions within the graph that satisfy supplied criteria. - - - Are >= initial_asset_partitions - - Match the condition_fn - - Any of their ancestors >= initial_asset_partitions match the condition_fn - - Also returns a list of tuples, where each tuple is a candidated_unit (list of - AssetKeyPartitionKeys that must be materialized together - ie multi_asset) that do not - satisfy the criteria and the reason they were filtered out. - - The condition_fn should return a tuple of a boolean indicating whether the asset partition meets - the condition and a string explaining why it does not meet the condition, if applicable. - - Visits parents before children. - - When asset partitions are part of the same execution set (non-subsettable multi-asset), - they're provided all at once to the condition_fn. - """ - all_nodes = set(initial_asset_partitions) - - # invariant: we never consider an asset partition before considering its ancestors - queue = ToposortedPriorityQueue(self, all_nodes, include_full_execution_set=True) - - result: set[AssetKeyPartitionKey] = set() - failed_reasons: list[tuple[Iterable[AssetKeyPartitionKey], str]] = [] - - while len(queue) > 0: - candidates_unit = queue.dequeue() - - meets_condition, fail_reason = condition_fn(candidates_unit, result) - if meets_condition: - result.update(candidates_unit) - - for candidate in candidates_unit: - for child in self.get_children_partitions( - dynamic_partitions_store, - evaluation_time, - candidate.asset_key, - candidate.partition_key, - ): - if child not in all_nodes: - queue.enqueue(child) - all_nodes.add(child) - else: - failed_reasons.append((candidates_unit, fail_reason)) - - return result, failed_reasons - def split_entity_keys_by_repository( self, keys: AbstractSet[EntityKey] ) -> Sequence[AbstractSet[EntityKey]]: @@ -847,80 +785,3 @@ def sort_key_for_asset_partition( # sort non-self dependencies from newest to oldest, as newer partitions are more relevant # than older ones return -1 * partition_timestamp - - -class ToposortedPriorityQueue: - """Queue that returns parents before their children.""" - - @total_ordering - class QueueItem(NamedTuple): - level: int - partition_sort_key: Optional[float] - asset_partitions: Iterable[AssetKeyPartitionKey] - - def __eq__(self, other: object) -> bool: - if isinstance(other, ToposortedPriorityQueue.QueueItem): - return ( - self.level == other.level - and self.partition_sort_key == other.partition_sort_key - ) - return False - - def __lt__(self, other: object) -> bool: - if isinstance(other, ToposortedPriorityQueue.QueueItem): - return self.level < other.level or ( - self.level == other.level - and self.partition_sort_key is not None - and other.partition_sort_key is not None - and self.partition_sort_key < other.partition_sort_key - ) - raise TypeError() - - def __init__( - self, - asset_graph: BaseAssetGraph, - items: Iterable[AssetKeyPartitionKey], - include_full_execution_set: bool, - ): - self._asset_graph = asset_graph - self._include_full_execution_set = include_full_execution_set - - self._toposort_level_by_asset_key = { - asset_key: i - for i, asset_keys in enumerate(asset_graph.toposorted_asset_keys_by_level) - for asset_key in asset_keys - } - self._heap = [self._queue_item(asset_partition) for asset_partition in items] - heapify(self._heap) - - def enqueue(self, asset_partition: AssetKeyPartitionKey) -> None: - heappush(self._heap, self._queue_item(asset_partition)) - - def dequeue(self) -> Iterable[AssetKeyPartitionKey]: - # For multi-assets, will include all required multi-asset keys if - # include_full_execution_set is set to True, or a list of size 1 with just the passed in - # asset key if it was not. - return heappop(self._heap).asset_partitions - - def _queue_item( - self, asset_partition: AssetKeyPartitionKey - ) -> "ToposortedPriorityQueue.QueueItem": - asset_key = asset_partition.asset_key - - if self._include_full_execution_set: - execution_set_keys = self._asset_graph.get(asset_key).execution_set_asset_keys - else: - execution_set_keys = {asset_key} - - level = max( - self._toposort_level_by_asset_key[asset_key] for asset_key in execution_set_keys - ) - - return ToposortedPriorityQueue.QueueItem( - level, - sort_key_for_asset_partition(self._asset_graph, asset_partition), - [AssetKeyPartitionKey(ak, asset_partition.partition_key) for ak in execution_set_keys], - ) - - def __len__(self) -> int: - return len(self._heap) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index a8ddf1037086a..75a4c746ba203 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -2,13 +2,22 @@ import logging import os import time -from collections import defaultdict from collections.abc import Iterable, Mapping, Sequence from datetime import datetime from enum import Enum from typing import TYPE_CHECKING, AbstractSet, NamedTuple, Optional, Union, cast # noqa: UP035 import dagster._check as check +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext +from dagster._core.asset_graph_view.bfs import ( + AssetGraphViewBfsFilterConditionResult, + bfs_filter_asset_graph_view, +) +from dagster._core.asset_graph_view.entity_subset import EntitySubset +from dagster._core.asset_graph_view.serializable_entity_subset import ( + EntitySubsetValue, + SerializableEntitySubset, +) from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_selection import KeysAssetSelection from dagster._core.definitions.automation_tick_evaluation_context import ( @@ -996,13 +1005,18 @@ def execute_asset_backfill_iteration( check.failed("Backfill must be an asset backfill") backfill_start_datetime = datetime_from_timestamp(backfill.backfill_timestamp) - instance_queryer = CachingInstanceQueryer( + + asset_graph_view = AssetGraphView( + temporal_context=TemporalContext( + effective_dt=backfill_start_datetime, + last_event_id=None, + ), instance=instance, asset_graph=asset_graph, - loading_context=workspace_context, - evaluation_time=backfill_start_datetime, ) + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() + previous_asset_backfill_data = _check_validity_and_deserialize_asset_backfill_data( workspace_context, backfill, asset_graph, instance_queryer, logger ) @@ -1034,8 +1048,7 @@ def execute_asset_backfill_iteration( for result in execute_asset_backfill_iteration_inner( backfill_id=backfill.backfill_id, asset_backfill_data=previous_asset_backfill_data, - instance_queryer=instance_queryer, - asset_graph=asset_graph, + asset_graph_view=asset_graph_view, backfill_start_timestamp=backfill.backfill_timestamp, logger=logger, ): @@ -1232,6 +1245,7 @@ def get_canceling_asset_backfill_iteration_data( " AssetGraphSubset object" ) + # TODO: Make this use subsets throughout instead of a sequence of AssetKeyPartitionKeys failed_subset = AssetGraphSubset.from_asset_partition_set( set( _get_failed_asset_partitions( @@ -1312,31 +1326,66 @@ def get_asset_backfill_iteration_materialized_partitions( yield updated_materialized_subset +def _get_subset_in_target_subset( + asset_graph_view: AssetGraphView, + candidate_asset_graph_subset: AssetGraphSubset, + target_subset: AssetGraphSubset, +) -> "AssetGraphSubset": + candidate_entity_subsets = list( + asset_graph_view.iterate_asset_subsets(candidate_asset_graph_subset) + ) + + assert ( + len(candidate_entity_subsets) == 1 + ), "Since include_execution_set=False, there should be exactly one candidate entity subset" + + candidate_entity_subset = next(iter(candidate_entity_subsets)) + + subset_in_target_subset: EntitySubset[AssetKey] = candidate_entity_subset.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + target_subset, candidate_entity_subset.key + ) + ) + + return AssetGraphSubset.from_entity_subsets([subset_in_target_subset]) + + def _get_failed_and_downstream_asset_partitions( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, - instance_queryer: CachingInstanceQueryer, - backfill_start_timestamp: float, + asset_graph_view: AssetGraphView, materialized_subset: AssetGraphSubset, ) -> AssetGraphSubset: - failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set( - asset_graph.bfs_filter_asset_partitions( - instance_queryer, - lambda asset_partitions, _: ( - any( - asset_partition in asset_backfill_data.target_subset - for asset_partition in asset_partitions - ), - "", - ), + asset_graph = cast(RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph) + + failed_asset_graph_subset = AssetGraphSubset.from_asset_partition_set( + set( _get_failed_asset_partitions( - instance_queryer, backfill_id, asset_graph, materialized_subset - ), - evaluation_time=datetime_from_timestamp(backfill_start_timestamp), - )[0], + asset_graph_view.get_inner_queryer_for_back_compat(), + backfill_id, + asset_graph, + materialized_subset, + ) + ), asset_graph, ) + + failed_and_downstream_subset = bfs_filter_asset_graph_view( + asset_graph_view, + lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=_get_subset_in_target_subset( + asset_graph_view, + candidate_asset_graph_subset, + asset_backfill_data.target_subset, + ), + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=failed_asset_graph_subset, + include_full_execution_set=False, + )[0] + return failed_and_downstream_subset @@ -1361,10 +1410,21 @@ def _partition_subset_str( if isinstance(partition_subset, TimeWindowPartitionsSubset) and isinstance( partitions_def, TimeWindowPartitionsDefinition ): - return ", ".join( - f"{partitions_def.get_num_partitions_in_window(time_window.to_public_time_window())} partitions: {time_window.start} -> {time_window.end}" - for time_window in partition_subset.included_time_windows - ) + time_window_strs = [] + for time_window in partition_subset.included_time_windows: + partition_key_range = partitions_def.get_partition_key_range_for_time_window( + time_window.to_public_time_window(), respect_bounds=False + ) + num_partitions = partitions_def.get_num_partitions_in_window( + time_window.to_public_time_window() + ) + if num_partitions == 1: + time_window_strs.append(f"1 partition: {partition_key_range.start}") + else: + time_window_strs.append( + f"{num_partitions} partitions: {partition_key_range.start} -> {partition_key_range.end}" + ) + return ", ".join(time_window_strs) return ", ".join(partition_subset.get_partition_keys()) @@ -1373,24 +1433,24 @@ def _asset_graph_subset_to_str( asset_graph_subset: AssetGraphSubset, asset_graph: BaseAssetGraph, ) -> str: - return_str = "" + return_strs = [] asset_subsets = asset_graph_subset.iterate_asset_subsets(asset_graph) + for subset in asset_subsets: if subset.is_partitioned: partitions_def = asset_graph.get(subset.key).partitions_def partition_ranges_str = _partition_subset_str(subset.subset_value, partitions_def) - return_str += f"- {subset.key.to_user_string()}: {{{partition_ranges_str}}}\n" + return_strs.append(f"- {subset.key.to_user_string()}: {{{partition_ranges_str}}}") else: - return_str += f"- {subset.key.to_user_string()}\n" + return_strs.append(f"- {subset.key.to_user_string()}") - return return_str + return "\n".join(return_strs) def execute_asset_backfill_iteration_inner( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, - instance_queryer: CachingInstanceQueryer, + asset_graph_view: AssetGraphView, backfill_start_timestamp: float, logger: logging.Logger, ) -> Iterable[Optional[AssetBackfillIterationResult]]: @@ -1402,6 +1462,11 @@ def execute_asset_backfill_iteration_inner( This is a generator so that we can return control to the daemon and let it heartbeat during expensive operations. """ + instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat() + asset_graph: RemoteWorkspaceAssetGraph = cast( + RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph + ) + initial_candidates: set[AssetKeyPartitionKey] = set() request_roots = not asset_backfill_data.requested_runs_for_target_roots if request_roots: @@ -1411,7 +1476,7 @@ def execute_asset_backfill_iteration_inner( target_roots = asset_backfill_data.get_target_root_asset_partitions(instance_queryer) initial_candidates.update(target_roots) logger.info( - f"Root assets that have not yet been requested:\n {_asset_graph_subset_to_str(AssetGraphSubset.from_asset_partition_set(set(target_roots), asset_graph), asset_graph)}" + f"Root assets that have not yet been requested:\n{_asset_graph_subset_to_str(AssetGraphSubset.from_asset_partition_set(set(target_roots), asset_graph), asset_graph)}" ) yield None @@ -1444,8 +1509,8 @@ def execute_asset_backfill_iteration_inner( updated_materialized_subset - asset_backfill_data.materialized_subset ) logger.info( - f"Assets materialized since last tick:\n {_asset_graph_subset_to_str(materialized_since_last_tick, asset_graph)}" - if materialized_since_last_tick + f"Assets materialized since last tick:\n{_asset_graph_subset_to_str(materialized_since_last_tick, asset_graph)}" + if not materialized_since_last_tick.empty else "No relevant assets materialized since last tick." ) @@ -1465,54 +1530,45 @@ def execute_asset_backfill_iteration_inner( failed_and_downstream_subset = _get_failed_and_downstream_asset_partitions( backfill_id, asset_backfill_data, - asset_graph, - instance_queryer, - backfill_start_timestamp, + asset_graph_view, updated_materialized_subset, ) yield None - backfill_start_datetime = datetime_from_timestamp(backfill_start_timestamp) + initial_asset_graph_subset = AssetGraphSubset.from_asset_partition_set( + initial_candidates, asset_graph + ) - asset_partitions_to_request, not_requested_and_reasons = ( - asset_graph.bfs_filter_asset_partitions( - instance_queryer, - lambda unit, visited: should_backfill_atomic_asset_partitions_unit( - candidates_unit=unit, - asset_partitions_to_request=visited, - asset_graph=asset_graph, - materialized_subset=updated_materialized_subset, - requested_subset=asset_backfill_data.requested_subset, - target_subset=asset_backfill_data.target_subset, - failed_and_downstream_subset=failed_and_downstream_subset, - dynamic_partitions_store=instance_queryer, - current_time=backfill_start_datetime, - ), - initial_asset_partitions=initial_candidates, - evaluation_time=backfill_start_datetime, - ) + asset_subset_to_request, not_requested_and_reasons = bfs_filter_asset_graph_view( + asset_graph_view, + lambda candidate_asset_graph_subset, + visited: _should_backfill_atomic_asset_graph_subset_unit( + asset_graph_view=asset_graph_view, + candidate_asset_graph_subset_unit=candidate_asset_graph_subset, + asset_graph_subset_matched_so_far=visited, + materialized_subset=updated_materialized_subset, + requested_subset=asset_backfill_data.requested_subset, + target_subset=asset_backfill_data.target_subset, + failed_and_downstream_subset=failed_and_downstream_subset, + ), + initial_asset_graph_subset=initial_asset_graph_subset, + include_full_execution_set=True, ) logger.info( - f"Asset partitions to request:\n {_asset_graph_subset_to_str(AssetGraphSubset.from_asset_partition_set(asset_partitions_to_request, asset_graph), asset_graph)}" - if asset_partitions_to_request + f"Asset partitions to request:\n{_asset_graph_subset_to_str(asset_subset_to_request, asset_graph)}" + if not asset_subset_to_request.empty else "No asset partitions to request." ) - if len(not_requested_and_reasons) > 0: - def _format_keys(keys: Iterable[AssetKeyPartitionKey]): - return ", ".join( - f"({key.asset_key.to_user_string()}, {key.partition_key})" - if key.partition_key - else key.asset_key.to_user_string() - for key in keys - ) + asset_partitions_to_request = set(asset_subset_to_request.iterate_asset_partitions()) - not_requested_str = "\n".join( + if len(not_requested_and_reasons) > 0: + not_requested_str = "\n\n".join( [ - f"[{_format_keys(keys)}] - Reason: {reason}." - for keys, reason in not_requested_and_reasons + f"{_asset_graph_subset_to_str(asset_graph_subset, asset_graph)}\nReason: {reason}" + for asset_graph_subset, reason in not_requested_and_reasons ] ) logger.info( @@ -1548,27 +1604,161 @@ def _format_keys(keys: Iterable[AssetKeyPartitionKey]): ) -def can_run_with_parent( - parent: AssetKeyPartitionKey, - candidate: AssetKeyPartitionKey, - candidates_unit: Iterable[AssetKeyPartitionKey], - asset_graph: RemoteWorkspaceAssetGraph, +def _should_backfill_atomic_asset_subset_unit( + asset_graph_view: AssetGraphView, + entity_subset_to_filter: EntitySubset[AssetKey], + candidate_asset_graph_subset_unit: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, target_subset: AssetGraphSubset, - asset_partitions_to_request_map: Mapping[AssetKey, AbstractSet[Optional[str]]], -) -> tuple[bool, str]: - """Returns if a given candidate can be materialized in the same run as a given parent on - this tick, and the reason it cannot be materialized, if applicable. - """ - parent_target_subset = target_subset.get_asset_subset(parent.asset_key, asset_graph) - candidate_target_subset = target_subset.get_asset_subset(candidate.asset_key, asset_graph) + requested_subset: AssetGraphSubset, + materialized_subset: AssetGraphSubset, + failed_and_downstream_subset: AssetGraphSubset, +) -> tuple[SerializableEntitySubset[AssetKey], Iterable[tuple[EntitySubsetValue, str]]]: + failure_subsets_with_reasons: list[tuple[EntitySubsetValue, str]] = [] + asset_graph = asset_graph_view.asset_graph + asset_key = entity_subset_to_filter.key + + missing_in_target_partitions = entity_subset_to_filter.compute_difference( + asset_graph_view.get_entity_subset_from_asset_graph_subset(target_subset, asset_key) + ) + if not missing_in_target_partitions.is_empty: + # Don't include a failure reason for this subset since it is unlikely to be + # useful to know that an untargeted subset was not included + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + missing_in_target_partitions + ) + + failed_and_downstream_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + failed_and_downstream_subset, asset_key + ) + ) + if not failed_and_downstream_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + failed_and_downstream_partitions.get_internal_value(), + f"{failed_and_downstream_partitions} has failed or is downstream of a failed asset", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + failed_and_downstream_partitions + ) + + materialized_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset(materialized_subset, asset_key) + ) + if not materialized_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + materialized_partitions.get_internal_value(), + f"{materialized_partitions} already materialized by backfill", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + materialized_partitions + ) + + requested_partitions = entity_subset_to_filter.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset(requested_subset, asset_key) + ) + + if not requested_partitions.is_empty: + failure_subsets_with_reasons.append( + ( + requested_partitions.get_internal_value(), + f"{requested_partitions} already requested by backfill", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference(requested_partitions) + + for parent_key in asset_graph.get(asset_key).parent_keys: + if entity_subset_to_filter.is_empty: + break + + parent_subset, required_but_nonexistent_subset = ( + asset_graph_view.compute_parent_subset_and_required_but_nonexistent_subset( + parent_key, + entity_subset_to_filter, + ) + ) + + if not required_but_nonexistent_subset.is_empty: + raise DagsterInvariantViolationError( + f"Asset partition subset {entity_subset_to_filter}" + f" depends on invalid partitions {required_but_nonexistent_subset}" + ) + + # Children with parents that are targeted but not materialized are eligible + # to be filtered out if the parent has not run yet + targeted_but_not_materialized_parent_subset: EntitySubset[AssetKey] = ( + parent_subset.compute_intersection( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + target_subset, parent_key + ) + ) + ).compute_difference( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + materialized_subset, parent_key + ) + ) + + possibly_waiting_for_parent_subset = ( + asset_graph_view.compute_child_subset( + asset_key, targeted_but_not_materialized_parent_subset + ) + ).compute_intersection(entity_subset_to_filter) + + not_waiting_for_parent_subset = entity_subset_to_filter.compute_difference( + possibly_waiting_for_parent_subset + ) + + if not possibly_waiting_for_parent_subset.is_empty: + can_run_with_parent_subset, parent_failure_subsets_with_reasons = ( + get_can_run_with_parent_subsets( + targeted_but_not_materialized_parent_subset, + possibly_waiting_for_parent_subset, + asset_graph_view, + target_subset, + asset_graph_subset_matched_so_far, + candidate_asset_graph_subset_unit, + ) + ) + if parent_failure_subsets_with_reasons: + failure_subsets_with_reasons.extend(parent_failure_subsets_with_reasons) + + entity_subset_to_filter = not_waiting_for_parent_subset.compute_union( + can_run_with_parent_subset + ) + + return ( + entity_subset_to_filter.convert_to_serializable_subset(), + failure_subsets_with_reasons, + ) + + +def get_can_run_with_parent_subsets( + parent_subset: EntitySubset[AssetKey], + entity_subset_to_filter: EntitySubset[AssetKey], + asset_graph_view: AssetGraphView, + target_subset: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, + candidate_asset_graph_subset_unit: AssetGraphSubset, +) -> tuple[EntitySubset[AssetKey], Iterable[tuple[EntitySubsetValue, str]]]: + candidate_asset_key = entity_subset_to_filter.key + parent_asset_key = parent_subset.key + + assert isinstance(asset_graph_view.asset_graph, RemoteWorkspaceAssetGraph) + asset_graph: RemoteWorkspaceAssetGraph = asset_graph_view.asset_graph + + parent_node = asset_graph.get(parent_asset_key) + candidate_node = asset_graph.get(candidate_asset_key) partition_mapping = asset_graph.get_partition_mapping( - candidate.asset_key, parent_asset_key=parent.asset_key + candidate_asset_key, parent_asset_key=parent_asset_key ) - is_self_dependency = parent.asset_key == candidate.asset_key + # First filter out cases where even if the parent was requested this iteration, it wouldn't + # matter, because the parent and child can't execute in the same run - parent_node = asset_graph.get(parent.asset_key) - candidate_node = asset_graph.get(candidate.asset_key) # checks if there is a simple partition mapping between the parent and the child has_identity_partition_mapping = ( # both unpartitioned @@ -1585,31 +1775,53 @@ def can_run_with_parent( ) if parent_node.backfill_policy != candidate_node.backfill_policy: return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different backfill policies so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different backfill policies so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key} is materialized.", + ) + ], ) if ( parent_node.resolve_to_singular_repo_scoped_node().repository_handle != candidate_node.resolve_to_singular_repo_scoped_node().repository_handle ): return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} are in different code locations so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} are in different code locations so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + ) + ], ) + if parent_node.partitions_def != candidate_node.partitions_def: return ( - False, - f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different partitions definitions so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different partitions definitions so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", + ) + ], ) - if ( - parent.partition_key not in asset_partitions_to_request_map[parent.asset_key] - and parent not in candidates_unit - ): - return ( - False, - f"parent {parent.asset_key.to_user_string()} with partition key {parent.partition_key} is not requested in this iteration", + + parent_target_subset = target_subset.get_asset_subset(parent_asset_key, asset_graph) + candidate_target_subset = target_subset.get_asset_subset(candidate_asset_key, asset_graph) + + parent_being_requested_this_tick_subset = ( + asset_graph_view.get_entity_subset_from_asset_graph_subset( + asset_graph_subset_matched_so_far, parent_asset_key ) - if ( + ) + + num_parent_partitions_being_requested_this_tick = parent_being_requested_this_tick_subset.size + + is_self_dependency = parent_asset_key == candidate_asset_key + + if not ( # if there is a simple mapping between the parent and the child, then # with the parent has_identity_partition_mapping @@ -1625,17 +1837,15 @@ def can_run_with_parent( parent_node.backfill_policy.max_partitions_per_run is None # a single run can materialize all requested parent partitions or parent_node.backfill_policy.max_partitions_per_run - > len(asset_partitions_to_request_map[parent.asset_key]) + > num_parent_partitions_being_requested_this_tick ) - # all targeted parents are being requested this tick, or its a self dependency + # all targeted parents are being requested this tick, or its a self depdendancy and ( - len(asset_partitions_to_request_map[parent.asset_key]) == parent_target_subset.size + num_parent_partitions_being_requested_this_tick == parent_target_subset.size or is_self_dependency ) ) ): - return True, "" - else: failed_reason = ( f"partition mapping between {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} is not simple and " f"{parent_node.key.to_user_string()} does not meet requirements of: targeting the same partitions as " @@ -1643,80 +1853,137 @@ def can_run_with_parent( "a backfill policy, and that backfill policy size limit is not exceeded by adding " f"{candidate_node.key.to_user_string()} to the run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized." ) - return False, failed_reason + return ( + asset_graph_view.get_empty_subset(key=candidate_asset_key), + [ + ( + entity_subset_to_filter.get_internal_value(), + failed_reason, + ) + ], + ) + # We now know that the parent and child are eligible to happen in the same run, so pass + # any children of parents that actually are being requested in this iteration (by + # being in either the asset_graph_subset_matched_so_far subset, or more rarely in + # candidate_asset_graph_subset_unit if they are part of a non-subsettable multi-asset) + failure_subsets_with_reasons = [] -def should_backfill_atomic_asset_partitions_unit( - asset_graph: RemoteWorkspaceAssetGraph, - candidates_unit: Iterable[AssetKeyPartitionKey], - asset_partitions_to_request: AbstractSet[AssetKeyPartitionKey], + candidate_subset = asset_graph_view.get_entity_subset_from_asset_graph_subset( + candidate_asset_graph_subset_unit, parent_asset_key + ) + + not_yet_requested_parent_subset = parent_subset.compute_difference( + parent_being_requested_this_tick_subset.compute_union(candidate_subset) + ) + + children_of_not_yet_requested_parents = asset_graph_view.compute_child_subset( + candidate_asset_key, not_yet_requested_parent_subset + ).compute_intersection(entity_subset_to_filter) + + if not children_of_not_yet_requested_parents.is_empty: + failure_subsets_with_reasons.append( + ( + children_of_not_yet_requested_parents.get_internal_value(), + f"Parent subset {not_yet_requested_parent_subset} is not requested in this iteration", + ) + ) + entity_subset_to_filter = entity_subset_to_filter.compute_difference( + children_of_not_yet_requested_parents + ) + + return ( + entity_subset_to_filter, + failure_subsets_with_reasons, + ) + + +def _should_backfill_atomic_asset_graph_subset_unit( + asset_graph_view: AssetGraphView, + candidate_asset_graph_subset_unit: AssetGraphSubset, + asset_graph_subset_matched_so_far: AssetGraphSubset, target_subset: AssetGraphSubset, requested_subset: AssetGraphSubset, materialized_subset: AssetGraphSubset, failed_and_downstream_subset: AssetGraphSubset, - dynamic_partitions_store: DynamicPartitionsStore, - current_time: datetime, -) -> tuple[bool, str]: - """Args: - candidates_unit: A set of asset partitions that must all be materialized if any is - materialized. - - returns if the candidates_unit can be materialized in this tick of the backfill, and the reason - it cannot, if applicable - """ - for candidate in candidates_unit: - candidate_asset_partition_string = f"Asset {candidate.asset_key.to_user_string()} {f'with partition {candidate.partition_key}' if candidate.partition_key is not None else ''}" - if candidate not in target_subset: - return ( - False, - f"{candidate_asset_partition_string} is not targeted by backfill", - ) - elif candidate in failed_and_downstream_subset: - return ( - False, - f"{candidate_asset_partition_string} has failed or is downstream of a failed asset", - ) - elif candidate in materialized_subset: - return ( - False, - f"{candidate_asset_partition_string} was already materialized by backfill", - ) - elif candidate in requested_subset: - return ( - False, - f"{candidate_asset_partition_string} was already requested by backfill", - ) +) -> AssetGraphViewBfsFilterConditionResult: + failure_subset_values_with_reasons: list[tuple[EntitySubsetValue, str]] = [] + + candidate_entity_subsets = list( + asset_graph_view.iterate_asset_subsets(candidate_asset_graph_subset_unit) + ) + + # this value is the same for all passed in asset keys since they are always part of the same + # execution set + passed_subset_value = candidate_entity_subsets[0].get_internal_value() + + candidate_asset_keys = [ + candidate_entity_subset.key for candidate_entity_subset in candidate_entity_subsets + ] - parent_partitions_result = asset_graph.get_parents_partitions( - dynamic_partitions_store, current_time, *candidate + for candidate_asset_key in candidate_asset_keys: + # filter down the set of matching values for each asset key + passed_serializable_entity_subset = SerializableEntitySubset( + candidate_asset_key, + passed_subset_value, + ) + entity_subset_to_filter = check.not_none( + asset_graph_view.get_subset_from_serializable_subset(passed_serializable_entity_subset) ) - if parent_partitions_result.required_but_nonexistent_parents_partitions: - raise DagsterInvariantViolationError( - f"Asset partition {candidate}" - " depends on invalid partition keys" - f" {parent_partitions_result.required_but_nonexistent_parents_partitions}" - ) - asset_partitions_to_request_map: dict[AssetKey, set[Optional[str]]] = defaultdict(set) - for asset_partition in asset_partitions_to_request: - asset_partitions_to_request_map[asset_partition.asset_key].add( - asset_partition.partition_key + if entity_subset_to_filter.is_empty: + break + + entity_subset_to_filter, new_failure_subset_values_with_reasons = ( + _should_backfill_atomic_asset_subset_unit( + asset_graph_view, + entity_subset_to_filter=entity_subset_to_filter, + candidate_asset_graph_subset_unit=candidate_asset_graph_subset_unit, + asset_graph_subset_matched_so_far=asset_graph_subset_matched_so_far, + target_subset=target_subset, + requested_subset=requested_subset, + materialized_subset=materialized_subset, + failed_and_downstream_subset=failed_and_downstream_subset, + ) + ) + passed_subset_value = entity_subset_to_filter.value + failure_subset_values_with_reasons.extend(new_failure_subset_values_with_reasons) + + passed_entity_subsets = [] + for candidate_entity_subset in candidate_entity_subsets: + passed_entity_subsets.append( + check.not_none( + asset_graph_view.get_subset_from_serializable_subset( + SerializableEntitySubset(candidate_entity_subset.key, passed_subset_value) + ) ) + ) - for parent in parent_partitions_result.parent_partitions: - if parent in target_subset and parent not in materialized_subset: - can_run, failed_reason = can_run_with_parent( - parent, - candidate, - candidates_unit, - asset_graph, - target_subset, - asset_partitions_to_request_map, + failure_asset_graph_subsets_with_reasons = [] + # Any failure partition values apply to all candidate asset keys, so construct a subset + # graph with that partition subset value for each key + for failure_subset_value, reason in failure_subset_values_with_reasons: + failure_entity_subsets = [ + check.not_none( + asset_graph_view.get_subset_from_serializable_subset( + SerializableEntitySubset(candidate_entity_subset.key, failure_subset_value) ) - if not can_run: - return False, failed_reason + ) + for candidate_entity_subset in candidate_entity_subsets + ] + failure_asset_graph_subsets_with_reasons.append( + ( + AssetGraphSubset.from_entity_subsets( + entity_subsets=failure_entity_subsets, + ), + reason, + ) + ) - return True, "" + return AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=AssetGraphSubset.from_entity_subsets(passed_entity_subsets), + excluded_asset_graph_subsets_and_reasons=failure_asset_graph_subsets_with_reasons, + ) def _get_failed_asset_partitions( diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index ae1fe934f46b6..775f47bbbe983 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -34,6 +34,10 @@ multi_asset, ) from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext +from dagster._core.asset_graph_view.bfs import ( + AssetGraphViewBfsFilterConditionResult, + bfs_filter_asset_graph_view, +) from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.base_asset_graph import BaseAssetGraph from dagster._core.definitions.decorators.repository_decorator import repository @@ -79,6 +83,7 @@ two_assets_in_sequence_fan_out_partitions, ) from dagster_tests.declarative_automation_tests.legacy_tests.scenarios.partition_scenarios import ( + daily_to_hourly_partitions, hourly_to_daily_partitions, non_partitioned_after_partitioned, one_asset_one_partition, @@ -160,7 +165,7 @@ def scenario( ), "unpartitioned_after_dynamic_asset": scenario(unpartitioned_after_dynamic_asset), "two_dynamic_assets": scenario(two_dynamic_assets), - "hourly_to_daily_partiitons": scenario( + "hourly_to_daily_partitions": scenario( hourly_to_daily_partitions, create_datetime(year=2013, month=1, day=7, hour=0), target_root_partition_keys=[ @@ -170,6 +175,14 @@ def scenario( "2013-01-06-01:00", ], ), + "daily_to_hourly_partitions_non_contiguous": scenario( + daily_to_hourly_partitions, + create_datetime(year=2013, month=1, day=8, hour=0), + target_root_partition_keys=[ + "2013-01-05", + "2013-01-07", + ], + ), "root_assets_different_partitions": scenario(root_assets_different_partitions_same_downstream), "hourly_with_nonexistent_downstream_daily_partition": scenario( hourly_to_daily_partitions, @@ -236,15 +249,25 @@ def test_from_asset_partitions_target_subset( ) -def _get_instance_queryer( - instance: DagsterInstance, asset_graph: BaseAssetGraph, evaluation_time: datetime.datetime -) -> CachingInstanceQueryer: +def _get_asset_graph_view( + instance: DagsterInstance, + asset_graph: BaseAssetGraph, + evaluation_time: Optional[datetime.datetime] = None, +) -> AssetGraphView: return AssetGraphView( temporal_context=TemporalContext( effective_dt=evaluation_time or get_current_datetime(), last_event_id=None ), instance=instance, asset_graph=asset_graph, + ) + + +def _get_instance_queryer( + instance: DagsterInstance, asset_graph: BaseAssetGraph, evaluation_time: datetime.datetime +) -> CachingInstanceQueryer: + return _get_asset_graph_view( + instance, asset_graph, evaluation_time ).get_inner_queryer_for_back_compat() @@ -300,8 +323,21 @@ def _single_backfill_iteration_create_but_do_not_submit_runs( ) -@pytest.mark.parametrize("some_or_all", ["all", "some"]) -@pytest.mark.parametrize("failures", ["no_failures", "root_failures", "random_half_failures"]) +@pytest.mark.parametrize( + "some_or_all", + [ + "all", + "some", + ], +) +@pytest.mark.parametrize( + "failures", + [ + "no_failures", + "root_failures", + "random_half_failures", + ], +) @pytest.mark.parametrize("scenario", list(scenarios.values()), ids=list(scenarios.keys())) def test_scenario_to_completion(scenario: AssetBackfillScenario, failures: str, some_or_all: str): with ( @@ -510,11 +546,21 @@ def make_random_subset( if i % 2 == 0: root_asset_partitions.add(AssetKeyPartitionKey(root_asset_key, None)) - target_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, lambda _a, _b: (True, ""), root_asset_partitions, evaluation_time=evaluation_time - ) + asset_graph_view = _get_asset_graph_view(instance, asset_graph, evaluation_time=evaluation_time) - return AssetGraphSubset.from_asset_partition_set(target_asset_partitions, asset_graph) + return bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + root_asset_partitions, asset_graph + ), + include_full_execution_set=True, + )[0] def make_subset_from_partition_keys( @@ -533,11 +579,21 @@ def make_subset_from_partition_keys( else: root_asset_partitions.add(AssetKeyPartitionKey(root_asset_key, None)) - target_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, lambda _a, _b: (True, ""), root_asset_partitions, evaluation_time=evaluation_time - ) + asset_graph_view = _get_asset_graph_view(instance, asset_graph, evaluation_time=evaluation_time) - return AssetGraphSubset.from_asset_partition_set(target_asset_partitions, asset_graph) + return bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: ( + AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ) + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + root_asset_partitions, asset_graph + ), + include_full_execution_set=True, + )[0] def get_asset_graph( @@ -578,10 +634,9 @@ def execute_asset_backfill_iteration_consume_generator( for result in execute_asset_backfill_iteration_inner( backfill_id=backfill_id, asset_backfill_data=asset_backfill_data, - instance_queryer=_get_instance_queryer( + asset_graph_view=_get_asset_graph_view( instance, asset_graph, asset_backfill_data.backfill_start_datetime ), - asset_graph=asset_graph, backfill_start_timestamp=asset_backfill_data.backfill_start_timestamp, logger=logging.getLogger("fake_logger"), ): @@ -606,11 +661,22 @@ def run_backfill_to_completion( # assert each asset partition only targeted once requested_asset_partitions: set[AssetKeyPartitionKey] = set() - fail_and_downstream_asset_partitions, _ = asset_graph.bfs_filter_asset_partitions( - instance, - lambda _a, _b: (True, ""), - fail_asset_partitions, - evaluation_time=backfill_data.backfill_start_datetime, + asset_graph_view = _get_asset_graph_view(instance, asset_graph) + + fail_and_downstream_asset_graph_subset, _ = bfs_filter_asset_graph_view( + asset_graph_view=asset_graph_view, + condition_fn=lambda candidate_asset_graph_subset, _: AssetGraphViewBfsFilterConditionResult( + passed_asset_graph_subset=candidate_asset_graph_subset, + excluded_asset_graph_subsets_and_reasons=[], + ), + initial_asset_graph_subset=AssetGraphSubset.from_asset_partition_set( + set(fail_asset_partitions), asset_graph + ), + include_full_execution_set=True, + ) + + fail_and_downstream_asset_partitions = set( + fail_and_downstream_asset_graph_subset.iterate_asset_partitions() ) while not backfill_is_complete( @@ -1100,7 +1166,7 @@ def may_asset( ) instance = DagsterInstance.ephemeral() - with pytest.raises(DagsterInvariantViolationError, match="depends on invalid partition keys"): + with pytest.raises(DagsterInvariantViolationError, match="depends on invalid partitions"): run_backfill_to_completion(asset_graph, assets_by_repo_name, backfill_data, [], instance) diff --git a/python_modules/dagster/dagster_tests/declarative_automation_tests/legacy_tests/scenarios/partition_scenarios.py b/python_modules/dagster/dagster_tests/declarative_automation_tests/legacy_tests/scenarios/partition_scenarios.py index 2597b66d1a61b..cddd02796eb5b 100644 --- a/python_modules/dagster/dagster_tests/declarative_automation_tests/legacy_tests/scenarios/partition_scenarios.py +++ b/python_modules/dagster/dagster_tests/declarative_automation_tests/legacy_tests/scenarios/partition_scenarios.py @@ -62,6 +62,14 @@ ), ] +daily_to_hourly_partitions = [ + asset_def( + "daily", + partitions_def=daily_partitions_def, + ), + asset_def("hourly", ["daily"], partitions_def=hourly_partitions_def), +] + unpartitioned_after_dynamic_asset = [ asset_def("asset1"), asset_def("asset2", ["asset1"], partitions_def=DynamicPartitionsDefinition(name="foo")),