Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow measures that join to time spine to use agg_time_dim #1005

Merged
merged 21 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240126-132734.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable offset metrics to be queried with agg_time_dimension.
time: 2024-01-26T13:27:34.200253-08:00
custom:
Author: courtneyholcomb
Issue: "1006"
68 changes: 47 additions & 21 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def _build_base_metric_output_node(
metric_input_measure_spec = self._build_input_measure_spec_for_base_metric(
filter_spec_factory=filter_spec_factory,
metric_reference=metric_reference,
query_contains_metric_time=queried_linkable_specs.contains_metric_time,
queried_linkable_specs=queried_linkable_specs,
child_metric_offset_window=metric_spec.offset_window,
child_metric_offset_to_grain=metric_spec.offset_to_grain,
cumulative_description=CumulativeMeasureDescription(
Expand Down Expand Up @@ -505,12 +505,20 @@ def _build_derived_metric_output_node(

# For nested ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
if metric_spec.has_time_offset:
queried_agg_time_dimension_specs = list(queried_linkable_specs.metric_time_specs)
if not queried_agg_time_dimension_specs:
valid_agg_time_dimensions = self._metric_lookup.get_valid_agg_time_dimensions_for_metric(
metric_spec.reference
)
queried_agg_time_dimension_specs = list(
set(queried_linkable_specs.time_dimension_specs).intersection(set(valid_agg_time_dimensions))
)
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric_time."
queried_agg_time_dimension_specs
), "Joining to time spine requires querying with metric_timeor the appropriate agg_time_dimension."
output_node = JoinToTimeSpineNode(
parent_node=output_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
Expand Down Expand Up @@ -787,9 +795,7 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
f"semantic models: {semantic_models}. This suggests the measure_specs were not correctly filtered."
)

agg_time_dimension = agg_time_dimension = self._semantic_model_lookup.get_agg_time_dimension_for_measure(
measure_specs[0].reference
)
agg_time_dimension = self._semantic_model_lookup.get_agg_time_dimension_for_measure(measure_specs[0].reference)
non_additive_dimension_spec = measure_specs[0].non_additive_dimension_spec
for measure_spec in measure_specs:
if non_additive_dimension_spec != measure_spec.non_additive_dimension_spec:
Expand Down Expand Up @@ -1055,7 +1061,7 @@ def _build_input_measure_spec_for_base_metric(
child_metric_offset_window: Optional[MetricTimeWindow],
child_metric_offset_to_grain: Optional[TimeGranularity],
descendent_filter_specs: Sequence[WhereFilterSpec],
query_contains_metric_time: bool,
queried_linkable_specs: LinkableSpecSet,
cumulative_description: Optional[CumulativeMeasureDescription],
) -> MetricInputMeasureSpec:
"""Return the input measure spec required to compute the base metric.
Expand Down Expand Up @@ -1097,15 +1103,25 @@ def _build_input_measure_spec_for_base_metric(
offset_to_grain=child_metric_offset_to_grain,
)

# Even if the measure is configured to join to time spine, if there's no metric_time in the query,
# there's no need to join to the time spine since all metric_time will be aggregated.
# Even if the measure is configured to join to time spine, if there's no agg_time_dimension in the query,
# there's no need to join to the time spine since all time will be aggregated.
after_aggregation_time_spine_join_description = None
if input_measure.join_to_timespine and query_contains_metric_time:
after_aggregation_time_spine_join_description = JoinToTimeSpineDescription(
join_type=SqlJoinType.LEFT_OUTER,
offset_window=None,
offset_to_grain=None,
)
if input_measure.join_to_timespine:
query_contains_agg_time_dimension = queried_linkable_specs.contains_metric_time
if not query_contains_agg_time_dimension:
valid_agg_time_dimensions = self._semantic_model_lookup.get_agg_time_dimension_specs_for_measure(
measure_spec.reference
)
query_contains_agg_time_dimension = bool(
set(queried_linkable_specs.time_dimension_specs).intersection(set(valid_agg_time_dimensions))
)

if query_contains_agg_time_dimension:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah if we have some kind of helper I think it'll clean up a lot of these conditionals as well, because it'll hide the wonky logic inside of the helper and this can become something like:

if query_contains_agg_time_dimension(queried_linkable_specs, measure_spec):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

after_aggregation_time_spine_join_description = JoinToTimeSpineDescription(
join_type=SqlJoinType.LEFT_OUTER,
offset_window=None,
offset_to_grain=None,
)

filter_specs: List[WhereFilterSpec] = []
filter_specs.extend(
Expand Down Expand Up @@ -1290,6 +1306,15 @@ def _build_aggregated_measure_from_measure_source_node(
f"Recipe not found for measure spec: {measure_spec} and linkable specs: {required_linkable_specs}"
)

queried_agg_time_dimension_specs = list(queried_linkable_specs.metric_time_specs)
if not queried_agg_time_dimension_specs:
valid_agg_time_dimensions = self._semantic_model_lookup.get_agg_time_dimension_specs_for_measure(
measure_spec.reference
)
queried_agg_time_dimension_specs = list(
set(queried_linkable_specs.time_dimension_specs).intersection(set(valid_agg_time_dimensions))
)

# If a cumulative metric is queried with metric_time, join over time range.
# Otherwise, the measure will be aggregated over all time.
time_range_node: Optional[JoinOverTimeRangeNode] = None
Expand All @@ -1306,16 +1331,17 @@ def _build_aggregated_measure_from_measure_source_node(
# If querying an offset metric, join to time spine before aggregation.
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None
if before_aggregation_time_spine_join_description is not None:
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric time."
assert queried_agg_time_dimension_specs, (
"Joining to time spine requires querying with metric time or the appropriate agg_time_dimension."
"This should have been caught by validations."
)
assert before_aggregation_time_spine_join_description.join_type is SqlJoinType.INNER, (
f"Expected {SqlJoinType.INNER} for joining to time spine before aggregation. Remove this if there's a "
f"new use case."
)
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=time_range_node or measure_recipe.source_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
time_range_constraint=time_range_constraint,
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
Expand Down Expand Up @@ -1418,7 +1444,7 @@ def _build_aggregated_measure_from_measure_source_node(
)
return JoinToTimeSpineNode(
parent_node=aggregate_measures_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
join_type=after_aggregation_time_spine_join_description.join_type,
time_range_constraint=time_range_constraint,
offset_window=after_aggregation_time_spine_join_description.offset_window,
Expand Down
16 changes: 8 additions & 8 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ class JoinToTimeSpineNode(BaseOutput, ABC):
def __init__(
self,
parent_node: BaseOutput,
requested_metric_time_dimension_specs: List[TimeDimensionSpec],
requested_agg_time_dimension_specs: List[TimeDimensionSpec],
join_type: SqlJoinType,
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
Expand All @@ -678,7 +678,7 @@ def __init__(

Args:
parent_node: Node that returns desired dataset to join to time spine.
requested_metric_time_dimension_specs: Time dimensions requested in query. Used to determine granularities.
requested_agg_time_dimension_specs: Time dimensions requested in query.
time_range_constraint: Time range to constrain the time spine to.
offset_window: Time window to offset the parent dataset by when joining to time spine.
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
Expand All @@ -689,7 +689,7 @@ def __init__(
offset_window and offset_to_grain
), "Can't set both offset_window and offset_to_grain when joining to time spine. Choose one or the other."
self._parent_node = parent_node
self._requested_metric_time_dimension_specs = requested_metric_time_dimension_specs
self._requested_agg_time_dimension_specs = requested_agg_time_dimension_specs
self._offset_window = offset_window
self._offset_to_grain = offset_to_grain
self._time_range_constraint = time_range_constraint
Expand All @@ -702,9 +702,9 @@ def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]:
def requested_agg_time_dimension_specs(self) -> List[TimeDimensionSpec]:
"""Time dimension specs to use when creating time spine table."""
return self._requested_metric_time_dimension_specs
return self._requested_agg_time_dimension_specs

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]:
Expand Down Expand Up @@ -736,7 +736,7 @@ def description(self) -> str: # noqa: D
@property
def displayed_properties(self) -> List[DisplayedProperty]: # noqa: D
return super().displayed_properties + [
DisplayedProperty("requested_metric_time_dimension_specs", self._requested_metric_time_dimension_specs),
DisplayedProperty("requested_agg_time_dimension_specs", self._requested_agg_time_dimension_specs),
DisplayedProperty("time_range_constraint", self._time_range_constraint),
DisplayedProperty("offset_window", self._offset_window),
DisplayedProperty("offset_to_grain", self._offset_to_grain),
Expand All @@ -753,15 +753,15 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa:
and other_node.time_range_constraint == self.time_range_constraint
and other_node.offset_window == self.offset_window
and other_node.offset_to_grain == self.offset_to_grain
and other_node.requested_metric_time_dimension_specs == self.requested_metric_time_dimension_specs
and other_node.requested_agg_time_dimension_specs == self.requested_agg_time_dimension_specs
and other_node.join_type == self.join_type
)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinToTimeSpineNode: # noqa: D
assert len(new_parent_nodes) == 1
return JoinToTimeSpineNode(
parent_node=new_parent_nodes[0],
requested_metric_time_dimension_specs=self.requested_metric_time_dimension_specs,
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
time_range_constraint=self.time_range_constraint,
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
Expand Down
Loading