diff --git a/metricflow-semantics/metricflow_semantics/time/time_spine_source.py b/metricflow-semantics/metricflow_semantics/time/time_spine_source.py index f55967d3eb..592c84158e 100644 --- a/metricflow-semantics/metricflow_semantics/time/time_spine_source.py +++ b/metricflow-semantics/metricflow_semantics/time/time_spine_source.py @@ -2,6 +2,7 @@ import logging from dataclasses import dataclass +from functools import lru_cache from typing import Dict, Optional, Sequence from dbt_semantic_interfaces.implementations.time_spine import PydanticTimeSpineCustomGranularityColumn @@ -79,6 +80,7 @@ def build_standard_time_spine_sources( return time_spine_sources @staticmethod + @lru_cache def build_custom_time_spine_sources(time_spine_sources: Sequence[TimeSpineSource]) -> Dict[str, TimeSpineSource]: """Creates a set of time spine sources with custom granularities based on what's in the manifest.""" return { @@ -99,33 +101,50 @@ def build_custom_granularities(time_spine_sources: Sequence[TimeSpineSource]) -> } @staticmethod - def choose_time_spine_source( + def choose_time_spine_sources( required_time_spine_specs: Sequence[TimeDimensionSpec], time_spine_sources: Dict[TimeGranularity, TimeSpineSource], - ) -> TimeSpineSource: - """Determine which time spine source to use to satisfy the given specs. + ) -> Sequence[TimeSpineSource]: + """Determine which time spine sources to use to satisfy the given specs. - Will choose the time spine with the largest granularity that can be used to get the smallest granularity required to - satisfy the time spine specs. Example: + Custom grains can only use the time spine where they are defined. For standard grains, this will choose the time + spine with the largest granularity that is compatible with all required standard grains. This ensures max efficiency + for the query by minimizing the number of time spine joins and the amount of aggregation required. Example: - Time spines available: SECOND, MINUTE, DAY - Time granularities needed for request: HOUR, DAY --> Selected time spine: MINUTE - - Note time spines are identified by their base granularity. """ assert required_time_spine_specs, ( "Choosing time spine source requires time spine specs, but the `required_time_spine_specs` param is empty. " "This indicates internal misconfiguration." ) - smallest_agg_time_grain = min(spec.time_granularity.base_granularity for spec in required_time_spine_specs) - time_spine_grains = time_spine_sources.keys() - compatible_time_spine_grains = [ - grain for grain in time_spine_grains if grain.to_int() <= smallest_agg_time_grain.to_int() - ] - if not compatible_time_spine_grains: + + # Each custom grain can only be satisfied by one time spine. + custom_time_spines = TimeSpineSource.build_custom_time_spine_sources(tuple(time_spine_sources.values())) + required_time_spines = { + custom_time_spines[spec.time_granularity.name] + for spec in required_time_spine_specs + if spec.time_granularity.is_custom_granularity + } + + # Standard grains can be satisfied by any time spine with a base grain that's <= the standard grain. + smallest_required_standard_grain = min( + spec.time_granularity.base_granularity for spec in required_time_spine_specs + ) + compatible_time_spines_for_standard_grains = { + grain: time_spine_source + for grain, time_spine_source in time_spine_sources.items() + if grain.to_int() <= smallest_required_standard_grain.to_int() + } + if not compatible_time_spines_for_standard_grains: raise RuntimeError( - f"This query requires a time spine with granularity {smallest_agg_time_grain.name} or smaller, which is not configured. " - f"The smallest available time spine granularity is {min(time_spine_grains).name}, which is too large." + f"This query requires a time spine with granularity {smallest_required_standard_grain.name} or smaller, which is not configured. " + f"The smallest available time spine granularity is {min(time_spine_sources.keys()).name}, which is too large." "See documentation for how to configure a new time spine: https://docs.getdbt.com/docs/build/metricflow-time-spine" ) - return time_spine_sources[max(compatible_time_spine_grains)] + + # If the standard grains can't be satisfied by the same time spines as the custom grains, add the largest compatible one. + if not required_time_spines.intersection(set(compatible_time_spines_for_standard_grains.values())): + required_time_spines.add(time_spine_sources[max(compatible_time_spines_for_standard_grains.keys())]) + + return tuple(required_time_spines) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 507a566525..daf70e2e59 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -1043,12 +1043,14 @@ def _find_source_node_recipe_non_cached( ) # If metric_time is requested without metrics, choose appropriate time spine node to select those values from. if linkable_specs_to_satisfy.metric_time_specs: - time_spine_node = self._source_node_set.time_spine_nodes[ - TimeSpineSource.choose_time_spine_source( - required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs, - time_spine_sources=self._source_node_builder.time_spine_sources, - ).base_granularity - ] + time_spine_sources = TimeSpineSource.choose_time_spine_sources( + required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs, + time_spine_sources=self._source_node_builder.time_spine_sources, + ) + assert ( + len(time_spine_sources) == 1 + ), "Only one time spine source should have been selected for base grains. This indicates internal misconfiguration." + time_spine_node = self._source_node_set.time_spine_nodes[time_spine_sources[0].base_granularity] candidate_nodes_for_right_side_of_join += [time_spine_node] candidate_nodes_for_left_side_of_join += [time_spine_node] default_join_type = SqlJoinType.FULL_OUTER @@ -1769,6 +1771,11 @@ def _build_aggregated_measure_from_measure_source_node( ) if set(included_agg_time_specs) == set(filter_spec.linkable_spec_set.as_tuple): agg_time_only_filters.append(filter_spec) + if filter_spec.linkable_spec_set.time_dimension_specs_with_custom_grain: + raise ValueError( + "Using custom granularity in filters for `join_to_timespine` metrics is not yet fully supported. " + "This feature is coming soon!" + ) else: non_agg_time_filters.append(filter_spec) diff --git a/metricflow/dataset/convert_semantic_model.py b/metricflow/dataset/convert_semantic_model.py index 4141606704..8410b2bfd7 100644 --- a/metricflow/dataset/convert_semantic_model.py +++ b/metricflow/dataset/convert_semantic_model.py @@ -559,8 +559,7 @@ def build_time_spine_source_data_set(self, time_spine_source: TimeSpineSource) - time_dimension_instances.append(custom_time_dimension_instance) custom_select_column = SqlSelectColumn( expr=SemanticModelToDataSetConverter._make_element_sql_expr( - table_alias=from_source_alias, - element_name=custom_granularity.column_name or custom_granularity.name, + table_alias=from_source_alias, element_name=custom_granularity.parsed_column_name ), column_alias=custom_time_dimension_instance.associated_column.column_name, ) diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index c2016fa1b5..108ba21c62 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -265,9 +265,16 @@ def _make_time_spine_data_set( agg_time_dimension_specs.union(specs_required_for_where_constraints), key=lambda spec: (spec.element_name, spec.time_granularity.base_granularity.to_int()), ) - time_spine_source = TimeSpineSource.choose_time_spine_source( + time_spine_sources = TimeSpineSource.choose_time_spine_sources( required_time_spine_specs=list(required_time_spine_specs), time_spine_sources=self._time_spine_sources ) + # TODO: handle multiple time spine joins + assert len(time_spine_sources) == 1, ( + "Join to time spine with custom granularity currently only supports one custom granularity per query. " + "Full feature coming soon." + ) + time_spine_source = time_spine_sources[0] + column_expr = SqlColumnReferenceExpression.from_table_and_column_names( table_alias=time_spine_table_alias, column_name=time_spine_source.base_column ) @@ -277,14 +284,25 @@ def _make_time_spine_data_set( column_alias = self.column_association_resolver.resolve_spec(agg_time_dimension_spec).column_name # If the requested granularity is the same as the granularity of the spine, do a direct select. agg_time_grain = agg_time_dimension_spec.time_granularity - assert ( - not agg_time_grain.is_custom_granularity - ), "Custom time granularities are not yet supported for all queries." - if agg_time_grain.base_granularity == time_spine_source.base_granularity: + if ( + agg_time_grain.base_granularity == time_spine_source.base_granularity + and not agg_time_grain.is_custom_granularity + ): select_columns += (SqlSelectColumn(expr=column_expr, column_alias=column_alias),) apply_group_by = False - # If any columns have a different granularity, apply a DATE_TRUNC(). + elif agg_time_grain.is_custom_granularity: + # If any dimensions require a custom granularity, select the appropriate column. + for custom_granularity in time_spine_source.custom_granularities: + select_columns += ( + SqlSelectColumn( + expr=SqlColumnReferenceExpression.from_table_and_column_names( + table_alias=time_spine_table_alias, column_name=custom_granularity.parsed_column_name + ), + column_alias=column_alias, + ), + ) else: + # If any dimensions require a different standard granularity, apply a DATE_TRUNC() to the base column. select_columns += ( SqlSelectColumn( expr=SqlDateTruncExpression.create( @@ -1337,10 +1355,12 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet "configured incorrectly." ) agg_time_dimension_instance_for_join = agg_time_dimension_instances[0] - agg_time_dim_for_join_with_base_grain = agg_time_dimension_instance_for_join.spec.with_base_grain() # Build time spine data set using the requested agg_time_dimension name. time_spine_alias = self._next_unique_table_alias() + # TODO: make sure we are joining on the custom grain column!! Seems like we aren't. + # TODO: first, error if all grains can't be satisfied by the same time spine. Later, support multiple time spine joins in one request. + # This should only be needed if there are multiple custom grains from diff time spines, or a combo or custom grain and standard grain that require diff time spines. time_spine_dataset = self._make_time_spine_data_set( agg_time_dimension_instances=(agg_time_dimension_instance_for_join,), time_range_constraint=node.time_range_constraint, @@ -1352,7 +1372,7 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet node=node, time_spine_alias=time_spine_alias, agg_time_dimension_column_name=self.column_association_resolver.resolve_spec( - agg_time_dim_for_join_with_base_grain + agg_time_dimension_instance_for_join.spec ).column_name, parent_sql_select_node=parent_data_set.checked_sql_select_node, parent_alias=parent_alias, @@ -1392,12 +1412,12 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet # Select matching instance from time spine data set (using base grain - custom grain will be joined in a later node). original_time_spine_dim_instance: Optional[TimeDimensionInstance] = None for time_dimension_instance in time_spine_dataset.instance_set.time_dimension_instances: - if time_dimension_instance.spec == agg_time_dim_for_join_with_base_grain: + if time_dimension_instance.spec == agg_time_dimension_instance_for_join.spec: original_time_spine_dim_instance = time_dimension_instance break assert original_time_spine_dim_instance, ( "Couldn't find requested agg_time_dimension_instance_for_join in time spine data set, which " - f"indicates it may have been configured incorrectly. Expected: {agg_time_dim_for_join_with_base_grain};" + f"indicates it may have been configured incorrectly. Expected: {agg_time_dimension_instance_for_join.spec};" f" Got: {[instance.spec for instance in time_spine_dataset.instance_set.time_dimension_instances]}" ) time_spine_column_select_expr: Union[ @@ -1498,7 +1518,7 @@ def _get_custom_granularity_column_name(self, custom_granularity_name: str) -> s time_spine_source = self._get_time_spine_for_custom_granularity(custom_granularity_name) for custom_granularity in time_spine_source.custom_granularities: if custom_granularity.name == custom_granularity_name: - return custom_granularity.column_name if custom_granularity.column_name else custom_granularity.name + return custom_granularity.parsed_column_name raise RuntimeError( f"Custom granularity {custom_granularity} not found. This indicates internal misconfiguration."