Skip to content

Commit

Permalink
Foundations for custom granularity (#1388)
Browse files Browse the repository at this point in the history
A few small changes that will be used in the dataflow plan for custom
granularities. Mostly some small cleanup items (detailed in commit
messages). Also adds a custom granularity called `martian_day` to the
daily time spine. This increments one day for every thousand days. Not
my decision. That's just how the martians do it. 👽
  • Loading branch information
courtneyholcomb authored Sep 10, 2024
1 parent 0ace6d6 commit b8e3e3a
Show file tree
Hide file tree
Showing 10 changed files with 15,020 additions and 14,997 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __init__(
# Sort semantic models by name for consistency in building derived objects.
self._semantic_models = sorted(self._semantic_manifest.semantic_models, key=lambda x: x.name)
self._join_evaluator = SemanticModelJoinEvaluator(semantic_model_lookup)
self._time_spine_sources = TimeSpineSource.create_from_manifest(self._semantic_manifest)
self._time_spine_sources = TimeSpineSource.build_standard_time_spine_sources(self._semantic_manifest)

assert max_entity_links >= 0
self._max_entity_links = max_entity_links
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ def with_grain(self, time_granularity: ExpandedTimeGranularity) -> TimeDimension
aggregation_state=self.aggregation_state,
)

def with_grain_and_date_part( # noqa: D102
self, time_granularity: ExpandedTimeGranularity, date_part: Optional[DatePart]
) -> TimeDimensionSpec:
return TimeDimensionSpec(
element_name=self.element_name,
entity_links=self.entity_links,
time_granularity=time_granularity,
date_part=date_part,
aggregation_state=self.aggregation_state,
)

def with_aggregation_state(self, aggregation_state: AggregationState) -> TimeDimensionSpec: # noqa: D102
return TimeDimensionSpec(
element_name=self.element_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ project_configuration:
primary_column:
name: ts
time_granularity: hour
- node_relation:
alias: mf_time_spine
schema_name: $source_schema
primary_column:
name: ds
time_granularity: day
custom_granularities:
- name: martian_day
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ class TimeSpineSource:
# The time granularity of the base column.
base_granularity: TimeGranularity = DEFAULT_TIME_GRANULARITY
db_name: Optional[str] = None
custom_granularity_columns: Sequence[str] = ()
custom_granularities: Sequence[str] = ()

@property
def spine_table(self) -> SqlTable:
"""Table containing all dates."""
return SqlTable(schema_name=self.schema_name, table_name=self.table_name, db_name=self.db_name)

@staticmethod
def create_from_manifest(semantic_manifest: SemanticManifest) -> Dict[TimeGranularity, TimeSpineSource]:
def build_standard_time_spine_sources(
semantic_manifest: SemanticManifest,
) -> Dict[TimeGranularity, TimeSpineSource]:
"""Creates a time spine source based on what's in the manifest."""
time_spine_sources = {
time_spine.primary_column.time_granularity: TimeSpineSource(
Expand All @@ -46,7 +48,7 @@ def create_from_manifest(semantic_manifest: SemanticManifest) -> Dict[TimeGranul
db_name=time_spine.node_relation.database,
base_column=time_spine.primary_column.name,
base_granularity=time_spine.primary_column.time_granularity,
custom_granularity_columns=[column.name for column in time_spine.custom_granularities],
custom_granularities=[column.name for column in time_spine.custom_granularities],
)
for time_spine in semantic_manifest.project_configuration.time_spines
}
Expand Down
2 changes: 1 addition & 1 deletion metricflow/dataflow/builder/source_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__( # noqa: D107
self._semantic_manifest_lookup = semantic_manifest_lookup
data_set_converter = SemanticModelToDataSetConverter(column_association_resolver)
self._time_spine_source_nodes = {}
for granularity, time_spine_source in TimeSpineSource.create_from_manifest(
for granularity, time_spine_source in TimeSpineSource.build_standard_time_spine_sources(
semantic_manifest_lookup.semantic_manifest
).items():
data_set = data_set_converter.build_time_spine_source_data_set(time_spine_source)
Expand Down
4 changes: 3 additions & 1 deletion metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,9 @@ def __init__(
DunderColumnAssociationResolver(semantic_manifest_lookup)
)
self._time_source = time_source
self._time_spine_sources = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest)
self._time_spine_sources = TimeSpineSource.build_standard_time_spine_sources(
semantic_manifest_lookup.semantic_manifest
)
self._source_data_sets: List[SemanticModelDataSet] = []
converter = SemanticModelToDataSetConverter(column_association_resolver=self._column_association_resolver)
for semantic_model in sorted(
Expand Down
13 changes: 5 additions & 8 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from metricflow_semantics.specs.metadata_spec import MetadataSpec
from metricflow_semantics.specs.metric_spec import MetricSpec
from metricflow_semantics.specs.spec_set import InstanceSpecSet
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.sql.sql_join_type import SqlJoinType
from metricflow_semantics.time.time_constants import ISO8601_PYTHON_FORMAT, ISO8601_PYTHON_TS_FORMAT
from metricflow_semantics.time.time_spine_source import TIME_SPINE_DATA_SET_DESCRIPTION, TimeSpineSource
Expand Down Expand Up @@ -196,7 +195,9 @@ def __init__(
self._semantic_manifest_lookup = semantic_manifest_lookup
self._metric_lookup = semantic_manifest_lookup.metric_lookup
self._semantic_model_lookup = semantic_manifest_lookup.semantic_model_lookup
self._time_spine_sources = TimeSpineSource.create_from_manifest(semantic_manifest_lookup.semantic_manifest)
self._time_spine_sources = TimeSpineSource.build_standard_time_spine_sources(
semantic_manifest_lookup.semantic_manifest
)

@property
def column_association_resolver(self) -> ColumnAssociationResolver: # noqa: D102
Expand Down Expand Up @@ -1408,12 +1409,8 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
# Apply date_part to time spine column select expression.
if time_dimension_spec.date_part:
select_expr = SqlExtractExpression.create(date_part=time_dimension_spec.date_part, arg=select_expr)
time_dim_spec = TimeDimensionSpec(
element_name=original_time_spine_dim_instance.spec.element_name,
entity_links=original_time_spine_dim_instance.spec.entity_links,
time_granularity=time_dimension_spec.time_granularity,
date_part=time_dimension_spec.date_part,
aggregation_state=original_time_spine_dim_instance.spec.aggregation_state,
time_dim_spec = original_time_spine_dim_instance.spec.with_grain_and_date_part(
time_granularity=time_dimension_spec.time_granularity, date_part=time_dimension_spec.date_part
)
time_spine_dim_instance = TimeDimensionInstance(
defined_from=original_time_spine_dim_instance.defined_from,
Expand Down
Loading

0 comments on commit b8e3e3a

Please sign in to comment.