-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Python API for launching asset backfills #26731
base: master
Are you sure you want to change the base?
Changes from all commits
aa12a45
864fa2c
72dae4d
cee2a99
939a4c7
b9f8cf6
5111725
d2b1000
451ab03
7eb017d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,9 @@ | |
from dagster._core.definitions.data_version import extract_data_provenance_from_entry | ||
from dagster._core.definitions.events import AssetKey, AssetObservation | ||
from dagster._core.definitions.partition_key_range import PartitionKeyRange | ||
from dagster._core.definitions.utils import check_valid_title | ||
from dagster._core.errors import ( | ||
DagsterError, | ||
DagsterHomeNotSetError, | ||
DagsterInvalidInvocationError, | ||
DagsterInvariantViolationError, | ||
|
@@ -79,8 +81,9 @@ | |
TAGS_TO_MAYBE_OMIT_ON_RETRY, | ||
WILL_RETRY_TAG, | ||
) | ||
from dagster._core.utils import make_new_backfill_id | ||
from dagster._serdes import ConfigurableClass | ||
from dagster._time import get_current_datetime, get_current_timestamp | ||
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp | ||
from dagster._utils import PrintFn, is_uuid, traced | ||
from dagster._utils.error import serializable_error_info_from_exc_info | ||
from dagster._utils.merger import merge_dicts | ||
|
@@ -3155,6 +3158,87 @@ def add_backfill(self, partition_backfill: "PartitionBackfill") -> None: | |
def update_backfill(self, partition_backfill: "PartitionBackfill") -> None: | ||
self._run_storage.update_backfill(partition_backfill) | ||
|
||
@public | ||
@experimental | ||
def launch_backfill( | ||
self, | ||
*, | ||
asset_graph: "BaseAssetGraph", | ||
asset_selection: Sequence[AssetKey], | ||
partition_names: Optional[Sequence[str]] = None, | ||
all_partitions: bool = False, | ||
tags: Mapping[str, str] = {}, | ||
title: Optional[str] = None, | ||
description: Optional[str] = None, | ||
) -> str: | ||
"""Launch a set of partition backfill runs. | ||
|
||
Either ``partition_names`` must not be ``None`` or ``all_partitions`` must be ``True`` but | ||
not both. | ||
|
||
Args: | ||
asset_graph (BaseAssetGraph): The asset graph for the backfill. | ||
asset_selection (Sequence[AssetKey]): List of asset keys to backfill. | ||
partition_names (Optional[Sequence[str]]): List of partition names to backfill. | ||
all_partitions (bool): Whether to backfill all partitions. | ||
tags (Mapping[str, str]): The tags for the backfill. | ||
title (Optional[str]): The title of the backfill. | ||
description (Optional[str]): The description of the backfill. | ||
|
||
Returns: | ||
str: The ID of the backfill. | ||
""" | ||
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader | ||
from dagster._core.execution.backfill import PartitionBackfill | ||
|
||
backfill_id = make_new_backfill_id() | ||
backfill_timestamp = get_current_timestamp() | ||
backfill_datetime = datetime_from_timestamp(backfill_timestamp) | ||
dynamic_partitions_store = CachingDynamicPartitionsLoader(self) | ||
|
||
check_valid_title(title) | ||
|
||
if asset_selection is not None: | ||
backfill = PartitionBackfill.from_asset_partitions( | ||
backfill_id=backfill_id, | ||
asset_graph=asset_graph, | ||
asset_selection=asset_selection, | ||
partition_names=partition_names, | ||
backfill_timestamp=backfill_timestamp, | ||
tags=tags, | ||
dynamic_partitions_store=dynamic_partitions_store, | ||
all_partitions=all_partitions, | ||
title=title, | ||
description=description, | ||
) | ||
Comment on lines
+3201
to
+3213
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from my perspective it would be really nice if as much of this as possible can be done on the 'server' (which isn't really a thing in our instance API unfortunately...). Right now this is a lot of asset graph calculation happening on the 'client', at least in cloud It's really too bad that the asset graph is not serialized in the database in OSS currently / needs to be passed in by the caller. That is not the case in cloud and simplifies the API in situations like this quite a bit i think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a particular goal in pushing this work onto the "server" side in cloud? My main concern here is that if we try to pass in some sort of "unresolved" asset graph subset selection (i.e. some combination of parameters such as asset_selection, partition_names, all_partitions), then this is actually a lot less flexible as time goes on. We'd need to commit to those parameters being the "official" way of determining an asset graph subset, when in reality I think we'll want to evolve those python APIs over time. Concretely, I'm imagining in the mid-term future we'll want to start exposing In that future, it's much easier to rely on PartitionBackfill as the serialized object that we produce here, as that is maximally flexible -- we can construct that object in whatever way we want on the client side, and be confident that the daemon can read it. |
||
else: | ||
raise DagsterError( | ||
"Backfill requested without specifying partition set selector or asset selection" | ||
) | ||
Comment on lines
+3215
to
+3217
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error message references a Spotted by Graphite Reviewer |
||
|
||
# Check that the partition keys can be found in the asset graph. | ||
# TODO(deepyaman): Abstract logic shared with `dagster-graphql`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this one can definitely be deduplicated; probably can be done in a subsequent PR? |
||
asset_backfill_data = backfill.asset_backfill_data | ||
if asset_backfill_data: | ||
partitions_subsets_by_asset_key = ( | ||
asset_backfill_data.target_subset.partitions_subsets_by_asset_key | ||
) | ||
for asset_key, partitions_subset in partitions_subsets_by_asset_key.items(): | ||
partitions_def = asset_graph.get(asset_key).partitions_def | ||
if not partitions_def: | ||
continue | ||
|
||
invalid_keys = set(partitions_subset.get_partition_keys()) - set( | ||
partitions_def.get_partition_keys(backfill_datetime, dynamic_partitions_store) | ||
) | ||
if invalid_keys: | ||
raise DagsterError( | ||
f"Partition keys `{sorted(invalid_keys)}` could not be found." | ||
) | ||
|
||
self.add_backfill(backfill) | ||
return backfill_id | ||
|
||
@property | ||
def should_start_background_run_thread(self) -> bool: | ||
"""Gate on an experimental feature to start a thread that monitors for if the run should be canceled.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to directly ask for the asset graph as an argument, since there are already tutorials that ask the user to pass asset graph. I didn't really see any reason to ask for a broader
Definitions
object.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point me to the tutorial where you see this? I'm having trouble finding one other than the airlift tutorial, which is a slightly different scenario since it is getting the asset graph from
defs
in a__main__
not in the body of a jobThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was an Airlift tutorial. 😅