Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: querying multiple granularities with offset metrics #1054

Merged
merged 10 commits into from
Feb 29, 2024
73 changes: 48 additions & 25 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,8 +1298,18 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
parent_alias=parent_alias,
)

# Select all instances from the parent data set, EXCEPT the requested agg_time_dimension.
# The agg_time_dimension will be selected from the time spine data set.
# Select all instances from the parent data set, EXCEPT agg_time_dimensions.
# The agg_time_dimensions will be selected from the time spine data set.
time_dimensions_to_select_from_parent: Tuple[TimeDimensionInstance, ...] = ()
time_dimensions_to_select_from_time_spine: Tuple[TimeDimensionInstance, ...] = ()
for time_dimension_instance in parent_data_set.instance_set.time_dimension_instances:
if (
time_dimension_instance.spec.element_name == agg_time_element_name
and time_dimension_instance.spec.entity_links == agg_time_entity_links
):
time_dimensions_to_select_from_time_spine += (time_dimension_instance,)
else:
time_dimensions_to_select_from_parent += (time_dimension_instance,)
parent_instance_set = InstanceSet(
measure_instances=parent_data_set.instance_set.measure_instances,
dimension_instances=parent_data_set.instance_set.dimension_instances,
Expand Down Expand Up @@ -1331,35 +1341,48 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
SqlColumnReference(table_alias=time_spine_alias, column_name=time_spine_dim_instance.spec.qualified_name)
)

# Add requested granularities (if different from time_spine) and date_parts to time spine column.
time_spine_select_columns = []
time_spine_dim_instances = []
where: Optional[SqlExpressionNode] = None
for requested_time_dimension_spec in node.requested_agg_time_dimension_specs:
# Apply granularity to time spine column select expression.
if requested_time_dimension_spec.time_granularity == time_spine_dim_instance.spec.time_granularity:
select_expr: SqlExpressionNode = time_spine_column_select_expr
else:
select_expr = SqlDateTruncExpression(
time_granularity=requested_time_dimension_spec.time_granularity, arg=time_spine_column_select_expr
where_filter: Optional[SqlExpressionNode] = None

# If offset_to_grain is used, will need to filter down to rows that match selected granularities.
# Does not apply if one of the granularities selected matches the time spine column granularity.
need_where_filter = (
node.offset_to_grain and time_spine_dim_instance.spec not in node.requested_agg_time_dimension_specs
)

# Add requested granularities (if different from time_spine) and date_parts to time spine column.
for time_dimension_instance in time_dimensions_to_select_from_time_spine:
time_dimension_spec = time_dimension_instance.spec

# Apply grain to time spine column select expression, unless grain already matches time spine column.
select_expr: SqlExpressionNode = (
time_spine_column_select_expr
if time_dimension_spec.time_granularity == time_spine_dim_instance.spec.time_granularity
else SqlDateTruncExpression(
time_granularity=time_dimension_spec.time_granularity, arg=time_spine_column_select_expr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies the time_spine_dim_instance.spec.time_granularity will always be equal to or finer (<= I think) than the time_dimension_spec.time_granularity.

I'm pretty sure that's true at this point, but is it enforced anywhere? I'm not sure what'll happen if we allow different granularities in the time spine, or if we allow the time spine to be DAY while the base granularity moves to MILLISECOND or whatever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I feel like the time spine SHOULD be set with the smallest possible grain in order to fill all granularity periods we support.
In practice, though, it might not be - especially when we transition to supporting smaller grains. Users might need to update their time spines.
I'll add an assertion for now and a comment to explain!

)
if node.offset_to_grain:
# Filter down to one row per granularity period
new_filter = SqlComparisonExpression(
left_expr=select_expr, comparison=SqlComparison.EQUALS, right_expr=time_spine_column_select_expr
)
if not where:
where = new_filter
else:
where = SqlLogicalExpression(operator=SqlLogicalOperator.OR, args=(where, new_filter))
)
# Filter down to one row per granularity period requested in the group by. Any other granularities
# included here will be filtered out in later nodes so should not be included in where filter.
if need_where_filter and time_dimension_spec in node.requested_agg_time_dimension_specs:
new_where_filter = SqlComparisonExpression(
left_expr=select_expr, comparison=SqlComparison.EQUALS, right_expr=time_spine_column_select_expr
)
where_filter = (
SqlLogicalExpression(operator=SqlLogicalOperator.OR, args=(where_filter, new_where_filter))
if where_filter
else new_where_filter
)

# Apply date_part to time spine column select expression.
if requested_time_dimension_spec.date_part:
select_expr = SqlExtractExpression(date_part=requested_time_dimension_spec.date_part, arg=select_expr)
if time_dimension_spec.date_part:
select_expr = SqlExtractExpression(date_part=time_dimension_spec.date_part, arg=select_expr)
time_dim_spec = TimeDimensionSpec(
element_name=time_spine_dim_instance.spec.element_name,
entity_links=time_spine_dim_instance.spec.entity_links,
time_granularity=requested_time_dimension_spec.time_granularity,
date_part=requested_time_dimension_spec.date_part,
time_granularity=time_dimension_spec.time_granularity,
date_part=time_dimension_spec.date_part,
aggregation_state=time_spine_dim_instance.spec.aggregation_state,
)
time_spine_dim_instance = TimeDimensionInstance(
Expand All @@ -1383,7 +1406,7 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
joins_descs=(join_description,),
group_bys=(),
order_bys=(),
where=where,
where=where_filter,
),
)

Expand Down
Loading