Skip to content

Commit

Permalink
Make asset backfills subset-aware
Browse files Browse the repository at this point in the history
refactor

> Insert changelog entry or delete this section.

moar refactor

> Insert changelog entry or delete this section.

subsets instead of values

> Insert changelog entry or delete this section.

more progress on subsets instead of values

> Insert changelog entry or delete this section.

more progress

> Insert changelog entry or delete this section.

yet more progress

> Insert changelog entry or delete this section.

more progress

> Insert changelog entry or delete this section.

more

> Insert changelog entry or delete this section.

renames

> Insert changelog entry or delete this section.

final names?

> Insert changelog entry or delete this section.

owen feedback

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Dec 31, 2024
1 parent d54f10c commit 8f13a8e
Show file tree
Hide file tree
Showing 5 changed files with 567 additions and 412 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
asset,
define_asset_job,
)
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph
from dagster._core.execution.asset_backfill import (
AssetBackfillIterationResult,
execute_asset_backfill_iteration,
Expand All @@ -34,7 +34,6 @@
from dagster._core.utils import make_new_backfill_id
from dagster._seven import get_system_temp_directory
from dagster._utils import safe_tempfile_path
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from dagster_graphql.client.query import (
LAUNCH_PARTITION_BACKFILL_MUTATION,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
Expand Down Expand Up @@ -241,27 +240,25 @@ def _get_run_stats(partition_statuses):
}


def _execute_asset_backfill_iteration_no_side_effects(
graphql_context, backfill_id: str, asset_graph: RemoteWorkspaceAssetGraph
) -> None:
def _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id: str) -> None:
"""Executes an asset backfill iteration and updates the serialized asset backfill data.
However, does not execute side effects i.e. launching runs.
"""
backfill = graphql_context.instance.get_backfill(backfill_id)
asset_backfill_data = backfill.asset_backfill_data
result = None
instance_queryer = CachingInstanceQueryer(
graphql_context.instance,
asset_graph,
graphql_context,
asset_backfill_data.backfill_start_datetime,
asset_graph_view = AssetGraphView(
temporal_context=TemporalContext(
effective_dt=asset_backfill_data.backfill_start_datetime, last_event_id=None
),
instance=graphql_context.instance,
asset_graph=graphql_context.asset_graph,
)
with environ({"ASSET_BACKFILL_CURSOR_DELAY_TIME": "0"}):
for result in execute_asset_backfill_iteration_inner(
backfill_id=backfill_id,
asset_backfill_data=asset_backfill_data,
instance_queryer=instance_queryer,
asset_graph=asset_graph,
asset_graph_view=asset_graph_view,
backfill_start_timestamp=asset_backfill_data.backfill_start_timestamp,
logger=logging.getLogger("fake_logger"),
):
Expand All @@ -275,10 +272,12 @@ def _execute_asset_backfill_iteration_no_side_effects(

updated_backfill = backfill.with_asset_backfill_data(
result.backfill_data.with_run_requests_submitted(
result.run_requests, asset_graph=asset_graph, instance_queryer=instance_queryer
result.run_requests,
asset_graph=graphql_context.asset_graph,
instance_queryer=asset_graph_view.get_inner_queryer_for_back_compat(),
),
dynamic_partitions_store=graphql_context.instance,
asset_graph=asset_graph,
asset_graph=graphql_context.asset_graph,
)
graphql_context.instance.update_backfill(updated_backfill)

Expand Down Expand Up @@ -312,12 +311,11 @@ def _execute_job_backfill_iteration_with_side_effects(graphql_context, backfill_
def _mock_asset_backfill_runs(
graphql_context,
asset_key: AssetKey,
asset_graph: RemoteWorkspaceAssetGraph,
backfill_id: str,
status: DagsterRunStatus,
partition_key: Optional[str],
):
partitions_def = asset_graph.get(asset_key).partitions_def
partitions_def = graphql_context.asset_graph.get(asset_key).partitions_def

@asset(
partitions_def=partitions_def,
Expand Down Expand Up @@ -751,10 +749,7 @@ def test_cancel_asset_backfill(self, graphql_context):

# Update asset backfill data to contain requested partition, but does not execute side effects,
# since launching the run will cause test process will hang forever.
code_location = graphql_context.get_code_location(main_repo_location_name())
repository = code_location.get_repository("test_repo")
asset_graph = repository.asset_graph
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

# Launch the run that runs forever
selector = infer_job_selector(graphql_context, "hanging_partition_asset_job")
Expand Down Expand Up @@ -830,10 +825,8 @@ def test_cancel_then_retry_asset_backfill(self, graphql_context):

# Update asset backfill data to contain requested partition, but does not execute side effects,
# since launching the run will cause test process will hang forever.
code_location = graphql_context.get_code_location(main_repo_location_name())
repository = code_location.get_repository("test_repo")
asset_graph = repository.asset_graph
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)

_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

# Launch the run that runs forever
selector = infer_job_selector(graphql_context, "hanging_partition_asset_job")
Expand Down Expand Up @@ -1144,11 +1137,7 @@ def test_asset_backfill_partition_stats(self, graphql_context):
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

code_location = graphql_context.get_code_location(main_repo_location_name())
repository = code_location.get_repository("test_repo")
asset_graph = repository.asset_graph

_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

for partition, status in [
("a", DagsterRunStatus.SUCCESS),
Expand All @@ -1157,11 +1146,9 @@ def test_asset_backfill_partition_stats(self, graphql_context):
("e", DagsterRunStatus.SUCCESS),
("f", DagsterRunStatus.FAILURE),
]:
_mock_asset_backfill_runs(
graphql_context, asset_key, asset_graph, backfill_id, status, partition
)
_mock_asset_backfill_runs(graphql_context, asset_key, backfill_id, status, partition)

_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

result = execute_dagster_graphql(
graphql_context,
Expand All @@ -1187,10 +1174,6 @@ def test_asset_backfill_partition_stats(self, graphql_context):
assert asset_partition_status_counts[0]["numPartitionsFailed"] == 2

def test_asset_backfill_status_with_upstream_failure(self, graphql_context):
code_location = graphql_context.get_code_location(main_repo_location_name())
repository = code_location.get_repository("test_repo")
asset_graph = repository.asset_graph

asset_keys = [
AssetKey("unpartitioned_upstream_of_partitioned"),
AssetKey("upstream_daily_partitioned_asset"),
Expand All @@ -1213,25 +1196,23 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context):
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("unpartitioned_upstream_of_partitioned"),
asset_graph,
backfill_id,
DagsterRunStatus.SUCCESS,
None,
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("upstream_daily_partitioned_asset"),
asset_graph,
backfill_id,
DagsterRunStatus.FAILURE,
"2023-01-09",
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

result = execute_dagster_graphql(
graphql_context,
Expand Down Expand Up @@ -1658,10 +1639,6 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context):
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10

def test_reexecute_asset_backfill_from_failure(self, graphql_context):
code_location = graphql_context.get_code_location(main_repo_location_name())
repository = code_location.get_repository("test_repo")
asset_graph = repository.asset_graph

asset_keys = [
AssetKey("unpartitioned_upstream_of_partitioned"),
AssetKey("upstream_daily_partitioned_asset"),
Expand All @@ -1684,25 +1661,23 @@ def test_reexecute_asset_backfill_from_failure(self, graphql_context):
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("unpartitioned_upstream_of_partitioned"),
asset_graph,
backfill_id,
DagsterRunStatus.SUCCESS,
None,
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("upstream_daily_partitioned_asset"),
asset_graph,
backfill_id,
DagsterRunStatus.FAILURE,
"2023-01-09",
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

# mark backfill as completed so we can retry it
backfill = graphql_context.instance.get_backfill(backfill_id)
Expand Down Expand Up @@ -1755,10 +1730,6 @@ def test_reexecute_asset_backfill_from_failure(self, graphql_context):
assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id

def test_reexecute_successful_asset_backfill(self, graphql_context):
code_location = graphql_context.get_code_location(main_repo_location_name())
repository = code_location.get_repository("test_repo")
asset_graph = repository.asset_graph

asset_keys = [
AssetKey("unpartitioned_upstream_of_partitioned"),
AssetKey("upstream_daily_partitioned_asset"),
Expand All @@ -1780,25 +1751,23 @@ def test_reexecute_successful_asset_backfill(self, graphql_context):
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("unpartitioned_upstream_of_partitioned"),
asset_graph,
backfill_id,
DagsterRunStatus.SUCCESS,
None,
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("upstream_daily_partitioned_asset"),
asset_graph,
backfill_id,
DagsterRunStatus.SUCCESS,
"2023-01-09",
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

# mark backfill as complete so we can retry it
backfill = graphql_context.instance.get_backfill(backfill_id)
Expand Down Expand Up @@ -1842,10 +1811,6 @@ def test_reexecute_successful_asset_backfill(self, graphql_context):
assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id

def test_reexecute_asset_backfill_still_in_progress(self, graphql_context):
code_location = graphql_context.get_code_location(main_repo_location_name())
repository = code_location.get_repository("test_repo")
asset_graph = repository.asset_graph

asset_keys = [
AssetKey("unpartitioned_upstream_of_partitioned"),
AssetKey("upstream_daily_partitioned_asset"),
Expand All @@ -1868,25 +1833,23 @@ def test_reexecute_asset_backfill_still_in_progress(self, graphql_context):
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("unpartitioned_upstream_of_partitioned"),
asset_graph,
backfill_id,
DagsterRunStatus.SUCCESS,
None,
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("upstream_daily_partitioned_asset"),
asset_graph,
backfill_id,
DagsterRunStatus.FAILURE,
"2023-01-09",
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

# try to retry the backfill while it is still in progress
result = execute_dagster_graphql(
Expand Down Expand Up @@ -1966,10 +1929,6 @@ def test_reexecute_asset_backfill_still_in_progress(self, graphql_context):
assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id

def test_reexecute_asset_backfill_twice(self, graphql_context):
code_location = graphql_context.get_code_location(main_repo_location_name())
repository = code_location.get_repository("test_repo")
asset_graph = repository.asset_graph

asset_keys = [
AssetKey("unpartitioned_upstream_of_partitioned"),
AssetKey("upstream_daily_partitioned_asset"),
Expand All @@ -1992,25 +1951,23 @@ def test_reexecute_asset_backfill_twice(self, graphql_context):
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("unpartitioned_upstream_of_partitioned"),
asset_graph,
backfill_id,
DagsterRunStatus.SUCCESS,
None,
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)
_mock_asset_backfill_runs(
graphql_context,
AssetKey("upstream_daily_partitioned_asset"),
asset_graph,
backfill_id,
DagsterRunStatus.FAILURE,
"2023-01-09",
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id)

# mark backfill as completed so we can retry it
backfill = graphql_context.instance.get_backfill(backfill_id)
Expand Down Expand Up @@ -2040,19 +1997,18 @@ def test_reexecute_asset_backfill_twice(self, graphql_context):
assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id

_execute_asset_backfill_iteration_no_side_effects(
graphql_context, retried_backfill.backfill_id, asset_graph
graphql_context, retried_backfill.backfill_id
)
# mark some partitions failed so we can retry again
_mock_asset_backfill_runs(
graphql_context,
AssetKey("upstream_daily_partitioned_asset"),
asset_graph,
retried_backfill.backfill_id,
DagsterRunStatus.FAILURE,
"2023-01-09",
)
_execute_asset_backfill_iteration_no_side_effects(
graphql_context, retried_backfill.backfill_id, asset_graph
graphql_context, retried_backfill.backfill_id
)

# refetch the backfill to get the updated statuses of all assets
Expand Down
Loading

0 comments on commit 8f13a8e

Please sign in to comment.