Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved source node logic for custom granularities #1427

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ def contains_metric_time(self) -> bool:
def time_dimension_specs_with_custom_grain(self) -> Tuple[TimeDimensionSpec, ...]: # noqa: D102
return tuple([spec for spec in self.time_dimension_specs if spec.time_granularity.is_custom_granularity])

def replace_custom_granularity_with_base_granularity(self) -> LinkableSpecSet:
"""Return the same spec set, replacing any custom time granularity with its base granularity."""
return LinkableSpecSet(
dimension_specs=self.dimension_specs,
time_dimension_specs=tuple(
[time_dimension_spec.with_base_grain() for time_dimension_spec in self.time_dimension_specs]
),
entity_specs=self.entity_specs,
group_by_metric_specs=self.group_by_metric_specs,
)

def included_agg_time_dimension_specs_for_metric(
self, metric_reference: MetricReference, metric_lookup: MetricLookup
) -> List[TimeDimensionSpec]:
Expand Down
66 changes: 30 additions & 36 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@


@dataclass(frozen=True)
class DataflowRecipe:
class SourceNodeRecipe:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woman making hand gesture for 'very nice'

"""Get a recipe for how to build a dataflow plan node that outputs measures and linkable instances as needed."""

source_node: DataflowPlanNode
Expand Down Expand Up @@ -277,13 +277,13 @@ def _build_aggregated_conversion_node(
queried_linkable_specs=queried_linkable_specs,
filter_specs=base_measure_spec.filter_specs,
)
base_measure_recipe = self._find_dataflow_recipe(
base_measure_recipe = self._find_source_node_recipe(
measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]),
predicate_pushdown_state=time_range_only_pushdown_state,
linkable_spec_set=base_required_linkable_specs,
)
logger.debug(LazyFormat(lambda: f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}"))
conversion_measure_recipe = self._find_dataflow_recipe(
conversion_measure_recipe = self._find_source_node_recipe(
measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]),
predicate_pushdown_state=disabled_pushdown_state,
linkable_spec_set=LinkableSpecSet(),
Expand Down Expand Up @@ -368,7 +368,7 @@ def _build_aggregated_conversion_node(
)

# Aggregate the conversion events with the JoinConversionEventsNode as the source node
recipe_with_join_conversion_source_node = DataflowRecipe(
recipe_with_join_conversion_source_node = SourceNodeRecipe(
source_node=join_conversion_node,
required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs,
join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes,
Expand Down Expand Up @@ -800,7 +800,7 @@ def _build_plan_for_distinct_values(
predicate_pushdown_state = PredicatePushdownState(
time_range_constraint=query_spec.time_range_constraint, where_filter_specs=query_level_filter_specs
)
dataflow_recipe = self._find_dataflow_recipe(
dataflow_recipe = self._find_source_node_recipe(
linkable_spec_set=required_linkable_specs, predicate_pushdown_state=predicate_pushdown_state
)
if not dataflow_recipe:
Expand Down Expand Up @@ -907,18 +907,8 @@ def _select_source_nodes_with_linkable_specs(
# Use a dictionary to dedupe for consistent ordering.
selected_nodes: Dict[DataflowPlanNode, None] = {}

# Find the source node that will satisfy the base granularity. Custom granularities will be joined in later.
linkable_specs_set_with_base_granularities: Set[LinkableInstanceSpec] = set()
# TODO: Add support for no-metrics queries for custom grains without a join (i.e., select directly from time spine).
for linkable_spec in linkable_specs.as_tuple:
if isinstance(linkable_spec, TimeDimensionSpec) and linkable_spec.time_granularity.is_custom_granularity:
linkable_spec_with_base_grain = linkable_spec.with_grain(
ExpandedTimeGranularity.from_time_granularity(linkable_spec.time_granularity.base_granularity)
)
linkable_specs_set_with_base_granularities.add(linkable_spec_with_base_grain)
else:
linkable_specs_set_with_base_granularities.add(linkable_spec)

linkable_specs_set_with_base_granularities = set(linkable_specs.as_tuple)
for source_node in source_nodes:
output_spec_set = self._node_data_set_resolver.get_output_data_set(source_node).instance_set.spec_set
all_linkable_specs_in_node = set(output_spec_set.linkable_specs)
Expand Down Expand Up @@ -984,16 +974,21 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
non_additive_dimension_spec=non_additive_dimension_spec,
)

def _find_dataflow_recipe(
def _find_source_node_recipe(
self,
linkable_spec_set: LinkableSpecSet,
predicate_pushdown_state: PredicatePushdownState,
measure_spec_properties: Optional[MeasureSpecProperties] = None,
) -> Optional[DataflowRecipe]:
linkable_specs = linkable_spec_set.as_tuple
) -> Optional[SourceNodeRecipe]:
"""Find the most suitable source nodes to satisfy the requested specs, as well as how to join them."""
candidate_nodes_for_left_side_of_join: List[DataflowPlanNode] = []
candidate_nodes_for_right_side_of_join: List[DataflowPlanNode] = []

# Replace any custom granularities with their base granularities. The custom granularity will be joined in
# later, since custom granularities cannot be satisfied by source nodes. But we will need the dimension at
# base granularity from the source node in order to join to the appropriate time spine later.
Comment on lines +987 to +989
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computer kid thumbs up - "quality work!"

linkable_specs_to_satisfy = linkable_spec_set.replace_custom_granularity_with_base_granularity()
linkable_specs_to_satisfy_tuple = linkable_specs_to_satisfy.as_tuple
if measure_spec_properties:
candidate_nodes_for_right_side_of_join += self._source_node_set.source_nodes_for_metric_queries
candidate_nodes_for_left_side_of_join += self._select_source_nodes_with_measures(
Expand All @@ -1005,15 +1000,15 @@ def _find_dataflow_recipe(
candidate_nodes_for_right_side_of_join += list(self._source_node_set.source_nodes_for_group_by_item_queries)
candidate_nodes_for_left_side_of_join += list(
self._select_source_nodes_with_linkable_specs(
linkable_specs=linkable_spec_set,
linkable_specs=linkable_specs_to_satisfy,
source_nodes=self._source_node_set.source_nodes_for_group_by_item_queries,
)
)
# If metric_time is requested without metrics, choose appropriate time spine node to select those values from.
if linkable_spec_set.metric_time_specs:
if linkable_specs_to_satisfy.metric_time_specs:
time_spine_node = self._source_node_set.time_spine_nodes[
TimeSpineSource.choose_time_spine_source(
required_time_spine_specs=linkable_spec_set.metric_time_specs,
required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs,
time_spine_sources=self._source_node_builder.time_spine_sources,
).base_granularity
]
Expand Down Expand Up @@ -1052,7 +1047,7 @@ def _find_dataflow_recipe(
)

candidate_nodes_for_right_side_of_join = node_processor.remove_unnecessary_nodes(
desired_linkable_specs=linkable_specs,
desired_linkable_specs=linkable_specs_to_satisfy_tuple,
nodes=candidate_nodes_for_right_side_of_join,
metric_time_dimension_reference=self._metric_time_dimension_reference,
time_spine_nodes=self._source_node_set.time_spine_nodes_tuple,
Expand All @@ -1064,10 +1059,10 @@ def _find_dataflow_recipe(
)
)
# TODO: test multi-hop with custom grains
if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs):
if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs_to_satisfy_tuple):
candidate_nodes_for_right_side_of_join = list(
node_processor.add_multi_hop_joins(
desired_linkable_specs=linkable_specs,
desired_linkable_specs=linkable_specs_to_satisfy_tuple,
nodes=candidate_nodes_for_right_side_of_join,
join_type=default_join_type,
)
Expand All @@ -1084,14 +1079,16 @@ def _find_dataflow_recipe(
# We do this at query time instead of during usual source node generation because the number of potential
# MetricGroupBy source nodes could be extremely large (and potentially slow).
logger.debug(
LazyFormat(lambda: f"Building source nodes for group by metrics: {linkable_spec_set.group_by_metric_specs}")
LazyFormat(
lambda: f"Building source nodes for group by metrics: {linkable_specs_to_satisfy.group_by_metric_specs}"
)
)
candidate_nodes_for_right_side_of_join += [
self._build_query_output_node(
query_spec=self._source_node_builder.build_source_node_inputs_for_group_by_metric(group_by_metric_spec),
for_group_by_source_node=True,
)
for group_by_metric_spec in linkable_spec_set.group_by_metric_specs
for group_by_metric_spec in linkable_specs_to_satisfy.group_by_metric_specs
]

logger.debug(LazyFormat(lambda: f"Processing nodes took: {time.time()-start_time:.2f}s"))
Expand Down Expand Up @@ -1134,7 +1131,7 @@ def _find_dataflow_recipe(
start_time = time.time()
evaluation = node_evaluator.evaluate_node(
left_node=node,
required_linkable_specs=list(linkable_specs),
required_linkable_specs=list(linkable_specs_to_satisfy_tuple),
default_join_type=default_join_type,
)
logger.debug(LazyFormat(lambda: f"Evaluation of {node} took {time.time() - start_time:.2f}s"))
Expand Down Expand Up @@ -1203,7 +1200,7 @@ def _find_dataflow_recipe(
for x in evaluation.join_recipes
for y in x.join_on_partition_time_dimensions
)
return DataflowRecipe(
return SourceNodeRecipe(
source_node=node_with_lowest_cost_plan,
required_local_linkable_specs=(
evaluation.local_linkable_specs
Expand Down Expand Up @@ -1392,7 +1389,7 @@ def build_aggregated_measure(
metric_input_measure_spec: MetricInputMeasureSpec,
queried_linkable_specs: LinkableSpecSet,
predicate_pushdown_state: PredicatePushdownState,
measure_recipe: Optional[DataflowRecipe] = None,
measure_recipe: Optional[SourceNodeRecipe] = None,
) -> DataflowPlanNode:
"""Returns a node where the measures are aggregated by the linkable specs and constrained appropriately.

Expand Down Expand Up @@ -1457,7 +1454,7 @@ def _build_aggregated_measure_from_measure_source_node(
metric_input_measure_spec: MetricInputMeasureSpec,
queried_linkable_specs: LinkableSpecSet,
predicate_pushdown_state: PredicatePushdownState,
measure_recipe: Optional[DataflowRecipe] = None,
measure_recipe: Optional[SourceNodeRecipe] = None,
) -> DataflowPlanNode:
measure_spec = metric_input_measure_spec.measure_spec
cumulative = metric_input_measure_spec.cumulative_description is not None
Expand Down Expand Up @@ -1531,7 +1528,7 @@ def _build_aggregated_measure_from_measure_source_node(
)

find_recipe_start_time = time.time()
measure_recipe = self._find_dataflow_recipe(
measure_recipe = self._find_source_node_recipe(
measure_spec_properties=measure_properties,
predicate_pushdown_state=measure_pushdown_state,
linkable_spec_set=required_linkable_specs,
Expand Down Expand Up @@ -1613,10 +1610,7 @@ def _build_aggregated_measure_from_measure_source_node(

specs_to_keep_after_join = InstanceSpecSet(measure_specs=(measure_spec,)).merge(
InstanceSpecSet.create_from_specs(
[
spec.with_base_grain() if isinstance(spec, TimeDimensionSpec) else spec
for spec in required_linkable_specs.as_tuple
]
required_linkable_specs.replace_custom_granularity_with_base_granularity().as_tuple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this class is fairly complex, so I'm going to give up and just ask - I thought that by moving one of the replace_custom_granularity_with_base_granularity calls to _find_source_node_recipe, we obviated the need to do this again anywhere else. Was that impression wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that was the plan! I hadn't realized this use case was outside of that function.
I took a stab at seeing if we could add this spec set as a property to the SourceNodeRecipe to avoid calculating it again here, which would clean things up a bit. It works everywhere except for conversion metrics, which have a separate SourceNodeRecipe that they build. I tried to mess around with that code to make it work but I'm not familiar enough with conversion metrics to make any significant changes without consulting someone so I scrapped it.
Basically the blocker is that we haven't consolidated this code enough to make this simple. I would love to clean up the whole DataflowPlanBuilder to remove redundancies, and I we would certainly be able to clean this piece up then, but I think that's too big a task to block this feature.

),
)

Expand Down
7 changes: 1 addition & 6 deletions metricflow/dataflow/builder/node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from metricflow_semantics.specs.entity_spec import LinklessEntitySpec
from metricflow_semantics.specs.instance_spec import LinkableInstanceSpec
from metricflow_semantics.specs.spec_set import group_specs_by_type
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.sql.sql_join_type import SqlJoinType

from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
Expand Down Expand Up @@ -408,10 +407,6 @@ def evaluate_node(
logger.debug(LazyFormat(lambda: f"Candidate spec set is:\n{mf_pformat(candidate_spec_set)}"))

data_set_linkable_specs = candidate_spec_set.linkable_specs
# Look for which nodes can satisfy the linkable specs at their base grains. Custom grains will be joined later.
required_linkable_specs_with_base_grains = [
spec.with_base_grain() if isinstance(spec, TimeDimensionSpec) else spec for spec in required_linkable_specs
]

# These are linkable specs in the start node data set. Those are considered "local".
local_linkable_specs: List[LinkableInstanceSpec] = []
Expand All @@ -421,7 +416,7 @@ def evaluate_node(

# Group required_linkable_specs into local / un-joinable / or possibly joinable.
unjoinable_linkable_specs = []
for required_linkable_spec in required_linkable_specs_with_base_grains:
for required_linkable_spec in required_linkable_specs:
is_metric_time = required_linkable_spec.element_name == DataSet.metric_time_dimension_name()
is_local = required_linkable_spec in data_set_linkable_specs
is_unjoinable = (
Expand Down
Loading