Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Python API for launching asset backfills #26731

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
86 changes: 85 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
from dagster._core.definitions.events import AssetKey, AssetObservation
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.utils import check_valid_title
from dagster._core.errors import (
DagsterError,
DagsterHomeNotSetError,
DagsterInvalidInvocationError,
DagsterInvariantViolationError,
Expand Down Expand Up @@ -79,8 +81,9 @@
TAGS_TO_MAYBE_OMIT_ON_RETRY,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_backfill_id
from dagster._serdes import ConfigurableClass
from dagster._time import get_current_datetime, get_current_timestamp
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp
from dagster._utils import PrintFn, is_uuid, traced
from dagster._utils.error import serializable_error_info_from_exc_info
from dagster._utils.merger import merge_dicts
Expand Down Expand Up @@ -3155,6 +3158,87 @@ def add_backfill(self, partition_backfill: "PartitionBackfill") -> None:
def update_backfill(self, partition_backfill: "PartitionBackfill") -> None:
self._run_storage.update_backfill(partition_backfill)

@public
@experimental
def launch_backfill(
self,
*,
asset_graph: "BaseAssetGraph",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to directly ask for the asset graph as an argument, since there are already tutorials that ask the user to pass asset graph. I didn't really see any reason to ask for a broader Definitions object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point me to the tutorial where you see this? I'm having trouble finding one other than the airlift tutorial, which is a slightly different scenario since it is getting the asset graph from defs in a __main__ not in the body of a job

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was an Airlift tutorial. 😅

asset_selection: Sequence[AssetKey],
partition_names: Optional[Sequence[str]] = None,
all_partitions: bool = False,
tags: Mapping[str, str] = {},
title: Optional[str] = None,
description: Optional[str] = None,
) -> str:
"""Launch a set of partition backfill runs.

Either ``partition_names`` must not be ``None`` or ``all_partitions`` must be ``True`` but
not both.

Args:
asset_graph (BaseAssetGraph): The asset graph for the backfill.
asset_selection (Sequence[AssetKey]): List of asset keys to backfill.
partition_names (Optional[Sequence[str]]): List of partition names to backfill.
all_partitions (bool): Whether to backfill all partitions.
tags (Mapping[str, str]): The tags for the backfill.
title (Optional[str]): The title of the backfill.
description (Optional[str]): The description of the backfill.

Returns:
str: The ID of the backfill.
"""
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader
from dagster._core.execution.backfill import PartitionBackfill

backfill_id = make_new_backfill_id()
backfill_timestamp = get_current_timestamp()
backfill_datetime = datetime_from_timestamp(backfill_timestamp)
dynamic_partitions_store = CachingDynamicPartitionsLoader(self)

check_valid_title(title)

if asset_selection is not None:
backfill = PartitionBackfill.from_asset_partitions(
backfill_id=backfill_id,
asset_graph=asset_graph,
asset_selection=asset_selection,
partition_names=partition_names,
backfill_timestamp=backfill_timestamp,
tags=tags,
dynamic_partitions_store=dynamic_partitions_store,
all_partitions=all_partitions,
title=title,
description=description,
)
Comment on lines +3201 to +3213
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my perspective it would be really nice if as much of this as possible can be done on the 'server' (which isn't really a thing in our instance API unfortunately...). Right now this is a lot of asset graph calculation happening on the 'client', at least in cloud

It's really too bad that the asset graph is not serialized in the database in OSS currently / needs to be passed in by the caller. That is not the case in cloud and simplifies the API in situations like this quite a bit i think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular goal in pushing this work onto the "server" side in cloud? My main concern here is that if we try to pass in some sort of "unresolved" asset graph subset selection (i.e. some combination of parameters such as asset_selection, partition_names, all_partitions), then this is actually a lot less flexible as time goes on. We'd need to commit to those parameters being the "official" way of determining an asset graph subset, when in reality I think we'll want to evolve those python APIs over time.

Concretely, I'm imagining in the mid-term future we'll want to start exposing EntitySubset-type APIs more publicly, which would mean that we'd want to add a parameter to this method that accepts a list of EntitySubsets determining what to backfill.

In that future, it's much easier to rely on PartitionBackfill as the serialized object that we produce here, as that is maximally flexible -- we can construct that object in whatever way we want on the client side, and be confident that the daemon can read it.

else:
raise DagsterError(
"Backfill requested without specifying partition set selector or asset selection"
)
Comment on lines +3215 to +3217
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message references a partition set selector which is not a valid parameter for this API. Consider updating to simply state "Backfill requested without specifying asset selection" to accurately reflect the API's requirements.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.


# Check that the partition keys can be found in the asset graph.
# TODO(deepyaman): Abstract logic shared with `dagster-graphql`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one can definitely be deduplicated; probably can be done in a subsequent PR?

asset_backfill_data = backfill.asset_backfill_data
if asset_backfill_data:
partitions_subsets_by_asset_key = (
asset_backfill_data.target_subset.partitions_subsets_by_asset_key
)
for asset_key, partitions_subset in partitions_subsets_by_asset_key.items():
partitions_def = asset_graph.get(asset_key).partitions_def
if not partitions_def:
continue

invalid_keys = set(partitions_subset.get_partition_keys()) - set(
partitions_def.get_partition_keys(backfill_datetime, dynamic_partitions_store)
)
if invalid_keys:
raise DagsterError(
f"Partition keys `{sorted(invalid_keys)}` could not be found."
)

self.add_backfill(backfill)
return backfill_id

@property
def should_start_background_run_thread(self) -> bool:
"""Gate on an experimental feature to start a thread that monitors for if the run should be canceled."""
Expand Down
65 changes: 65 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,71 @@ def test_pure_asset_backfill_with_multiple_assets_selected(
assert all([run.status == DagsterRunStatus.SUCCESS] for run in runs)


def test_pure_asset_backfill_with_multiple_assets_selected_using_python_api(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
remote_repo: RemoteRepository,
):
asset_selection = [
AssetKey("asset_a"),
AssetKey("asset_b"),
AssetKey("asset_c"),
AssetKey("asset_d"),
]

partition_keys = partitions_a.get_partition_keys()

backfill_id = instance.launch_backfill(
asset_graph=workspace_context.create_request_context().asset_graph,
asset_selection=asset_selection,
partition_names=partition_keys,
tags={"custom_tag_key": "custom_tag_value"},
all_partitions=False,
title=None,
description=None,
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)
assert instance.get_runs_count() == 1
wait_for_all_runs_to_start(instance, timeout=30)
wait_for_all_runs_to_finish(instance, timeout=30)
run = instance.get_runs()[0]
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert run.tags["custom_tag_key"] == "custom_tag_value"
assert run.asset_selection == {AssetKey(["asset_a"])}

assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)
assert instance.get_runs_count() == 4
wait_for_all_runs_to_start(instance, timeout=30)
wait_for_all_runs_to_finish(instance, timeout=30)

runs = instance.get_runs()

assert any([run.asset_selection == {AssetKey(["asset_b"])}] for run in runs)
assert any([run.asset_selection == {AssetKey(["asset_c"])}] for run in runs)
assert any([run.asset_selection == {AssetKey(["asset_d"])}] for run in runs)

assert all([run.status == DagsterRunStatus.SUCCESS] for run in runs)


def test_pure_asset_backfill(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
Expand Down