Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Nov 20, 2024
1 parent 22654b7 commit 00fb89e
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 146 deletions.
13 changes: 13 additions & 0 deletions metricflow-semantics/metricflow_semantics/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,19 @@ def with_entity_prefix(
spec=transformed_spec,
)

@staticmethod
def from_properties(
spec: TimeDimensionSpec,
defined_from: Tuple[SemanticModelElementReference, ...],
column_association_resolver: ColumnAssociationResolver,
) -> TimeDimensionInstance:
"""Create a TimeDimensionInstance from specified properties."""
return TimeDimensionInstance(
associated_columns=(column_association_resolver.resolve_spec(spec),),
spec=spec,
defined_from=defined_from,
)


@dataclass(frozen=True)
class EntityInstance(LinkableInstance[EntitySpec], SemanticModelElementInstance): # noqa: D101
Expand Down
20 changes: 15 additions & 5 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,15 +643,17 @@ def _build_derived_metric_output_node(

# For ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
if metric_spec.has_time_offset:
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
required_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
assert (
queried_agg_time_dimension_specs
required_agg_time_dimension_specs
), "Joining to time spine requires querying with metric_time or the appropriate agg_time_dimension."
output_node = JoinToTimeSpineNode.create(
parent_node=output_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
requested_agg_time_dimension_specs=[
spec.with_base_grain() for spec in required_agg_time_dimension_specs
],
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=metric_spec.offset_window,
Expand Down Expand Up @@ -1590,10 +1592,14 @@ def _build_aggregated_measure_from_measure_source_node(
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_measure(
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
)
required_agg_time_dimension_specs = required_linkable_specs.included_agg_time_dimension_specs_for_measure(
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
)

# If a cumulative metric is queried with metric_time / agg_time_dimension, join over time range.
# Otherwise, the measure will be aggregated over all time.
unaggregated_measure_node: DataflowPlanNode = measure_recipe.source_node
# TODO: can we use required here, too?
if cumulative and queried_agg_time_dimension_specs:
unaggregated_measure_node = JoinOverTimeRangeNode.create(
parent_node=unaggregated_measure_node,
Expand All @@ -1619,7 +1625,11 @@ def _build_aggregated_measure_from_measure_source_node(
# in join rendering
unaggregated_measure_node = JoinToTimeSpineNode.create(
parent_node=unaggregated_measure_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
requested_agg_time_dimension_specs=[
spec
for spec in required_agg_time_dimension_specs
if not spec.time_granularity.is_custom_granularity
],
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=before_aggregation_time_spine_join_description.offset_window,
Expand Down Expand Up @@ -1691,7 +1701,7 @@ def _build_aggregated_measure_from_measure_source_node(
# like JoinToCustomGranularityNode, WhereConstraintNode, etc.
output_node: DataflowPlanNode = JoinToTimeSpineNode.create(
parent_node=aggregate_measures_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
requested_agg_time_dimension_specs=required_agg_time_dimension_specs,
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
join_type=after_aggregation_time_spine_join_description.join_type,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
Expand Down
7 changes: 4 additions & 3 deletions metricflow/dataflow/nodes/join_over_time.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Sequence
from typing import Optional, Sequence, Tuple

from dbt_semantic_interfaces.protocols import MetricTimeWindow
from dbt_semantic_interfaces.type_enums import TimeGranularity
Expand All @@ -15,6 +15,7 @@
from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor


# TODO: Shoult this class be combined with JoinToTimeSpineNode?
@dataclass(frozen=True, eq=False)
class JoinOverTimeRangeNode(DataflowPlanNode):
"""A node that allows for cumulative metric computation by doing a self join across a cumulative date range.
Expand All @@ -26,7 +27,7 @@ class JoinOverTimeRangeNode(DataflowPlanNode):
time_range_constraint: Time range to aggregate over.
"""

queried_agg_time_dimension_specs: Sequence[TimeDimensionSpec]
queried_agg_time_dimension_specs: Tuple[TimeDimensionSpec, ...]
window: Optional[MetricTimeWindow]
grain_to_date: Optional[TimeGranularity]
time_range_constraint: Optional[TimeRangeConstraint]
Expand All @@ -38,7 +39,7 @@ def __post_init__(self) -> None: # noqa: D105
@staticmethod
def create( # noqa: D102
parent_node: DataflowPlanNode,
queried_agg_time_dimension_specs: Sequence[TimeDimensionSpec],
queried_agg_time_dimension_specs: Tuple[TimeDimensionSpec, ...],
window: Optional[MetricTimeWindow] = None,
grain_to_date: Optional[TimeGranularity] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
Expand Down
2 changes: 2 additions & 0 deletions metricflow/dataflow/nodes/join_to_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
"""

# TODO: rename property to required_agg_time_dimension_specs
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec]
# TODO remove this property
use_custom_agg_time_dimension: bool
join_type: SqlJoinType
time_range_constraint: Optional[TimeRangeConstraint]
Expand Down
40 changes: 32 additions & 8 deletions metricflow/dataset/sql_dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import List, Optional, Sequence
from dataclasses import dataclass
from typing import List, Optional, Sequence, Tuple

from dbt_semantic_interfaces.references import SemanticModelReference
from metricflow_semantics.assert_one_arg import assert_exactly_one_arg_set
Expand Down Expand Up @@ -124,26 +125,27 @@ def column_association_for_dimension(

def instances_for_time_dimensions(
self, time_dimension_specs: Sequence[TimeDimensionSpec]
) -> List[TimeDimensionInstance]:
) -> Tuple[TimeDimensionInstance, ...]:
"""Return the instances associated with these specs in the data set."""
time_dimension_specs_set = set(time_dimension_specs)
matching_instances = 0
instances_to_return: List[TimeDimensionInstance] = []
instances_to_return: Tuple[TimeDimensionInstance, ...] = ()
for time_dimension_instance in self.instance_set.time_dimension_instances:
if time_dimension_instance.spec in time_dimension_specs:
instances_to_return.append(time_dimension_instance)
if time_dimension_instance.spec in time_dimension_specs_set:
instances_to_return += (time_dimension_instance,)
matching_instances += 1

if matching_instances != len(time_dimension_specs):
if matching_instances != len(time_dimension_specs_set):
raise RuntimeError(
f"Unexpected number of time dimension instances found matching specs.\nSpecs: {time_dimension_specs}\n"
f"Unexpected number of time dimension instances found matching specs.\nSpecs: {time_dimension_specs_set}\n"
f"Instances: {instances_to_return}"
)

return instances_to_return

def instance_for_time_dimension(self, time_dimension_spec: TimeDimensionSpec) -> TimeDimensionInstance:
"""Given the name of the time dimension, return the instance associated with it in the data set."""
return self.instances_for_time_dimensions([time_dimension_spec])[0]
return self.instances_for_time_dimensions((time_dimension_spec,))[0]

def column_association_for_time_dimension(self, time_dimension_spec: TimeDimensionSpec) -> ColumnAssociation:
"""Given the name of the time dimension, return the set of columns associated with it in the data set."""
Expand All @@ -153,3 +155,25 @@ def column_association_for_time_dimension(self, time_dimension_spec: TimeDimensi
@override
def semantic_model_reference(self) -> Optional[SemanticModelReference]:
return None

def annotate(self, alias: str, metric_time_spec: TimeDimensionSpec) -> AnnotatedSqlDataSet:
"""Convert to an AnnotatedSqlDataSet with specified metadata."""
metric_time_column_name = self.column_association_for_time_dimension(metric_time_spec).column_name
return AnnotatedSqlDataSet(data_set=self, alias=alias, _metric_time_column_name=metric_time_column_name)


@dataclass(frozen=True)
class AnnotatedSqlDataSet:
"""Class to bind a DataSet to transient properties associated with it at a given point in the SqlQueryPlan."""

data_set: SqlDataSet
alias: str
_metric_time_column_name: Optional[str] = None

@property
def metric_time_column_name(self) -> str:
"""Direct accessor for the optional metric time name, only safe to call when we know that value is set."""
assert (
self._metric_time_column_name
), "Expected a valid metric time dimension name to be associated with this dataset, but did not get one!"
return self._metric_time_column_name
Loading

0 comments on commit 00fb89e

Please sign in to comment.