Skip to content

Commit

Permalink
Dataflow plan tests
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Jun 14, 2024
1 parent ae72a15 commit 9891bbf
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 0 deletions.
54 changes: 54 additions & 0 deletions tests_metricflow/dataflow/builder/test_dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1351,3 +1351,57 @@ def test_all_available_metric_filters(
),
).query_spec
dataflow_plan_builder.build_plan(query_spec)


def test_cumulative_metric_with_non_default_grain(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
query_parser: MetricFlowQueryParser,
create_source_tables: bool,
) -> None:
"""Test querying a cumulative metric using a granularity that is not the metric's default."""
query_spec = query_parser.parse_and_validate_query(
metric_names=("trailing_2_months_revenue",), group_by_names=("metric_time__year",)
).query_spec
dataflow_plan = dataflow_plan_builder.build_plan(query_spec)

assert_plan_snapshot_text_equal(
request=request,
mf_test_configuration=mf_test_configuration,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan.structure_text(),
)

display_graph_if_requested(
request=request,
mf_test_configuration=mf_test_configuration,
dag_graph=dataflow_plan,
)


def test_derived_cumulative_metric_with_non_default_grain(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
query_parser: MetricFlowQueryParser,
create_source_tables: bool,
) -> None:
"""Test querying a derived metric with a cumulative input metric using non-default granularity."""
query_spec = query_parser.parse_and_validate_query(
metric_names=("trailing_2_months_revenue_sub_10",), group_by_names=("metric_time__month",)
).query_spec
dataflow_plan = dataflow_plan_builder.build_plan(query_spec)

assert_plan_snapshot_text_equal(
request=request,
mf_test_configuration=mf_test_configuration,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan.structure_text(),
)

