Skip to content

Commit

Permalink
Enable querying time offset metrics with agg_time_dimension (#1007)
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb authored Feb 3, 2024
1 parent 07ecffe commit d69b382
Show file tree
Hide file tree
Showing 64 changed files with 12,983 additions and 114 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240126-132734.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable offset metrics to be queried with agg_time_dimension.
time: 2024-01-26T13:27:34.200253-08:00
custom:
Author: courtneyholcomb
Issue: "1006"
88 changes: 47 additions & 41 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,14 @@ def _build_base_metric_output_node(
query_contains_metric_time=queried_linkable_specs.contains_metric_time,
child_metric_offset_window=metric_spec.offset_window,
child_metric_offset_to_grain=metric_spec.offset_to_grain,
cumulative_description=CumulativeMeasureDescription(
cumulative_window=metric.type_params.window,
cumulative_grain_to_date=metric.type_params.grain_to_date,
)
if metric.type is MetricType.CUMULATIVE
else None,
cumulative_description=(
CumulativeMeasureDescription(
cumulative_window=metric.type_params.window,
cumulative_grain_to_date=metric.type_params.grain_to_date,
)
if metric.type is MetricType.CUMULATIVE
else None
),
descendent_filter_specs=metric_spec.filter_specs,
)

Expand Down Expand Up @@ -490,9 +492,9 @@ def _build_derived_metric_output_node(
offset_window=metric_input_spec.offset_window,
offset_to_grain=metric_input_spec.offset_to_grain,
),
queried_linkable_specs=queried_linkable_specs
if not metric_spec.has_time_offset
else required_linkable_specs,
queried_linkable_specs=(
queried_linkable_specs if not metric_spec.has_time_offset else required_linkable_specs
),
filter_spec_factory=filter_spec_factory,
time_range_constraint=time_range_constraint if not metric_spec.has_time_offset else None,
)
Expand All @@ -505,12 +507,16 @@ def _build_derived_metric_output_node(

# For nested ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
if metric_spec.has_time_offset:
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric_time."
queried_agg_time_dimension_specs
), "Joining to time spine requires querying with metric_time or the appropriate agg_time_dimension."
output_node = JoinToTimeSpineNode(
parent_node=output_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
Expand Down Expand Up @@ -787,9 +793,7 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
f"semantic models: {semantic_models}. This suggests the measure_specs were not correctly filtered."
)

