Skip to content

Commit

Permalink
Don't enumerate partition keys when constructing run requests for a s…
Browse files Browse the repository at this point in the history
…ingle-run backfill

Summary:
This ensures that if you take a giant partition set and use a single run backfill, we don't individually enumerate every single key before assembling them back into a partition key range.

(we would have to get even fancier to be able to handle the non-single backfil multi-run case without ever enumerating partition key - would require some kindo of chunking method at the partitions level that lets you break up partition ranges into chunks).

BK

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Dec 27, 2024
1 parent a1a5805 commit 2b0fc23
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
AbstractSet,
Any,
Dict,
FrozenSet,
Iterable,
List,
Mapping,
Expand Down Expand Up @@ -36,8 +35,8 @@
from dagster._core.definitions.declarative_automation.serialized_objects import (
AutomationConditionEvaluation,
)
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset
from dagster._core.definitions.run_request import RunRequest
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.storage.tags import (
Expand Down Expand Up @@ -344,44 +343,53 @@ def _build_run_requests_from_partitions_def_mapping(


def build_run_requests_with_backfill_policies(
asset_partitions: Iterable[AssetKeyPartitionKey],
asset_subset_to_request: AssetGraphSubset,
asset_graph: BaseAssetGraph,
dynamic_partitions_store: DynamicPartitionsStore,
) -> Sequence[RunRequest]:
"""Build run requests for a selection of asset partitions based on the associated BackfillPolicies."""
run_requests = []

asset_partition_keys: Mapping[AssetKey, Set[str]] = {
asset_key_partition.asset_key: set() for asset_key_partition in asset_partitions
}
for asset_partition in asset_partitions:
if asset_partition.partition_key:
asset_partition_keys[asset_partition.asset_key].add(asset_partition.partition_key)

assets_to_reconcile_by_partitions_def_partition_keys_backfill_policy: Mapping[
Tuple[Optional[PartitionsDefinition], Optional[FrozenSet[str]], Optional[BackfillPolicy]],
Tuple[
Optional[PartitionsDefinition],
Optional[Tuple[PartitionKeyRange]],
Optional[BackfillPolicy],
],
Set[AssetKey],
] = defaultdict(set)

# here we are grouping assets by their partitions def, selected partition keys, and backfill policy.
for asset_key, partition_keys in asset_partition_keys.items():
# here we are grouping assets by their partitions def, selected partition ranges, and backfill policy.
for serializable_entity_subset in asset_subset_to_request.iterate_asset_subsets(asset_graph):
asset_key = serializable_entity_subset.key
partitions_def = asset_graph.get(asset_key).partitions_def
partition_key_ranges = (
tuple( # allows the value to be hashed
serializable_entity_subset.value.get_partition_key_ranges(
partitions_def, dynamic_partitions_store=dynamic_partitions_store
)
)
if isinstance(serializable_entity_subset.value, PartitionsSubset)
else None
)

assets_to_reconcile_by_partitions_def_partition_keys_backfill_policy[
asset_graph.get(asset_key).partitions_def,
frozenset(partition_keys) if partition_keys else None,
partitions_def,
partition_key_ranges,
asset_graph.get(asset_key).backfill_policy,
].add(asset_key)

for (
partitions_def,
partition_keys,
partition_key_ranges,
backfill_policy,
), asset_keys in assets_to_reconcile_by_partitions_def_partition_keys_backfill_policy.items():
asset_check_keys = asset_graph.get_check_keys_for_assets(asset_keys)
if partitions_def is None and partition_keys is not None:
check.failed("Partition key provided for unpartitioned asset")
elif partitions_def is not None and partition_keys is None:
check.failed("Partition key missing for partitioned asset")
elif partitions_def is None and partition_keys is None:
if partitions_def is None and partition_key_ranges is not None:
check.failed("Partition key range provided for unpartitioned asset")
elif partitions_def is not None and partition_key_ranges is None:
check.failed("Partition key range missing for partitioned asset")
elif partitions_def is None and partition_key_ranges is None:
# non partitioned assets will be backfilled in a single run
run_requests.append(
RunRequest(
Expand All @@ -392,6 +400,14 @@ def build_run_requests_with_backfill_policies(
)
elif backfill_policy is None:
# just use the normal single-partition behavior
partition_keys = set()
for partition_key_range in check.not_none(partition_key_ranges):
partition_keys = partition_keys.union(
check.not_none(partitions_def).get_partition_keys_in_range(
partition_key_range,
dynamic_partitions_store=dynamic_partitions_store,
)
)
entity_keys = cast(Set[EntityKey], asset_keys)
mapping: _PartitionsDefKeyMapping = {
(partitions_def, pk): entity_keys for pk in (partition_keys or [None])
Expand All @@ -405,7 +421,7 @@ def build_run_requests_with_backfill_policies(
list(asset_keys),
list(asset_check_keys),
check.not_none(backfill_policy),
check.not_none(partition_keys),
check.not_none(partition_key_ranges),
check.not_none(partitions_def),
tags={},
dynamic_partitions_store=dynamic_partitions_store,
Expand All @@ -418,16 +434,12 @@ def _build_run_requests_with_backfill_policy(
asset_keys: Sequence[AssetKey],
asset_check_keys: Sequence[AssetCheckKey],
backfill_policy: BackfillPolicy,
partition_keys: FrozenSet[str],
partition_key_ranges: Sequence[PartitionKeyRange],
partitions_def: PartitionsDefinition,
tags: Dict[str, Any],
dynamic_partitions_store: DynamicPartitionsStore,
) -> Sequence[RunRequest]:
run_requests = []
partition_subset = partitions_def.subset_with_partition_keys(partition_keys)
partition_key_ranges = partition_subset.get_partition_key_ranges(
partitions_def, dynamic_partitions_store=dynamic_partitions_store
)
for partition_key_range in partition_key_ranges:
# We might resolve more than one partition key range for the given partition keys.
# We can only apply chunking on individual partition key ranges.
Expand Down Expand Up @@ -470,6 +482,9 @@ def _build_run_requests_for_partition_key_range(
"""Builds multiple run requests for the given partition key range. Each run request will have at most
max_partitions_per_run partitions.
"""
# TODO add a method to PartitionsDefinition that lets us return a list of ranges
# with at most max_partitions_per_run partitions in each range, so that we don't
# have to enumerate every partition key here
partition_keys = partitions_def.get_partition_keys_in_range(
partition_key_range, dynamic_partitions_store=dynamic_partitions_store
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1532,8 +1532,6 @@ def execute_asset_backfill_iteration_inner(
else "No asset partitions to request."
)

asset_partitions_to_request = set(asset_subset_to_request.iterate_asset_partitions())

if len(not_requested_and_reasons) > 0:

def _format_graph_subset(asset_graph_subset: AssetGraphSubset):
Expand All @@ -1555,7 +1553,7 @@ def _format_graph_subset(asset_graph_subset: AssetGraphSubset):
)

run_requests = build_run_requests_with_backfill_policies(
asset_partitions=asset_partitions_to_request,
asset_subset_to_request=asset_subset_to_request,
asset_graph=asset_graph,
dynamic_partitions_store=instance_queryer,
)
Expand Down

0 comments on commit 2b0fc23

Please sign in to comment.