From 2711efb3e581d95535bc9ae01760fe232ac853dc Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Wed, 18 Sep 2024 12:35:34 -0700 Subject: [PATCH] Bug fix: select appropriate time spine node for metric_time queries without metrics --- .../time/time_spine_source.py | 34 +++++++++++++++++- .../dataflow/builder/dataflow_plan_builder.py | 11 ++++++ metricflow/dataflow/builder/source_node.py | 19 +++++----- metricflow/plan_conversion/dataflow_to_sql.py | 36 +++---------------- 4 files changed, 59 insertions(+), 41 deletions(-) diff --git a/metricflow-semantics/metricflow_semantics/time/time_spine_source.py b/metricflow-semantics/metricflow_semantics/time/time_spine_source.py index 1985369f77..5cbf0044b8 100644 --- a/metricflow-semantics/metricflow_semantics/time/time_spine_source.py +++ b/metricflow-semantics/metricflow_semantics/time/time_spine_source.py @@ -8,7 +8,7 @@ from dbt_semantic_interfaces.protocols import SemanticManifest from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity -from metricflow_semantics.specs.time_dimension_spec import DEFAULT_TIME_GRANULARITY +from metricflow_semantics.specs.time_dimension_spec import DEFAULT_TIME_GRANULARITY, TimeDimensionSpec from metricflow_semantics.sql.sql_table import SqlTable logger = logging.getLogger(__name__) @@ -91,3 +91,35 @@ def build_custom_time_spine_sources(time_spine_sources: Sequence[TimeSpineSource for time_spine_source in time_spine_sources for custom_granularity in time_spine_source.custom_granularities } + + @staticmethod + def choose_time_spine_source( + required_time_spine_specs: Sequence[TimeDimensionSpec], + time_spine_sources: Dict[TimeGranularity, TimeSpineSource], + ) -> TimeSpineSource: + """Determine which time spine source to use to satisfy the given specs. + + Will choose the time spine with the largest granularity that can be used to get the smallest granularity required to + satisfy the time spine specs. Example: + - Time spines available: SECOND, MINUTE, DAY + - Time granularities needed for request: HOUR, DAY + --> Selected time spine: MINUTE + + Note time spines are identified by their base granularity. + """ + assert required_time_spine_specs, ( + "Choosing time spine source requires time spine specs, but the `required_time_spine_specs` param is empty. " + "This indicates internal misconfiguration." + ) + smallest_agg_time_grain = min(spec.time_granularity.base_granularity for spec in required_time_spine_specs) + time_spine_grains = time_spine_sources.keys() + compatible_time_spine_grains = [ + grain for grain in time_spine_grains if grain.to_int() <= smallest_agg_time_grain.to_int() + ] + if not compatible_time_spine_grains: + raise RuntimeError( + f"This query requires a time spine with granularity {smallest_agg_time_grain.name} or smaller, which is not configured. " + f"The smallest available time spine granularity is {min(time_spine_grains).name}, which is too large." + "See documentation for how to configure a new time spine: https://docs.getdbt.com/docs/build/metricflow-time-spine" + ) + return time_spine_sources[max(compatible_time_spine_grains)] diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index edbe4e0fce..9bea1392fb 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -55,6 +55,7 @@ from metricflow_semantics.sql.sql_table import SqlTable from metricflow_semantics.time.dateutil_adjuster import DateutilTimePeriodAdjuster from metricflow_semantics.time.granularity import ExpandedTimeGranularity +from metricflow_semantics.time.time_spine_source import TimeSpineSource from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver from metricflow.dataflow.builder.node_evaluator import ( @@ -973,6 +974,16 @@ def _find_dataflow_recipe( source_nodes=self._source_node_set.source_nodes_for_group_by_item_queries, ) ) + # If metric_time is requested without metrics, choose appropriate time spine node to select those values from. + if linkable_spec_set.metric_time_specs: + time_spine_node = self._source_node_set.time_spine_nodes[ + TimeSpineSource.choose_time_spine_source( + required_time_spine_specs=linkable_spec_set.metric_time_specs, + time_spine_sources=self._source_node_builder.time_spine_sources, + ).base_granularity + ] + candidate_nodes_for_right_side_of_join += [time_spine_node] + candidate_nodes_for_left_side_of_join += [time_spine_node] default_join_type = SqlJoinType.FULL_OUTER logger.info( diff --git a/metricflow/dataflow/builder/source_node.py b/metricflow/dataflow/builder/source_node.py index 9bd7af8a71..575668eabb 100644 --- a/metricflow/dataflow/builder/source_node.py +++ b/metricflow/dataflow/builder/source_node.py @@ -33,8 +33,7 @@ class SourceNodeSet: # components. source_nodes_for_metric_queries: Tuple[DataflowPlanNode, ...] - # Semantic models are 1:1 mapped to a ReadSqlSourceNode. The tuple also contains the same `time_spine_node` as - # below. See usage in `DataflowPlanBuilder`. + # Semantic models are 1:1 mapped to a ReadSqlSourceNode. source_nodes_for_group_by_item_queries: Tuple[DataflowPlanNode, ...] # Provides the time spines. @@ -42,7 +41,11 @@ class SourceNodeSet: @property def all_nodes(self) -> Sequence[DataflowPlanNode]: # noqa: D102 - return self.source_nodes_for_metric_queries + self.source_nodes_for_group_by_item_queries + return ( + self.source_nodes_for_metric_queries + + self.source_nodes_for_group_by_item_queries + + self.time_spine_nodes_tuple + ) @property def time_spine_nodes_tuple(self) -> Tuple[MetricTimeDimensionTransformNode, ...]: # noqa: D102 @@ -59,10 +62,11 @@ def __init__( # noqa: D107 ) -> None: self._semantic_manifest_lookup = semantic_manifest_lookup data_set_converter = SemanticModelToDataSetConverter(column_association_resolver) - self._time_spine_source_nodes = {} - for granularity, time_spine_source in TimeSpineSource.build_standard_time_spine_sources( + self.time_spine_sources = TimeSpineSource.build_standard_time_spine_sources( semantic_manifest_lookup.semantic_manifest - ).items(): + ) + self._time_spine_source_nodes = {} + for granularity, time_spine_source in self.time_spine_sources.items(): data_set = data_set_converter.build_time_spine_source_data_set(time_spine_source) self._time_spine_source_nodes[granularity] = MetricTimeDimensionTransformNode.create( parent_node=ReadSqlSourceNode.create(data_set), @@ -100,8 +104,7 @@ def create_from_data_sets(self, data_sets: Sequence[SemanticModelDataSet]) -> So return SourceNodeSet( time_spine_nodes=self._time_spine_source_nodes, - source_nodes_for_group_by_item_queries=tuple(group_by_item_source_nodes) - + tuple(self._time_spine_source_nodes.values()), + source_nodes_for_group_by_item_queries=tuple(group_by_item_source_nodes), source_nodes_for_metric_queries=tuple(source_nodes_for_metric_queries), ) diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index 92e3d33bbe..53cbab3470 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -237,37 +237,6 @@ def _next_unique_table_alias(self) -> str: """Return the next unique table alias to use in generating queries.""" return SequentialIdGenerator.create_next_id(StaticIdPrefix.SUB_QUERY).str_value - def _choose_time_spine_source( - self, agg_time_dimension_instances: Tuple[TimeDimensionInstance, ...] - ) -> TimeSpineSource: - """Determine which time spine source to use when building time spine dataset. - - Will choose the time spine with the largest granularity that can be used to get the smallest granularity requested - in the agg time dimension instances. Example: - - Time spines available: SECOND, MINUTE, DAY - - Agg time dimension granularity needed for request: HOUR, DAY - --> Selected time spine: MINUTE - - Note time spines are identified by the base granularity from the time dimension instance, not the raw granularity - name. - """ - assert ( - agg_time_dimension_instances - ), "Building time spine dataset requires agg_time_dimension_instances, but none were found." - smallest_agg_time_grain = min( - dim.spec.time_granularity.base_granularity for dim in agg_time_dimension_instances - ) - compatible_time_spine_grains = [ - grain for grain in self._time_spine_sources.keys() if grain.to_int() <= smallest_agg_time_grain.to_int() - ] - if not compatible_time_spine_grains: - raise RuntimeError( - f"This query requires a time spine with granularity {smallest_agg_time_grain.name} or smaller, which is not configured. " - f"The smallest available time spine granularity is {min(self._time_spine_sources.keys()).name}, which is too large." - "See documentation for how to configure a new time spine: https://docs.getdbt.com/docs/build/metricflow-time-spine" - ) - return self._time_spine_sources[max(compatible_time_spine_grains)] - def _make_time_spine_data_set( self, agg_time_dimension_instances: Tuple[TimeDimensionInstance, ...], @@ -280,7 +249,10 @@ def _make_time_spine_data_set( time_spine_instance_set = InstanceSet(time_dimension_instances=agg_time_dimension_instances) time_spine_table_alias = self._next_unique_table_alias() - time_spine_source = self._choose_time_spine_source(agg_time_dimension_instances) + time_spine_source = TimeSpineSource.choose_time_spine_source( + required_time_spine_specs=[instance.spec for instance in agg_time_dimension_instances], + time_spine_sources=self._time_spine_sources, + ) column_expr = SqlColumnReferenceExpression.from_table_and_column_names( table_alias=time_spine_table_alias, column_name=time_spine_source.base_column )