Skip to content

Commit

Permalink
Rename default grain -> min grain
Browse files Browse the repository at this point in the history
Clean up. This was causing me some confusion because we recently
introduced the concept of default grain for metrics, which is defined in
 the YAML spec, but this code was not referring to that.
  • Loading branch information
courtneyholcomb committed Dec 9, 2024
1 parent dcf17cf commit 145292a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,28 @@ def as_tuple(self) -> Tuple[LinkableInstanceSpec, ...]: # noqa: D102
)
)

def add_specs(
self,
dimension_specs: Tuple[DimensionSpec, ...] = (),
time_dimension_specs: Tuple[TimeDimensionSpec, ...] = (),
entity_specs: Tuple[EntitySpec, ...] = (),
group_by_metric_specs: Tuple[GroupByMetricSpec, ...] = (),
) -> LinkableSpecSet:
"""Return a new set with the new specs in addition to the existing ones."""
return LinkableSpecSet(
dimension_specs=self.dimension_specs + dimension_specs,
time_dimension_specs=self.time_dimension_specs + time_dimension_specs,
entity_specs=self.entity_specs + entity_specs,
group_by_metric_specs=self.group_by_metric_specs + group_by_metric_specs,
)

@override
def merge(self, other: LinkableSpecSet) -> LinkableSpecSet:
return LinkableSpecSet(
dimension_specs=self.dimension_specs + other.dimension_specs,
time_dimension_specs=self.time_dimension_specs + other.time_dimension_specs,
entity_specs=self.entity_specs + other.entity_specs,
group_by_metric_specs=self.group_by_metric_specs + other.group_by_metric_specs,
return self.add_specs(
dimension_specs=other.dimension_specs,
time_dimension_specs=other.time_dimension_specs,
entity_specs=other.entity_specs,
group_by_metric_specs=other.group_by_metric_specs,
)

@classmethod
Expand Down
46 changes: 20 additions & 26 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ def _build_query_output_node(
where_filter_specs=(),
pushdown_enabled_types=frozenset({PredicateInputType.TIME_RANGE_CONSTRAINT}),
)

return self._build_metrics_output_node(
metric_specs=tuple(
MetricSpec(
Expand Down Expand Up @@ -237,6 +236,13 @@ def _optimize_plan(self, plan: DataflowPlan, optimizations: FrozenSet[DataflowPl

return plan

def _get_minimum_metric_time_spec_for_metric(self, metric_reference: MetricReference) -> TimeDimensionSpec:
"""Gets the minimum metric time spec for the given metric reference."""
min_granularity = ExpandedTimeGranularity.from_time_granularity(
self._metric_lookup.get_min_queryable_time_granularity(metric_reference)
)
return DataSet.metric_time_dimension_spec(min_granularity)

def _build_aggregated_conversion_node(
self,
metric_spec: MetricSpec,
Expand Down Expand Up @@ -308,14 +314,11 @@ def _build_aggregated_conversion_node(
# Get the time dimension used to calculate the conversion window
# Currently, both the base/conversion measure uses metric_time as it's the default agg time dimension.
# However, eventually, there can be user-specified time dimensions used for this calculation.
default_granularity = ExpandedTimeGranularity.from_time_granularity(
self._metric_lookup.get_min_queryable_time_granularity(metric_spec.reference)
)
metric_time_dimension_spec = DataSet.metric_time_dimension_spec(default_granularity)
min_metric_time_spec = self._get_minimum_metric_time_spec_for_metric(metric_spec.reference)

# Filter the source nodes with only the required specs needed for the calculation
constant_property_specs = []
required_local_specs = [base_measure_spec.measure_spec, entity_spec, metric_time_dimension_spec] + list(
required_local_specs = [base_measure_spec.measure_spec, entity_spec, min_metric_time_spec] + list(
base_measure_recipe.required_local_linkable_specs.as_tuple
)
for constant_property in constant_properties or []:
Expand Down Expand Up @@ -346,10 +349,10 @@ def _build_aggregated_conversion_node(
# adjusted in the opposite direction.
join_conversion_node = JoinConversionEventsNode.create(
base_node=unaggregated_base_measure_node,
base_time_dimension_spec=metric_time_dimension_spec,
base_time_dimension_spec=min_metric_time_spec,
conversion_node=unaggregated_conversion_measure_node,
conversion_measure_spec=conversion_measure_spec.measure_spec,
conversion_time_dimension_spec=metric_time_dimension_spec,
conversion_time_dimension_spec=min_metric_time_spec,
unique_identifier_keys=(MetadataSpec(MetricFlowReservedKeywords.MF_INTERNAL_UUID.value),),
entity_spec=entity_spec,
window=window,
Expand Down Expand Up @@ -445,21 +448,19 @@ def _build_cumulative_metric_output_node(
predicate_pushdown_state: PredicatePushdownState,
for_group_by_source_node: bool = False,
) -> DataflowPlanNode:
# TODO: [custom granularity] Figure out how to support custom granularities as defaults
default_granularity = ExpandedTimeGranularity.from_time_granularity(
self._metric_lookup.get_min_queryable_time_granularity(metric_spec.reference)
)
min_metric_time_spec = self._get_minimum_metric_time_spec_for_metric(metric_spec.reference)
min_granularity = min_metric_time_spec.time_granularity

queried_agg_time_dimensions = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
query_includes_agg_time_dimension_with_default_granularity = False
query_includes_agg_time_dimension_with_min_granularity = False
for time_dimension_spec in queried_agg_time_dimensions:
if time_dimension_spec.time_granularity == default_granularity:
query_includes_agg_time_dimension_with_default_granularity = True
if time_dimension_spec.time_granularity == min_granularity:
query_includes_agg_time_dimension_with_min_granularity = True
break

if query_includes_agg_time_dimension_with_default_granularity or not queried_agg_time_dimensions:
if query_includes_agg_time_dimension_with_min_granularity or len(queried_agg_time_dimensions) == 0:
return self._build_base_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
Expand All @@ -468,14 +469,11 @@ def _build_cumulative_metric_output_node(
for_group_by_source_node=for_group_by_source_node,
)

# If a cumulative metric is queried without default granularity, it will need to be aggregated twice -
# If a cumulative metric is queried without its minimum granularity, it will need to be aggregated twice:
# once as a normal metric, and again using a window function to narrow down to one row per granularity period.
# In this case, add metric time at the default granularity to the linkable specs. It will be used in the order by
# clause of the window function and later excluded from the output selections.
default_metric_time = DataSet.metric_time_dimension_spec(default_granularity)
include_linkable_specs = queried_linkable_specs.merge(
LinkableSpecSet(time_dimension_specs=(default_metric_time,))
)
include_linkable_specs = queried_linkable_specs.add_specs(time_dimension_specs=(min_metric_time_spec,))
compute_metrics_node = self._build_base_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=include_linkable_specs,
Expand All @@ -486,7 +484,7 @@ def _build_cumulative_metric_output_node(
return WindowReaggregationNode.create(
parent_node=compute_metrics_node,
metric_spec=metric_spec,
order_by_spec=default_metric_time,
order_by_spec=min_metric_time_spec,
partition_by_specs=queried_linkable_specs.as_tuple,
)

Expand Down Expand Up @@ -1629,10 +1627,6 @@ def _build_aggregated_measure_from_measure_source_node(

# If querying an offset metric, join to time spine before aggregation.
if before_aggregation_time_spine_join_description is not None:
assert queried_agg_time_dimension_specs, (
"Joining to time spine requires querying with metric time or the appropriate agg_time_dimension."
"This should have been caught by validations."
)
assert before_aggregation_time_spine_join_description.join_type is SqlJoinType.INNER, (
f"Expected {SqlJoinType.INNER} for joining to time spine before aggregation. Remove this if there's a "
f"new use case."
Expand Down

0 comments on commit 145292a

Please sign in to comment.