From 6a9cdeaba90b5b3e04cf16f5e3b7bec3c6e1843c Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Fri, 18 Oct 2024 14:36:48 -0700 Subject: [PATCH] Clean up dataflow to SQL for JoinToTimeSpineNode Should not actually change behavior since we don't pass custom grains into this node --- metricflow/plan_conversion/dataflow_to_sql.py | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index b658ddce0d..a17e4ca2fe 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -317,13 +317,15 @@ def _make_time_spine_data_set( from_source=SqlTableNode.create(sql_table=time_spine_source.spine_table), from_source_alias=time_spine_table_alias, group_bys=select_columns if apply_group_by else (), - where=_make_time_range_comparison_expr( - table_alias=time_spine_table_alias, - column_alias=time_spine_source.base_column, - time_range_constraint=time_range_constraint, - ) - if time_range_constraint - else None, + where=( + _make_time_range_comparison_expr( + table_alias=time_spine_table_alias, + column_alias=time_spine_source.base_column, + time_range_constraint=time_range_constraint, + ) + if time_range_constraint + else None + ), ) # Where constraints must be applied in an outer query since they are using an alias (e.g., 'metric_time__day'), @@ -380,7 +382,6 @@ def visit_source_node(self, node: ReadSqlSourceNode) -> SqlDataSet: instance_set=node.data_set.instance_set, ) - # TODO: write tests for custom granularities that hit this node def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDataSet: """Generate time range join SQL.""" table_alias_to_instance_set: OrderedDict[str, InstanceSet] = OrderedDict() @@ -1307,7 +1308,6 @@ def visit_semi_additive_join_node(self, node: SemiAdditiveJoinNode) -> SqlDataSe ), ) - # TODO: write tests for custom granularities that hit this node def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet: # noqa: D102 parent_data_set = node.parent_node.accept(self) parent_alias = self._next_unique_table_alias() @@ -1330,16 +1330,14 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet ): agg_time_dimension_instances.append(instance) - # Choose the instance with the smallest standard granularity available. - assert all( - [not instance.spec.time_granularity.is_custom_granularity for instance in agg_time_dimension_instances] - ), "Custom granularities are not yet supported for all queries." + # Choose the instance with the smallest base granularity available. agg_time_dimension_instances.sort(key=lambda instance: instance.spec.time_granularity.base_granularity.to_int()) assert len(agg_time_dimension_instances) > 0, ( "Couldn't find requested agg_time_dimension in parent data set. The dataflow plan may have been " "configured incorrectly." ) agg_time_dimension_instance_for_join = agg_time_dimension_instances[0] + agg_time_dim_for_join_with_base_grain = agg_time_dimension_instance_for_join.spec.with_base_grain() # Build time spine data set using the requested agg_time_dimension name. time_spine_alias = self._next_unique_table_alias() @@ -1354,7 +1352,7 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet node=node, time_spine_alias=time_spine_alias, agg_time_dimension_column_name=self.column_association_resolver.resolve_spec( - agg_time_dimension_instance_for_join.spec + agg_time_dim_for_join_with_base_grain ).column_name, parent_sql_select_node=parent_data_set.checked_sql_select_node, parent_alias=parent_alias, @@ -1391,15 +1389,15 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet self._column_association_resolver, OrderedDict({parent_alias: parent_instance_set}) ) - # Select instance from time spine data set that matches the join instance in the parent dataset. + # Select matching instance from time spine data set (using base grain - custom grain will be joined in a later node). original_time_spine_dim_instance: Optional[TimeDimensionInstance] = None for time_dimension_instance in time_spine_dataset.instance_set.time_dimension_instances: - if time_dimension_instance.spec == agg_time_dimension_instance_for_join.spec: + if time_dimension_instance.spec == agg_time_dim_for_join_with_base_grain: original_time_spine_dim_instance = time_dimension_instance break assert original_time_spine_dim_instance, ( "Couldn't find requested agg_time_dimension_instance_for_join in time spine data set, which " - f"indicates it may have been configured incorrectly. Expected: {agg_time_dimension_instance_for_join.spec};" + f"indicates it may have been configured incorrectly. Expected: {agg_time_dim_for_join_with_base_grain};" f" Got: {[instance.spec for instance in time_spine_dataset.instance_set.time_dimension_instances]}" ) time_spine_column_select_expr: Union[