Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query by agg_time_dimension instead of metric_time #980

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 67 additions & 28 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def _build_base_metric_output_node(
metric_input_measure_spec = self._build_input_measure_spec_for_base_metric(
filter_spec_factory=filter_spec_factory,
metric_reference=metric_reference,
query_contains_metric_time=queried_linkable_specs.contains_metric_time,
queried_linkable_specs=queried_linkable_specs,
child_metric_offset_window=metric_spec.offset_window,
child_metric_offset_to_grain=metric_spec.offset_to_grain,
cumulative_description=CumulativeMeasureDescription(
Expand Down Expand Up @@ -501,12 +501,20 @@ def _build_derived_metric_output_node(

# For nested ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
if metric_spec.has_time_offset:
queried_agg_time_dimension_specs = list(queried_linkable_specs.metric_time_specs)
if not queried_agg_time_dimension_specs:
valid_agg_time_dimensions = self._metric_lookup.get_valid_agg_time_dimensions_for_metric(
metric_spec.reference
)
queried_agg_time_dimension_specs = list(
set(queried_linkable_specs.time_dimension_specs).intersection(set(valid_agg_time_dimensions))
)
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric_time."
queried_agg_time_dimension_specs
), "Joining to time spine requires querying with metric_time or the appropriate agg_time_dimension."
output_node = JoinToTimeSpineNode(
parent_node=output_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
Expand Down Expand Up @@ -705,9 +713,13 @@ def _get_semantic_model_names_for_measures(self, measures: Sequence[MeasureSpec]
"""
semantic_model_names: Set[str] = set()
for measure in measures:
semantic_model_names = semantic_model_names.union(
{d.name for d in self._semantic_model_lookup.get_semantic_models_for_measure(measure.reference)}
semantic_model_names.update(
{
semantic_model.name
for semantic_model in self._semantic_model_lookup.get_semantic_models_for_measure(measure.reference)
}
)

return semantic_model_names

def _sort_by_suitability(self, nodes: Sequence[BaseOutput]) -> Sequence[BaseOutput]:
Expand Down Expand Up @@ -783,9 +795,7 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
f"semantic models: {semantic_models}. This suggests the measure_specs were not correctly filtered."
)

agg_time_dimension = agg_time_dimension = self._semantic_model_lookup.get_agg_time_dimension_for_measure(
measure_specs[0].reference
)
agg_time_dimension = self._semantic_model_lookup.get_agg_time_dimension_for_measure(measure_specs[0].reference)
non_additive_dimension_spec = measure_specs[0].non_additive_dimension_spec
for measure_spec in measure_specs:
if non_additive_dimension_spec != measure_spec.non_additive_dimension_spec:
Expand Down Expand Up @@ -1059,7 +1069,7 @@ def _build_input_measure_spec_for_base_metric(
child_metric_offset_window: Optional[MetricTimeWindow],
child_metric_offset_to_grain: Optional[TimeGranularity],
descendent_filter_specs: Sequence[WhereFilterSpec],
query_contains_metric_time: bool,
queried_linkable_specs: LinkableSpecSet,
cumulative_description: Optional[CumulativeMeasureDescription],
) -> MetricInputMeasureSpec:
"""Return the input measure spec required to compute the base metric.
Expand Down Expand Up @@ -1101,15 +1111,25 @@ def _build_input_measure_spec_for_base_metric(
offset_to_grain=child_metric_offset_to_grain,
)

# Even if the measure is configured to join to time spine, if there's no metric_time in the query,
# there's no need to join to the time spine since all metric_time will be aggregated.
# Even if the measure is configured to join to time spine, if there's no agg_time_dimension in the query,
# there's no need to join to the time spine since all time will be aggregated.
after_aggregation_time_spine_join_description = None
if input_measure.join_to_timespine and query_contains_metric_time:
after_aggregation_time_spine_join_description = JoinToTimeSpineDescription(
join_type=SqlJoinType.LEFT_OUTER,
offset_window=None,
offset_to_grain=None,
)
if input_measure.join_to_timespine:
query_contains_agg_time_dimension = queried_linkable_specs.contains_metric_time
if not query_contains_agg_time_dimension:
valid_agg_time_dimensions = self._semantic_model_lookup.get_agg_time_dimension_specs_for_measure(
measure_spec.reference
)
query_contains_agg_time_dimension = bool(
set(queried_linkable_specs.time_dimension_specs).intersection(set(valid_agg_time_dimensions))
)

if query_contains_agg_time_dimension:
after_aggregation_time_spine_join_description = JoinToTimeSpineDescription(
join_type=SqlJoinType.LEFT_OUTER,
offset_window=None,
offset_to_grain=None,
)

filter_specs: List[WhereFilterSpec] = []
filter_specs.extend(
Expand Down Expand Up @@ -1297,12 +1317,26 @@ def _build_aggregated_measure_from_measure_source_node(
f"Recipe not found for measure spec: {measure_spec} and linkable specs: {required_linkable_specs}"
)

# If a cumulative metric is queried with metric_time, join over time range.
queried_agg_time_dimension_specs = list(queried_linkable_specs.metric_time_specs)
if not queried_agg_time_dimension_specs:
valid_agg_time_dimensions = self._semantic_model_lookup.get_agg_time_dimension_specs_for_measure(
measure_spec.reference
)
queried_agg_time_dimension_specs = list(
set(queried_linkable_specs.time_dimension_specs).intersection(set(valid_agg_time_dimensions))
)

# If a cumulative metric is queried with agg_time_dimension, join over time range.
# Otherwise, the measure will be aggregated over all time.
time_range_node: Optional[JoinOverTimeRangeNode] = None
if cumulative and queried_linkable_specs.contains_metric_time:
if cumulative and queried_agg_time_dimension_specs:
# Use the time dimension spec with the smallest granularity.
agg_time_dimension_spec_for_join = sorted(
queried_agg_time_dimension_specs, key=lambda spec: spec.time_granularity.to_int()
)[0]
time_range_node = JoinOverTimeRangeNode(
parent_node=measure_recipe.source_node,
time_dimension_spec_for_join=agg_time_dimension_spec_for_join,
window=cumulative_window,
grain_to_date=cumulative_grain_to_date,
time_range_constraint=time_range_constraint
Expand All @@ -1313,16 +1347,18 @@ def _build_aggregated_measure_from_measure_source_node(
# If querying an offset metric, join to time spine before aggregation.
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None
if before_aggregation_time_spine_join_description is not None:
assert (
queried_linkable_specs.contains_metric_time
), "Joining to time spine requires querying with metric time."
assert queried_agg_time_dimension_specs, (
"Joining to time spine requires querying with metric time or the appropriate agg_time_dimension."
"This should have been caught by validations."
)

assert before_aggregation_time_spine_join_description.join_type is SqlJoinType.INNER, (
f"Expected {SqlJoinType.INNER} for joining to time spine before aggregation. Remove this if there's a "
f"new use case."
)
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=time_range_node or measure_recipe.source_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
time_range_constraint=time_range_constraint,
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
Expand Down Expand Up @@ -1356,14 +1392,17 @@ def _build_aggregated_measure_from_measure_source_node(
else:
unaggregated_measure_node = filtered_measure_source_node

# If time constraint was previously adjusted for cumulative window or grain, apply original time constraint
# here. Can skip if metric is being aggregated over all time.
cumulative_metric_constrained_node: Optional[ConstrainTimeRangeNode] = None
if (
cumulative_metric_adjusted_time_constraint is not None
and time_range_constraint is not None
and queried_linkable_specs.contains_metric_time
and queried_agg_time_dimension_specs
):
cumulative_metric_constrained_node = ConstrainTimeRangeNode(
unaggregated_measure_node, time_range_constraint
parent_node=unaggregated_measure_node,
time_range_constraint=time_range_constraint,
)

pre_aggregate_node: BaseOutput = cumulative_metric_constrained_node or unaggregated_measure_node
Expand All @@ -1382,7 +1421,7 @@ def _build_aggregated_measure_from_measure_source_node(
queried_time_dimension_spec: Optional[
TimeDimensionSpec
] = self._find_non_additive_dimension_in_linkable_specs(
agg_time_dimension=agg_time_dimension,
agg_time_dimension=TimeDimensionReference(agg_time_dimension.element_name),
linkable_specs=queried_linkable_specs.as_tuple,
non_additive_dimension_spec=non_additive_dimension_spec,
)
Expand Down Expand Up @@ -1425,7 +1464,7 @@ def _build_aggregated_measure_from_measure_source_node(
)
return JoinToTimeSpineNode(
parent_node=aggregate_measures_node,
requested_metric_time_dimension_specs=list(queried_linkable_specs.metric_time_specs),
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
join_type=after_aggregation_time_spine_join_description.join_type,
time_range_constraint=time_range_constraint,
offset_window=after_aggregation_time_spine_join_description.offset_window,
Expand Down
20 changes: 12 additions & 8 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ class JoinOverTimeRangeNode(BaseOutput):
def __init__(
self,
parent_node: BaseOutput,
time_dimension_spec_for_join: TimeDimensionSpec,
window: Optional[MetricTimeWindow],
grain_to_date: Optional[TimeGranularity],
node_id: Optional[NodeId] = None,
Expand All @@ -390,6 +391,7 @@ def __init__(
(eg month to day)
node_id: Override the node ID with this value
time_range_constraint: time range to aggregate over
time_dimension_spec_for_join: time dimension spec to use when joining to time spine
"""
if window and grain_to_date:
raise RuntimeError(
Expand All @@ -400,6 +402,7 @@ def __init__(
self._grain_to_date = grain_to_date
self._window = window
self.time_range_constraint = time_range_constraint
self.time_dimension_spec_for_join = time_dimension_spec_for_join

# Doing a list comprehension throws a type error, so doing it this way.
parent_nodes: List[DataflowPlanNode] = [self._parent_node]
Expand Down Expand Up @@ -447,6 +450,7 @@ def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinOverTi
window=self.window,
grain_to_date=self.grain_to_date,
time_range_constraint=self.time_range_constraint,
time_dimension_spec_for_join=self.time_dimension_spec_for_join,
)


Expand Down Expand Up @@ -668,7 +672,7 @@ class JoinToTimeSpineNode(BaseOutput, ABC):
def __init__(
self,
parent_node: BaseOutput,
requested_metric_time_dimension_specs: List[TimeDimensionSpec],
requested_agg_time_dimension_specs: List[TimeDimensionSpec],
join_type: SqlJoinType,
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
Expand All @@ -678,7 +682,7 @@ def __init__(

Args:
parent_node: Node that returns desired dataset to join to time spine.
requested_metric_time_dimension_specs: Time dimensions requested in query. Used to determine granularities.
requested_agg_time_dimension_specs: Time dimensions requested in query. Used to determine granularities.
time_range_constraint: Time range to constrain the time spine to.
offset_window: Time window to offset the parent dataset by when joining to time spine.
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
Expand All @@ -689,7 +693,7 @@ def __init__(
offset_window and offset_to_grain
), "Can't set both offset_window and offset_to_grain when joining to time spine. Choose one or the other."
self._parent_node = parent_node
self._requested_metric_time_dimension_specs = requested_metric_time_dimension_specs
self._requested_agg_time_dimension_specs = requested_agg_time_dimension_specs
self._offset_window = offset_window
self._offset_to_grain = offset_to_grain
self._time_range_constraint = time_range_constraint
Expand All @@ -702,9 +706,9 @@ def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def requested_metric_time_dimension_specs(self) -> List[TimeDimensionSpec]:
def requested_agg_time_dimension_specs(self) -> List[TimeDimensionSpec]:
"""Time dimension specs to use when creating time spine table."""
return self._requested_metric_time_dimension_specs
return self._requested_agg_time_dimension_specs

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]:
Expand Down Expand Up @@ -736,7 +740,7 @@ def description(self) -> str: # noqa: D
@property
def displayed_properties(self) -> List[DisplayedProperty]: # noqa: D
return super().displayed_properties + [
DisplayedProperty("requested_metric_time_dimension_specs", self._requested_metric_time_dimension_specs),
DisplayedProperty("requested_agg_time_dimension_specs", self._requested_agg_time_dimension_specs),
DisplayedProperty("time_range_constraint", self._time_range_constraint),
DisplayedProperty("offset_window", self._offset_window),
DisplayedProperty("offset_to_grain", self._offset_to_grain),
Expand All @@ -753,15 +757,15 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa:
and other_node.time_range_constraint == self.time_range_constraint
and other_node.offset_window == self.offset_window
and other_node.offset_to_grain == self.offset_to_grain
and other_node.requested_metric_time_dimension_specs == self.requested_metric_time_dimension_specs
and other_node.requested_agg_time_dimension_specs == self.requested_agg_time_dimension_specs
and other_node.join_type == self.join_type
)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinToTimeSpineNode: # noqa: D
assert len(new_parent_nodes) == 1
return JoinToTimeSpineNode(
parent_node=new_parent_nodes[0],
requested_metric_time_dimension_specs=self.requested_metric_time_dimension_specs,
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
time_range_constraint=self.time_range_constraint,
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
Expand Down
Loading
Loading