Skip to content

Commit

Permalink
Accept set of keys instead of partition key ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman committed Dec 31, 2024
1 parent 31e0d77 commit ab28602
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,17 @@ def assert_valid_asset_partition_backfill(
if not asset_backfill_data:
return

partition_subset_by_asset_key = (
partitions_subsets_by_asset_key = (
asset_backfill_data.target_subset.partitions_subsets_by_asset_key
)

for asset_key, partition_subset in partition_subset_by_asset_key.items():
for asset_key, partitions_subset in partitions_subsets_by_asset_key.items():
partitions_def = asset_graph.get(asset_key).partitions_def

if not partitions_def:
continue

invalid_keys = set(partition_subset.get_partition_keys()) - set(
invalid_keys = set(partitions_subset.get_partition_keys()) - set(
partitions_def.get_partition_keys(backfill_datetime, dynamic_partitions_store)
)

Expand Down
33 changes: 20 additions & 13 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3169,9 +3169,7 @@ def launch_backfill(
self,
asset_graph: "BaseAssetGraph",
asset_selection: Optional[Sequence[AssetKey]] = None,
partitions_by_assets: Optional[
Sequence[Tuple[AssetKey, Optional[Sequence[Tuple[str, str]]]]]
] = None,
partitions_by_assets: Optional[Mapping[AssetKey, Optional[AbstractSet[str]]]] = None,
partition_names: Optional[Sequence[str]] = None,
tags: Mapping[str, str] = {},
all_partitions: bool = False,
Expand All @@ -3185,7 +3183,7 @@ def launch_backfill(
Args:
asset_graph (BaseAssetGraph): The asset graph for the backfill.
asset_selection (Optional[Sequence[AssetKey]]): List of asset keys to backfill.
partitions_by_assets (Optional[Sequence[Tuple[AssetKey, Optional[Sequence[Tuple[str, str]]]]]]):
partitions_by_assets (Optional[Mapping[AssetKey, Optional[AbstractSet[str]]]]):
List of partitions for each asset key to backfill.
partition_names (Optional[Sequence[str]]): List of partition names to backfill. Only
valid if `asset_selection` is provided.
Expand All @@ -3194,7 +3192,10 @@ def launch_backfill(
str: The ID of the backfill.
"""
# TODO(deepyaman): Abstract logic shared with `dagster-graphql`.
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader
from dagster._core.definitions.partition import (
CachingDynamicPartitionsLoader,
DefaultPartitionsSubset,
)
from dagster._core.definitions.selector import (
PartitionRangeSelector,
PartitionsByAssetSelector,
Expand Down Expand Up @@ -3232,11 +3233,17 @@ def launch_backfill(
)
elif partitions_by_assets is not None:
partitions_by_asset_selectors = []
for asset_key, partitions in partitions_by_assets:
partitions_selector = (
PartitionsSelector(ranges=[PartitionRangeSelector(*r) for r in partitions])
if partitions is not None
else None
for asset_key, subset in partitions_by_assets.items():
partitions_def = asset_graph.get(asset_key).partitions_def
partitions_subset = DefaultPartitionsSubset(subset)
partition_key_ranges = partitions_subset.get_partition_key_ranges(
partitions_def, dynamic_partitions_store=dynamic_partitions_store
)
partitions_selector = PartitionsSelector(
ranges=[
PartitionRangeSelector(partition_key_range.start, partition_key_range.end)
for partition_key_range in partition_key_ranges
]
)
partitions_by_asset_selectors.append(
PartitionsByAssetSelector(asset_key=asset_key, partitions=partitions_selector)
Expand All @@ -3261,15 +3268,15 @@ def launch_backfill(
# TODO(deepyaman): Abstract logic shared with `dagster-graphql`.
asset_backfill_data = backfill.asset_backfill_data
if asset_backfill_data:
partition_subset_by_asset_key = (
partitions_subsets_by_asset_key = (
asset_backfill_data.target_subset.partitions_subsets_by_asset_key
)
for asset_key, partition_subset in partition_subset_by_asset_key.items():
for asset_key, partitions_subset in partitions_subsets_by_asset_key.items():
partitions_def = asset_graph.get(asset_key).partitions_def
if not partitions_def:
continue

invalid_keys = set(partition_subset.get_partition_keys()) - set(
invalid_keys = set(partitions_subset.get_partition_keys()) - set(
partitions_def.get_partition_keys(backfill_datetime, dynamic_partitions_store)
)
if invalid_keys:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2497,9 +2497,7 @@ def test_asset_backfill_with_single_run_backfill_policy_using_python_api(

backfill_id = instance.launch_backfill(
asset_graph=asset_graph,
partitions_by_assets=[
(asset_with_single_run_backfill_policy.key, [(partitions[0], partitions[-1])])
],
partitions_by_assets={asset_with_single_run_backfill_policy.key: set(partitions)},
tags={},
title=None,
description=None,
Expand Down

0 comments on commit ab28602

Please sign in to comment.