Skip to content

Commit

Permalink
Make UpstreamParittionsResult subset-awawre
Browse files Browse the repository at this point in the history
Summary:
This allows us to make the method on AssetGraphView compute_parent_subset_and_required_but_nonexistent_subset instead of still working in

I'm a little trepeditious about this because I could imagine cases where the invalid upstream partition is so invalid that it can't even be expressed as part of a PartitionsSubset? I guess in that case we could just raise an exception when you try to do the mapping like we already do in many cases (e.g. the timezones not matching).

Test Plan: BK

NOCHANGELOG

go all in

> Insert changelog entry or delete this section.

continue going all in

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Dec 11, 2024
1 parent 4f04a39 commit fa638ec
Show file tree
Hide file tree
Showing 18 changed files with 248 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
Literal,
NamedTuple,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Expand Down Expand Up @@ -230,19 +229,19 @@ def get_asset_subset_from_asset_partitions(
)
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def compute_parent_subset_and_required_but_nonexistent_partition_keys(
def compute_parent_subset_and_required_but_nonexistent_subset(
self, parent_key, subset: EntitySubset[T_EntityKey]
) -> Tuple[EntitySubset[AssetKey], Sequence[str]]:
) -> Tuple[EntitySubset[AssetKey], EntitySubset[AssetKey]]:
check.invariant(
parent_key in self.asset_graph.get(subset.key).parent_entity_keys,
)
to_key = parent_key
to_partitions_def = self.asset_graph.get(to_key).partitions_def

if subset.is_empty:
return self.get_empty_subset(key=parent_key), []
return self.get_empty_subset(key=parent_key), self.get_empty_subset(key=parent_key)
elif to_partitions_def is None:
return self.get_full_subset(key=to_key), []
return self.get_full_subset(key=to_key), self.get_empty_subset(key=parent_key)

upstream_partition_result = self._compute_upstream_partitions_result(to_key, subset)

Expand All @@ -252,7 +251,15 @@ def compute_parent_subset_and_required_but_nonexistent_partition_keys(
value=_ValidatedEntitySubsetValue(upstream_partition_result.partitions_subset),
)

return parent_subset, upstream_partition_result.required_but_nonexistent_partition_keys
required_but_nonexistent_subset = EntitySubset(
self,
key=to_key,
value=_ValidatedEntitySubsetValue(
upstream_partition_result.required_but_nonexistent_subset
),
)

return parent_subset, required_but_nonexistent_subset

