Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom_granularity_columns to TimeSpineSource #1373

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extra-hatch-configuration/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Jinja2>=3.1.3
dbt-semantic-interfaces==0.6.10
dbt-semantic-interfaces==0.7.1.dev0
more-itertools>=8.10.0, <10.2.0
pydantic>=1.10.0, <3.0
tabulate>=0.8.9
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# dbt Cloud depends on metricflow-semantics (dependency set in dbt-mantle), so DSI must always point to a production version here.
dbt-semantic-interfaces>=0.6.10, <2.0.0
dbt-semantic-interfaces==0.7.1.dev0
graphviz>=0.18.2, <0.21
python-dateutil>=2.9.0, <2.10.0
rapidfuzz>=3.0, <4.0
25 changes: 15 additions & 10 deletions metricflow-semantics/metricflow_semantics/time/time_spine_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from dataclasses import dataclass
from typing import Dict, Optional
from typing import Dict, Optional, Sequence

from dbt_semantic_interfaces.protocols import SemanticManifest
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
Expand All @@ -17,15 +17,19 @@

@dataclass(frozen=True)
class TimeSpineSource:
"""Defines a source table containing all timestamps to use for computing cumulative metrics."""
"""A calendar table. Should contain at least one column with dates/times that map to a standard granularity.

Dates should be contiguous. May also contain custom granularity columns.
"""

schema_name: str
table_name: str = "mf_time_spine"
# Name of the column in the table that contains the dates.
time_column_name: str = "ds"
# The time granularity of the dates in the spine table.
time_column_granularity: TimeGranularity = DEFAULT_TIME_GRANULARITY
# Name of the column in the table that contains date/time values that map to a standard granularity.
base_column: str = "ds"
# The time granularity of the base column.
base_granularity: TimeGranularity = DEFAULT_TIME_GRANULARITY
db_name: Optional[str] = None
custom_granularity_columns: Sequence[str] = ()

@property
def spine_table(self) -> SqlTable:
Expand All @@ -40,8 +44,9 @@ def create_from_manifest(semantic_manifest: SemanticManifest) -> Dict[TimeGranul
schema_name=time_spine.node_relation.schema_name,
table_name=time_spine.node_relation.alias,
db_name=time_spine.node_relation.database,
time_column_name=time_spine.primary_column.name,
time_column_granularity=time_spine.primary_column.time_granularity,
base_column=time_spine.primary_column.name,
base_granularity=time_spine.primary_column.time_granularity,
custom_granularity_columns=[column.name for column in time_spine.custom_granularity_columns],
)
for time_spine in semantic_manifest.project_configuration.time_spines
}
Expand All @@ -56,8 +61,8 @@ def create_from_manifest(semantic_manifest: SemanticManifest) -> Dict[TimeGranul
schema_name=time_spine_table.schema_name,
table_name=time_spine_table.table_name,
db_name=time_spine_table.db_name,
time_column_name=legacy_time_spine.column_name,
time_column_granularity=legacy_time_spine.grain,
base_column=legacy_time_spine.column_name,
base_granularity=legacy_time_spine.grain,
)

