Skip to content

Commit

Permalink
woo
Browse files Browse the repository at this point in the history
> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Nov 19, 2024
1 parent 56ea925 commit 7d1ff8b
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Literal,
NamedTuple,
Optional,
Tuple,
Type,
TypeVar,
)
Expand All @@ -24,6 +25,7 @@
PartitionDimensionDefinition,
)
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._core.definitions.partition_mapping import UpstreamPartitionsResult
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
Expand Down Expand Up @@ -227,6 +229,46 @@ def get_asset_subset_from_asset_partitions(
)
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def compute_parent_subset_and_required_but_not_existent_parent_subset(
self, parent_key, subset: EntitySubset[T_EntityKey]
) -> Tuple[EntitySubset[AssetKey], EntitySubset[AssetKey]]:
check.invariant(
parent_key in self.asset_graph.get(subset.key).parent_entity_keys,
)
to_key = parent_key
to_partitions_def = self.asset_graph.get(to_key).partitions_def

if subset.is_empty:
empty_subset = self.get_empty_subset(key=parent_key)
return empty_subset, empty_subset
elif to_partitions_def is None:
return self.get_full_subset(key=to_key), self.get_empty_subset(key=to_key)

upstream_partition_result = self._compute_upstream_partitions_result(to_key, subset)

parent_subset = EntitySubset(
self,
key=to_key,
value=_ValidatedEntitySubsetValue(upstream_partition_result.partitions_subset),
)

required_but_nonexistent_parent_partitions = self._compute_upstream_partitions_result(
to_key, subset
).required_but_nonexistent_partition_keys

if not required_but_nonexistent_parent_partitions:
required_but_nonexistent_parent_subset = self.get_empty_subset(key=parent_key)
else:
required_but_nonexistent_parent_subset = self.get_asset_subset_from_asset_partitions(
to_key,
{
AssetKeyPartitionKey(parent_key, partition)
for partition in required_but_nonexistent_parent_partitions
},
)

return parent_subset, required_but_nonexistent_parent_subset

def compute_parent_subset(
self, parent_key: AssetKey, subset: EntitySubset[T_EntityKey]
) -> EntitySubset[AssetKey]:
Expand All @@ -243,6 +285,25 @@ def compute_child_subset(
)
return self.compute_mapped_subset(child_key, subset, direction="down")

def _compute_upstream_partitions_result(
self, to_key: T_EntityKey, from_subset: EntitySubset
) -> UpstreamPartitionsResult:
from_key = from_subset.key
parent_key = to_key
partition_mapping = self.asset_graph.get_partition_mapping(from_key, parent_key)
from_partitions_def = self.asset_graph.get(from_key).partitions_def
to_partitions_def = self.asset_graph.get(to_key).partitions_def

return partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
downstream_partitions_subset=from_subset.get_internal_subset_value()
if from_partitions_def is not None
else None,
downstream_partitions_def=from_partitions_def,
upstream_partitions_def=check.not_none(to_partitions_def),
dynamic_partitions_store=self._queryer,
current_time=self.effective_dt,
)

def compute_mapped_subset(
self, to_key: T_EntityKey, from_subset: EntitySubset, direction: Literal["up", "down"]
) -> EntitySubset[T_EntityKey]:
Expand Down Expand Up @@ -281,17 +342,9 @@ def compute_mapped_subset(
parent_key = to_key
partition_mapping = self.asset_graph.get_partition_mapping(child_key, parent_key)

to_partitions_subset = (
partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
downstream_partitions_subset=from_subset.get_internal_subset_value()
if from_partitions_def is not None
else None,
downstream_partitions_def=from_partitions_def,
upstream_partitions_def=to_partitions_def,
dynamic_partitions_store=self._queryer,
current_time=self.effective_dt,
).partitions_subset
)
to_partitions_subset = self._compute_upstream_partitions_result(
to_key, from_subset
).partitions_subset

