-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[rfc] Implement Python API for launching asset backfills #26731
base: master
Are you sure you want to change the base?
Conversation
cddb72a
to
f7133d7
Compare
) and partitions_by_assets: | ||
raise DagsterInvariantViolationError( | ||
"partitions_by_assets cannot be used together with asset_selection, selector, or" | ||
" partitionNames" | ||
"partitionsByAssets cannot be used together with assetSelection, selector, or" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least, I feel like the capitalization should be consistent; the camel-casing is aligned with the GraphQL API names.
"partitions_by_assets cannot be used together with asset_selection, selector, or" | ||
" partitionNames" | ||
"partitionsByAssets cannot be used together with assetSelection, selector, or" | ||
" partitionNames or if allPartitions is True" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to validate allPartitions
, too, if checking partitionNames
; neither can be used with partitionsByAssets
.
I would be happy to split this and the above change into a separate (stacked?) PR, since they're not directly related to creating the Python API; just thought I'd do some housekeeping as I was learning about it for the Python API implementation.
asset_graph=asset_graph, | ||
backfill_timestamp=backfill_timestamp, | ||
tags=tags, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Minimal argument order alignment across calls.
@@ -2260,7 +2262,7 @@ def get_event_tags_for_asset( | |||
@public | |||
@traced | |||
def wipe_assets(self, asset_keys: Sequence[AssetKey]) -> None: | |||
"""Wipes asset event history from the event log for the given asset keys. | |||
"""Wipe asset event history from the event log for the given asset keys. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: All of the other public methods in this file use imperative mood.
@public | ||
def launch_backfill( | ||
self, | ||
asset_graph: "BaseAssetGraph", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to directly ask for the asset graph as an argument, since there are already tutorials that ask the user to pass asset graph. I didn't really see any reason to ask for a broader Definitions
object.
Returns: | ||
str: The ID of the backfill. | ||
""" | ||
# TODO(deepyaman): Abstract logic shared with `dagster-graphql`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# TODO(deepyaman): Abstract logic shared with `dagster-graphql`. |
Actually, as I got to implementing more of this, maybe they're sufficiently different that this isn't worth it.
partitions_by_assets: Optional[ | ||
Sequence[Tuple[AssetKey, Optional[Sequence[Tuple[str, str]]]]] | ||
] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This avoids exposing any additional objects, but I suppose an explanation/examples will be necessary for anybody unfamiliar with Dagster internals already.
) | ||
|
||
tags = {t["key"]: t["value"] for t in backfill_params.get("tags", [])} | ||
|
||
tags = {**tags, **graphene_info.context.get_viewer_tags()} | ||
|
||
title = check_valid_title(backfill_params.get("title")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems reasonable to use the existing utility. Also, I think there's no reason it should only be happening for job backfills.
This is another one I'd be happy to move into a separate PR, if so desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doing it for only job backfills is 100% just a goof on my part. we should be doing it for all types of backfills - thanks for catching!
switching to the check version of the fn is also fine w me. I think originally we were considering re-raising as a GrapheneError so that it had a more immediate failure for the user. But we've kind of abandoned adding titles/descriptions to backfills for now anyway, so keeping it as the generic python error is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jamiedemaria Great! I raised a new PR just addressing this (tagged you, if you can take a quick look), since it's agreed upon, while I go about changing this broader PR.
) | ||
|
||
# Check that the partition keys can be found in the asset graph. | ||
# TODO(deepyaman): Abstract logic shared with `dagster-graphql`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this one can definitely be deduplicated; probably can be done in a subsequent PR?
ab28602
to
4423290
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After sleeping on this, I feel like putting this logic on the DagsterInstance itself isn't quite right.
At the core, there are two things we want to be able to do here:
- Express some intent to backfill some arbitrary set of partitions
- Send a message to the instance to act on that intent
We already have an existing method for (2) (instance.add_backfill), and so the question is what is the best way of exposing (1) to the user.
We already have the PartitionBackfill
object, and so putting this layer of indirection for constructing the partition backfill feels off, especially because none of the code in the added method really interacts with the core instance database outside of a single call to self.add_backfill().
Right now, this single method supports a wide array of different parameter combinations, with only some subset of them being valid, which is pretty confusing for a user, even if we add a ton of examples of valid behavior.
I think the better way of factoring this behavior would be to just expose PartitionBackfill more directly, and make the existing static constructors public (with a potentially a bit of light refactoring of the input types, e.g. backfill_id could be an optional parameter -- if not provided, we'd create one for the user).
To start, we could just expose .from_asset_partitions()
as public, as I think that's the most common case, and its existing implementation is less subject to change than the from_partitions_by_assets (which has extra custom types)
++1 I think exposing the constructors on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have there been discussions in the build team about the implications of allowing users to launch backfills (and other jobs) from runs? I know in the past we've shied away from allowing/promoting this behavior and instead pointed users to sensors. Would love to get up to date on what you are all thinking in terms of best practices here
) | ||
|
||
tags = {t["key"]: t["value"] for t in backfill_params.get("tags", [])} | ||
|
||
tags = {**tags, **graphene_info.context.get_viewer_tags()} | ||
|
||
title = check_valid_title(backfill_params.get("title")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doing it for only job backfills is 100% just a goof on my part. we should be doing it for all types of backfills - thanks for catching!
switching to the check version of the fn is also fine w me. I think originally we were considering re-raising as a GrapheneError so that it had a more immediate failure for the user. But we've kind of abandoned adding titles/descriptions to backfills for now anyway, so keeping it as the generic python error is fine
## Summary & Motivation Fix a mistaken inconsistency; see #26731 (comment). ## How I Tested These Changes BK ## Changelog Added validation of `title` for asset backfills (not just for job backfills).
4423290
to
2186ef0
Compare
Summary & Motivation
Background
@smackesey has interacted with several users on the following scenario:
Problem Statement
We do not have a good solution to the above scenario.
So if X is a backfill, basically your only option is to use a job that then uses the private GQL API.
Objective
The purpose of this enhancement is to improve user experience. This will allow users to not have to resort to workarounds to perform a manual trigger a conditional launch of X.
The manifestation of this is to expose a public Python API for creating and launching asset backfills.
How I Tested These Changes
Added unit tests mirroring a couple of existing backfill tests, but using the newly-added Python API.
TODO
I wanted to get feedback/validation that I'm on the right track before proceeding much further, but I understand that the following things probably still need doing:
Changelog
Exposed a Python API for creating and launching asset backfills.