Skip to content

Commit

Permalink
moar
Browse files Browse the repository at this point in the history
> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Dec 26, 2024
1 parent 59ef83c commit 0d7fcf2
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
)
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.instance import DagsterInstance
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionResolvedStatus
from dagster._core.storage.dagster_run import RunRecord
Expand Down Expand Up @@ -195,6 +196,20 @@ def get_empty_subset(self, *, key: T_EntityKey) -> EntitySubset[T_EntityKey]:
value = partitions_def.empty_subset() if partitions_def else False
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def get_entity_subset_in_range(
self, asset_key: AssetKey, partition_key_range: "PartitionKeyRange"
) -> EntitySubset[AssetKey]:
partitions_def = check.not_none(
self._get_partitions_def(asset_key), "Must have partitions def"
)
partition_subset_in_range = partitions_def.get_subset_in_range(
partition_key_range=partition_key_range,
dynamic_partitions_store=self._queryer,
)
return EntitySubset(
self, key=asset_key, value=_ValidatedEntitySubsetValue(partition_subset_in_range)
)

def get_entity_subset_from_asset_graph_subset(
self, asset_graph_subset: AssetGraphSubset, key: AssetKey
) -> EntitySubset[AssetKey]:
Expand Down
147 changes: 55 additions & 92 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -206,9 +205,9 @@ def with_run_requests_submitted(

return self.replace_requested_subset(submitted_partitions)

def get_target_root_asset_partitions(
def get_target_root_asset_graph_subset(
self, instance_queryer: CachingInstanceQueryer
) -> Iterable[AssetKeyPartitionKey]:
) -> AssetGraphSubset:
def _get_self_and_downstream_targeted_subset(
initial_subset: AssetGraphSubset,
) -> AssetGraphSubset:
Expand Down Expand Up @@ -280,7 +279,7 @@ def _get_self_and_downstream_targeted_subset(
" This is likely a system error. Please report this issue to the Dagster team."
)

return list(root_subset.iterate_asset_partitions())
return root_subset

def get_target_partitions_subset(self, asset_key: AssetKey) -> PartitionsSubset:
# Return the targeted partitions for the root partitioned asset keys
Expand Down Expand Up @@ -669,7 +668,6 @@ def _get_requested_asset_graph_subset_from_run_requests(
asset_graph_view: AssetGraphView,
) -> AssetGraphSubset:
asset_graph = asset_graph_view.asset_graph
instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat()
requested_subset = AssetGraphSubset.empty()
for run_request in run_requests:
# Run request targets a range of partitions
Expand All @@ -680,26 +678,9 @@ def _get_requested_asset_graph_subset_from_run_requests(
# have the same partitions def
selected_assets = cast(Sequence[AssetKey], run_request.asset_selection)
check.invariant(len(selected_assets) > 0)
partitions_defs = set(
asset_graph.get(asset_key).partitions_def for asset_key in selected_assets
)
check.invariant(
len(partitions_defs) == 1,
"Expected all assets selected in partition range run request to have the same"
" partitions def",
)

partitions_def = cast(PartitionsDefinition, next(iter(partitions_defs)))
partition_subset_in_range = partitions_def.get_subset_in_range(
partition_key_range=PartitionKeyRange(range_start, range_end),
dynamic_partitions_store=instance_queryer,
)
partition_range = PartitionKeyRange(range_start, range_end)
entity_subsets = [
check.not_none(
asset_graph_view.get_subset_from_serializable_subset(
SerializableEntitySubset(key=asset_key, value=partition_subset_in_range)
)
)
asset_graph_view.get_entity_subset_in_range(asset_key, partition_range)
for asset_key in selected_assets
]
requested_subset = requested_subset | AssetGraphSubset.from_entity_subsets(
Expand Down Expand Up @@ -745,7 +726,6 @@ def _submit_runs_and_update_backfill_in_chunks(
asset_backfill_iteration_result: AssetBackfillIterationResult,
logger: logging.Logger,
run_tags: Mapping[str, str],
instance_queryer: CachingInstanceQueryer,
) -> Iterable[None]:
from dagster._core.execution.backfill import BulkActionStatus

Expand Down Expand Up @@ -1110,7 +1090,6 @@ def execute_asset_backfill_iteration(
result,
logger,
run_tags=updated_backfill.tags,
instance_queryer=instance_queryer,
)

updated_backfill = cast(
Expand Down Expand Up @@ -1194,8 +1173,7 @@ def execute_asset_backfill_iteration(
for updated_asset_backfill_data in get_canceling_asset_backfill_iteration_data(
backfill.backfill_id,
previous_asset_backfill_data,
instance_queryer,
asset_graph,
asset_graph_view,
backfill.backfill_timestamp,
):
yield None
Expand Down Expand Up @@ -1246,37 +1224,32 @@ def execute_asset_backfill_iteration(
def get_canceling_asset_backfill_iteration_data(
backfill_id: str,
asset_backfill_data: AssetBackfillData,
instance_queryer: CachingInstanceQueryer,
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph_view: AssetGraphView,
backfill_start_timestamp: float,
) -> Iterable[Optional[AssetBackfillData]]:
"""For asset backfills in the "canceling" state, fetch the asset backfill data with the updated
materialized and failed subsets.
"""
asset_graph = cast(RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph)
instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat()
updated_materialized_subset = None
for updated_materialized_subset in get_asset_backfill_iteration_materialized_partitions(
for updated_materialized_subset in get_asset_backfill_iteration_materialized_subset(
backfill_id, asset_backfill_data, asset_graph, instance_queryer
):
yield None

if not isinstance(updated_materialized_subset, AssetGraphSubset):
check.failed(
"Expected get_asset_backfill_iteration_materialized_partitions to return an"
"Expected get_asset_backfill_iteration_materialized_subset to return an"
" AssetGraphSubset object"
)

# TODO: Make this use subsets throughout instead of a sequence of AssetKeyPartitionKeys
failed_subset = AssetGraphSubset.from_asset_partition_set(
set(
_get_failed_asset_partitions(
instance_queryer,
backfill_id,
asset_graph,
materialized_subset=updated_materialized_subset,
)
),
asset_graph,
failed_subset = _get_failed_asset_graph_subset(
asset_graph_view,
backfill_id,
materialized_subset=updated_materialized_subset,
)

# we fetch the failed_subset to get any new assets that have failed and add that to the set of
# assets we already know failed and their downstreams. However we need to remove any assets in
# updated_materialized_subset to account for the case where a run retry successfully
Expand All @@ -1297,7 +1270,7 @@ def get_canceling_asset_backfill_iteration_data(
yield updated_backfill_data


def get_asset_backfill_iteration_materialized_partitions(
def get_asset_backfill_iteration_materialized_subset(
backfill_id: str,
asset_backfill_data: AssetBackfillData,
asset_graph: RemoteWorkspaceAssetGraph,
Expand Down Expand Up @@ -1370,24 +1343,16 @@ def _get_subset_in_target_subset(
return AssetGraphSubset.from_entity_subsets([subset_in_target_subset])


def _get_failed_and_downstream_asset_partitions(
def _get_failed_and_downstream_asset_graph_subset(
backfill_id: str,
asset_backfill_data: AssetBackfillData,
asset_graph_view: AssetGraphView,
materialized_subset: AssetGraphSubset,
) -> AssetGraphSubset:
asset_graph = cast(RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph)

failed_asset_graph_subset = AssetGraphSubset.from_asset_partition_set(
set(
_get_failed_asset_partitions(
asset_graph_view.get_inner_queryer_for_back_compat(),
backfill_id,
asset_graph,
materialized_subset,
)
),
asset_graph,
failed_asset_graph_subset = _get_failed_asset_graph_subset(
asset_graph_view,
backfill_id,
materialized_subset,
)

failed_and_downstream_subset = bfs_filter_asset_graph_view(
Expand Down Expand Up @@ -1475,16 +1440,16 @@ def execute_asset_backfill_iteration_inner(
RemoteWorkspaceAssetGraph, asset_graph_view.asset_graph
)

initial_candidates: Set[AssetKeyPartitionKey] = set()
initial_asset_graph_subset: AssetGraphSubset = AssetGraphSubset.empty()
request_roots = not asset_backfill_data.requested_runs_for_target_roots
if request_roots:
logger.info(
"Not all root assets (assets in backfill that do not have parents in the backill) have been requested, finding root assets."
)
target_roots = asset_backfill_data.get_target_root_asset_partitions(instance_queryer)
initial_candidates.update(target_roots)
target_roots = asset_backfill_data.get_target_root_asset_graph_subset(instance_queryer)
initial_asset_graph_subset = initial_asset_graph_subset | target_roots
logger.info(
f"Root assets that have not yet been requested:\n {_asset_graph_subset_to_str(AssetGraphSubset.from_asset_partition_set(set(target_roots), asset_graph), asset_graph)}"
f"Root assets that have not yet been requested:\n {_asset_graph_subset_to_str(target_roots, asset_graph)}"
)

yield None
Expand All @@ -1502,14 +1467,14 @@ def execute_asset_backfill_iteration_inner(
time.sleep(cursor_delay_time)

updated_materialized_subset = None
for updated_materialized_subset in get_asset_backfill_iteration_materialized_partitions(
for updated_materialized_subset in get_asset_backfill_iteration_materialized_subset(
backfill_id, asset_backfill_data, asset_graph, instance_queryer
):
yield None

if not isinstance(updated_materialized_subset, AssetGraphSubset):
check.failed(
"Expected get_asset_backfill_iteration_materialized_partitions to return an"
"Expected get_asset_backfill_iteration_materialized_subset to return an"
" AssetGraphSubset"
)

Expand All @@ -1531,11 +1496,16 @@ def execute_asset_backfill_iteration_inner(
for asset_key in asset_backfill_data.target_subset.asset_keys
)
)
initial_candidates.update(parent_materialized_asset_partitions)
initial_asset_graph_subset = (
initial_asset_graph_subset
| AssetGraphSubset.from_asset_partition_set(
parent_materialized_asset_partitions, asset_graph
)
)

yield None

failed_and_downstream_subset = _get_failed_and_downstream_asset_partitions(
failed_and_downstream_subset = _get_failed_and_downstream_asset_graph_subset(
backfill_id,
asset_backfill_data,
asset_graph_view,
Expand All @@ -1544,10 +1514,6 @@ def execute_asset_backfill_iteration_inner(

yield None

initial_asset_graph_subset = AssetGraphSubset.from_asset_partition_set(
initial_candidates, asset_graph
)

asset_subset_to_request, not_requested_and_reasons = bfs_filter_asset_graph_view(
asset_graph_view,
lambda candidate_asset_graph_subset,
Expand Down Expand Up @@ -2007,13 +1973,12 @@ def _should_backfill_atomic_asset_graph_subset_unit(
)


def _get_failed_asset_partitions(
instance_queryer: CachingInstanceQueryer,
def _get_failed_asset_graph_subset(
asset_graph_view: AssetGraphView,
backfill_id: str,
asset_graph: RemoteAssetGraph,
materialized_subset: AssetGraphSubset,
) -> Sequence[AssetKeyPartitionKey]:
"""Returns asset partitions that materializations were requested for as part of the backfill, but were
) -> AssetGraphSubset:
"""Returns asset subset that materializations were requested for as part of the backfill, but were
not successfully materialized.
This function gets a list of all runs for the backfill that have failed and extracts the asset partitions
Expand All @@ -2025,15 +1990,16 @@ def _get_failed_asset_partitions(
Includes canceled asset partitions. Implementation assumes that successful runs won't have any
failed partitions.
"""
instance_queryer = asset_graph_view.get_inner_queryer_for_back_compat()

runs = instance_queryer.instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.CANCELED, DagsterRunStatus.FAILURE],
)
)

result: List[AssetKeyPartitionKey] = []

result: AssetGraphSubset = AssetGraphSubset.empty()
for run in runs:
planned_asset_keys = instance_queryer.get_planned_materializations_for_run(
run_id=run.run_id
Expand All @@ -2053,25 +2019,22 @@ def _get_failed_asset_partitions(
start=run.tags[ASSET_PARTITION_RANGE_START_TAG],
end=run.tags[ASSET_PARTITION_RANGE_END_TAG],
)
asset_partition_candidates = []
for asset_key in failed_asset_keys:
asset_partition_candidates.extend(
asset_graph.get_partitions_in_range(
asset_key, partition_range, instance_queryer
)
)
candidate_subset = AssetGraphSubset.from_entity_subsets(
[
asset_graph_view.get_entity_subset_in_range(asset_key, partition_range)
for asset_key in failed_asset_keys
]
)

else:
# a regular backfill run that run on a single partition
partition_key = run.tags.get(PARTITION_NAME_TAG)
asset_partition_candidates = [
AssetKeyPartitionKey(asset_key, partition_key) for asset_key in failed_asset_keys
]
candidate_subset = AssetGraphSubset.from_asset_partition_set(
{AssetKeyPartitionKey(asset_key, partition_key) for asset_key in failed_asset_keys},
asset_graph_view.asset_graph,
)

asset_partitions_still_failed = [
asset_partition
for asset_partition in asset_partition_candidates
if asset_partition not in materialized_subset
]
result.extend(asset_partitions_still_failed)
asset_subset_still_failed = candidate_subset - materialized_subset
result = result | asset_subset_still_failed

return result
Original file line number Diff line number Diff line change
Expand Up @@ -1198,14 +1198,15 @@ def downstream_daily_partitioned_asset(

assert len(instance.get_runs()) == 1

instance_queryer = _get_instance_queryer(instance, asset_graph, backfill_start_datetime)

canceling_backfill_data = None
for canceling_backfill_data in get_canceling_asset_backfill_iteration_data(
backfill_id,
asset_backfill_data,
instance_queryer,
asset_graph,
_get_asset_graph_view(
instance,
asset_graph,
backfill_start_datetime,
),
backfill_start_datetime.timestamp(),
):
pass
Expand Down Expand Up @@ -1283,14 +1284,11 @@ def downstream_daily_partitioned_asset(
in asset_backfill_data.failed_and_downstream_subset
)

instance_queryer = _get_instance_queryer(instance, asset_graph, backfill_start_datetime)

canceling_backfill_data = None
for canceling_backfill_data in get_canceling_asset_backfill_iteration_data(
backfill_id,
asset_backfill_data,
instance_queryer,
asset_graph,
_get_asset_graph_view(instance, asset_graph, backfill_start_datetime),
backfill_start_datetime.timestamp(),
):
pass
Expand Down Expand Up @@ -1516,8 +1514,8 @@ def foo_grandchild(foo_child):
],
)

target_root_partitions = asset_backfill_data.get_target_root_asset_partitions(instance_queryer)
assert set(target_root_partitions) == {
target_root_subset = asset_backfill_data.get_target_root_asset_graph_subset(instance_queryer)
assert set(target_root_subset.iterate_asset_partitions()) == {
AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-05"),
AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-03"),
AssetKeyPartitionKey(asset_key=AssetKey(["foo"]), partition_key="2023-10-04"),
Expand Down

0 comments on commit 0d7fcf2

Please sign in to comment.