Skip to content

Commit

Permalink
Better handling for satisfying specs with base grain in dataflow plan
Browse files Browse the repository at this point in the history
Consolidates that logic into _find_source_node_recipe() as much as possible, removes isinstance() checks, and clarifies comments.
  • Loading branch information
courtneyholcomb committed Sep 26, 2024
1 parent 1ce044b commit 67ed80c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ def contains_metric_time(self) -> bool:
def time_dimension_specs_with_custom_grain(self) -> Tuple[TimeDimensionSpec, ...]: # noqa: D102
return tuple([spec for spec in self.time_dimension_specs if spec.time_granularity.is_custom_granularity])

def replace_custom_granularity_with_base_granularity(self) -> LinkableSpecSet:
"""Return the same spec set, replacing any custom time granularity with its base granularity."""
return LinkableSpecSet(
dimension_specs=self.dimension_specs,
time_dimension_specs=tuple(
[time_dimension_spec.with_base_grain() for time_dimension_spec in self.time_dimension_specs]
),
entity_specs=self.entity_specs,
group_by_metric_specs=self.group_by_metric_specs,
)

def included_agg_time_dimension_specs_for_metric(
self, metric_reference: MetricReference, metric_lookup: MetricLookup
) -> List[TimeDimensionSpec]:
Expand Down
43 changes: 18 additions & 25 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,18 +907,8 @@ def _select_source_nodes_with_linkable_specs(
# Use a dictionary to dedupe for consistent ordering.
selected_nodes: Dict[DataflowPlanNode, None] = {}

# Find the source node that will satisfy the base granularity. Custom granularities will be joined in later.
linkable_specs_set_with_base_granularities: Set[LinkableInstanceSpec] = set()
# TODO: Add support for no-metrics queries for custom grains without a join (i.e., select directly from time spine).
for linkable_spec in linkable_specs.as_tuple:
if isinstance(linkable_spec, TimeDimensionSpec) and linkable_spec.time_granularity.is_custom_granularity:
linkable_spec_with_base_grain = linkable_spec.with_grain(
ExpandedTimeGranularity.from_time_granularity(linkable_spec.time_granularity.base_granularity)
)
linkable_specs_set_with_base_granularities.add(linkable_spec_with_base_grain)
else:
linkable_specs_set_with_base_granularities.add(linkable_spec)

linkable_specs_set_with_base_granularities = set(linkable_specs.as_tuple)
for source_node in source_nodes:
output_spec_set = self._node_data_set_resolver.get_output_data_set(source_node).instance_set.spec_set
all_linkable_specs_in_node = set(output_spec_set.linkable_specs)
Expand Down Expand Up @@ -991,10 +981,14 @@ def _find_source_node_recipe(
measure_spec_properties: Optional[MeasureSpecProperties] = None,
) -> Optional[SourceNodeRecipe]:
"""Find the most suitable source nodes to satisfy the requested specs, as well as how to join them."""
linkable_specs = linkable_spec_set.as_tuple
candidate_nodes_for_left_side_of_join: List[DataflowPlanNode] = []
candidate_nodes_for_right_side_of_join: List[DataflowPlanNode] = []

# Replace any custom granularities with their base granularities. The custom granularity will be joined in
# later, since custom granularities cannot be satisfied by source nodes. But we will need the dimension at
# base granularity from the source node in order to join to the appropriate time spine later.
linkable_specs_to_satisfy = linkable_spec_set.replace_custom_granularity_with_base_granularity()
linkable_specs_to_satisfy_tuple = linkable_specs_to_satisfy.as_tuple
if measure_spec_properties:
candidate_nodes_for_right_side_of_join += self._source_node_set.source_nodes_for_metric_queries
candidate_nodes_for_left_side_of_join += self._select_source_nodes_with_measures(
Expand All @@ -1006,15 +1000,15 @@ def _find_source_node_recipe(
candidate_nodes_for_right_side_of_join += list(self._source_node_set.source_nodes_for_group_by_item_queries)
candidate_nodes_for_left_side_of_join += list(
self._select_source_nodes_with_linkable_specs(
linkable_specs=linkable_spec_set,
linkable_specs=linkable_specs_to_satisfy,
source_nodes=self._source_node_set.source_nodes_for_group_by_item_queries,
)
)
# If metric_time is requested without metrics, choose appropriate time spine node to select those values from.
if linkable_spec_set.metric_time_specs:
if linkable_specs_to_satisfy.metric_time_specs:
time_spine_node = self._source_node_set.time_spine_nodes[
TimeSpineSource.choose_time_spine_source(
required_time_spine_specs=linkable_spec_set.metric_time_specs,
required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs,
time_spine_sources=self._source_node_builder.time_spine_sources,
).base_granularity
]
Expand Down Expand Up @@ -1053,7 +1047,7 @@ def _find_source_node_recipe(
)

candidate_nodes_for_right_side_of_join = node_processor.remove_unnecessary_nodes(
desired_linkable_specs=linkable_specs,
desired_linkable_specs=linkable_specs_to_satisfy_tuple,
nodes=candidate_nodes_for_right_side_of_join,
metric_time_dimension_reference=self._metric_time_dimension_reference,
time_spine_nodes=self._source_node_set.time_spine_nodes_tuple,
Expand All @@ -1065,10 +1059,10 @@ def _find_source_node_recipe(
)
)
# TODO: test multi-hop with custom grains
if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs):
if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs_to_satisfy_tuple):
candidate_nodes_for_right_side_of_join = list(
node_processor.add_multi_hop_joins(
desired_linkable_specs=linkable_specs,
desired_linkable_specs=linkable_specs_to_satisfy_tuple,
nodes=candidate_nodes_for_right_side_of_join,
join_type=default_join_type,
)
Expand All @@ -1085,14 +1079,16 @@ def _find_source_node_recipe(
# We do this at query time instead of during usual source node generation because the number of potential
# MetricGroupBy source nodes could be extremely large (and potentially slow).
logger.debug(
LazyFormat(lambda: f"Building source nodes for group by metrics: {linkable_spec_set.group_by_metric_specs}")
LazyFormat(
lambda: f"Building source nodes for group by metrics: {linkable_specs_to_satisfy.group_by_metric_specs}"
)
)
candidate_nodes_for_right_side_of_join += [
self._build_query_output_node(
query_spec=self._source_node_builder.build_source_node_inputs_for_group_by_metric(group_by_metric_spec),
for_group_by_source_node=True,
)
for group_by_metric_spec in linkable_spec_set.group_by_metric_specs
for group_by_metric_spec in linkable_specs_to_satisfy.group_by_metric_specs
]

logger.debug(LazyFormat(lambda: f"Processing nodes took: {time.time()-start_time:.2f}s"))
Expand Down Expand Up @@ -1135,7 +1131,7 @@ def _find_source_node_recipe(
start_time = time.time()
evaluation = node_evaluator.evaluate_node(
left_node=node,
required_linkable_specs=list(linkable_specs),
required_linkable_specs=list(linkable_specs_to_satisfy_tuple),
default_join_type=default_join_type,
)
logger.debug(LazyFormat(lambda: f"Evaluation of {node} took {time.time() - start_time:.2f}s"))
Expand Down Expand Up @@ -1614,10 +1610,7 @@ def _build_aggregated_measure_from_measure_source_node(

specs_to_keep_after_join = InstanceSpecSet(measure_specs=(measure_spec,)).merge(
InstanceSpecSet.create_from_specs(
[
spec.with_base_grain() if isinstance(spec, TimeDimensionSpec) else spec
for spec in required_linkable_specs.as_tuple
]
required_linkable_specs.replace_custom_granularity_with_base_granularity().as_tuple
),
)

Expand Down
7 changes: 1 addition & 6 deletions metricflow/dataflow/builder/node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from metricflow_semantics.specs.entity_spec import LinklessEntitySpec
from metricflow_semantics.specs.instance_spec import LinkableInstanceSpec
from metricflow_semantics.specs.spec_set import group_specs_by_type
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.sql.sql_join_type import SqlJoinType

from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
Expand Down Expand Up @@ -408,10 +407,6 @@ def evaluate_node(
logger.debug(LazyFormat(lambda: f"Candidate spec set is:\n{mf_pformat(candidate_spec_set)}"))

data_set_linkable_specs = candidate_spec_set.linkable_specs
# Look for which nodes can satisfy the linkable specs at their base grains. Custom grains will be joined later.
required_linkable_specs_with_base_grains = [
spec.with_base_grain() if isinstance(spec, TimeDimensionSpec) else spec for spec in required_linkable_specs
]

# These are linkable specs in the start node data set. Those are considered "local".
local_linkable_specs: List[LinkableInstanceSpec] = []
Expand All @@ -421,7 +416,7 @@ def evaluate_node(

# Group required_linkable_specs into local / un-joinable / or possibly joinable.
unjoinable_linkable_specs = []
for required_linkable_spec in required_linkable_specs_with_base_grains:
for required_linkable_spec in required_linkable_specs:
is_metric_time = required_linkable_spec.element_name == DataSet.metric_time_dimension_name()
is_local = required_linkable_spec in data_set_linkable_specs
is_unjoinable = (
Expand Down

0 comments on commit 67ed80c

Please sign in to comment.