Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional rendering & check query tests for custom grain #1466

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,15 @@ def _make_time_spine_data_set(
from_source=SqlTableNode.create(sql_table=time_spine_source.spine_table),
from_source_alias=time_spine_table_alias,
group_bys=select_columns if apply_group_by else (),
where=_make_time_range_comparison_expr(
table_alias=time_spine_table_alias,
column_alias=time_spine_source.base_column,
time_range_constraint=time_range_constraint,
)
if time_range_constraint
else None,
where=(
_make_time_range_comparison_expr(
table_alias=time_spine_table_alias,
column_alias=time_spine_source.base_column,
time_range_constraint=time_range_constraint,
)
if time_range_constraint
else None
),
)

# Where constraints must be applied in an outer query since they are using an alias (e.g., 'metric_time__day'),
Expand Down Expand Up @@ -380,7 +382,6 @@ def visit_source_node(self, node: ReadSqlSourceNode) -> SqlDataSet:
instance_set=node.data_set.instance_set,
)

# TODO: write tests for custom granularities that hit this node
def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDataSet:
"""Generate time range join SQL."""
table_alias_to_instance_set: OrderedDict[str, InstanceSet] = OrderedDict()
Expand Down Expand Up @@ -1307,7 +1308,6 @@ def visit_semi_additive_join_node(self, node: SemiAdditiveJoinNode) -> SqlDataSe
),
)

# TODO: write tests for custom granularities that hit this node
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet: # noqa: D102
parent_data_set = node.parent_node.accept(self)
parent_alias = self._next_unique_table_alias()
Expand All @@ -1330,16 +1330,14 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
):
agg_time_dimension_instances.append(instance)

# Choose the instance with the smallest standard granularity available.
assert all(
[not instance.spec.time_granularity.is_custom_granularity for instance in agg_time_dimension_instances]
), "Custom granularities are not yet supported for all queries."
# Choose the instance with the smallest base granularity available.
agg_time_dimension_instances.sort(key=lambda instance: instance.spec.time_granularity.base_granularity.to_int())
assert len(agg_time_dimension_instances) > 0, (
"Couldn't find requested agg_time_dimension in parent data set. The dataflow plan may have been "
"configured incorrectly."
)
agg_time_dimension_instance_for_join = agg_time_dimension_instances[0]
agg_time_dim_for_join_with_base_grain = agg_time_dimension_instance_for_join.spec.with_base_grain()

# Build time spine data set using the requested agg_time_dimension name.
time_spine_alias = self._next_unique_table_alias()
Expand All @@ -1354,7 +1352,7 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
node=node,
time_spine_alias=time_spine_alias,
agg_time_dimension_column_name=self.column_association_resolver.resolve_spec(
agg_time_dimension_instance_for_join.spec
agg_time_dim_for_join_with_base_grain
).column_name,
parent_sql_select_node=parent_data_set.checked_sql_select_node,
parent_alias=parent_alias,
Expand Down Expand Up @@ -1391,15 +1389,15 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
self._column_association_resolver, OrderedDict({parent_alias: parent_instance_set})
)

# Select instance from time spine data set that matches the join instance in the parent dataset.
# Select matching instance from time spine data set (using base grain - custom grain will be joined in a later node).
original_time_spine_dim_instance: Optional[TimeDimensionInstance] = None
for time_dimension_instance in time_spine_dataset.instance_set.time_dimension_instances:
if time_dimension_instance.spec == agg_time_dimension_instance_for_join.spec:
if time_dimension_instance.spec == agg_time_dim_for_join_with_base_grain:
original_time_spine_dim_instance = time_dimension_instance
break
assert original_time_spine_dim_instance, (
"Couldn't find requested agg_time_dimension_instance_for_join in time spine data set, which "
f"indicates it may have been configured incorrectly. Expected: {agg_time_dimension_instance_for_join.spec};"
f"indicates it may have been configured incorrectly. Expected: {agg_time_dim_for_join_with_base_grain};"
f" Got: {[instance.spec for instance in time_spine_dataset.instance_set.time_dimension_instances]}"
)
time_spine_column_select_expr: Union[
Expand Down
36 changes: 36 additions & 0 deletions tests_metricflow/integration/test_cases/itest_granularity.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,39 @@ integration_test:
ON {{ render_date_trunc("l.created_at", TimeGranularity.DAY) }} = ts.ds
WHERE {{ render_time_constraint("ts.martian_day", start_time="2019-12-20") }}
GROUP BY ts.martian_day
---
integration_test:
name: test_simple_metric_with_multi_hop_custom_granularity
description: Test querying a simple metric with a custom grain on a multi-hop dimension
model: SIMPLE_MODEL
metrics: ["bookings"]
group_bys: ["listing__user__ds__martian_day"]
check_query: |
SELECT
ts.martian_day AS listing__user__ds__martian_day
, SUM(1) AS bookings
FROM {{ source_schema }}.fct_bookings b
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest l
LEFT OUTER JOIN {{ source_schema }}.dim_users u ON l.user_id = u.user_id
ON b.listing_id = l.listing_id
AND {{ render_date_trunc("b.ds_partitioned", TimeGranularity.DAY) }} = {{ render_date_trunc("u.ds_partitioned", TimeGranularity.DAY) }}
LEFT OUTER JOIN {{ source_schema }}.mf_time_spine ts ON {{ render_date_trunc("u.ds", TimeGranularity.DAY) }} = ts.ds
GROUP BY ts.martian_day
---
integration_test:
name: test_offset_metric_with_custom_granularity_filter_not_in_group_by
description: Test querying a simple metric with a custom grain on a multi-hop dimension
model: SIMPLE_MODEL
metrics: ["bookings_5_day_lag"]
group_bys: ["metric_time__day"]
where_filter: |
{{ render_time_constraint(render_time_dimension_template('metric_time', 'martian_day'), start_time="2020-01-01") }}
check_query: |
SELECT
ts.ds AS metric_time__day
, SUM(1) AS bookings_5_day_lag
FROM {{ source_schema }}.mf_time_spine ts
INNER JOIN {{ source_schema }}.fct_bookings b ON {{ render_date_sub("ts", "ds", 5, TimeGranularity.DAY) }} = {{ render_date_trunc("b.ds", TimeGranularity.DAY) }}
LEFT OUTER JOIN {{ source_schema }}.mf_time_spine ts1 ON ts.ds = ts1.ds
WHERE {{ render_time_constraint("ts1.martian_day", "2020-01-01") }}
GROUP BY ts.ds
75 changes: 75 additions & 0 deletions tests_metricflow/query_rendering/test_custom_granularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,78 @@ def test_no_metrics_with_custom_granularity_in_filter_and_group_by(
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_simple_metric_with_multi_hop_custom_granularity(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
"""Test simple metric with a multi hop dimension and custom grain."""
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings",),
group_by_names=("listing__user__ds__martian_day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_offset_metric_with_custom_granularity( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
) -> None:
query_spec = MetricFlowQuerySpec(
metric_specs=(MetricSpec("bookings_5_day_lag"),),
time_dimension_specs=(normal_time_dim_with_custom_grain1,),
)

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_offset_metric_with_custom_granularity_filter_not_in_group_by( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_5_day_lag",),
group_by_names=("metric_time__day",),
where_constraints=[
PydanticWhereFilter(where_sql_template=("{{ TimeDimension('metric_time', 'martian_day') }} = '2020-01-01'"))
],
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)
Loading
Loading