def compute_parent_subset(
self, parent_key: AssetKey, subset: EntitySubset[T_EntityKey]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ def get_parents_partitions(
required_but_nonexistent_parent_partitions.update(
{
AssetKeyPartitionKey(parent_asset_key, invalid_partition)
for invalid_partition in mapped_partitions_result.required_but_nonexistent_partition_keys
for invalid_partition in mapped_partitions_result.required_but_nonexistent_subset.get_partition_keys()
}
)
else:
Expand Down
35 changes: 27 additions & 8 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def get_partition_keys_in_range(
]

def empty_subset(self) -> "PartitionsSubset":
return self.partitions_subset_class.empty_subset(self)
return self.partitions_subset_class.create_empty_subset(self)

def subset_with_partition_keys(self, partition_keys: Iterable[str]) -> "PartitionsSubset":
return self.empty_subset().with_partition_keys(partition_keys)
Expand Down Expand Up @@ -992,6 +992,7 @@ def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset":
if self is other or other.is_empty:
return self
# Anything | AllPartitionsSubset = AllPartitionsSubset
# (this assumes the two subsets are using the same partitions definition)
if isinstance(other, AllPartitionsSubset):
return other
return self.with_partition_keys(other.get_partition_keys())
Expand All @@ -1002,6 +1003,7 @@ def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset":
if other.is_empty:
return self
# Anything - AllPartitionsSubset = Empty
# (this assumes the two subsets are using the same partitions definition)
if isinstance(other, AllPartitionsSubset):
return self.empty_subset()
return self.empty_subset().with_partition_keys(
Expand All @@ -1014,6 +1016,7 @@ def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset":
if other.is_empty:
return other
# Anything & AllPartitionsSubset = Anything
# (this assumes the two subsets are using the same partitions definition)
if isinstance(other, AllPartitionsSubset):
return self
return self.empty_subset().with_partition_keys(
Expand Down Expand Up @@ -1045,9 +1048,11 @@ def __len__(self) -> int: ...
@abstractmethod
def __contains__(self, value) -> bool: ...

def empty_subset(self) -> "PartitionsSubset[T_str]": ...

@classmethod
@abstractmethod
def empty_subset(
def create_empty_subset(
cls, partitions_def: Optional[PartitionsDefinition] = None
) -> "PartitionsSubset[T_str]": ...

Expand Down Expand Up @@ -1212,11 +1217,16 @@ def __repr__(self) -> str:
return f"DefaultPartitionsSubset(subset={self.subset})"

@classmethod
def empty_subset(
def create_empty_subset(
cls, partitions_def: Optional[PartitionsDefinition] = None
) -> "DefaultPartitionsSubset":
return cls()

def empty_subset(
self,
) -> "DefaultPartitionsSubset":
return DefaultPartitionsSubset()


class AllPartitionsSubset(
NamedTuple(
Expand Down Expand Up @@ -1295,11 +1305,16 @@ def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset":
return other

def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset":
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsSubset
from dagster._core.definitions.time_window_partitions import (
TimeWindowPartitionsDefinition,
TimeWindowPartitionsSubset,
)

if self == other:
return self.partitions_def.empty_subset()
elif isinstance(other, TimeWindowPartitionsSubset):
elif isinstance(other, TimeWindowPartitionsSubset) and isinstance(
self.partitions_def, TimeWindowPartitionsDefinition
):
return TimeWindowPartitionsSubset.from_all_partitions_subset(self) - other
return self.partitions_def.empty_subset().with_partition_keys(
set(self.get_partition_keys()).difference(set(other.get_partition_keys()))
Expand Down Expand Up @@ -1340,11 +1355,15 @@ def from_serialized(
) -> "PartitionsSubset[T_str]":
raise NotImplementedError()

def empty_subset(
self, partitions_def: Optional[PartitionsDefinition] = None
) -> PartitionsSubset:
def empty_subset(self) -> PartitionsSubset:
return self.partitions_def.empty_subset()

@classmethod
def create_empty_subset(
cls, partitions_def: Optional[PartitionsDefinition] = None
) -> PartitionsSubset:
return check.not_none(partitions_def).empty_subset()

def to_serializable_subset(self) -> PartitionsSubset:
return self.partitions_def.subset_with_all_partitions(
current_time=self.current_time, dynamic_partitions_store=self.dynamic_partitions_store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from abc import ABC, abstractmethod, abstractproperty
from collections import defaultdict
from datetime import datetime
from functools import lru_cache
from functools import cached_property, lru_cache
from typing import (
Collection,
Dict,
Expand All @@ -27,29 +27,36 @@
)
from dagster._core.definitions.partition import (
AllPartitionsSubset,
DefaultPartitionsSubset,
PartitionsDefinition,
PartitionsSubset,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.instance import DynamicPartitionsStore
from dagster._record import record
from dagster._serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._utils.warnings import disable_dagster_warnings


class UpstreamPartitionsResult(NamedTuple):
@record
class UpstreamPartitionsResult:
"""Represents the result of mapping a PartitionsSubset to the corresponding
partitions in another PartitionsDefinition.
partitions_subset (PartitionsSubset): The resulting partitions subset that was
mapped to. Only contains partitions for existent partitions, filtering out nonexistent partitions.
required_but_nonexistent_partition_keys (Sequence[str]): A list containing invalid partition keys in to_partitions_def
required_but_nonexistent_subset (PartitionsSubset): A set of invalid partition keys in to_partitions_def
that partitions in from_partitions_subset were mapped to.
"""

partitions_subset: PartitionsSubset
required_but_nonexistent_partition_keys: Sequence[str]
required_but_nonexistent_subset: PartitionsSubset

@cached_property
def required_but_nonexistent_partition_keys(self) -> Sequence[str]:
return list(self.required_but_nonexistent_subset.get_partition_keys())


class PartitionMapping(ABC):
Expand Down Expand Up @@ -94,11 +101,11 @@ def get_upstream_mapped_partitions_result_for_partitions(
partitions subset was mapped to in the upstream partitions definition.
Valid upstream partitions will be included in UpstreamPartitionsResult.partitions_subset.
Invalid upstream partitions will be included in UpstreamPartitionsResult.required_but_nonexistent_partition_keys.
Invalid upstream partitions will be included in UpstreamPartitionsResult.required_but_nonexistent_subset.
For example, if an upstream asset is time-partitioned and starts in June 2023, and the
downstream asset is time-partitioned and starts in May 2023, this function would return a
UpstreamPartitionsResult(PartitionsSubset("2023-06-01"), required_but_nonexistent_partition_keys=["2023-05-01"])
UpstreamPartitionsResult(PartitionsSubset("2023-06-01"), required_but_nonexistent_subset=PartitionsSubset("2023-05-01"))
when downstream_partitions_subset contains 2023-05-01 and 2023-06-01.
"""

Expand Down Expand Up @@ -126,8 +133,13 @@ def get_upstream_mapped_partitions_result_for_partitions(
check.failed("downstream asset is not partitioned")

if downstream_partitions_def == upstream_partitions_def:
return UpstreamPartitionsResult(downstream_partitions_subset, [])
return UpstreamPartitionsResult(
partitions_subset=downstream_partitions_subset,
required_but_nonexistent_subset=upstream_partitions_def.empty_subset(),
)

# must list out the keys before combining them since they might be from
# different asset keys
upstream_partition_keys = set(
upstream_partitions_def.get_partition_keys(
dynamic_partitions_store=dynamic_partitions_store
Expand All @@ -136,10 +148,12 @@ def get_upstream_mapped_partitions_result_for_partitions(
downstream_partition_keys = set(downstream_partitions_subset.get_partition_keys())

return UpstreamPartitionsResult(
upstream_partitions_def.subset_with_partition_keys(
partitions_subset=upstream_partitions_def.subset_with_partition_keys(
list(upstream_partition_keys & downstream_partition_keys)
),
list(downstream_partition_keys - upstream_partition_keys),
required_but_nonexistent_subset=DefaultPartitionsSubset(
downstream_partition_keys - upstream_partition_keys,
),
)

def get_downstream_partitions_for_partitions(
Expand Down Expand Up @@ -202,7 +216,8 @@ def get_upstream_mapped_partitions_result_for_partitions(
current_time=current_time, dynamic_partitions_store=dynamic_partitions_store
)
return UpstreamPartitionsResult(
partitions_subset=partitions_subset, required_but_nonexistent_partition_keys=[]
partitions_subset=partitions_subset,
required_but_nonexistent_subset=upstream_partitions_def.empty_subset(),
)

def get_downstream_partitions_for_partitions(
Expand Down Expand Up @@ -252,7 +267,10 @@ def get_upstream_mapped_partitions_result_for_partitions(
if last is not None:
upstream_subset = upstream_subset.with_partition_keys([last])

return UpstreamPartitionsResult(upstream_subset, [])
return UpstreamPartitionsResult(
partitions_subset=upstream_subset,
required_but_nonexistent_subset=upstream_partitions_def.empty_subset(),
)

def get_downstream_partitions_for_partitions(
self,
Expand Down Expand Up @@ -313,7 +331,10 @@ def get_upstream_mapped_partitions_result_for_partitions(
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> UpstreamPartitionsResult:
return UpstreamPartitionsResult(
upstream_partitions_def.subset_with_partition_keys(self.partition_keys), []
partitions_subset=upstream_partitions_def.subset_with_partition_keys(
self.partition_keys
),
required_but_nonexistent_subset=upstream_partitions_def.empty_subset(),
)

def get_downstream_partitions_for_partitions(
Expand Down Expand Up @@ -483,8 +504,13 @@ def _get_dependency_partitions_subset(
dep_b_keys_by_a_dim_and_key[a_dim_name][key] = list(
mapped_partitions_result.partitions_subset.get_partition_keys()
)

# enumerating partition keys since the two subsets might be from different
# asset keys
required_but_nonexistent_upstream_partitions.update(
set(mapped_partitions_result.required_but_nonexistent_partition_keys)
set(
mapped_partitions_result.required_but_nonexistent_subset.get_partition_keys()
)
)

b_partition_keys = set()
Expand Down Expand Up @@ -534,8 +560,8 @@ def _get_dependency_partitions_subset(
return mapped_subset
else:
return UpstreamPartitionsResult(
mapped_subset,
required_but_nonexistent_partition_keys=list(
partitions_subset=mapped_subset,
required_but_nonexistent_subset=DefaultPartitionsSubset(
required_but_nonexistent_upstream_partitions
),
)
Expand Down Expand Up @@ -960,13 +986,19 @@ def get_upstream_mapped_partitions_result_for_partitions(

upstream_subset = upstream_partitions_def.empty_subset()
if downstream_partitions_subset is None:
return UpstreamPartitionsResult(upstream_subset, [])
return UpstreamPartitionsResult(
partitions_subset=upstream_subset,
required_but_nonexistent_subset=upstream_partitions_def.empty_subset(),
)

upstream_keys = set()
for key in downstream_partitions_subset.get_partition_keys():
upstream_keys.update(self._inverse_mapping[key])

return UpstreamPartitionsResult(upstream_subset.with_partition_keys(upstream_keys), [])
return UpstreamPartitionsResult(
partitions_subset=upstream_subset.with_partition_keys(upstream_keys),
required_but_nonexistent_subset=upstream_partitions_def.empty_subset(),
)

@property
def description(self) -> str:
Expand Down
Loading

0 comments on commit fa638ec

Please sign in to comment.