Skip to content

Commit

Permalink
WIP - separate time spine source nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Nov 21, 2024
1 parent 9754025 commit 56497e9
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ def _build_aggregated_conversion_node(
.merge(base_required_linkable_specs.as_instance_spec_set)
.dedupe(),
custom_granularity_specs=base_required_linkable_specs.time_dimension_specs_with_custom_grain,
time_range_constraint=None,
where_filter_specs=base_measure_spec.filter_spec_set.all_filter_specs,
)

Expand Down Expand Up @@ -647,8 +646,13 @@ def _build_derived_metric_output_node(
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
time_spine_source = self._choose_time_spine_source(queried_agg_time_dimension_specs)
time_spine_node = self._get_time_spine_node(time_spine_source)
# Might need to filter to distinct values if the base grain isn't selected.
# Probably shouldn't use _build_pre_aggregation_plan, but TBD.
output_node = JoinToTimeSpineNode.create(
parent_node=output_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
Expand Down Expand Up @@ -1622,16 +1626,31 @@ def _build_aggregated_measure_from_measure_source_node(
)

# If querying an offset metric, join to time spine before aggregation.
# This also uses the original time range constraint due to the application of the time window intervals
# in join rendering.
if before_aggregation_time_spine_join_description is not None:
assert before_aggregation_time_spine_join_description.join_type is SqlJoinType.INNER, (
f"Expected {SqlJoinType.INNER} for joining to time spine before aggregation. Remove this if there's a "
f"new use case."
)
# This also uses the original time range constraint due to the application of the time window intervals
# in join rendering
time_spine_source = self._choose_time_spine_source(queried_agg_time_dimension_specs)
time_spine_node = self._get_time_spine_node(time_spine_source)
# Note: filters will be applied after offset
filtered_time_spine_node = self._build_pre_aggregation_plan(
source_node=time_spine_node,
filter_to_specs=base_agg_time_dimension_specs, # is this right?
custom_granularity_specs=(), # need these? don't think so but TBD - remove later if not
distinct=True, # only if the base grain isn't queried?
)

unaggregated_measure_node = JoinToTimeSpineNode.create(
parent_node=unaggregated_measure_node,
time_spine_node=filtered_time_spine_node,
requested_agg_time_dimension_specs=base_agg_time_dimension_specs,
<<<<<<< HEAD
=======
time_range_constraint=predicate_pushdown_state.time_range_constraint, # why is this here? is it applied after the offset? might need an initial bug fix for this, test.
>>>>>>> 6bb0a260c (WIP - separate time spine source nodes)
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
join_type=before_aggregation_time_spine_join_description.join_type,
Expand Down Expand Up @@ -1659,7 +1678,7 @@ def _build_aggregated_measure_from_measure_source_node(
InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple)
),
measure_properties=measure_properties,
queried_linkable_specs=queried_linkable_specs,
queried_linkable_specs_for_semi_additive_join=queried_linkable_specs,
)

aggregate_measures_node = AggregateMeasuresNode.create(
Expand Down Expand Up @@ -1734,13 +1753,17 @@ def _build_aggregated_measure_from_measure_source_node(
def _build_pre_aggregation_plan(
self,
source_node: DataflowPlanNode,
<<<<<<< HEAD
filter_to_specs: Optional[InstanceSpecSet] = None,
=======
filter_to_specs: InstanceSpecSet,
>>>>>>> 6bb0a260c (WIP - separate time spine source nodes)
join_targets: List[JoinDescription] = [],
custom_granularity_specs: Sequence[TimeDimensionSpec] = (),
where_filter_specs: Sequence[WhereFilterSpec] = (),
time_range_constraint: Optional[TimeRangeConstraint] = None,
measure_properties: Optional[MeasureSpecProperties] = None,
queried_linkable_specs: Optional[LinkableSpecSet] = None,
queried_linkable_specs_for_semi_additive_join: Optional[LinkableSpecSet] = None,
distinct: bool = False,
) -> DataflowPlanNode:
"""Adds standard pre-aggegation steps after building source node and before aggregation."""
Expand All @@ -1762,14 +1785,14 @@ def _build_pre_aggregation_plan(
)

if measure_properties and measure_properties.non_additive_dimension_spec:
if queried_linkable_specs is None:
if queried_linkable_specs_for_semi_additive_join is None:
raise ValueError(
"`queried_linkable_specs` must be provided in when building pre-aggregation plan if "
"`non_additive_dimension_spec` is present."
"`queried_linkable_specs_for_semi_additive_join` must be provided in when building pre-aggregation plan "
"if `non_additive_dimension_spec` is present."
)
output_node = self._build_semi_additive_join_node(
measure_properties=measure_properties,
queried_linkable_specs=queried_linkable_specs,
queried_linkable_specs=queried_linkable_specs_for_semi_additive_join,
parent_node=output_node,
)
if filter_to_specs:
Expand Down

0 comments on commit 56497e9

Please sign in to comment.