# Sanity check: this should have been validated during manifest parsing.
Expand Down
2 changes: 1 addition & 1 deletion metricflow/dataflow/builder/source_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__( # noqa: D107
data_set = data_set_converter.build_time_spine_source_data_set(time_spine_source)
self._time_spine_source_nodes[granularity] = MetricTimeDimensionTransformNode.create(
parent_node=ReadSqlSourceNode.create(data_set),
aggregation_time_dimension_reference=TimeDimensionReference(time_spine_source.time_column_name),
aggregation_time_dimension_reference=TimeDimensionReference(time_spine_source.base_column),
)

self._query_parser = MetricFlowQueryParser(semantic_manifest_lookup)
Expand Down
4 changes: 2 additions & 2 deletions metricflow/dataset/convert_semantic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,8 @@ def create_sql_source_data_set(self, semantic_model: SemanticModel) -> SemanticM
def build_time_spine_source_data_set(self, time_spine_source: TimeSpineSource) -> SqlDataSet:
"""Build data set for time spine."""
from_source_alias = SequentialIdGenerator.create_next_id(StaticIdPrefix.TIME_SPINE_SOURCE).str_value
defined_time_granularity = time_spine_source.time_column_granularity
time_column_name = time_spine_source.time_column_name
defined_time_granularity = time_spine_source.base_granularity
time_column_name = time_spine_source.base_column

time_dimension_instances: List[TimeDimensionInstance] = []
select_columns: List[SqlSelectColumn] = []
Expand Down
6 changes: 3 additions & 3 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,15 @@ def _make_time_spine_data_set(

time_spine_source = self._choose_time_spine_source(agg_time_dimension_instances)
column_expr = SqlColumnReferenceExpression.from_table_and_column_names(
table_alias=time_spine_table_alias, column_name=time_spine_source.time_column_name
table_alias=time_spine_table_alias, column_name=time_spine_source.base_column
)
select_columns: Tuple[SqlSelectColumn, ...] = ()
apply_group_by = False
for agg_time_dimension_instance in agg_time_dimension_instances:
column_alias = self.column_association_resolver.resolve_spec(agg_time_dimension_instance.spec).column_name
# If the requested granularity is the same as the granularity of the spine, do a direct select.
# TODO: also handle date part.
if agg_time_dimension_instance.spec.time_granularity == time_spine_source.time_column_granularity:
if agg_time_dimension_instance.spec.time_granularity == time_spine_source.base_granularity:
select_columns += (SqlSelectColumn(expr=column_expr, column_alias=column_alias),)
# If any columns have a different granularity, apply a DATE_TRUNC() and aggregate via group_by.
else:
Expand All @@ -308,7 +308,7 @@ def _make_time_spine_data_set(
where=(
_make_time_range_comparison_expr(
table_alias=time_spine_table_alias,
column_alias=time_spine_source.time_column_name,
column_alias=time_spine_source.base_column,
time_range_constraint=time_range_constraint,
)
if time_range_constraint
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ dependencies = [
"update-checker>=0.18.0, <0.19.0",
# Bug with mypy: https://github.com/pallets/click/issues/2558#issuecomment-1656546003
"click>=8.1.6",
"dbt-core @ git+https://github.com/dbt-labs/dbt-core@e53420c1d073dc81609ae7aa84cef6ee09650576#subdirectory=core",
"dbt-core @ git+https://github.com/dbt-labs/dbt-core@84eb0ff6720ec82ce4015c2657d512bf51381732#subdirectory=core",
"dbt-duckdb>=1.8.0, <1.9.0",
]

Expand Down
6 changes: 3 additions & 3 deletions tests_metricflow/fixtures/dataflow_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def time_spine_sources( # noqa: D103
) -> Mapping[TimeGranularity, TimeSpineSource]:
legacy_time_spine_grain = TimeGranularity.DAY
time_spine_base_table_name = "mf_time_spine"
print("expected schema name:", mf_test_configuration.mf_source_schema)

# Legacy time spine
time_spine_sources = {
legacy_time_spine_grain: TimeSpineSource(
Expand All @@ -113,8 +113,8 @@ def time_spine_sources( # noqa: D103
time_spine_sources[granularity] = TimeSpineSource(
schema_name=mf_test_configuration.mf_source_schema,
table_name=f"{time_spine_base_table_name}_{granularity.value}",
time_column_name="ts",
time_column_granularity=granularity,
base_column="ts",
base_granularity=granularity,
)

return time_spine_sources
2 changes: 1 addition & 1 deletion tests_metricflow/fixtures/table_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def check_time_spine_source(

assert len(time_spine_snapshot.column_definitions) == 1
time_column = time_spine_snapshot.column_definitions[0]
assert time_column.name == time_spine_source.time_column_name
assert time_column.name == time_spine_source.base_column


@pytest.fixture(scope="session")
Expand Down
4 changes: 2 additions & 2 deletions tests_metricflow/plan_conversion/test_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def test_date_spine_date_range( # noqa: D103
textwrap.dedent(
f"""\
SELECT
MIN({time_spine_source.time_column_name})
, MAX({time_spine_source.time_column_name})
MIN({time_spine_source.base_column})
, MAX({time_spine_source.base_column})
FROM {time_spine_source.spine_table.sql}
""",
)
Expand Down
Loading