Skip to content

Commit

Permalink
Implement Python API for launching asset backfills
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman committed Dec 27, 2024
1 parent 8c6746a commit f7133d7
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def create_and_launch_partition_backfill(
asset_selection or backfill_params.get("selector") or backfill_params.get("partitionNames")
) and partitions_by_assets:
raise DagsterInvariantViolationError(
"partitions_by_assets cannot be used together with asset_selection, selector, or"
"partitionsByAssets cannot be used together with assetSelection, selector, or"
" partitionNames"
)

Expand Down Expand Up @@ -231,10 +231,10 @@ def create_and_launch_partition_backfill(
)

backfill = PartitionBackfill.from_asset_partitions(
asset_graph=asset_graph,
backfill_id=backfill_id,
tags=tags,
asset_graph=asset_graph,
backfill_timestamp=backfill_timestamp,
tags=tags,
asset_selection=asset_selection,
partition_names=backfill_params.get("partitionNames"),
dynamic_partitions_store=dynamic_partitions_store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ def test_asset_backfill_error_raised_upon_invalid_params_provided():
== "PythonError"
)
assert (
"partitions_by_assets cannot be used together with asset_selection, selector, or partitionNames"
"partitionsByAssets cannot be used together with assetSelection, selector, or partitionNames"
in launch_backfill_result.data["launchPartitionBackfill"]["message"]
)

Expand Down
87 changes: 82 additions & 5 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from dagster._core.definitions.events import AssetKey, AssetObservation
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.errors import (
DagsterError,
DagsterHomeNotSetError,
DagsterInvalidInvocationError,
DagsterInvariantViolationError,
Expand Down Expand Up @@ -81,8 +82,9 @@
TAGS_TO_MAYBE_OMIT_ON_RETRY,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_backfill_id
from dagster._serdes import ConfigurableClass
from dagster._time import get_current_datetime, get_current_timestamp
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp
from dagster._utils import PrintFn, is_uuid, traced
from dagster._utils.error import serializable_error_info_from_exc_info
from dagster._utils.merger import merge_dicts
Expand Down Expand Up @@ -2260,7 +2262,7 @@ def get_event_tags_for_asset(
@public
@traced
def wipe_assets(self, asset_keys: Sequence[AssetKey]) -> None:
"""Wipes asset event history from the event log for the given asset keys.
"""Wipe asset event history from the event log for the given asset keys.
Args:
asset_keys (Sequence[AssetKey]): Asset keys to wipe.
Expand All @@ -2277,7 +2279,7 @@ def wipe_asset_partitions(
"""Wipes asset event history from the event log for the given asset key and partition keys.
Args:
asset_key (Sequence[AssetKey]): Asset key to wipe.
asset_key (AssetKey): Asset key to wipe.
partition_keys (Sequence[str]): Partition keys to wipe.
"""
self._event_storage.wipe_asset_partitions(asset_key, partition_keys)
Expand Down Expand Up @@ -2649,7 +2651,7 @@ def storage_directory(self) -> str:
def schedules_directory(self) -> str:
return self._local_artifact_storage.schedules_dir

# Runs coordinator
# Run coordinator

def submit_run(self, run_id: str, workspace: "BaseWorkspaceRequestContext") -> DagsterRun:
"""Submit a pipeline run to the coordinator.
Expand Down Expand Up @@ -3161,6 +3163,81 @@ def add_backfill(self, partition_backfill: "PartitionBackfill") -> None:
def update_backfill(self, partition_backfill: "PartitionBackfill") -> None:
self._run_storage.update_backfill(partition_backfill)

@public
def launch_backfill(
self,
asset_graph: "BaseAssetGraph",
asset_selection: Optional[Sequence[AssetKey]] = None,
partitions_by_assets: Optional[
Sequence[Tuple[AssetKey, Optional[Sequence[Tuple[str, str]]]]]
] = None,
partition_names: Optional[Sequence[str]] = None,
tags: Mapping[str, str] = {},
all_partitions: bool = False,
title: Optional[str] = None,
description: Optional[str] = None,
) -> None:
"""Launch a set of partition backfill runs.
Args:
asset_graph (BaseAssetGraph): The asset graph for the backfill.
asset_selection (Optional[Sequence[AssetKey]]): List of asset keys to backfill.
partitions_by_assets (Optional[Sequence[Tuple[AssetKey, Optional[Sequence[Tuple[str, str]]]]]]):
List of partitions for each asset key to backfill.
partition_names (Optional[Sequence[str]]): List of partition names to backfill. Only
valid if `asset_selection` is provided.
"""
# TODO(deepyaman): Abstract logic shared with `dagster-graphql`.
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader
from dagster._core.execution.backfill import PartitionBackfill

backfill_id = make_new_backfill_id()
backfill_timestamp = get_current_timestamp()
backfill_datetime = datetime_from_timestamp(backfill_timestamp)
dynamic_partitions_store = CachingDynamicPartitionsLoader(self)

if (
asset_selection is not None or partition_names is not None
) and partitions_by_assets is not None:
raise DagsterInvariantViolationError(
"partitions_by_assets cannot be used together with asset_selection or"
" partition_names"
)

if asset_selection is not None:
backfill = PartitionBackfill.from_asset_partitions(
backfill_id=backfill_id,
asset_graph=asset_graph,
asset_selection=asset_selection,
partition_names=partition_names,
backfill_timestamp=backfill_timestamp,
tags=tags,
dynamic_partitions_store=dynamic_partitions_store,
all_partitions=all_partitions,
title=title,
description=description,
)
elif partitions_by_assets is not None:
backfill = PartitionBackfill.from_partitions_by_assets(
backfill_id=backfill_id,
asset_graph=asset_graph,
partitions_by_assets=partitions_by_assets,
backfill_timestamp=backfill_timestamp,
tags=tags,
dynamic_partitions_store=dynamic_partitions_store,
title=title,
description=description,
)
else:
raise DagsterError(
"Backfill requested without specifying partition set selector or asset selection"
)

# TODO(deepyaman): Assert valid asset partition backfill
assert backfill_datetime is not None

self.add_backfill(backfill)

@property
def should_start_background_run_thread(self) -> bool:
"""Gate on an experimental feature to start a thread that monitors for if the run should be canceled."""
Expand Down Expand Up @@ -3236,7 +3313,7 @@ def get_latest_data_version_record(
def get_latest_materialization_code_versions(
self, asset_keys: Iterable[AssetKey]
) -> Mapping[AssetKey, Optional[str]]:
"""Returns the code version used for the latest materialization of each of the provided
"""Return the code version used for the latest materialization of each of the provided
assets.
Args:
Expand Down

0 comments on commit f7133d7

Please sign in to comment.