Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Nov 22, 2024
1 parent 46a1c37 commit 52f9322
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 87 deletions.
51 changes: 41 additions & 10 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@
from metricflow.dataflow.nodes.join_to_base import JoinDescription, JoinOnEntitiesNode
from metricflow.dataflow.nodes.join_to_custom_granularity import JoinToCustomGranularityNode
from metricflow.dataflow.nodes.join_to_time_spine import JoinToTimeSpineNode
from metricflow.dataflow.nodes.metric_time_transform import MetricTimeDimensionTransformNode
from metricflow.dataflow.nodes.min_max import MinMaxNode
from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode
from metricflow.dataflow.nodes.read_sql_source import ReadSqlSourceNode
from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode
from metricflow.dataflow.nodes.where_filter import WhereConstraintNode
from metricflow.dataflow.nodes.window_reaggregation_node import WindowReaggregationNode
Expand Down Expand Up @@ -646,8 +648,11 @@ def _build_derived_metric_output_node(
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
time_spine_node = self._build_time_spine_node(queried_agg_time_dimension_specs)
# TODO: No where constraint needed here, but might need to apply distinct values if the base grain isn't selected.
output_node = JoinToTimeSpineNode.create(
parent_node=output_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
Expand Down Expand Up @@ -1037,8 +1042,7 @@ def _find_source_node_recipe_non_cached(
)
# If metric_time is requested without metrics, choose appropriate time spine node to select those values from.
if linkable_specs_to_satisfy.metric_time_specs:
time_spine_source = self._choose_time_spine_source(linkable_specs_to_satisfy.metric_time_specs)
time_spine_node = self._source_node_set.time_spine_metric_time_nodes[time_spine_source.base_granularity]
time_spine_node = self._choose_time_spine_metric_time_node(linkable_specs_to_satisfy.metric_time_specs)
candidate_nodes_for_right_side_of_join += [time_spine_node]
candidate_nodes_for_left_side_of_join += [time_spine_node]
default_join_type = SqlJoinType.FULL_OUTER
Expand Down Expand Up @@ -1619,10 +1623,11 @@ def _build_aggregated_measure_from_measure_source_node(
f"Expected {SqlJoinType.INNER} for joining to time spine before aggregation. Remove this if there's a "
f"new use case."
)
# This also uses the original time range constraint due to the application of the time window intervals
# in join rendering
time_spine_node = self._build_time_spine_node(base_agg_time_dimension_specs)
# TODO: No where constraint needed here, but might need to apply distinct values if the base grain isn't selected.
unaggregated_measure_node = JoinToTimeSpineNode.create(
parent_node=unaggregated_measure_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=base_agg_time_dimension_specs,
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
Expand Down Expand Up @@ -1684,19 +1689,25 @@ def _build_aggregated_measure_from_measure_source_node(
else:
non_agg_time_filters.append(filter_spec)

# TODO: split this node into TimeSpineSourceNode and JoinToTimeSpineNode - then can use standard nodes here
# like JoinToCustomGranularityNode, WhereConstraintNode, etc.
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_measure(
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
)
time_spine_node = self._build_time_spine_node(queried_agg_time_dimension_specs)
filtered_time_spine_node = self._build_pre_aggregation_plan(
source_node=time_spine_node,
# TODO: Also need the join on spec, right? Figure that out.
# filter_to_specs=InstanceSpecSet.create_from_specs(queried_agg_time_dimension_specs),
custom_granularity_specs=tuple(
spec for spec in queried_agg_time_dimension_specs if spec.time_granularity.is_custom_granularity
),
time_range_constraint=predicate_pushdown_state.time_range_constraint,
where_filter_specs=agg_time_only_filters,
)
output_node: DataflowPlanNode = JoinToTimeSpineNode.create(
parent_node=aggregate_measures_node,
time_spine_node=filtered_time_spine_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
join_type=after_aggregation_time_spine_join_description.join_type,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=after_aggregation_time_spine_join_description.offset_window,
offset_to_grain=after_aggregation_time_spine_join_description.offset_to_grain,
time_spine_filters=agg_time_only_filters,
)

# Since new rows might have been added due to time spine join, re-apply constraints here. Only re-apply filters
Expand Down Expand Up @@ -1812,3 +1823,23 @@ def _choose_time_spine_source(self, required_time_spine_specs: Sequence[TimeDime
required_time_spine_specs=required_time_spine_specs,
time_spine_sources=self._source_node_builder.time_spine_sources,
)

def _choose_time_spine_metric_time_node(
self, required_time_spine_specs: Sequence[TimeDimensionSpec]
) -> MetricTimeDimensionTransformNode:
"""Return the MetricTimeDimensionTransform time spine node needed to satisfy the specs."""
time_spine_source = self._choose_time_spine_source(required_time_spine_specs)
return self._source_node_set.time_spine_metric_time_nodes[time_spine_source.base_granularity]

def _choose_time_spine_read_node(self, required_time_spine_specs: Sequence[TimeDimensionSpec]) -> ReadSqlSourceNode:
"""Return the MetricTimeDimensionTransform time spine node needed to satisfy the specs."""
time_spine_source = self._choose_time_spine_source(required_time_spine_specs)
return self._source_node_set.time_spine_read_nodes[time_spine_source.base_granularity.value]

def _build_time_spine_node(self, required_time_spine_specs: Sequence[TimeDimensionSpec]) -> DataflowPlanNode:
"""Return the time spine node needed to satisfy the specs."""
original_time_spine_node = self._choose_time_spine_read_node(required_time_spine_specs)
# TODO: build this node. Transform columns to the requested ones
return TransformTimeDimensionsNode(
parent_node=original_time_spine_node, required_time_spine_specs=required_time_spine_specs
)
1 change: 1 addition & 0 deletions metricflow/dataflow/builder/source_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SourceNodeSet:
# Semantic models are 1:1 mapped to a ReadSqlSourceNode.
source_nodes_for_group_by_item_queries: Tuple[DataflowPlanNode, ...]

# TODO: maybe this didn't need to have string keys, check later
# Provides time spines that can be used to satisfy time spine joins, organized by granularity name.
time_spine_read_nodes: Mapping[str, ReadSqlSourceNode]

Expand Down
24 changes: 4 additions & 20 deletions metricflow/dataflow/nodes/join_to_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
from dbt_semantic_interfaces.type_enums import TimeGranularity
from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DisplayedProperty
from metricflow_semantics.filters.time_constraint import TimeRangeConstraint
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.specs.where_filter.where_filter_spec import WhereFilterSpec
from metricflow_semantics.sql.sql_join_type import SqlJoinType
from metricflow_semantics.visitor import VisitorOutputT

Expand All @@ -30,12 +28,11 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
"""

time_spine_node: DataflowPlanNode
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec]
join_type: SqlJoinType
time_range_constraint: Optional[TimeRangeConstraint]
offset_window: Optional[MetricTimeWindow]
offset_to_grain: Optional[TimeGranularity]
time_spine_filters: Optional[Sequence[WhereFilterSpec]] = None

def __post_init__(self) -> None: # noqa: D105
super().__post_init__()
Expand All @@ -51,21 +48,19 @@ def __post_init__(self) -> None: # noqa: D105
@staticmethod
def create( # noqa: D102
parent_node: DataflowPlanNode,
time_spine_node: DataflowPlanNode,
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec],
join_type: SqlJoinType,
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
offset_to_grain: Optional[TimeGranularity] = None,
time_spine_filters: Optional[Sequence[WhereFilterSpec]] = None,
) -> JoinToTimeSpineNode:
return JoinToTimeSpineNode(
parent_nodes=(parent_node,),
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=tuple(requested_agg_time_dimension_specs),
join_type=join_type,
time_range_constraint=time_range_constraint,
offset_window=offset_window,
offset_to_grain=offset_to_grain,
time_spine_filters=time_spine_filters,
)

@classmethod
Expand All @@ -89,14 +84,6 @@ def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
props += (DisplayedProperty("offset_window", self.offset_window),)
if self.offset_to_grain:
props += (DisplayedProperty("offset_to_grain", self.offset_to_grain),)
if self.time_range_constraint:
props += (DisplayedProperty("time_range_constraint", self.time_range_constraint),)
if self.time_spine_filters:
props += (
DisplayedProperty(
"time_spine_filters", [time_spine_filter.where_sql for time_spine_filter in self.time_spine_filters]
),
)
return props

@property
Expand All @@ -106,22 +93,19 @@ def parent_node(self) -> DataflowPlanNode: # noqa: D102
def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102
return (
isinstance(other_node, self.__class__)
and other_node.time_range_constraint == self.time_range_constraint
and other_node.offset_window == self.offset_window
and other_node.offset_to_grain == self.offset_to_grain
and other_node.requested_agg_time_dimension_specs == self.requested_agg_time_dimension_specs
and other_node.join_type == self.join_type
and other_node.time_spine_filters == self.time_spine_filters
)

def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> JoinToTimeSpineNode: # noqa: D102
assert len(new_parent_nodes) == 1
return JoinToTimeSpineNode.create(
parent_node=new_parent_nodes[0],
time_spine_node=self.time_spine_node,
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
time_range_constraint=self.time_range_constraint,
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
join_type=self.join_type,
time_spine_filters=self.time_spine_filters,
)
16 changes: 16 additions & 0 deletions metricflow/dataset/sql_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ def instance_for_time_dimension(self, time_dimension_spec: TimeDimensionSpec) ->
"""Given the name of the time dimension, return the instance associated with it in the data set."""
return self.instances_for_time_dimensions((time_dimension_spec,))[0]

def instance_from_time_dimension_grain_and_date_part(
self, time_dimension_spec: TimeDimensionSpec
) -> TimeDimensionInstance:
"""Find instance in dataset that matches the grain and date part of the given time dimension spec."""
for time_dimension_instance in self.instance_set.time_dimension_instances:
if (
time_dimension_instance.spec.time_granularity == time_dimension_spec.time_granularity
and time_dimension_instance.spec.date_part == time_dimension_spec.date_part
):
return time_dimension_instance

raise RuntimeError(
f"Did not find a time dimension instance with matching grain and date part for spec: {time_dimension_spec}\n"
f"Instances available: {self.instance_set.time_dimension_instances}"
)

def column_association_for_time_dimension(self, time_dimension_spec: TimeDimensionSpec) -> ColumnAssociation:
"""Given the name of the time dimension, return the set of columns associated with it in the data set."""
return self.instance_for_time_dimension(time_dimension_spec).associated_column
Expand Down
Loading

0 comments on commit 52f9322

Please sign in to comment.