Skip to content

Commit

Permalink
moar refactor
Browse files Browse the repository at this point in the history
> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Nov 19, 2024
1 parent b84c5f7 commit 10f5893
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
32 changes: 18 additions & 14 deletions python_modules/dagster/dagster/_core/asset_graph_view/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.time_window_partitions import get_time_partitions_def
from dagster._record import record


@record
class AssetGraphViewBfsFilterConditionResult:
class AssetGraphViewBfsFilterConditionResult(NamedTuple):
passed_subset_value: EntitySubsetValue
excluded_subset_values_and_reasons: Sequence[Tuple[EntitySubsetValue, str]]

Expand All @@ -29,23 +27,28 @@ def bfs_filter_asset_graph_view(
],
initial_asset_subset: "AssetGraphSubset",
) -> Tuple[AssetGraphSubset, Sequence[Tuple[AssetGraphSubset, str]]]:
"""Returns asset partitions within the graph that satisfy supplied criteria.
"""Returns the subset of the graph that satisfy supplied criteria.
- Are >= initial_asset_partitions
- Are >= initial_asset_subset
- Match the condition_fn
- Any of their ancestors >= initial_asset_partitions match the condition_fn
Also returns a list of tuples, where each tuple is a candidated_unit (list of
AssetKeyPartitionKeys that must be materialized together - ie multi_asset) that do not
satisfy the criteria and the reason they were filtered out.
Also returns a list of tuples, where each tuple is an asset subset that did not
satisfy the condition and the reason they were filtered out.
The condition_fn should return a tuple of a boolean indicating whether the asset partition meets
the condition and a string explaining why it does not meet the condition, if applicable.
The condition_fn takes in:
- a set of asset keys. If there is more than one, the asset keys are part of the same
execution set (i.e. non-subsettable multi-asset)
- The EntitySubsetValue representing the partitions being evaluated for those key(s) for
the condition.
- An AssetGraphSubset for the portion of the graph that has so far been visited and passed
the condition.
Visits parents before children.
The condition_fn should return a object with an EntitySubsetValue indicating the partitions
that passed the condition for the suppleid asset keys, and a list of (EntitySubsetValue, str)
tuples with more information about why certain subsets were excluded.
When asset partitions are part of the same execution set (non-subsettable multi-asset),
they're provided all at once to the condition_fn.
Visits parents before children.
"""
initial_subsets = list(
initial_asset_subset.iterate_asset_subsets(asset_graph=asset_graph_view.asset_graph)
Expand All @@ -65,8 +68,8 @@ def bfs_filter_asset_graph_view(

while len(queue) > 0:
candidate_keys, candidate_subset_value = queue.dequeue()

condition_result = condition_fn(candidate_keys, candidate_subset_value, result)

subset_that_meets_condition = condition_result.passed_subset_value
fail_reasons = condition_result.excluded_subset_values_and_reasons

Expand All @@ -92,6 +95,7 @@ def bfs_filter_asset_graph_view(
asset_graph_view.get_subset_from_serializable_subset(candidate_subset_value)
)

# Add any child subsets that have not yet been visited to the queue
for child_key in asset_graph.get(candidate_key).child_keys:
child_subset = asset_graph_view.compute_child_subset(
child_key, matching_entity_subset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,7 @@ def _get_failed_and_downstream_asset_partitions(
),
initial_asset_subset=failed_asset_graph_subset,
)[0]

return failed_and_downstream_subset


Expand Down Expand Up @@ -1669,18 +1670,18 @@ def _should_backfill_atomic_asset_partitions_unit_with_subsets(
f"Could not load subset for candidate entity subset {candidate_entity_subset} - asset may have been removed or the underlying partitions definition may have changed"
)

parent_subset, required_but_nonexistent_parent_subset = (
asset_graph_view.compute_parent_subset_and_required_but_not_existent_parent_subset(
parent_subset, required_but_nonexistent_partition_keys = (
asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys(
parent_key,
candidate_entity_subset,
)
)

if required_but_nonexistent_parent_subset:
if required_but_nonexistent_partition_keys:
raise DagsterInvariantViolationError(
f"Asset partition subset {candidate_entity_subset}"
" depends on invalid partition keys"
f" {required_but_nonexistent_parent_subset.get_internal_subset_value()}"
f" {required_but_nonexistent_partition_keys}"
)
# Children with parents that are targeted but not materialized are eligible
# to be filtered out if the parent
Expand Down

0 comments on commit 10f5893

Please sign in to comment.