diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 1090c2c999..b3b1ef4f3e 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -996,7 +996,7 @@ def _find_dataflow_recipe( desired_linkable_specs=linkable_specs, nodes=candidate_nodes_for_right_side_of_join, metric_time_dimension_reference=self._metric_time_dimension_reference, - time_spine_node=self._source_node_set.time_spine_node, + time_spine_nodes=self._source_node_set.time_spine_nodes_tuple, ) logger.info( f"After removing unnecessary nodes, there are {len(candidate_nodes_for_right_side_of_join)} candidate " @@ -1034,7 +1034,7 @@ def _find_dataflow_recipe( semantic_model_lookup=self._semantic_model_lookup, nodes_available_for_joins=self._sort_by_suitability(candidate_nodes_for_right_side_of_join), node_data_set_resolver=self._node_data_set_resolver, - time_spine_node=self._source_node_set.time_spine_node, + time_spine_nodes=self._source_node_set.time_spine_nodes_tuple, ) # Dict from the node that contains the source node to the evaluation results. diff --git a/metricflow/dataflow/builder/node_evaluator.py b/metricflow/dataflow/builder/node_evaluator.py index 698651e397..d22533be75 100644 --- a/metricflow/dataflow/builder/node_evaluator.py +++ b/metricflow/dataflow/builder/node_evaluator.py @@ -168,7 +168,7 @@ def __init__( semantic_model_lookup: SemanticModelLookup, nodes_available_for_joins: Sequence[DataflowPlanNode], node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver, - time_spine_node: MetricTimeDimensionTransformNode, + time_spine_nodes: Sequence[MetricTimeDimensionTransformNode], ) -> None: """Initializer. @@ -185,7 +185,7 @@ def __init__( self._node_data_set_resolver = node_data_set_resolver self._partition_resolver = PartitionJoinResolver(self._semantic_model_lookup) self._join_evaluator = SemanticModelJoinEvaluator(self._semantic_model_lookup) - self._time_spine_node = time_spine_node + self._time_spine_nodes = time_spine_nodes def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( self, @@ -201,7 +201,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( left_node_spec_set = left_node_instance_set.spec_set for right_node in self._nodes_available_for_joins: # If right node is time spine source node, use cross join. - if right_node == self._time_spine_node: + if right_node in self._time_spine_nodes: needed_metric_time_specs = group_specs_by_type(needed_linkable_specs).metric_time_specs candidates_for_join.append( JoinLinkableInstancesRecipe( diff --git a/metricflow/dataflow/builder/source_node.py b/metricflow/dataflow/builder/source_node.py index 3e105a83fa..a5c9592005 100644 --- a/metricflow/dataflow/builder/source_node.py +++ b/metricflow/dataflow/builder/source_node.py @@ -1,9 +1,10 @@ from __future__ import annotations from dataclasses import dataclass -from typing import List, Sequence, Tuple +from typing import Dict, List, Sequence, Tuple from dbt_semantic_interfaces.references import TimeDimensionReference +from dbt_semantic_interfaces.type_enums import TimeGranularity from metricflow_semantics.model.semantic_manifest_lookup import SemanticManifestLookup from metricflow_semantics.query.query_parser import MetricFlowQueryParser from metricflow_semantics.specs.column_assoc import ColumnAssociationResolver @@ -36,14 +37,16 @@ class SourceNodeSet: # below. See usage in `DataflowPlanBuilder`. source_nodes_for_group_by_item_queries: Tuple[DataflowPlanNode, ...] - # Provides the time spine. - time_spine_node: MetricTimeDimensionTransformNode + # Provides the time spines. + time_spine_nodes: Dict[TimeGranularity, MetricTimeDimensionTransformNode] @property def all_nodes(self) -> Sequence[DataflowPlanNode]: # noqa: D102 - return ( - self.source_nodes_for_metric_queries + self.source_nodes_for_group_by_item_queries + (self.time_spine_node,) - ) + return self.source_nodes_for_metric_queries + self.source_nodes_for_group_by_item_queries + + @property + def time_spine_nodes_tuple(self) -> Tuple[MetricTimeDimensionTransformNode, ...]: # noqa: D102 + return tuple(self.time_spine_nodes.values()) class SourceNodeBuilder: @@ -56,13 +59,19 @@ def __init__( # noqa: D107 ) -> None: self._semantic_manifest_lookup = semantic_manifest_lookup data_set_converter = SemanticModelToDataSetConverter(column_association_resolver) - time_spine_source = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest) - time_spine_data_set = data_set_converter.build_time_spine_source_data_set(time_spine_source) - time_dim_reference = TimeDimensionReference(element_name=time_spine_source.time_column_name) - self._time_spine_source_node = MetricTimeDimensionTransformNode.create( - parent_node=ReadSqlSourceNode.create(data_set=time_spine_data_set), - aggregation_time_dimension_reference=time_dim_reference, - ) + self._time_spine_source_nodes = { + granularity: MetricTimeDimensionTransformNode.create( + parent_node=ReadSqlSourceNode.create( + data_set=data_set_converter.build_time_spine_source_data_set(time_spine_source) + ), + aggregation_time_dimension_reference=TimeDimensionReference( + element_name=time_spine_source.time_column_name + ), + ) + for granularity, time_spine_source in TimeSpineSource.create_from_manifest( + semantic_manifest_lookup.semantic_manifest + ).items() + } self._query_parser = MetricFlowQueryParser(semantic_manifest_lookup) def create_from_data_sets(self, data_sets: Sequence[SemanticModelDataSet]) -> SourceNodeSet: @@ -93,8 +102,9 @@ def create_from_data_sets(self, data_sets: Sequence[SemanticModelDataSet]) -> So source_nodes_for_metric_queries.append(metric_time_transform_node) return SourceNodeSet( - time_spine_node=self._time_spine_source_node, - source_nodes_for_group_by_item_queries=tuple(group_by_item_source_nodes) + (self._time_spine_source_node,), + time_spine_nodes=self._time_spine_source_nodes, + source_nodes_for_group_by_item_queries=tuple(group_by_item_source_nodes) + + tuple(self._time_spine_source_nodes.values()), source_nodes_for_metric_queries=tuple(source_nodes_for_metric_queries), ) diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index 24e070f0e7..f260cf0993 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -368,7 +368,7 @@ def __init__( DunderColumnAssociationResolver(semantic_manifest_lookup) ) self._time_source = time_source - self._time_spine_source = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest) + self._time_spine_sources = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest) self._source_data_sets: List[SemanticModelDataSet] = [] converter = SemanticModelToDataSetConverter(column_association_resolver=self._column_association_resolver) for semantic_model in sorted( diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index 56fe9c8720..663dcff13f 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -183,7 +183,7 @@ def __init__( self._semantic_manifest_lookup = semantic_manifest_lookup self._metric_lookup = semantic_manifest_lookup.metric_lookup self._semantic_model_lookup = semantic_manifest_lookup.semantic_model_lookup - self._time_spine_source = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest) + self._time_spine_sources = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest) @property def column_association_resolver(self) -> ColumnAssociationResolver: # noqa: D102 @@ -222,24 +222,47 @@ def _next_unique_table_alias(self) -> str: """Return the next unique table alias to use in generating queries.""" return SequentialIdGenerator.create_next_id(StaticIdPrefix.SUB_QUERY).str_value + def _choose_time_spine_source( + self, agg_time_dimension_instances: Tuple[TimeDimensionInstance, ...] + ) -> TimeSpineSource: + """Determine which time spine source to use when building time spine dataset. + + Will choose the time spine with the largest granularity that can be used to get the smallest granularity requested + in the agg time dimension instances. Example: + - Time spines available: SECOND, MINUTE, DAY + - Agg time dimension granularity needed for request: HOUR, DAY + --> Selected time spine: MINUTE + """ + assert ( + agg_time_dimension_instances + ), "Building time spine dataset requires agg_time_dimension_instances, but none were found." + smallest_agg_time_grain = min(dim.spec.time_granularity for dim in agg_time_dimension_instances) + compatible_time_spine_grains = [ + grain for grain in self._time_spine_sources.keys() if grain.to_int() <= smallest_agg_time_grain.to_int() + ] + if not compatible_time_spine_grains: + raise RuntimeError( + # TODO: update docs link when new docs are available + f"No compatible time spine found. This query requires a time spine with granularity {smallest_agg_time_grain} or smaller. See docs to configure a new time spine: https://docs.getdbt.com/docs/build/metricflow-time-spine" + ) + return self._time_spine_sources[max(compatible_time_spine_grains)] + def _make_time_spine_data_set( self, agg_time_dimension_instances: Tuple[TimeDimensionInstance, ...], - time_spine_source: TimeSpineSource, time_range_constraint: Optional[TimeRangeConstraint] = None, ) -> SqlDataSet: - """Make a time spine data set, which contains all date/time values like '2020-01-01', '2020-01-02'... + """Returns a dataset with a datetime column for each agg_time_dimension granularity requested. - Returns a dataset with a column selected for each agg_time_dimension requested. Column alias will use 'metric_time' or the agg_time_dimension name depending on which the user requested. """ time_spine_instance_set = InstanceSet(time_dimension_instances=agg_time_dimension_instances) time_spine_table_alias = self._next_unique_table_alias() + time_spine_source = self._choose_time_spine_source(agg_time_dimension_instances) column_expr = SqlColumnReferenceExpression.from_table_and_column_names( table_alias=time_spine_table_alias, column_name=time_spine_source.time_column_name ) - select_columns: Tuple[SqlSelectColumn, ...] = () apply_group_by = False for agg_time_dimension_instance in agg_time_dimension_instances: @@ -314,7 +337,6 @@ def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDat # Assemble time_spine dataset with requested agg time dimension instances selected. time_spine_data_set = self._make_time_spine_data_set( agg_time_dimension_instances=requested_agg_time_dimension_instances, - time_spine_source=self._time_spine_source, time_range_constraint=node.time_range_constraint, ) table_alias_to_instance_set[time_spine_data_set_alias] = time_spine_data_set.instance_set @@ -1053,9 +1075,9 @@ def visit_metric_time_dimension_transform_node(self, node: MetricTimeDimensionTr spec=metric_time_dimension_spec, ) ) - output_column_to_input_column[ - metric_time_dimension_column_association.column_name - ] = matching_time_dimension_instance.associated_column.column_name + output_column_to_input_column[metric_time_dimension_column_association.column_name] = ( + matching_time_dimension_instance.associated_column.column_name + ) output_instance_set = InstanceSet( measure_instances=tuple(output_measure_instances), @@ -1234,7 +1256,6 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet time_spine_alias = self._next_unique_table_alias() time_spine_dataset = self._make_time_spine_data_set( agg_time_dimension_instances=(agg_time_dimension_instance_for_join,), - time_spine_source=self._time_spine_source, time_range_constraint=node.time_range_constraint, ) @@ -1287,11 +1308,11 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet and len(time_spine_dataset.checked_sql_select_node.select_columns) == 1 ), "Time spine dataset not configured properly. Expected exactly one column." original_time_spine_dim_instance = time_spine_dataset.instance_set.time_dimension_instances[0] - time_spine_column_select_expr: Union[ - SqlColumnReferenceExpression, SqlDateTruncExpression - ] = SqlColumnReferenceExpression.create( - SqlColumnReference( - table_alias=time_spine_alias, column_name=original_time_spine_dim_instance.spec.qualified_name + time_spine_column_select_expr: Union[SqlColumnReferenceExpression, SqlDateTruncExpression] = ( + SqlColumnReferenceExpression.create( + SqlColumnReference( + table_alias=time_spine_alias, column_name=original_time_spine_dim_instance.spec.qualified_name + ) ) ) diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index 0334ae81fb..8a139113db 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -619,7 +619,7 @@ def remove_unnecessary_nodes( desired_linkable_specs: Sequence[LinkableInstanceSpec], nodes: Sequence[DataflowPlanNode], metric_time_dimension_reference: TimeDimensionReference, - time_spine_node: MetricTimeDimensionTransformNode, + time_spine_nodes: Sequence[MetricTimeDimensionTransformNode], ) -> List[DataflowPlanNode]: """Filters out many of the nodes that can't possibly be useful for joins to obtain the desired linkable specs. @@ -661,7 +661,7 @@ def remove_unnecessary_nodes( continue # Used for group-by-item-values queries. - if node == time_spine_node: + if node in time_spine_nodes: logger.debug(f"Including {node} since it matches `time_spine_node`") relevant_nodes.append(node) continue diff --git a/metricflow/plan_conversion/time_spine.py b/metricflow/plan_conversion/time_spine.py index 858c0bdb0e..54c32fa5b7 100644 --- a/metricflow/plan_conversion/time_spine.py +++ b/metricflow/plan_conversion/time_spine.py @@ -2,11 +2,10 @@ import logging from dataclasses import dataclass -from typing import Optional +from typing import Dict, Optional from dbt_semantic_interfaces.protocols import SemanticManifest from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity -from metricflow_semantics.mf_logging.pretty_print import mf_pformat from metricflow_semantics.specs.time_dimension_spec import DEFAULT_TIME_GRANULARITY from metricflow.sql.sql_table import SqlTable @@ -34,26 +33,42 @@ def spine_table(self) -> SqlTable: return SqlTable(schema_name=self.schema_name, table_name=self.table_name, db_name=self.db_name) @staticmethod - def create_from_manifest(semantic_manifest: SemanticManifest) -> TimeSpineSource: + def create_from_manifest(semantic_manifest: SemanticManifest) -> Dict[TimeGranularity, TimeSpineSource]: """Creates a time spine source based on what's in the manifest.""" - time_spine_table_configurations = semantic_manifest.project_configuration.time_spine_table_configurations - - if not ( - len(time_spine_table_configurations) == 1 - and time_spine_table_configurations[0].grain == DEFAULT_TIME_GRANULARITY - ): - raise NotImplementedError( - f"Only a single time spine table configuration with {DEFAULT_TIME_GRANULARITY} is currently " - f"supported. Got:\n" - f"{mf_pformat(time_spine_table_configurations)}" + time_spine_sources = { + time_spine.primary_column.time_granularity: TimeSpineSource( + schema_name=time_spine.node_relation.schema_name, + table_name=time_spine.node_relation.relation_name, # is relation name the table name? double check + db_name=time_spine.node_relation.database, + time_column_name=time_spine.primary_column.name, + time_column_granularity=time_spine.primary_column.time_granularity, ) + for time_spine in semantic_manifest.project_configuration.time_spines + } - time_spine_table_configuration = time_spine_table_configurations[0] - time_spine_table = SqlTable.from_string(time_spine_table_configuration.location) - return TimeSpineSource( - schema_name=time_spine_table.schema_name, - table_name=time_spine_table.table_name, - db_name=time_spine_table.db_name, - time_column_name=time_spine_table_configuration.column_name, - time_column_granularity=time_spine_table_configuration.grain, - ) + # For backward compatibility: if legacy time spine config exists in the manifest, add that time spine here for + # backward compatibility. Ignore it if there is a new time spine config with the same granularity. + legacy_time_spines = semantic_manifest.project_configuration.time_spine_table_configurations + for legacy_time_spine in legacy_time_spines: + if not time_spine_sources.get(legacy_time_spine.grain): + time_spine_table = SqlTable.from_string(legacy_time_spine.location) + time_spine_sources[legacy_time_spine.grain] = TimeSpineSource( + schema_name=time_spine_table.schema_name, + table_name=time_spine_table.table_name, + db_name=time_spine_table.db_name, + time_column_name=legacy_time_spine.column_name, + time_column_granularity=legacy_time_spine.grain, + ) + + # Sanity check: this should have been validated during manifest parsing. + if not time_spine_sources: + raise RuntimeError( + "At least one time spine must be configured to use the semantic layer, but none were found." + ) + + return time_spine_sources + + +# DSI validations to add: +# - Check that there is only one time spine for each granularity option +# - Check that there is a time spine defined at minimum DAY diff --git a/metricflow/validation/data_warehouse_model_validator.py b/metricflow/validation/data_warehouse_model_validator.py index bb90074195..8ed5360353 100644 --- a/metricflow/validation/data_warehouse_model_validator.py +++ b/metricflow/validation/data_warehouse_model_validator.py @@ -44,7 +44,6 @@ from metricflow.dataset.dataset_classes import DataSet from metricflow.engine.metricflow_engine import MetricFlowEngine, MetricFlowExplainResult, MetricFlowQueryRequest from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlQueryPlanConverter -from metricflow.plan_conversion.time_spine import TimeSpineSource from metricflow.protocols.sql_client import SqlClient @@ -59,7 +58,6 @@ class QueryRenderingTools: semantic_manifest_lookup: SemanticManifestLookup source_node_builder: SourceNodeBuilder converter: SemanticModelToDataSetConverter - time_spine_source: TimeSpineSource plan_converter: DataflowToSqlQueryPlanConverter def __init__(self, manifest: SemanticManifest) -> None: # noqa: D107 @@ -68,7 +66,6 @@ def __init__(self, manifest: SemanticManifest) -> None: # noqa: D107 column_association_resolver=DunderColumnAssociationResolver(self.semantic_manifest_lookup), semantic_manifest_lookup=self.semantic_manifest_lookup, ) - self.time_spine_source = TimeSpineSource.create_from_manifest(manifest) self.converter = SemanticModelToDataSetConverter( column_association_resolver=DunderColumnAssociationResolver( semantic_manifest_lookup=self.semantic_manifest_lookup diff --git a/tests_metricflow/dataflow/builder/test_node_evaluator.py b/tests_metricflow/dataflow/builder/test_node_evaluator.py index 92fa8cc172..e6716c1ff1 100644 --- a/tests_metricflow/dataflow/builder/test_node_evaluator.py +++ b/tests_metricflow/dataflow/builder/test_node_evaluator.py @@ -48,7 +48,7 @@ def node_evaluator( ].semantic_manifest_lookup.semantic_model_lookup, nodes_available_for_joins=tuple(mf_engine_fixture.read_node_mapping.values()), node_data_set_resolver=node_data_set_resolver, - time_spine_node=mf_engine_fixture.source_node_set.time_spine_node, + time_spine_nodes=mf_engine_fixture.source_node_set.time_spine_nodes_tuple, ) @@ -72,7 +72,7 @@ def make_multihop_node_evaluator( desired_linkable_specs=desired_linkable_specs, nodes=source_node_set.source_nodes_for_metric_queries, metric_time_dimension_reference=DataSet.metric_time_dimension_reference(), - time_spine_node=source_node_set.time_spine_node, + time_spine_nodes=source_node_set.time_spine_nodes_tuple, ) nodes_available_for_joins = list( @@ -87,7 +87,7 @@ def make_multihop_node_evaluator( semantic_model_lookup=semantic_manifest_lookup_with_multihop_links.semantic_model_lookup, nodes_available_for_joins=nodes_available_for_joins, node_data_set_resolver=node_data_set_resolver, - time_spine_node=source_node_set.time_spine_node, + time_spine_nodes=source_node_set.time_spine_nodes_tuple, ) @@ -518,7 +518,7 @@ def test_node_evaluator_with_scd_target( # Use all nodes in the simple model as candidates for joins. nodes_available_for_joins=tuple(mf_engine_fixture.read_node_mapping.values()), node_data_set_resolver=node_data_set_resolver, - time_spine_node=mf_engine_fixture.source_node_set.time_spine_node, + time_spine_nodes=mf_engine_fixture.source_node_set.time_spine_nodes_tuple, ) evaluation = node_evaluator.evaluate_node( diff --git a/tests_metricflow/fixtures/table_fixtures.py b/tests_metricflow/fixtures/table_fixtures.py index 27e1e4bcad..8571eacf1d 100644 --- a/tests_metricflow/fixtures/table_fixtures.py +++ b/tests_metricflow/fixtures/table_fixtures.py @@ -5,6 +5,7 @@ from pathlib import Path import pytest +from dbt_semantic_interfaces.type_enums import TimeGranularity from metricflow_semantics.model.semantic_manifest_lookup import SemanticManifestLookup from metricflow_semantics.test_helpers.config_helpers import MetricFlowTestConfiguration @@ -41,7 +42,9 @@ def check_time_spine_source( The time spine table is defined in a table snapshot YAML file and is restored to the source schema based on that definition. The definition in the YAML should align with the definition in the time_spine_source fixture. """ - time_spine_source = TimeSpineSource.create_from_manifest(simple_semantic_manifest_lookup.semantic_manifest) + time_spine_source = TimeSpineSource.create_from_manifest(simple_semantic_manifest_lookup.semantic_manifest)[ + TimeGranularity.DAY + ] assert ( time_spine_source.schema_name == mf_test_configuration.mf_source_schema ), "The time spine source table should be in the source schema" diff --git a/tests_metricflow/plan_conversion/test_time_spine.py b/tests_metricflow/plan_conversion/test_time_spine.py index fffbc10b68..931e728c92 100644 --- a/tests_metricflow/plan_conversion/test_time_spine.py +++ b/tests_metricflow/plan_conversion/test_time_spine.py @@ -2,6 +2,7 @@ import textwrap +from dbt_semantic_interfaces.type_enums import TimeGranularity from metricflow_semantics.filters.time_constraint import TimeRangeConstraint from metricflow_semantics.model.semantic_manifest_lookup import SemanticManifestLookup @@ -14,7 +15,9 @@ def test_date_spine_date_range( # noqa: D103 simple_semantic_manifest_lookup: SemanticManifestLookup, create_source_tables: None, ) -> None: - time_spine_source = TimeSpineSource.create_from_manifest(simple_semantic_manifest_lookup.semantic_manifest) + time_spine_source = TimeSpineSource.create_from_manifest(simple_semantic_manifest_lookup.semantic_manifest)[ + TimeGranularity.DAY + ] range_df = sql_client.query( textwrap.dedent( f"""\