return EntitySubset(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [
"""Maps every partition in the downstream asset to every partition in the upstream asset.
Commonly used in the case when the downstream asset is not partitioned, in which the entire
downstream asset depends on all partitions of the usptream asset.
downstream asset depends on all partitions of the upstream asset.
"""

def get_upstream_mapped_partitions_result_for_partitions(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import AssetDep, Definitions, asset
from dagster import AssetDep, Definitions, IdentityPartitionMapping, asset
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.events import AssetKeyPartitionKey
Expand Down Expand Up @@ -27,6 +27,46 @@ def an_asset() -> None: ...
assert asset_graph_view_t0.asset_graph.get_all_asset_keys() == {an_asset.key}


def test_upstream_non_existent_partitions():
xy = StaticPartitionsDefinition(["x", "y"])
zx = StaticPartitionsDefinition(["z", "x"])

@asset(partitions_def=xy)
def up_asset() -> None: ...

@asset(
deps=[AssetDep(up_asset, partition_mapping=IdentityPartitionMapping())],
partitions_def=zx,
)
def down_asset(): ...

defs = Definitions([up_asset, down_asset])

with DagsterInstance.ephemeral() as instance:
asset_graph_view = AssetGraphView.for_test(defs, instance)
down_subset = asset_graph_view.get_full_subset(key=down_asset.key)
assert down_subset.expensively_compute_partition_keys() == {"x", "z"}

parent_subset, required_but_nonexistent_parent_subset = (
asset_graph_view.compute_parent_subset_and_required_but_not_existent_parent_subset(
parent_key=up_asset.key, subset=down_subset
)
)

assert parent_subset.expensively_compute_partition_keys() == {"x"}
assert required_but_nonexistent_parent_subset.expensively_compute_partition_keys() == {"z"}

# Mapping onto an empty subset is empty
empty_down_subset = asset_graph_view.get_empty_subset(key=down_asset.key)
parent_subset, required_but_nonexistent_parent_subset = (
asset_graph_view.compute_parent_subset_and_required_but_not_existent_parent_subset(
parent_key=up_asset.key, subset=empty_down_subset
)
)
assert parent_subset.is_empty
assert required_but_nonexistent_parent_subset.is_empty


def test_subset_traversal_static_partitions() -> None:
number_keys = {"1", "2", "3"}
letter_keys = {"a", "b", "c"}
Expand Down Expand Up @@ -133,6 +173,43 @@ def up_numbers() -> None: ...
)


def test_downstream_of_unpartitioned_partition_mapping() -> None:
@asset
def unpartitioned() -> None: ...

@asset(
partitions_def=StaticPartitionsDefinition(["a", "b", "c"]),
deps=[AssetDep(unpartitioned, partition_mapping=LastPartitionMapping())],
)
def downstream() -> None: ...

defs = Definitions([downstream, unpartitioned])
instance = DagsterInstance.ephemeral()
asset_graph_view = AssetGraphView.for_test(defs, instance)

unpartitioned_full = asset_graph_view.get_full_subset(key=unpartitioned.key)
unpartitioned_empty = asset_graph_view.get_empty_subset(key=unpartitioned.key)

downstream_full = asset_graph_view.get_full_subset(key=downstream.key)
downstream_empty = asset_graph_view.get_empty_subset(key=downstream.key)

assert unpartitioned_full.compute_child_subset(child_key=downstream.key) == downstream_full
assert unpartitioned_empty.compute_child_subset(child_key=downstream.key) == downstream_empty

assert downstream_full.compute_parent_subset(parent_key=unpartitioned.key) == unpartitioned_full
assert (
downstream_empty.compute_parent_subset(parent_key=unpartitioned.key) == unpartitioned_empty
)

assert asset_graph_view.compute_parent_subset_and_required_but_not_existent_parent_subset(
parent_key=unpartitioned.key, subset=downstream_full
) == (unpartitioned_full, unpartitioned_empty)

assert asset_graph_view.compute_parent_subset_and_required_but_not_existent_parent_subset(
parent_key=unpartitioned.key, subset=downstream_empty
) == (unpartitioned_empty, unpartitioned_empty)


def test_upstream_of_unpartitioned_partition_mapping() -> None:
@asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c"]))
def upstream() -> None: ...
Expand Down

0 comments on commit 7d1ff8b

Please sign in to comment.