Skip to content

Commit

Permalink
Add all_linkable_specs_required_for_source_nodes property to SourceNo…
Browse files Browse the repository at this point in the history
…deRecipe

Storing this on SourceNodeRecipe allows us to keep logic for converting to base grain in one place instead of needing to propogate that in several places and add a comment explaining each one. This commit also adds a little bit of type cleanup to the same class.
  • Loading branch information
courtneyholcomb committed Oct 23, 2024
1 parent 3981f9e commit 30c09b6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
14 changes: 7 additions & 7 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def _build_aggregated_conversion_node(
# Filter the source nodes with only the required specs needed for the calculation
constant_property_specs = []
required_local_specs = [base_measure_spec.measure_spec, entity_spec, base_time_dimension_spec] + list(
base_measure_recipe.required_local_linkable_specs
base_measure_recipe.required_local_linkable_specs.as_tuple
)
for constant_property in constant_properties or []:
base_property_spec = self._semantic_model_lookup.get_element_spec_for_name(constant_property.base_property)
Expand Down Expand Up @@ -361,11 +361,12 @@ def _build_aggregated_conversion_node(
constant_properties=constant_property_specs,
)

# Aggregate the conversion events with the JoinConversionEventsNode as the source node
# Aggregate the conversion events with the JoinConversionEventsNode as the source node.
recipe_with_join_conversion_source_node = SourceNodeRecipe(
source_node=join_conversion_node,
required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs,
join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes,
all_linkable_specs_required_for_source_nodes=base_measure_recipe.all_linkable_specs_required_for_source_nodes,
)
# TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model
# In this case we override the measure recipe, which currently results in us bypassing predicate pushdown
Expand Down Expand Up @@ -1236,13 +1237,14 @@ def _find_source_node_recipe_non_cached(
)
return SourceNodeRecipe(
source_node=node_with_lowest_cost_plan,
required_local_linkable_specs=(
required_local_linkable_specs=LinkableSpecSet.create_from_specs(
evaluation.local_linkable_specs
+ required_local_entity_specs
+ required_local_dimension_specs
+ required_local_time_dimension_specs
),
join_linkable_instances_recipes=node_to_evaluation[node_with_lowest_cost_plan].join_recipes,
all_linkable_specs_required_for_source_nodes=linkable_specs_to_satisfy,
)

logger.error(LazyFormat(lambda: "No recipe could be constructed."))
Expand Down Expand Up @@ -1643,7 +1645,7 @@ def _build_aggregated_measure_from_measure_source_node(
filtered_measure_source_node = FilterElementsNode.create(
parent_node=join_to_time_spine_node or time_range_node or measure_recipe.source_node,
include_specs=InstanceSpecSet(measure_specs=(measure_spec,)).merge(
group_specs_by_type(measure_recipe.required_local_linkable_specs),
measure_recipe.required_local_linkable_specs.as_instance_spec_set,
),
)

Expand All @@ -1656,9 +1658,7 @@ def _build_aggregated_measure_from_measure_source_node(
)

specs_to_keep_after_join = InstanceSpecSet(measure_specs=(measure_spec,)).merge(
InstanceSpecSet.create_from_specs(
required_linkable_specs.replace_custom_granularity_with_base_granularity().as_tuple
),
InstanceSpecSet.create_from_specs(measure_recipe.all_linkable_specs_required_for_source_nodes.as_tuple),
)

after_join_filtered_node = FilterElementsNode.create(
Expand Down
5 changes: 3 additions & 2 deletions metricflow/dataflow/builder/source_node_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from typing import List, Tuple

from metricflow_semantics.specs.instance_spec import LinkableInstanceSpec
from metricflow_semantics.specs.linkable_spec_set import LinkableSpecSet

from metricflow.dataflow.builder.node_evaluator import JoinLinkableInstancesRecipe
from metricflow.dataflow.dataflow_plan import DataflowPlanNode
Expand All @@ -15,8 +15,9 @@ class SourceNodeRecipe:
"""Get a recipe for how to build a dataflow plan node that outputs measures and linkable instances as needed."""

source_node: DataflowPlanNode
required_local_linkable_specs: Tuple[LinkableInstanceSpec, ...]
required_local_linkable_specs: LinkableSpecSet
join_linkable_instances_recipes: Tuple[JoinLinkableInstancesRecipe, ...]
all_linkable_specs_required_for_source_nodes: LinkableSpecSet

@property
def join_targets(self) -> List[JoinDescription]:
Expand Down

0 comments on commit 30c09b6

Please sign in to comment.