Skip to content

Commit

Permalink
Store both MetricTimeDimensionTransformNodes and ReadSqlSourceNodes f…
Browse files Browse the repository at this point in the history
…or time spines

The metric time nodes are used for resolving metric_time without metrics. The read SQL nodes will be used for time spine joins.
  • Loading branch information
courtneyholcomb committed Nov 21, 2024
1 parent 44c387e commit c71061a
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 22 deletions.
6 changes: 3 additions & 3 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ def _find_source_node_recipe_non_cached(
# If metric_time is requested without metrics, choose appropriate time spine node to select those values from.
if linkable_specs_to_satisfy.metric_time_specs:
time_spine_source = self._choose_time_spine_source(linkable_specs_to_satisfy.metric_time_specs)
time_spine_node = self._source_node_set.time_spine_nodes[time_spine_source.base_granularity]
time_spine_node = self._source_node_set.time_spine_metric_time_nodes[time_spine_source.base_granularity]
candidate_nodes_for_right_side_of_join += [time_spine_node]
candidate_nodes_for_left_side_of_join += [time_spine_node]
default_join_type = SqlJoinType.FULL_OUTER
Expand Down Expand Up @@ -1077,7 +1077,7 @@ def _find_source_node_recipe_non_cached(
desired_linkable_specs=linkable_specs_to_satisfy_tuple,
nodes=candidate_nodes_for_right_side_of_join,
metric_time_dimension_reference=self._metric_time_dimension_reference,
time_spine_nodes=self._source_node_set.time_spine_nodes_tuple,
time_spine_metric_time_nodes=self._source_node_set.time_spine_metric_time_nodes_tuple,
)
logger.debug(
LazyFormat(
Expand Down Expand Up @@ -1124,7 +1124,7 @@ def _find_source_node_recipe_non_cached(
semantic_model_lookup=self._semantic_model_lookup,
nodes_available_for_joins=self._sort_by_suitability(candidate_nodes_for_right_side_of_join),
node_data_set_resolver=self._node_data_set_resolver,
time_spine_nodes=self._source_node_set.time_spine_nodes_tuple,
time_spine_metric_time_nodes=self._source_node_set.time_spine_metric_time_nodes_tuple,
)

# Dict from the node that contains the source node to the evaluation results.
Expand Down
6 changes: 3 additions & 3 deletions metricflow/dataflow/builder/node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def __init__(
semantic_model_lookup: SemanticModelLookup,
nodes_available_for_joins: Sequence[DataflowPlanNode],
node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver,
time_spine_nodes: Sequence[MetricTimeDimensionTransformNode],
time_spine_metric_time_nodes: Sequence[MetricTimeDimensionTransformNode],
) -> None:
"""Initializer.
Expand All @@ -186,7 +186,7 @@ def __init__(
self._node_data_set_resolver = node_data_set_resolver
self._partition_resolver = PartitionJoinResolver(self._semantic_model_lookup)
self._join_evaluator = SemanticModelJoinEvaluator(self._semantic_model_lookup)
self._time_spine_nodes = time_spine_nodes
self._time_spine_metric_time_nodes = time_spine_metric_time_nodes

def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(
self,
Expand All @@ -205,7 +205,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(
linkable_specs_in_right_node = data_set_in_right_node.instance_set.spec_set.linkable_specs

# If right node is time spine source node, use cross join.
if right_node in self._time_spine_nodes:
if right_node in self._time_spine_metric_time_nodes:
satisfiable_metric_time_specs = [
spec for spec in linkable_specs_in_right_node if spec in needed_linkable_specs
]
Expand Down
30 changes: 20 additions & 10 deletions metricflow/dataflow/builder/source_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ class SourceNodeSet:
# Semantic models are 1:1 mapped to a ReadSqlSourceNode.
source_nodes_for_group_by_item_queries: Tuple[DataflowPlanNode, ...]

# Provides the time spines.
time_spine_nodes: Mapping[TimeGranularity, MetricTimeDimensionTransformNode]
# Provides time spines that can be used to satisfy time spine joins, organized by granularity name.
time_spine_read_nodes: Mapping[str, ReadSqlSourceNode]

# Provides time spines that can be used to satisfy metric_time without metrics, organized by base granularity.
time_spine_metric_time_nodes: Mapping[TimeGranularity, MetricTimeDimensionTransformNode]

@property
def all_nodes(self) -> Sequence[DataflowPlanNode]: # noqa: D102
return (
self.source_nodes_for_metric_queries
+ self.source_nodes_for_group_by_item_queries
+ self.time_spine_nodes_tuple
+ self.time_spine_metric_time_nodes_tuple
)

@property
def time_spine_nodes_tuple(self) -> Tuple[MetricTimeDimensionTransformNode, ...]: # noqa: D102
return tuple(self.time_spine_nodes.values())
def time_spine_metric_time_nodes_tuple(self) -> Tuple[MetricTimeDimensionTransformNode, ...]: # noqa: D102
return tuple(self.time_spine_metric_time_nodes.values())


class SourceNodeBuilder:
Expand All @@ -65,11 +68,17 @@ def __init__( # noqa: D107
self.time_spine_sources = TimeSpineSource.build_standard_time_spine_sources(
semantic_manifest_lookup.semantic_manifest
)
self._time_spine_source_nodes = {}
for granularity, time_spine_source in self.time_spine_sources.items():

self._time_spine_read_nodes = {}
self._time_spine_metric_time_nodes = {}
for base_granularity, time_spine_source in self.time_spine_sources.items():
data_set = data_set_converter.build_time_spine_source_data_set(time_spine_source)
self._time_spine_source_nodes[granularity] = MetricTimeDimensionTransformNode.create(
parent_node=ReadSqlSourceNode.create(data_set),
read_node = ReadSqlSourceNode.create(data_set)
self._time_spine_read_nodes[base_granularity.value] = read_node
for custom_granularity in time_spine_source.custom_granularities:
self._time_spine_read_nodes[custom_granularity.name] = read_node
self._time_spine_metric_time_nodes[base_granularity] = MetricTimeDimensionTransformNode.create(
parent_node=read_node,
aggregation_time_dimension_reference=TimeDimensionReference(time_spine_source.base_column),
)

Expand Down Expand Up @@ -103,7 +112,8 @@ def create_from_data_sets(self, data_sets: Sequence[SemanticModelDataSet]) -> So
source_nodes_for_metric_queries.append(metric_time_transform_node)

return SourceNodeSet(
time_spine_nodes=self._time_spine_source_nodes,
time_spine_metric_time_nodes=self._time_spine_metric_time_nodes,
time_spine_read_nodes=self._time_spine_read_nodes,
source_nodes_for_group_by_item_queries=tuple(group_by_item_source_nodes),
source_nodes_for_metric_queries=tuple(source_nodes_for_metric_queries),
)
Expand Down
4 changes: 2 additions & 2 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def remove_unnecessary_nodes(
desired_linkable_specs: Sequence[LinkableInstanceSpec],
nodes: Sequence[DataflowPlanNode],
metric_time_dimension_reference: TimeDimensionReference,
time_spine_nodes: Sequence[MetricTimeDimensionTransformNode],
time_spine_metric_time_nodes: Sequence[MetricTimeDimensionTransformNode],
) -> List[DataflowPlanNode]:
"""Filters out many of the nodes that can't possibly be useful for joins to obtain the desired linkable specs.
Expand Down Expand Up @@ -668,7 +668,7 @@ def remove_unnecessary_nodes(
continue

# Used for group-by-item-values queries.
if node in time_spine_nodes:
if node in time_spine_metric_time_nodes:
logger.debug(LazyFormat(lambda: f"Including {node} since it matches `time_spine_node`"))
relevant_nodes.append(node)
continue
Expand Down
8 changes: 4 additions & 4 deletions tests_metricflow/dataflow/builder/test_node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def node_evaluator(
].semantic_manifest_lookup.semantic_model_lookup,
nodes_available_for_joins=tuple(mf_engine_fixture.read_node_mapping.values()),
node_data_set_resolver=node_data_set_resolver,
time_spine_nodes=mf_engine_fixture.source_node_set.time_spine_nodes_tuple,
time_spine_metric_time_nodes=mf_engine_fixture.source_node_set.time_spine_metric_time_nodes_tuple,
)


Expand All @@ -73,7 +73,7 @@ def make_multihop_node_evaluator(
desired_linkable_specs=desired_linkable_specs,
nodes=source_node_set.source_nodes_for_metric_queries,
metric_time_dimension_reference=DataSet.metric_time_dimension_reference(),
time_spine_nodes=source_node_set.time_spine_nodes_tuple,
time_spine_metric_time_nodes=source_node_set.time_spine_metric_time_nodes_tuple,
)

nodes_available_for_joins = list(
Expand All @@ -88,7 +88,7 @@ def make_multihop_node_evaluator(
semantic_model_lookup=semantic_manifest_lookup_with_multihop_links.semantic_model_lookup,
nodes_available_for_joins=nodes_available_for_joins,
node_data_set_resolver=node_data_set_resolver,
time_spine_nodes=source_node_set.time_spine_nodes_tuple,
time_spine_metric_time_nodes=source_node_set.time_spine_metric_time_nodes_tuple,
)


Expand Down Expand Up @@ -521,7 +521,7 @@ def test_node_evaluator_with_scd_target(
# Use all nodes in the simple model as candidates for joins.
nodes_available_for_joins=tuple(mf_engine_fixture.read_node_mapping.values()),
node_data_set_resolver=node_data_set_resolver,
time_spine_nodes=mf_engine_fixture.source_node_set.time_spine_nodes_tuple,
time_spine_metric_time_nodes=mf_engine_fixture.source_node_set.time_spine_metric_time_nodes_tuple,
)

evaluation = node_evaluator.evaluate_node(
Expand Down

0 comments on commit c71061a

Please sign in to comment.