diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 56d2aa7398b30..b852362bb5791 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -9,6 +9,8 @@ Literal, NamedTuple, Optional, + Sequence, + Tuple, Type, TypeVar, ) @@ -24,6 +26,7 @@ PartitionDimensionDefinition, ) from dagster._core.definitions.partition import AllPartitionsSubset +from dagster._core.definitions.partition_mapping import UpstreamPartitionsResult from dagster._core.definitions.time_window_partitions import ( TimeWindow, TimeWindowPartitionsDefinition, @@ -227,6 +230,30 @@ def get_asset_subset_from_asset_partitions( ) return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) + def compute_parent_subset_and_required_but_nonexistent_partition_keys( + self, parent_key, subset: EntitySubset[T_EntityKey] + ) -> Tuple[EntitySubset[AssetKey], Sequence[str]]: + check.invariant( + parent_key in self.asset_graph.get(subset.key).parent_entity_keys, + ) + to_key = parent_key + to_partitions_def = self.asset_graph.get(to_key).partitions_def + + if subset.is_empty: + return self.get_empty_subset(key=parent_key), [] + elif to_partitions_def is None: + return self.get_full_subset(key=to_key), [] + + upstream_partition_result = self._compute_upstream_partitions_result(to_key, subset) + + parent_subset = EntitySubset( + self, + key=to_key, + value=_ValidatedEntitySubsetValue(upstream_partition_result.partitions_subset), + ) + + return parent_subset, upstream_partition_result.required_but_nonexistent_partition_keys + def compute_parent_subset( self, parent_key: AssetKey, subset: EntitySubset[T_EntityKey] ) -> EntitySubset[AssetKey]: @@ -243,6 +270,25 @@ def compute_child_subset( ) return self.compute_mapped_subset(child_key, subset, direction="down") + def _compute_upstream_partitions_result( + self, to_key: T_EntityKey, from_subset: EntitySubset + ) -> UpstreamPartitionsResult: + from_key = from_subset.key + parent_key = to_key + partition_mapping = self.asset_graph.get_partition_mapping(from_key, parent_key) + from_partitions_def = self.asset_graph.get(from_key).partitions_def + to_partitions_def = self.asset_graph.get(to_key).partitions_def + + return partition_mapping.get_upstream_mapped_partitions_result_for_partitions( + downstream_partitions_subset=from_subset.get_internal_subset_value() + if from_partitions_def is not None + else None, + downstream_partitions_def=from_partitions_def, + upstream_partitions_def=check.not_none(to_partitions_def), + dynamic_partitions_store=self._queryer, + current_time=self.effective_dt, + ) + def compute_mapped_subset( self, to_key: T_EntityKey, from_subset: EntitySubset, direction: Literal["up", "down"] ) -> EntitySubset[T_EntityKey]: @@ -281,17 +327,9 @@ def compute_mapped_subset( parent_key = to_key partition_mapping = self.asset_graph.get_partition_mapping(child_key, parent_key) - to_partitions_subset = ( - partition_mapping.get_upstream_mapped_partitions_result_for_partitions( - downstream_partitions_subset=from_subset.get_internal_subset_value() - if from_partitions_def is not None - else None, - downstream_partitions_def=from_partitions_def, - upstream_partitions_def=to_partitions_def, - dynamic_partitions_store=self._queryer, - current_time=self.effective_dt, - ).partitions_subset - ) + to_partitions_subset = self._compute_upstream_partitions_result( + to_key, from_subset + ).partitions_subset return EntitySubset( self, diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index 996ffa059e2c5..861c541c6ec00 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -180,7 +180,7 @@ class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [ """Maps every partition in the downstream asset to every partition in the upstream asset. Commonly used in the case when the downstream asset is not partitioned, in which the entire - downstream asset depends on all partitions of the usptream asset. + downstream asset depends on all partitions of the upstream asset. """ def get_upstream_mapped_partitions_result_for_partitions( diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py index 43696c2141fd6..c63b72f8e6d2e 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py @@ -1,4 +1,4 @@ -from dagster import AssetDep, Definitions, asset +from dagster import AssetDep, Definitions, IdentityPartitionMapping, asset from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView from dagster._core.definitions.asset_check_spec import AssetCheckSpec from dagster._core.definitions.events import AssetKeyPartitionKey @@ -27,6 +27,46 @@ def an_asset() -> None: ... assert asset_graph_view_t0.asset_graph.get_all_asset_keys() == {an_asset.key} +def test_upstream_non_existent_partitions(): + xy = StaticPartitionsDefinition(["x", "y"]) + zx = StaticPartitionsDefinition(["z", "x"]) + + @asset(partitions_def=xy) + def up_asset() -> None: ... + + @asset( + deps=[AssetDep(up_asset, partition_mapping=IdentityPartitionMapping())], + partitions_def=zx, + ) + def down_asset(): ... + + defs = Definitions([up_asset, down_asset]) + + with DagsterInstance.ephemeral() as instance: + asset_graph_view = AssetGraphView.for_test(defs, instance) + down_subset = asset_graph_view.get_full_subset(key=down_asset.key) + assert down_subset.expensively_compute_partition_keys() == {"x", "z"} + + parent_subset, required_but_nonexistent_partition_keys = ( + asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys( + parent_key=up_asset.key, subset=down_subset + ) + ) + + assert parent_subset.expensively_compute_partition_keys() == {"x"} + assert required_but_nonexistent_partition_keys == ["z"] + + # Mapping onto an empty subset is empty + empty_down_subset = asset_graph_view.get_empty_subset(key=down_asset.key) + parent_subset, required_but_nonexistent_partition_keys = ( + asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys( + parent_key=up_asset.key, subset=empty_down_subset + ) + ) + assert parent_subset.is_empty + assert required_but_nonexistent_partition_keys == [] + + def test_subset_traversal_static_partitions() -> None: number_keys = {"1", "2", "3"} letter_keys = {"a", "b", "c"} @@ -133,6 +173,43 @@ def up_numbers() -> None: ... ) +def test_downstream_of_unpartitioned_partition_mapping() -> None: + @asset + def unpartitioned() -> None: ... + + @asset( + partitions_def=StaticPartitionsDefinition(["a", "b", "c"]), + deps=[AssetDep(unpartitioned, partition_mapping=LastPartitionMapping())], + ) + def downstream() -> None: ... + + defs = Definitions([downstream, unpartitioned]) + instance = DagsterInstance.ephemeral() + asset_graph_view = AssetGraphView.for_test(defs, instance) + + unpartitioned_full = asset_graph_view.get_full_subset(key=unpartitioned.key) + unpartitioned_empty = asset_graph_view.get_empty_subset(key=unpartitioned.key) + + downstream_full = asset_graph_view.get_full_subset(key=downstream.key) + downstream_empty = asset_graph_view.get_empty_subset(key=downstream.key) + + assert unpartitioned_full.compute_child_subset(child_key=downstream.key) == downstream_full + assert unpartitioned_empty.compute_child_subset(child_key=downstream.key) == downstream_empty + + assert downstream_full.compute_parent_subset(parent_key=unpartitioned.key) == unpartitioned_full + assert ( + downstream_empty.compute_parent_subset(parent_key=unpartitioned.key) == unpartitioned_empty + ) + + assert asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys( + parent_key=unpartitioned.key, subset=downstream_full + ) == (unpartitioned_full, []) + + assert asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys( + parent_key=unpartitioned.key, subset=downstream_empty + ) == (unpartitioned_empty, []) + + def test_upstream_of_unpartitioned_partition_mapping() -> None: @asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c"])) def upstream() -> None: ...