agg_time_dimension = agg_time_dimension = self._semantic_model_lookup.get_agg_time_dimension_for_measure(
measure_specs[0].reference
)
agg_time_dimension = self._semantic_model_lookup.get_agg_time_dimension_for_measure(measure_specs[0].reference)
non_additive_dimension_spec = measure_specs[0].non_additive_dimension_spec
for measure_spec in measure_specs:
if non_additive_dimension_spec != measure_spec.non_additive_dimension_spec:
Expand Down Expand Up @@ -1152,12 +1156,14 @@ def _build_input_metric_specs_for_derived_metric(
element_name=input_metric.name,
filter_specs=tuple(filter_specs),
alias=input_metric.alias,
offset_window=PydanticMetricTimeWindow(
count=input_metric.offset_window.count,
granularity=input_metric.offset_window.granularity,
)
if input_metric.offset_window
else None,
offset_window=(
PydanticMetricTimeWindow(
count=input_metric.offset_window.count,
granularity=input_metric.offset_window.granularity,
)
if input_metric.offset_window
else None
),
offset_to_grain=input_metric.offset_to_grain,
)
input_metric_specs.append(spec)
Expand Down Expand Up @@ -1272,9 +1278,11 @@ def _build_aggregated_measure_from_measure_source_node(
find_recipe_start_time = time.time()
measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=measure_properties,
time_range_constraint=(cumulative_metric_adjusted_time_constraint or time_range_constraint)
if not before_aggregation_time_spine_join_description
else None,
time_range_constraint=(
(cumulative_metric_adjusted_time_constraint or time_range_constraint)
if not before_aggregation_time_spine_join_description
else None
),
linkable_spec_set=required_linkable_specs,
)
logger.info(
Expand All @@ -1290,16 +1298,11 @@ def _build_aggregated_measure_from_measure_source_node(
f"Recipe not found for measure spec: {measure_spec} and linkable specs: {required_linkable_specs}"
)

queried_agg_time_dimension_specs = list(queried_linkable_specs.metric_time_specs)
if not queried_agg_time_dimension_specs:
valid_agg_time_dimensions = self._semantic_model_lookup.get_agg_time_dimension_specs_for_measure(
measure_spec.reference
)
queried_agg_time_dimension_specs = list(
set(queried_linkable_specs.time_dimension_specs).intersection(set(valid_agg_time_dimensions))
)
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_measure(
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
)

# If a cumulative metric is queried with agg_time_dimension, join over time range.
# If a cumulative metric is queried with metric_time, join over time range.
# Otherwise, the measure will be aggregated over all time.
time_range_node: Optional[JoinOverTimeRangeNode] = None
if cumulative and queried_agg_time_dimension_specs:
Expand All @@ -1312,24 +1315,26 @@ def _build_aggregated_measure_from_measure_source_node(
time_dimension_spec_for_join=agg_time_dimension_spec_for_join,
window=cumulative_window,
grain_to_date=cumulative_grain_to_date,
time_range_constraint=time_range_constraint
if not before_aggregation_time_spine_join_description
else None,
time_range_constraint=(
time_range_constraint if not before_aggregation_time_spine_join_description else None
),
)

# If querying an offset metric, join to time spine before aggregation.
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None
if before_aggregation_time_spine_join_description is not None:
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric time."
assert queried_agg_time_dimension_specs, (
"Joining to time spine requires querying with metric time or the appropriate agg_time_dimension."
"This should have been caught by validations."
)
assert before_aggregation_time_spine_join_description.join_type is SqlJoinType.INNER, (
f"Expected {SqlJoinType.INNER} for joining to time spine before aggregation. Remove this if there's a "
f"new use case."
)
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=time_range_node or measure_recipe.source_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
time_range_constraint=time_range_constraint,
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
Expand Down Expand Up @@ -1433,7 +1438,8 @@ def _build_aggregated_measure_from_measure_source_node(
)
return JoinToTimeSpineNode(
parent_node=aggregate_measures_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
join_type=after_aggregation_time_spine_join_description.join_type,
time_range_constraint=time_range_constraint,
offset_window=after_aggregation_time_spine_join_description.offset_window,
Expand Down
31 changes: 23 additions & 8 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ class JoinToTimeSpineNode(BaseOutput, ABC):
def __init__(
self,
parent_node: BaseOutput,
requested_metric_time_dimension_specs: List[TimeDimensionSpec],
requested_agg_time_dimension_specs: List[TimeDimensionSpec],
use_custom_agg_time_dimension: bool,
join_type: SqlJoinType,
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
Expand All @@ -682,7 +683,8 @@ def __init__(
Args:
parent_node: Node that returns desired dataset to join to time spine.
requested_metric_time_dimension_specs: Time dimensions requested in query. Used to determine granularities.
requested_agg_time_dimension_specs: Time dimensions requested in query.
use_custom_agg_time_dimension: Indicates if agg_time_dimension should be used in join. If false, uses metric_time.
time_range_constraint: Time range to constrain the time spine to.
offset_window: Time window to offset the parent dataset by when joining to time spine.
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
Expand All @@ -692,8 +694,13 @@ def __init__(
assert not (
offset_window and offset_to_grain
), "Can't set both offset_window and offset_to_grain when joining to time spine. Choose one or the other."
assert (
len(requested_agg_time_dimension_specs) > 0
), "Must have at least one value in requested_agg_time_dimension_specs for JoinToTimeSpineNode."

self._parent_node = parent_node
self._requested_metric_time_dimension_specs = requested_metric_time_dimension_specs
self._requested_agg_time_dimension_specs = requested_agg_time_dimension_specs
self._use_custom_agg_time_dimension = use_custom_agg_time_dimension
self._offset_window = offset_window
self._offset_to_grain = offset_to_grain
self._time_range_constraint = time_range_constraint
Expand All @@ -706,9 +713,14 @@ def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]:
def requested_agg_time_dimension_specs(self) -> List[TimeDimensionSpec]:
"""Time dimension specs to use when creating time spine table."""
return self._requested_metric_time_dimension_specs
return self._requested_agg_time_dimension_specs

@property
def use_custom_agg_time_dimension(self) -> bool:
"""Whether or not metric_time was included in the query."""
return self._use_custom_agg_time_dimension

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]:
Expand Down Expand Up @@ -740,7 +752,8 @@ def description(self) -> str: # noqa: D
@property
def displayed_properties(self) -> List[DisplayedProperty]: # noqa: D
return super().displayed_properties + [
DisplayedProperty("requested_metric_time_dimension_specs", self._requested_metric_time_dimension_specs),
DisplayedProperty("requested_agg_time_dimension_specs", self._requested_agg_time_dimension_specs),
DisplayedProperty("use_custom_agg_time_dimension", self._use_custom_agg_time_dimension),
DisplayedProperty("time_range_constraint", self._time_range_constraint),
DisplayedProperty("offset_window", self._offset_window),
DisplayedProperty("offset_to_grain", self._offset_to_grain),
Expand All @@ -757,15 +770,17 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa:
and other_node.time_range_constraint == self.time_range_constraint
and other_node.offset_window == self.offset_window
and other_node.offset_to_grain == self.offset_to_grain
and other_node.requested_metric_time_dimension_specs == self.requested_metric_time_dimension_specs
and other_node.requested_agg_time_dimension_specs == self.requested_agg_time_dimension_specs
and other_node.use_custom_agg_time_dimension == self.use_custom_agg_time_dimension
and other_node.join_type == self.join_type
)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinToTimeSpineNode: # noqa: D
assert len(new_parent_nodes) == 1
return JoinToTimeSpineNode(
parent_node=new_parent_nodes[0],
requested_metric_time_dimension_specs=self.requested_metric_time_dimension_specs,
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
use_custom_agg_time_dimension=self.use_custom_agg_time_dimension,
time_range_constraint=self.time_range_constraint,
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
Expand Down
Loading

0 comments on commit d69b382

Please sign in to comment.