-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support multiple time spines using new time spine configs #1348
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,10 @@ | ||
from __future__ import annotations | ||
|
||
from dataclasses import dataclass | ||
from typing import List, Sequence, Tuple | ||
from typing import Dict, List, Sequence, Tuple | ||
|
||
from dbt_semantic_interfaces.references import TimeDimensionReference | ||
from dbt_semantic_interfaces.type_enums import TimeGranularity | ||
from metricflow_semantics.model.semantic_manifest_lookup import SemanticManifestLookup | ||
from metricflow_semantics.query.query_parser import MetricFlowQueryParser | ||
from metricflow_semantics.specs.column_assoc import ColumnAssociationResolver | ||
|
@@ -36,14 +37,16 @@ class SourceNodeSet: | |
# below. See usage in `DataflowPlanBuilder`. | ||
source_nodes_for_group_by_item_queries: Tuple[DataflowPlanNode, ...] | ||
|
||
# Provides the time spine. | ||
time_spine_node: MetricTimeDimensionTransformNode | ||
# Provides the time spines. | ||
time_spine_nodes: Dict[TimeGranularity, MetricTimeDimensionTransformNode] | ||
|
||
@property | ||
def all_nodes(self) -> Sequence[DataflowPlanNode]: # noqa: D102 | ||
return ( | ||
self.source_nodes_for_metric_queries + self.source_nodes_for_group_by_item_queries + (self.time_spine_node,) | ||
) | ||
return self.source_nodes_for_metric_queries + self.source_nodes_for_group_by_item_queries | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I read below and I see why this is this way, but I think it's a bit weird - this is mainly an artifact of us expecting callers to wire in the time spine nodes as group by item source nodes. I can't come up with anything obviously better, and it's probably outside the scope of this PR to do that refactor, but we should probably address this. I suspect it'll get confusing with things like custom calendar processing. |
||
|
||
@property | ||
def time_spine_nodes_tuple(self) -> Tuple[MetricTimeDimensionTransformNode, ...]: # noqa: D102 | ||
return tuple(self.time_spine_nodes.values()) | ||
|
||
|
||
class SourceNodeBuilder: | ||
|
@@ -56,13 +59,19 @@ def __init__( # noqa: D107 | |
) -> None: | ||
self._semantic_manifest_lookup = semantic_manifest_lookup | ||
data_set_converter = SemanticModelToDataSetConverter(column_association_resolver) | ||
time_spine_source = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest) | ||
time_spine_data_set = data_set_converter.build_time_spine_source_data_set(time_spine_source) | ||
time_dim_reference = TimeDimensionReference(element_name=time_spine_source.time_column_name) | ||
self._time_spine_source_node = MetricTimeDimensionTransformNode.create( | ||
parent_node=ReadSqlSourceNode.create(data_set=time_spine_data_set), | ||
aggregation_time_dimension_reference=time_dim_reference, | ||
) | ||
self._time_spine_source_nodes = { | ||
granularity: MetricTimeDimensionTransformNode.create( | ||
parent_node=ReadSqlSourceNode.create( | ||
data_set=data_set_converter.build_time_spine_source_data_set(time_spine_source) | ||
), | ||
aggregation_time_dimension_reference=TimeDimensionReference( | ||
element_name=time_spine_source.time_column_name | ||
), | ||
) | ||
for granularity, time_spine_source in TimeSpineSource.create_from_manifest( | ||
semantic_manifest_lookup.semantic_manifest | ||
).items() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style nit - I think this would be easier to read if it's building a dict via a for loop. That's a lot of function nesting inside a dict comprehension. It'd also be a much smaller change in the diff, since we could just indent the locals inside the loop and replace the assignment with an add to dict. |
||
} | ||
self._query_parser = MetricFlowQueryParser(semantic_manifest_lookup) | ||
|
||
def create_from_data_sets(self, data_sets: Sequence[SemanticModelDataSet]) -> SourceNodeSet: | ||
|
@@ -93,8 +102,9 @@ def create_from_data_sets(self, data_sets: Sequence[SemanticModelDataSet]) -> So | |
source_nodes_for_metric_queries.append(metric_time_transform_node) | ||
|
||
return SourceNodeSet( | ||
time_spine_node=self._time_spine_source_node, | ||
source_nodes_for_group_by_item_queries=tuple(group_by_item_source_nodes) + (self._time_spine_source_node,), | ||
time_spine_nodes=self._time_spine_source_nodes, | ||
source_nodes_for_group_by_item_queries=tuple(group_by_item_source_nodes) | ||
+ tuple(self._time_spine_source_nodes.values()), | ||
source_nodes_for_metric_queries=tuple(source_nodes_for_metric_queries), | ||
) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -183,7 +183,7 @@ def __init__( | |
self._semantic_manifest_lookup = semantic_manifest_lookup | ||
self._metric_lookup = semantic_manifest_lookup.metric_lookup | ||
self._semantic_model_lookup = semantic_manifest_lookup.semantic_model_lookup | ||
self._time_spine_source = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest) | ||
self._time_spine_sources = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest) | ||
|
||
@property | ||
def column_association_resolver(self) -> ColumnAssociationResolver: # noqa: D102 | ||
|
@@ -222,24 +222,47 @@ def _next_unique_table_alias(self) -> str: | |
"""Return the next unique table alias to use in generating queries.""" | ||
return SequentialIdGenerator.create_next_id(StaticIdPrefix.SUB_QUERY).str_value | ||
|
||
def _choose_time_spine_source( | ||
self, agg_time_dimension_instances: Tuple[TimeDimensionInstance, ...] | ||
) -> TimeSpineSource: | ||
"""Determine which time spine source to use when building time spine dataset. | ||
|
||
Will choose the time spine with the largest granularity that can be used to get the smallest granularity requested | ||
in the agg time dimension instances. Example: | ||
- Time spines available: SECOND, MINUTE, DAY | ||
- Agg time dimension granularity needed for request: HOUR, DAY | ||
--> Selected time spine: MINUTE | ||
""" | ||
assert ( | ||
agg_time_dimension_instances | ||
), "Building time spine dataset requires agg_time_dimension_instances, but none were found." | ||
smallest_agg_time_grain = min(dim.spec.time_granularity for dim in agg_time_dimension_instances) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, this works but it'll start failing with custom granularities. Good thing for me to note here - we'll need to be able to go from the agg time dimension instance to the time spine index granularity and check up on that. |
||
compatible_time_spine_grains = [ | ||
grain for grain in self._time_spine_sources.keys() if grain.to_int() <= smallest_agg_time_grain.to_int() | ||
] | ||
if not compatible_time_spine_grains: | ||
raise RuntimeError( | ||
# TODO: update docs link when new docs are available | ||
f"No compatible time spine found. This query requires a time spine with granularity {smallest_agg_time_grain} or smaller. See docs to configure a new time spine: https://docs.getdbt.com/docs/build/metricflow-time-spine" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's include the smallest time spine granularity as well. Something like:
|
||
) | ||
return self._time_spine_sources[max(compatible_time_spine_grains)] | ||
|
||
def _make_time_spine_data_set( | ||
self, | ||
agg_time_dimension_instances: Tuple[TimeDimensionInstance, ...], | ||
time_spine_source: TimeSpineSource, | ||
time_range_constraint: Optional[TimeRangeConstraint] = None, | ||
) -> SqlDataSet: | ||
"""Make a time spine data set, which contains all date/time values like '2020-01-01', '2020-01-02'... | ||
"""Returns a dataset with a datetime column for each agg_time_dimension granularity requested. | ||
|
||
Returns a dataset with a column selected for each agg_time_dimension requested. | ||
Column alias will use 'metric_time' or the agg_time_dimension name depending on which the user requested. | ||
""" | ||
time_spine_instance_set = InstanceSet(time_dimension_instances=agg_time_dimension_instances) | ||
time_spine_table_alias = self._next_unique_table_alias() | ||
|
||
time_spine_source = self._choose_time_spine_source(agg_time_dimension_instances) | ||
column_expr = SqlColumnReferenceExpression.from_table_and_column_names( | ||
table_alias=time_spine_table_alias, column_name=time_spine_source.time_column_name | ||
) | ||
|
||
select_columns: Tuple[SqlSelectColumn, ...] = () | ||
apply_group_by = False | ||
for agg_time_dimension_instance in agg_time_dimension_instances: | ||
|
@@ -314,7 +337,6 @@ def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDat | |
# Assemble time_spine dataset with requested agg time dimension instances selected. | ||
time_spine_data_set = self._make_time_spine_data_set( | ||
agg_time_dimension_instances=requested_agg_time_dimension_instances, | ||
time_spine_source=self._time_spine_source, | ||
time_range_constraint=node.time_range_constraint, | ||
) | ||
table_alias_to_instance_set[time_spine_data_set_alias] = time_spine_data_set.instance_set | ||
|
@@ -1234,7 +1256,6 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet | |
time_spine_alias = self._next_unique_table_alias() | ||
time_spine_dataset = self._make_time_spine_data_set( | ||
agg_time_dimension_instances=(agg_time_dimension_instance_for_join,), | ||
time_spine_source=self._time_spine_source, | ||
time_range_constraint=node.time_range_constraint, | ||
) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,10 @@ | |
|
||
import logging | ||
from dataclasses import dataclass | ||
from typing import Optional | ||
from typing import Dict, Optional | ||
|
||
from dbt_semantic_interfaces.protocols import SemanticManifest | ||
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity | ||
from metricflow_semantics.mf_logging.pretty_print import mf_pformat | ||
from metricflow_semantics.specs.time_dimension_spec import DEFAULT_TIME_GRANULARITY | ||
|
||
from metricflow.sql.sql_table import SqlTable | ||
|
@@ -34,26 +33,42 @@ def spine_table(self) -> SqlTable: | |
return SqlTable(schema_name=self.schema_name, table_name=self.table_name, db_name=self.db_name) | ||
|
||
@staticmethod | ||
def create_from_manifest(semantic_manifest: SemanticManifest) -> TimeSpineSource: | ||
def create_from_manifest(semantic_manifest: SemanticManifest) -> Dict[TimeGranularity, TimeSpineSource]: | ||
"""Creates a time spine source based on what's in the manifest.""" | ||
time_spine_table_configurations = semantic_manifest.project_configuration.time_spine_table_configurations | ||
|
||
if not ( | ||
len(time_spine_table_configurations) == 1 | ||
and time_spine_table_configurations[0].grain == DEFAULT_TIME_GRANULARITY | ||
): | ||
raise NotImplementedError( | ||
f"Only a single time spine table configuration with {DEFAULT_TIME_GRANULARITY} is currently " | ||
f"supported. Got:\n" | ||
f"{mf_pformat(time_spine_table_configurations)}" | ||
time_spine_sources = { | ||
time_spine.primary_column.time_granularity: TimeSpineSource( | ||
schema_name=time_spine.node_relation.schema_name, | ||
table_name=time_spine.node_relation.relation_name, # is relation name the table name? double check | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. relation_name should be the table-or-view name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whoops I forgot about this comment 😅 thank you! |
||
db_name=time_spine.node_relation.database, | ||
time_column_name=time_spine.primary_column.name, | ||
time_column_granularity=time_spine.primary_column.time_granularity, | ||
) | ||
for time_spine in semantic_manifest.project_configuration.time_spines | ||
} | ||
|
||
time_spine_table_configuration = time_spine_table_configurations[0] | ||
time_spine_table = SqlTable.from_string(time_spine_table_configuration.location) | ||
return TimeSpineSource( | ||
schema_name=time_spine_table.schema_name, | ||
table_name=time_spine_table.table_name, | ||
db_name=time_spine_table.db_name, | ||
time_column_name=time_spine_table_configuration.column_name, | ||
time_column_granularity=time_spine_table_configuration.grain, | ||
) | ||
# For backward compatibility: if legacy time spine config exists in the manifest, add that time spine here for | ||
# backward compatibility. Ignore it if there is a new time spine config with the same granularity. | ||
legacy_time_spines = semantic_manifest.project_configuration.time_spine_table_configurations | ||
for legacy_time_spine in legacy_time_spines: | ||
if not time_spine_sources.get(legacy_time_spine.grain): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use
That way you don't get weird bugs with falsey hashable values. |
||
time_spine_table = SqlTable.from_string(legacy_time_spine.location) | ||
time_spine_sources[legacy_time_spine.grain] = TimeSpineSource( | ||
schema_name=time_spine_table.schema_name, | ||
table_name=time_spine_table.table_name, | ||
db_name=time_spine_table.db_name, | ||
time_column_name=legacy_time_spine.column_name, | ||
time_column_granularity=legacy_time_spine.grain, | ||
) | ||
|
||
# Sanity check: this should have been validated during manifest parsing. | ||
if not time_spine_sources: | ||
raise RuntimeError( | ||
"At least one time spine must be configured to use the semantic layer, but none were found." | ||
) | ||
|
||
return time_spine_sources | ||
|
||
|
||
# DSI validations to add: | ||
# - Check that there is only one time spine for each granularity option | ||
# - Check that there is a time spine defined at minimum DAY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might consider typing this as a Mapping so it's not considered mutable.