Skip to content

Commit

Permalink
Support joining to time spine on a custom grain column
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Nov 4, 2024
1 parent 272f738 commit aaad04c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 35 deletions.
51 changes: 35 additions & 16 deletions metricflow-semantics/metricflow_semantics/time/time_spine_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, Optional, Sequence

from dbt_semantic_interfaces.implementations.time_spine import PydanticTimeSpineCustomGranularityColumn
Expand Down Expand Up @@ -79,6 +80,7 @@ def build_standard_time_spine_sources(
return time_spine_sources

@staticmethod
@lru_cache
def build_custom_time_spine_sources(time_spine_sources: Sequence[TimeSpineSource]) -> Dict[str, TimeSpineSource]:
"""Creates a set of time spine sources with custom granularities based on what's in the manifest."""
return {
Expand All @@ -99,33 +101,50 @@ def build_custom_granularities(time_spine_sources: Sequence[TimeSpineSource]) ->
}

@staticmethod
def choose_time_spine_source(
def choose_time_spine_sources(
required_time_spine_specs: Sequence[TimeDimensionSpec],
time_spine_sources: Dict[TimeGranularity, TimeSpineSource],
) -> TimeSpineSource:
"""Determine which time spine source to use to satisfy the given specs.
) -> Sequence[TimeSpineSource]:
"""Determine which time spine sources to use to satisfy the given specs.
Will choose the time spine with the largest granularity that can be used to get the smallest granularity required to
satisfy the time spine specs. Example:
Custom grains can only use the time spine where they are defined. For standard grains, this will choose the time
spine with the largest granularity that is compatible with all required standard grains. This ensures max efficiency
for the query by minimizing the number of time spine joins and the amount of aggregation required. Example:
- Time spines available: SECOND, MINUTE, DAY
- Time granularities needed for request: HOUR, DAY
--> Selected time spine: MINUTE
Note time spines are identified by their base granularity.
"""
assert required_time_spine_specs, (
"Choosing time spine source requires time spine specs, but the `required_time_spine_specs` param is empty. "
"This indicates internal misconfiguration."
)
smallest_agg_time_grain = min(spec.time_granularity.base_granularity for spec in required_time_spine_specs)
time_spine_grains = time_spine_sources.keys()
compatible_time_spine_grains = [
grain for grain in time_spine_grains if grain.to_int() <= smallest_agg_time_grain.to_int()
]
if not compatible_time_spine_grains:

# Each custom grain can only be satisfied by one time spine.
custom_time_spines = TimeSpineSource.build_custom_time_spine_sources(tuple(time_spine_sources.values()))
required_time_spines = {
custom_time_spines[spec.time_granularity.name]
for spec in required_time_spine_specs
if spec.time_granularity.is_custom_granularity
}

# Standard grains can be satisfied by any time spine with a base grain that's <= the standard grain.
smallest_required_standard_grain = min(
spec.time_granularity.base_granularity for spec in required_time_spine_specs
)
compatible_time_spines_for_standard_grains = {
grain: time_spine_source
for grain, time_spine_source in time_spine_sources.items()
if grain.to_int() <= smallest_required_standard_grain.to_int()
}
if not compatible_time_spines_for_standard_grains:
raise RuntimeError(
f"This query requires a time spine with granularity {smallest_agg_time_grain.name} or smaller, which is not configured. "
f"The smallest available time spine granularity is {min(time_spine_grains).name}, which is too large."
f"This query requires a time spine with granularity {smallest_required_standard_grain.name} or smaller, which is not configured. "
f"The smallest available time spine granularity is {min(time_spine_sources.keys()).name}, which is too large."
"See documentation for how to configure a new time spine: https://docs.getdbt.com/docs/build/metricflow-time-spine"
)
return time_spine_sources[max(compatible_time_spine_grains)]

# If the standard grains can't be satisfied by the same time spines as the custom grains, add the largest compatible one.
if not required_time_spines.intersection(set(compatible_time_spines_for_standard_grains.values())):
required_time_spines.add(time_spine_sources[max(compatible_time_spines_for_standard_grains.keys())])

return tuple(required_time_spines)
19 changes: 13 additions & 6 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,12 +1043,14 @@ def _find_source_node_recipe_non_cached(
)
# If metric_time is requested without metrics, choose appropriate time spine node to select those values from.
if linkable_specs_to_satisfy.metric_time_specs:
time_spine_node = self._source_node_set.time_spine_nodes[
TimeSpineSource.choose_time_spine_source(
required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs,
time_spine_sources=self._source_node_builder.time_spine_sources,
).base_granularity
]
time_spine_sources = TimeSpineSource.choose_time_spine_sources(
required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs,
time_spine_sources=self._source_node_builder.time_spine_sources,
)
assert (
len(time_spine_sources) == 1
), "Only one time spine source should have been selected for base grains. This indicates internal misconfiguration."
time_spine_node = self._source_node_set.time_spine_nodes[time_spine_sources[0].base_granularity]
candidate_nodes_for_right_side_of_join += [time_spine_node]
candidate_nodes_for_left_side_of_join += [time_spine_node]
default_join_type = SqlJoinType.FULL_OUTER
Expand Down Expand Up @@ -1769,6 +1771,11 @@ def _build_aggregated_measure_from_measure_source_node(
)
if set(included_agg_time_specs) == set(filter_spec.linkable_spec_set.as_tuple):
agg_time_only_filters.append(filter_spec)
if filter_spec.linkable_spec_set.time_dimension_specs_with_custom_grain:
raise ValueError(
"Using custom granularity in filters for `join_to_timespine` metrics is not yet fully supported. "
"This feature is coming soon!"
)
else:
non_agg_time_filters.append(filter_spec)

Expand Down
3 changes: 1 addition & 2 deletions metricflow/dataset/convert_semantic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,7 @@ def build_time_spine_source_data_set(self, time_spine_source: TimeSpineSource) -
time_dimension_instances.append(custom_time_dimension_instance)
custom_select_column = SqlSelectColumn(
expr=SemanticModelToDataSetConverter._make_element_sql_expr(
table_alias=from_source_alias,
element_name=custom_granularity.column_name or custom_granularity.name,
table_alias=from_source_alias, element_name=custom_granularity.parsed_column_name
),
column_alias=custom_time_dimension_instance.associated_column.column_name,
)
Expand Down
42 changes: 31 additions & 11 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,16 @@ def _make_time_spine_data_set(
agg_time_dimension_specs.union(specs_required_for_where_constraints),
key=lambda spec: (spec.element_name, spec.time_granularity.base_granularity.to_int()),
)
time_spine_source = TimeSpineSource.choose_time_spine_source(
time_spine_sources = TimeSpineSource.choose_time_spine_sources(
required_time_spine_specs=list(required_time_spine_specs), time_spine_sources=self._time_spine_sources
)
# TODO: handle multiple time spine joins
assert len(time_spine_sources) == 1, (
"Join to time spine with custom granularity currently only supports one custom granularity per query. "
"Full feature coming soon."
)
time_spine_source = time_spine_sources[0]

column_expr = SqlColumnReferenceExpression.from_table_and_column_names(
table_alias=time_spine_table_alias, column_name=time_spine_source.base_column
)
Expand All @@ -277,14 +284,25 @@ def _make_time_spine_data_set(
column_alias = self.column_association_resolver.resolve_spec(agg_time_dimension_spec).column_name
# If the requested granularity is the same as the granularity of the spine, do a direct select.
agg_time_grain = agg_time_dimension_spec.time_granularity
assert (
not agg_time_grain.is_custom_granularity
), "Custom time granularities are not yet supported for all queries."
if agg_time_grain.base_granularity == time_spine_source.base_granularity:
if (
agg_time_grain.base_granularity == time_spine_source.base_granularity
and not agg_time_grain.is_custom_granularity
):
select_columns += (SqlSelectColumn(expr=column_expr, column_alias=column_alias),)
apply_group_by = False
# If any columns have a different granularity, apply a DATE_TRUNC().
elif agg_time_grain.is_custom_granularity:
# If any dimensions require a custom granularity, select the appropriate column.
for custom_granularity in time_spine_source.custom_granularities:
select_columns += (
SqlSelectColumn(
expr=SqlColumnReferenceExpression.from_table_and_column_names(
table_alias=time_spine_table_alias, column_name=custom_granularity.parsed_column_name
),
column_alias=column_alias,
),
)
else:
# If any dimensions require a different standard granularity, apply a DATE_TRUNC() to the base column.
select_columns += (
SqlSelectColumn(
expr=SqlDateTruncExpression.create(
Expand Down Expand Up @@ -1337,10 +1355,12 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
"configured incorrectly."
)
agg_time_dimension_instance_for_join = agg_time_dimension_instances[0]
agg_time_dim_for_join_with_base_grain = agg_time_dimension_instance_for_join.spec.with_base_grain()

# Build time spine data set using the requested agg_time_dimension name.
time_spine_alias = self._next_unique_table_alias()
# TODO: make sure we are joining on the custom grain column!! Seems like we aren't.
# TODO: first, error if all grains can't be satisfied by the same time spine. Later, support multiple time spine joins in one request.
# This should only be needed if there are multiple custom grains from diff time spines, or a combo or custom grain and standard grain that require diff time spines.
time_spine_dataset = self._make_time_spine_data_set(
agg_time_dimension_instances=(agg_time_dimension_instance_for_join,),
time_range_constraint=node.time_range_constraint,
Expand All @@ -1352,7 +1372,7 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
node=node,
time_spine_alias=time_spine_alias,
agg_time_dimension_column_name=self.column_association_resolver.resolve_spec(
agg_time_dim_for_join_with_base_grain
agg_time_dimension_instance_for_join.spec
).column_name,
parent_sql_select_node=parent_data_set.checked_sql_select_node,
parent_alias=parent_alias,
Expand Down Expand Up @@ -1392,12 +1412,12 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
# Select matching instance from time spine data set (using base grain - custom grain will be joined in a later node).
original_time_spine_dim_instance: Optional[TimeDimensionInstance] = None
for time_dimension_instance in time_spine_dataset.instance_set.time_dimension_instances:
if time_dimension_instance.spec == agg_time_dim_for_join_with_base_grain:
if time_dimension_instance.spec == agg_time_dimension_instance_for_join.spec:
original_time_spine_dim_instance = time_dimension_instance
break
assert original_time_spine_dim_instance, (
"Couldn't find requested agg_time_dimension_instance_for_join in time spine data set, which "
f"indicates it may have been configured incorrectly. Expected: {agg_time_dim_for_join_with_base_grain};"
f"indicates it may have been configured incorrectly. Expected: {agg_time_dimension_instance_for_join.spec};"
f" Got: {[instance.spec for instance in time_spine_dataset.instance_set.time_dimension_instances]}"
)
time_spine_column_select_expr: Union[
Expand Down Expand Up @@ -1498,7 +1518,7 @@ def _get_custom_granularity_column_name(self, custom_granularity_name: str) -> s
time_spine_source = self._get_time_spine_for_custom_granularity(custom_granularity_name)
for custom_granularity in time_spine_source.custom_granularities:
if custom_granularity.name == custom_granularity_name:
return custom_granularity.column_name if custom_granularity.column_name else custom_granularity.name
return custom_granularity.parsed_column_name

raise RuntimeError(
f"Custom granularity {custom_granularity} not found. This indicates internal misconfiguration."
Expand Down

0 comments on commit aaad04c

Please sign in to comment.