display_graph_if_requested(
request=request,
mf_test_configuration=mf_test_configuration,
dag_graph=dataflow_plan,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<DataflowPlan>
<WriteToResultDataTableNode>
<!-- description = 'Write to DataTable' -->
<!-- node_id = NodeId(id_str='wrd_0') -->
<WindowReaggregationNode>
<!-- description = 'Re-aggregate Metrics via Window Functions' -->
<!-- node_id = NodeId(id_str='wr_0') -->
<!-- metric_spec = MetricSpec(element_name='trailing_2_months_revenue') -->
<!-- order_by_spec = TimeDimensionSpec(element_name='metric_time', time_granularity=DAY) -->
<!-- partition_by_specs = (TimeDimensionSpec(element_name='metric_time', time_granularity=YEAR),) -->
<ComputeMetricsNode>
<!-- description = 'Compute Metrics via Expressions' -->
<!-- node_id = NodeId(id_str='cm_0') -->
<!-- metric_spec = MetricSpec(element_name='trailing_2_months_revenue') -->
<AggregateMeasuresNode>
<!-- description = 'Aggregate Measures' -->
<!-- node_id = NodeId(id_str='am_0') -->
<FilterElementsNode>
<!-- description = -->
<!-- "Pass Only Elements: ['txn_revenue', 'metric_time__year', 'metric_time__day']" -->
<!-- node_id = NodeId(id_str='pfe_0') -->
<!-- include_spec = MeasureSpec(element_name='txn_revenue') -->
<!-- include_spec = TimeDimensionSpec(element_name='metric_time', time_granularity=YEAR) -->
<!-- include_spec = TimeDimensionSpec(element_name='metric_time', time_granularity=DAY) -->
<!-- distinct = False -->
<JoinOverTimeRangeNode>
<!-- description = 'Join Self Over Time Range' -->
<!-- node_id = NodeId(id_str='jotr_0') -->
<!-- queried_agg_time_dimension_specs = -->
<!-- ( -->
<!-- TimeDimensionSpec(element_name='metric_time', time_granularity=YEAR), -->
<!-- TimeDimensionSpec(element_name='metric_time', time_granularity=DAY), -->
<!-- ) -->
<!-- window = PydanticMetricTimeWindow(count=2, granularity=MONTH) -->
<MetricTimeDimensionTransformNode>
<!-- description = "Metric Time Dimension 'ds'" -->
<!-- node_id = NodeId(id_str='sma_28007') -->
<!-- aggregation_time_dimension = 'ds' -->
<ReadSqlSourceNode>
<!-- description = "Read From SemanticModelDataSet('revenue')" -->
<!-- node_id = NodeId(id_str='rss_28020') -->
<!-- data_set = SemanticModelDataSet('revenue') -->
</ReadSqlSourceNode>
</MetricTimeDimensionTransformNode>
</JoinOverTimeRangeNode>
</FilterElementsNode>
</AggregateMeasuresNode>
</ComputeMetricsNode>
</WindowReaggregationNode>
</WriteToResultDataTableNode>
</DataflowPlan>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<DataflowPlan>
<WriteToResultDataTableNode>
<!-- description = 'Write to DataTable' -->
<!-- node_id = NodeId(id_str='wrd_0') -->
<ComputeMetricsNode>
<!-- description = 'Compute Metrics via Expressions' -->
<!-- node_id = NodeId(id_str='cm_1') -->
<!-- metric_spec = MetricSpec(element_name='trailing_2_months_revenue_sub_10') -->
<WindowReaggregationNode>
<!-- description = 'Re-aggregate Metrics via Window Functions' -->
<!-- node_id = NodeId(id_str='wr_0') -->
<!-- metric_spec = MetricSpec(element_name='trailing_2_months_revenue', alias='t2mr') -->
<!-- order_by_spec = TimeDimensionSpec(element_name='metric_time', time_granularity=DAY) -->
<!-- partition_by_specs = (TimeDimensionSpec(element_name='metric_time', time_granularity=MONTH),) -->
<ComputeMetricsNode>
<!-- description = 'Compute Metrics via Expressions' -->
<!-- node_id = NodeId(id_str='cm_0') -->
<!-- metric_spec = MetricSpec(element_name='trailing_2_months_revenue', alias='t2mr') -->
<AggregateMeasuresNode>
<!-- description = 'Aggregate Measures' -->
<!-- node_id = NodeId(id_str='am_0') -->
<FilterElementsNode>
<!-- description = -->
<!-- "Pass Only Elements: ['txn_revenue', 'metric_time__month', 'metric_time__day']" -->
<!-- node_id = NodeId(id_str='pfe_0') -->
<!-- include_spec = MeasureSpec(element_name='txn_revenue') -->
<!-- include_spec = TimeDimensionSpec(element_name='metric_time', time_granularity=MONTH) -->
<!-- include_spec = TimeDimensionSpec(element_name='metric_time', time_granularity=DAY) -->
<!-- distinct = False -->
<JoinOverTimeRangeNode>
<!-- description = 'Join Self Over Time Range' -->
<!-- node_id = NodeId(id_str='jotr_0') -->
<!-- queried_agg_time_dimension_specs = -->
<!-- ( -->
<!-- TimeDimensionSpec(element_name='metric_time', time_granularity=MONTH), -->
<!-- TimeDimensionSpec(element_name='metric_time', time_granularity=DAY), -->
<!-- ) -->
<!-- window = PydanticMetricTimeWindow(count=2, granularity=MONTH) -->
<MetricTimeDimensionTransformNode>
<!-- description = "Metric Time Dimension 'ds'" -->
<!-- node_id = NodeId(id_str='sma_28007') -->
<!-- aggregation_time_dimension = 'ds' -->
<ReadSqlSourceNode>
<!-- description = "Read From SemanticModelDataSet('revenue')" -->
<!-- node_id = NodeId(id_str='rss_28020') -->
<!-- data_set = SemanticModelDataSet('revenue') -->
</ReadSqlSourceNode>
</MetricTimeDimensionTransformNode>
</JoinOverTimeRangeNode>
</FilterElementsNode>
</AggregateMeasuresNode>
</ComputeMetricsNode>
</WindowReaggregationNode>
</ComputeMetricsNode>
</WriteToResultDataTableNode>
</DataflowPlan>

0 comments on commit 9891bbf

Please sign in to comment.