Replies: 1 comment 1 reply
-
Hi @smphhh ! Thanks for putting this together -- at a high level, this is quite similar to the behavior of AutomationCondition.on_cron(), where you give a single cron schedule for an asset, and it will materialize after all of its parents have an update since the latest tick of the cron schedule. The main difference here is that each individual asset does not "care" about the cron schedules defined on its dependencies. This was an explicit choice, as there are plenty of cases where it's not possible to define an explicit cron schedule for an upstream asset (it might be scheduled in a way that doesn't really map to an explicit cron schedule). However, that's not an inherent limitation of the system, it's just a simplifying assumption for the built-in condition. There are two ways to add more flexibility to the default The scheme you described is even more general, and applies a different cron schedule per upstream dependency. Using AutomationConditions, one way of of doing this would be something along the lines of: def get_condition(
base_cron_schedule: str, upstream_cron_schedules: Mapping[AssetKey, str]
) -> AutomationCondition:
all_upstreams_updated = reduce(
lambda a, b: a & b,
[
AutomationCondition.asset_matches(
key,
AutomationCondition.newly_updated().since(
AutomationCondition.cron_tick_passed(cron_schedule)
),
)
for key, cron_schedule in upstream_cron_schedules.items()
],
)
return AutomationCondition.cron_tick_passed(base_cron_schedule) & all_upstreams_updated This constructs a condition that individually detects that each upstream has been updated since the latest tick of their specific cron schedule (rather than using a uniform cron schedule for all upstreams). |
Beta Was this translation helpful? Give feedback.
-
Hi,
A while ago we decided to try implementing our own general-purpose scheduling sensor for declarative automation because we had trouble implementing some pretty common scheduling patterns using Dagster's built-in scheduling mechanisms. Those patterns include cases where dependencies have schedules that are misaligned or have different frequencies.
I though I'd share the logic behind the sensor because I think it solves some of those patterns pretty elegantly. Admittedly I'm not that familiar with Dagster's latest declarative automation feature, so feel free to point out if it includes mechanisms that could be used to achieve behavior similar to our sensor's I'll describe below. While the sensor now works quite well, the observability could be better, so therefore any Dagster-native scheduling mechanisms would be preferable of course.
I'll start by describing the behavior of the sensor for non-partitioned assets, and later talk about how it naturally extends to partitioned assets.
Each asset can define a schedule, which is a sequence of successive schedule timestamps. The schedule can be defined as a cron expression, for example.
An asset without any dependencies can be materialized as soon as the current time passes the next schedule timestamp in the schedule. The materialization is logically tied to that schedule timestamp.
An asset with dependencies can be materialized when the current time passes the next schedule timestamp, but only after any materializations of its immediate upstream dependencies with equal or earlier schedule timestamps have finished. Note that due to recursion the same condition will hold for non-immediate upstream dependencies as well.
An interesting property of the above rules is that in terms of data, the outcome of the materializations will be the same as if all the materialization jobs of all assets would be put into a single queue and executed one by one in order of their schedule timestamps, and in order of asset dependencies in cases where materializations of multiple assets have the same schedule timestamp. Well, strictly speaking that'd only be true if scheduling and materialization happened immediately, but we do have a configuration option that allows to enforce the effective ordering to various degrees.
Another nice property is that external dependencies integrate naturally in this. The only thing that needs to be determined for the external asset is the next schedule timestamp in its schedule. That timestamp basically tells the sensor that any materializations of downstream assets with schedule timestamps earlier than the next schedule timestamp of the external asset can go forward (as long as there are no other blocking dependencies of course), but materializations with later or equal schedule timestamps need to wait. In fact, the above property is used in the implemention of the sensor to resolve internal dependencies as well, which makes the implementation relatively straightforward.
We also allow specifying schedule timestamp margins for external asset dependencies such that one can tell that the sensor should wait for upstream materializations with schedule timestamps up to a bit after (or before) the schedule timestamp being considered for materialization. This is useful if the external dependencies are materialized at a schedule following a different timezone, for example.
Materializations at schedule timestamps can also be skipped (optionally) if a materialization at a later schedule timestamp is already possible. For the purposes of scheduling, a skipped materialization can be considered to have happened if a materialization at a later schedule timestamp has completed.
SLAs are also supported quite naturally. Basically with an SLA of four hours (for example) a materialization with a particular schedule timestamp has four hours to complete counted from the schedule timestamp.
In principle partitioned assets can be treated in an equivalent way as a collection of independent mini-assets each with their own schedule, and with partition-to-partition dependencies determined by partition mappings in addition to the asset-level dependencies. In practice though in our implementation we make some assumptions for efficiency reasons, and handle dependencies on partitioned assets in sort of an aggregate way. We also derive the partition-level schedules from a single asset-level schedule. For time-partitioned assets the schedule timestamps are assigned to the partitions according to the partition windows which implicitly defines a (finite) schedule for each partition, and for static-partitioned assets the same asset-level schedule is just used for all partitions as is.
Coming back to the patterns I mentioned in the beginning, I'll describe a couple of examples of how the sensor behaves.
Let's have asset A scheduled to be materialized at 1 AM each night, and assets B and C scheduled to be materialized at 2 AM each night. C depends on both A and B. Materialization of C scheduled for 2 AM will start only once the materialization of asset A scheduled for 1 AM and the materialization of asset B scheduled for 2 AM have both completed.
In another example let's have asset A scheduled to be materialized once per day at 1 AM and asset B scheduled to be materialized every hour. The materialization of asset B at midnight can start immediately as long as the materialization of asset A at 1 AM the previous day (23 hours ago) has completed. The materialization of asset B at 1 AM will wait for the materialization of asset A at 1 AM. The materialization of asset B at 2 AM will wait for the materialization of asset A at 1 AM (if necessary), but also for its own previous materialization, because we don't allow multiple concurrent materializations for the same asset/partition.
I think that's pretty much it, any thoughts?
Beta Was this translation helpful? Give feedback.
All reactions