Skip to content

Commit

Permalink
Support for join_to_timespine metrics with custom grain in the grou…
Browse files Browse the repository at this point in the history
…p by (#1505)

Support for using custom grains in the group by when querying
`join_to_timespine` metrics.
This includes only limited support for now. Later we will support:
- Including custom grain in the where filter for these metric queries
- Queries that require multiple time spines to be joined in one
`JoinToTimeSpineNode`
  • Loading branch information
courtneyholcomb authored Nov 5, 2024
1 parent 289b278 commit ae8ea72
Show file tree
Hide file tree
Showing 29 changed files with 2,232 additions and 44 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241104-193531.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Basic support for join_to_timespine metrics with custom grain in the group by.
time: 2024-11-04T19:35:31.185372-08:00
custom:
Author: courtneyholcomb
Issue: "1505"
51 changes: 35 additions & 16 deletions metricflow-semantics/metricflow_semantics/time/time_spine_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, Optional, Sequence

from dbt_semantic_interfaces.implementations.time_spine import PydanticTimeSpineCustomGranularityColumn
Expand Down Expand Up @@ -79,6 +80,7 @@ def build_standard_time_spine_sources(
return time_spine_sources

@staticmethod
@lru_cache
def build_custom_time_spine_sources(time_spine_sources: Sequence[TimeSpineSource]) -> Dict[str, TimeSpineSource]:
"""Creates a set of time spine sources with custom granularities based on what's in the manifest."""
return {
Expand All @@ -99,33 +101,50 @@ def build_custom_granularities(time_spine_sources: Sequence[TimeSpineSource]) ->
}

@staticmethod
def choose_time_spine_source(
def choose_time_spine_sources(
required_time_spine_specs: Sequence[TimeDimensionSpec],
time_spine_sources: Dict[TimeGranularity, TimeSpineSource],
) -> TimeSpineSource:
"""Determine which time spine source to use to satisfy the given specs.
) -> Sequence[TimeSpineSource]:
"""Determine which time spine sources to use to satisfy the given specs.
Will choose the time spine with the largest granularity that can be used to get the smallest granularity required to
satisfy the time spine specs. Example:
Custom grains can only use the time spine where they are defined. For standard grains, this will choose the time
spine with the largest granularity that is compatible with all required standard grains. This ensures max efficiency
for the query by minimizing the number of time spine joins and the amount of aggregation required. Example:
- Time spines available: SECOND, MINUTE, DAY
- Time granularities needed for request: HOUR, DAY
--> Selected time spine: MINUTE
Note time spines are identified by their base granularity.
"""
assert required_time_spine_specs, (
"Choosing time spine source requires time spine specs, but the `required_time_spine_specs` param is empty. "
"This indicates internal misconfiguration."
)
smallest_agg_time_grain = min(spec.time_granularity.base_granularity for spec in required_time_spine_specs)
time_spine_grains = time_spine_sources.keys()
compatible_time_spine_grains = [
grain for grain in time_spine_grains if grain.to_int() <= smallest_agg_time_grain.to_int()
]
if not compatible_time_spine_grains:

# Each custom grain can only be satisfied by one time spine.
custom_time_spines = TimeSpineSource.build_custom_time_spine_sources(tuple(time_spine_sources.values()))
required_time_spines = {
custom_time_spines[spec.time_granularity.name]
for spec in required_time_spine_specs
if spec.time_granularity.is_custom_granularity
}

# Standard grains can be satisfied by any time spine with a base grain that's <= the standard grain.
smallest_required_standard_grain = min(
spec.time_granularity.base_granularity for spec in required_time_spine_specs
)
compatible_time_spines_for_standard_grains = {
grain: time_spine_source
for grain, time_spine_source in time_spine_sources.items()
if grain.to_int() <= smallest_required_standard_grain.to_int()
}
if len(compatible_time_spines_for_standard_grains) == 0:
raise RuntimeError(
f"This query requires a time spine with granularity {smallest_agg_time_grain.name} or smaller, which is not configured. "
f"The smallest available time spine granularity is {min(time_spine_grains).name}, which is too large."
f"This query requires a time spine with granularity {smallest_required_standard_grain.name} or smaller, which is not configured. "
f"The smallest available time spine granularity is {min(time_spine_sources).name}, which is too large."
"See documentation for how to configure a new time spine: https://docs.getdbt.com/docs/build/metricflow-time-spine"
)
return time_spine_sources[max(compatible_time_spine_grains)]

# If the standard grains can't be satisfied by the same time spines as the custom grains, add the largest compatible one.
if not required_time_spines.intersection(set(compatible_time_spines_for_standard_grains.values())):
required_time_spines.add(time_spine_sources[max(compatible_time_spines_for_standard_grains)])

return tuple(required_time_spines)
20 changes: 14 additions & 6 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,12 +1047,15 @@ def _find_source_node_recipe_non_cached(
)
# If metric_time is requested without metrics, choose appropriate time spine node to select those values from.
if linkable_specs_to_satisfy.metric_time_specs:
time_spine_node = self._source_node_set.time_spine_nodes[
TimeSpineSource.choose_time_spine_source(
required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs,
time_spine_sources=self._source_node_builder.time_spine_sources,
).base_granularity
]
time_spine_sources = TimeSpineSource.choose_time_spine_sources(
required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs,
time_spine_sources=self._source_node_builder.time_spine_sources,
)
assert len(time_spine_sources) == 1, (
"Exactly one time spine source should have been selected for base grains."
"This indicates internal misconfiguration."
)
time_spine_node = self._source_node_set.time_spine_nodes[time_spine_sources[0].base_granularity]
candidate_nodes_for_right_side_of_join += [time_spine_node]
candidate_nodes_for_left_side_of_join += [time_spine_node]
default_join_type = SqlJoinType.FULL_OUTER
Expand Down Expand Up @@ -1773,6 +1776,11 @@ def _build_aggregated_measure_from_measure_source_node(
)
if set(included_agg_time_specs) == set(filter_spec.linkable_spec_set.as_tuple):
agg_time_only_filters.append(filter_spec)
if filter_spec.linkable_spec_set.time_dimension_specs_with_custom_grain:
raise ValueError(
"Using custom granularity in filters for `join_to_timespine` metrics is not yet fully supported. "
"This feature is coming soon!"
)
else:
non_agg_time_filters.append(filter_spec)

Expand Down
3 changes: 1 addition & 2 deletions metricflow/dataset/convert_semantic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,7 @@ def build_time_spine_source_data_set(self, time_spine_source: TimeSpineSource) -
time_dimension_instances.append(custom_time_dimension_instance)
custom_select_column = SqlSelectColumn(
expr=SemanticModelToDataSetConverter._make_element_sql_expr(
table_alias=from_source_alias,
element_name=custom_granularity.column_name or custom_granularity.name,
table_alias=from_source_alias, element_name=custom_granularity.parsed_column_name
),
column_alias=custom_time_dimension_instance.associated_column.column_name,
)
Expand Down
48 changes: 28 additions & 20 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,17 @@ def _make_time_spine_data_set(
agg_time_dimension_specs.union(specs_required_for_where_constraints),
key=lambda spec: (spec.element_name, spec.time_granularity.base_granularity.to_int()),
)
time_spine_source = TimeSpineSource.choose_time_spine_source(
time_spine_sources = TimeSpineSource.choose_time_spine_sources(
required_time_spine_specs=list(required_time_spine_specs), time_spine_sources=self._time_spine_sources
)
column_expr = SqlColumnReferenceExpression.from_table_and_column_names(
# TODO: handle multiple time spine joins
assert len(time_spine_sources) == 1, (
"Join to time spine with custom granularity currently only supports one custom granularity per query. "
"Full feature coming soon."
)
time_spine_source = time_spine_sources[0]

base_column_expr = SqlColumnReferenceExpression.from_table_and_column_names(
table_alias=time_spine_table_alias, column_name=time_spine_source.base_column
)
select_columns: Tuple[SqlSelectColumn, ...] = ()
Expand All @@ -275,22 +282,24 @@ def _make_time_spine_data_set(
column_alias = self.column_association_resolver.resolve_spec(agg_time_dimension_spec).column_name
# If the requested granularity is the same as the granularity of the spine, do a direct select.
agg_time_grain = agg_time_dimension_spec.time_granularity
assert (
not agg_time_grain.is_custom_granularity
), "Custom time granularities are not yet supported for all queries."
if agg_time_grain.base_granularity == time_spine_source.base_granularity:
select_columns += (SqlSelectColumn(expr=column_expr, column_alias=column_alias),)
if (
agg_time_grain.base_granularity == time_spine_source.base_granularity
and not agg_time_grain.is_custom_granularity
):
expr: SqlExpressionNode = base_column_expr
apply_group_by = False
# If any columns have a different granularity, apply a DATE_TRUNC().
elif agg_time_grain.is_custom_granularity:
# If any dimensions require a custom granularity, select the appropriate column.
for custom_granularity in time_spine_source.custom_granularities:
expr = SqlColumnReferenceExpression.from_table_and_column_names(
table_alias=time_spine_table_alias, column_name=custom_granularity.parsed_column_name
)
else:
select_columns += (
SqlSelectColumn(
expr=SqlDateTruncExpression.create(
time_granularity=agg_time_grain.base_granularity, arg=column_expr
),
column_alias=column_alias,
),
# If any dimensions require a different standard granularity, apply a DATE_TRUNC() to the base column.
expr = SqlDateTruncExpression.create(
time_granularity=agg_time_grain.base_granularity, arg=base_column_expr
)
select_columns += (SqlSelectColumn(expr=expr, column_alias=column_alias),)
# TODO: also handle date part.

output_instance_set = InstanceSet(
Expand Down Expand Up @@ -1334,7 +1343,6 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
"configured incorrectly."
)
agg_time_dimension_instance_for_join = agg_time_dimension_instances[0]
agg_time_dim_for_join_with_base_grain = agg_time_dimension_instance_for_join.spec.with_base_grain()

# Build time spine data set using the requested agg_time_dimension name.
time_spine_alias = self._next_unique_table_alias()
Expand All @@ -1349,7 +1357,7 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
node=node,
time_spine_alias=time_spine_alias,
agg_time_dimension_column_name=self.column_association_resolver.resolve_spec(
agg_time_dim_for_join_with_base_grain
agg_time_dimension_instance_for_join.spec
).column_name,
parent_sql_select_node=parent_data_set.checked_sql_select_node,
parent_alias=parent_alias,
Expand Down Expand Up @@ -1389,12 +1397,12 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
# Select matching instance from time spine data set (using base grain - custom grain will be joined in a later node).
original_time_spine_dim_instance: Optional[TimeDimensionInstance] = None
for time_dimension_instance in time_spine_dataset.instance_set.time_dimension_instances:
if time_dimension_instance.spec == agg_time_dim_for_join_with_base_grain:
if time_dimension_instance.spec == agg_time_dimension_instance_for_join.spec:
original_time_spine_dim_instance = time_dimension_instance
break
assert original_time_spine_dim_instance, (
"Couldn't find requested agg_time_dimension_instance_for_join in time spine data set, which "
f"indicates it may have been configured incorrectly. Expected: {agg_time_dim_for_join_with_base_grain};"
f"indicates it may have been configured incorrectly. Expected: {agg_time_dimension_instance_for_join.spec};"
f" Got: {[instance.spec for instance in time_spine_dataset.instance_set.time_dimension_instances]}"
)
time_spine_column_select_expr: Union[
Expand Down Expand Up @@ -1495,7 +1503,7 @@ def _get_custom_granularity_column_name(self, custom_granularity_name: str) -> s
time_spine_source = self._get_time_spine_for_custom_granularity(custom_granularity_name)
for custom_granularity in time_spine_source.custom_granularities:
if custom_granularity.name == custom_granularity_name:
return custom_granularity.column_name if custom_granularity.column_name else custom_granularity.name
return custom_granularity.parsed_column_name

raise RuntimeError(
f"Custom granularity {custom_granularity} not found. This indicates internal misconfiguration."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,28 @@ def test_join_to_time_spine_with_filter_not_in_group_by_using_agg_time_and_metri
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_join_to_time_spine_with_custom_grain_in_group_by( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=["bookings_join_to_time_spine"],
group_by_names=["booking__ds__martian_day"],
order_by_names=["booking__ds__martian_day"],
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)
27 changes: 27 additions & 0 deletions tests_metricflow/integration/test_cases/itest_granularity.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -934,3 +934,30 @@ integration_test:
ON subq_11.user = subq_14.user AND subq_11.ds__day <= subq_14.ds__day
) subq_15
) subq_18
---
integration_test:
name: test_join_to_time_spine_metric_grouped_by_custom_grain
description: Test a join to time spine metric with custom grain in group by
model: SIMPLE_MODEL
metrics: ["bookings_join_to_time_spine"]
group_bys: ["metric_time__martian_day"]
check_query: |
SELECT
subq_6.metric_time__martian_day
, subq_5.bookings AS bookings_join_to_time_spine
FROM (
SELECT
martian_day AS metric_time__martian_day
FROM {{ source_schema }}.mf_time_spine subq_7
GROUP BY martian_day
) subq_6
LEFT OUTER JOIN (
SELECT
subq_2.martian_day AS metric_time__martian_day
, SUM(1) AS bookings
FROM {{ source_schema }}.fct_bookings b
LEFT OUTER JOIN {{ source_schema }}.mf_time_spine subq_2
ON {{ render_date_trunc("b.ds", TimeGranularity.DAY) }} = subq_2.ds
GROUP BY subq_2.martian_day
) subq_5
ON subq_6.metric_time__martian_day = subq_5.metric_time__martian_day
24 changes: 24 additions & 0 deletions tests_metricflow/query_rendering/test_custom_granularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,3 +532,27 @@ def test_conversion_metric_with_custom_granularity_filter_not_in_group_by( # no
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_join_to_time_spine_metric_grouped_by_custom_grain( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_join_to_time_spine",),
group_by_names=("metric_time__martian_day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)
Loading

0 comments on commit ae8ea72

Please sign in to comment.