Skip to content

Commit

Permalink
Add custom granularities to TimeSpineSource and rename other fields
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Aug 21, 2024
1 parent 28489f3 commit 8637c35
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 22 deletions.
25 changes: 15 additions & 10 deletions metricflow-semantics/metricflow_semantics/time/time_spine_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from dataclasses import dataclass
from typing import Dict, Optional
from typing import Dict, Optional, Sequence

from dbt_semantic_interfaces.protocols import SemanticManifest
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
Expand All @@ -17,15 +17,19 @@

@dataclass(frozen=True)
class TimeSpineSource:
"""Defines a source table containing all timestamps to use for computing cumulative metrics."""
"""A calendar table. Should contain at least one column with dates/times that map to a standard granularity.
Dates should be contiguous. May also contain custom granularity columns.
"""

schema_name: str
table_name: str = "mf_time_spine"
# Name of the column in the table that contains the dates.
time_column_name: str = "ds"
# The time granularity of the dates in the spine table.
time_column_granularity: TimeGranularity = DEFAULT_TIME_GRANULARITY
# Name of the column in the table that contains date/time values that map to a standard granularity.
base_column: str = "ds"
# The time granularity of the base column.
base_granularity: TimeGranularity = DEFAULT_TIME_GRANULARITY
db_name: Optional[str] = None
custom_granularity_columns: Sequence[str] = ()

@property
def spine_table(self) -> SqlTable:
Expand All @@ -40,8 +44,9 @@ def create_from_manifest(semantic_manifest: SemanticManifest) -> Dict[TimeGranul
schema_name=time_spine.node_relation.schema_name,
table_name=time_spine.node_relation.alias,
db_name=time_spine.node_relation.database,
time_column_name=time_spine.primary_column.name,
time_column_granularity=time_spine.primary_column.time_granularity,
base_column=time_spine.primary_column.name,
base_granularity=time_spine.primary_column.time_granularity,
custom_granularity_columns=[column.name for column in time_spine.custom_granularity_columns],
)
for time_spine in semantic_manifest.project_configuration.time_spines
}
Expand All @@ -56,8 +61,8 @@ def create_from_manifest(semantic_manifest: SemanticManifest) -> Dict[TimeGranul
schema_name=time_spine_table.schema_name,
table_name=time_spine_table.table_name,
db_name=time_spine_table.db_name,
time_column_name=legacy_time_spine.column_name,
time_column_granularity=legacy_time_spine.grain,
base_column=legacy_time_spine.column_name,
base_granularity=legacy_time_spine.grain,
)

# Sanity check: this should have been validated during manifest parsing.
Expand Down
2 changes: 1 addition & 1 deletion metricflow/dataflow/builder/source_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__( # noqa: D107
data_set = data_set_converter.build_time_spine_source_data_set(time_spine_source)
self._time_spine_source_nodes[granularity] = MetricTimeDimensionTransformNode.create(
parent_node=ReadSqlSourceNode.create(data_set),
aggregation_time_dimension_reference=TimeDimensionReference(time_spine_source.time_column_name),
aggregation_time_dimension_reference=TimeDimensionReference(time_spine_source.base_column),
)

self._query_parser = MetricFlowQueryParser(semantic_manifest_lookup)
Expand Down
4 changes: 2 additions & 2 deletions metricflow/dataset/convert_semantic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,8 @@ def create_sql_source_data_set(self, semantic_model: SemanticModel) -> SemanticM
def build_time_spine_source_data_set(self, time_spine_source: TimeSpineSource) -> SqlDataSet:
"""Build data set for time spine."""
from_source_alias = SequentialIdGenerator.create_next_id(StaticIdPrefix.TIME_SPINE_SOURCE).str_value
defined_time_granularity = time_spine_source.time_column_granularity
time_column_name = time_spine_source.time_column_name
defined_time_granularity = time_spine_source.base_granularity
time_column_name = time_spine_source.base_column

time_dimension_instances: List[TimeDimensionInstance] = []
select_columns: List[SqlSelectColumn] = []
Expand Down
6 changes: 3 additions & 3 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,15 @@ def _make_time_spine_data_set(

time_spine_source = self._choose_time_spine_source(agg_time_dimension_instances)
column_expr = SqlColumnReferenceExpression.from_table_and_column_names(
table_alias=time_spine_table_alias, column_name=time_spine_source.time_column_name
table_alias=time_spine_table_alias, column_name=time_spine_source.base_column
)
select_columns: Tuple[SqlSelectColumn, ...] = ()
apply_group_by = False
for agg_time_dimension_instance in agg_time_dimension_instances:
column_alias = self.column_association_resolver.resolve_spec(agg_time_dimension_instance.spec).column_name
# If the requested granularity is the same as the granularity of the spine, do a direct select.
# TODO: also handle date part.
if agg_time_dimension_instance.spec.time_granularity == time_spine_source.time_column_granularity:
if agg_time_dimension_instance.spec.time_granularity == time_spine_source.base_granularity:
select_columns += (SqlSelectColumn(expr=column_expr, column_alias=column_alias),)
# If any columns have a different granularity, apply a DATE_TRUNC() and aggregate via group_by.
else:
Expand All @@ -308,7 +308,7 @@ def _make_time_spine_data_set(
where=(
_make_time_range_comparison_expr(
table_alias=time_spine_table_alias,
column_alias=time_spine_source.time_column_name,
column_alias=time_spine_source.base_column,
time_range_constraint=time_range_constraint,
)
if time_range_constraint
Expand Down
6 changes: 3 additions & 3 deletions tests_metricflow/fixtures/dataflow_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def time_spine_sources( # noqa: D103
) -> Mapping[TimeGranularity, TimeSpineSource]:
legacy_time_spine_grain = TimeGranularity.DAY
time_spine_base_table_name = "mf_time_spine"
print("expected schema name:", mf_test_configuration.mf_source_schema)

# Legacy time spine
time_spine_sources = {
legacy_time_spine_grain: TimeSpineSource(
Expand All @@ -113,8 +113,8 @@ def time_spine_sources( # noqa: D103
time_spine_sources[granularity] = TimeSpineSource(
schema_name=mf_test_configuration.mf_source_schema,
table_name=f"{time_spine_base_table_name}_{granularity.value}",
time_column_name="ts",
time_column_granularity=granularity,
base_column="ts",
base_granularity=granularity,
)

return time_spine_sources
2 changes: 1 addition & 1 deletion tests_metricflow/fixtures/table_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def check_time_spine_source(

assert len(time_spine_snapshot.column_definitions) == 1
time_column = time_spine_snapshot.column_definitions[0]
assert time_column.name == time_spine_source.time_column_name
assert time_column.name == time_spine_source.base_column


@pytest.fixture(scope="session")
Expand Down
4 changes: 2 additions & 2 deletions tests_metricflow/plan_conversion/test_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def test_date_spine_date_range( # noqa: D103
textwrap.dedent(
f"""\
SELECT
MIN({time_spine_source.time_column_name})
, MAX({time_spine_source.time_column_name})
MIN({time_spine_source.base_column})
, MAX({time_spine_source.base_column})
FROM {time_spine_source.spine_table.sql}
""",
)
Expand Down

0 comments on commit 8637c35

Please sign in to comment.