diff --git a/metricflow/test/fixtures/model_fixtures.py b/metricflow/test/fixtures/model_fixtures.py index 2804cb727d..4cc48fc9fc 100644 --- a/metricflow/test/fixtures/model_fixtures.py +++ b/metricflow/test/fixtures/model_fixtures.py @@ -143,6 +143,8 @@ def template_mapping(mf_test_session_state: MetricFlowTestSessionState) -> Dict[ "source_schema": schema, "accounts_source_table": f"{schema}.fct_accounts", "primary_accounts_table": f"{schema}.dim_primary_accounts", + "buys_source_table": f"{schema}.fct_buys", + "visits_source_table": f"{schema}.fct_visits", } diff --git a/metricflow/test/fixtures/model_yamls/simple_model/data_sources/buys_source.yaml b/metricflow/test/fixtures/model_yamls/simple_model/data_sources/buys_source.yaml new file mode 100644 index 0000000000..5609232d9c --- /dev/null +++ b/metricflow/test/fixtures/model_yamls/simple_model/data_sources/buys_source.yaml @@ -0,0 +1,30 @@ +--- +data_source: + name: buys_source + description: buys_source + owners: + - support@transformdata.io + + sql_query: | + -- User Defined SQL Query + SELECT * FROM $buys_source_table + + measures: + - name: buys + expr: 1 + agg: count + - name: buyers + expr: user_id + agg: count_distinct + + dimensions: + - name: ds + type: time + type_params: + is_primary: True + time_granularity: day + + identifiers: + - name: user + type: foreign + expr: user_id diff --git a/metricflow/test/fixtures/model_yamls/simple_model/data_sources/visits_source.yaml b/metricflow/test/fixtures/model_yamls/simple_model/data_sources/visits_source.yaml new file mode 100644 index 0000000000..9a517b1785 --- /dev/null +++ b/metricflow/test/fixtures/model_yamls/simple_model/data_sources/visits_source.yaml @@ -0,0 +1,33 @@ +--- +data_source: + name: visits_source + description: visits_source + owners: + - support@transformdata.io + + sql_query: | + -- User Defined SQL Query + SELECT * FROM $visits_source_table + + measures: + - name: visits + expr: 1 + agg: count + - name: visitors + expr: user_id + agg: count_distinct + + + dimensions: + - name: ds + type: time + type_params: + is_primary: True + time_granularity: day + - name: referrer_id + type: categorical + + identifiers: + - name: user + type: foreign + expr: user_id \ No newline at end of file diff --git a/metricflow/test/fixtures/table_fixtures.py b/metricflow/test/fixtures/table_fixtures.py index 0d09bf3ed2..5226dfc172 100644 --- a/metricflow/test/fixtures/table_fixtures.py +++ b/metricflow/test/fixtures/table_fixtures.py @@ -318,6 +318,61 @@ def create_simple_model_tables(mf_test_session_state: MetricFlowTestSessionState ), ) + # Events data + visits_data = [ + ("u0004114", "2020-01-01", "fb_ad_1"), + ("u0004214", "2020-01-01", "fb_ad_2"), + ("u0003141", "2020-01-01", "homepage_1"), + ("u0003154", "2020-01-01", "homepage_2"), + ("u1612112", "2020-01-02", "homepage_1"), + ("u0042324", "2020-01-02", "homepage_1"), + ("u0005432", "2020-01-02", "fb_ad_3"), + ("u0003452", "2020-01-02", "fb_ad_1"), + ("u0003452", "2020-01-02", "user_2"), + ("u0042324", "2020-01-03", "fb_ad_1"), + ("u0005432", "2020-01-03", "google_ad_1"), + ("u0005472", "2020-01-03", "google_ad_2"), + ("u0005414", "2020-01-04", "google_ad_1"), + ("u0004114", "2020-01-06", "homepage_1"), + ("u0004114", "2020-01-07", "fb_ad_2"), + ("u0004117", "2020-01-10", "google_ad_1"), + ("u0003141", "2020-01-12", "user_1"), + ] + + create_table( + sql_client=sql_client, + sql_table=SqlTable(schema_name=schema, table_name="fct_visits"), + df=make_df( + sql_client=sql_client, + columns=["user_id", DEFAULT_DS, "referrer_id"], + time_columns={DEFAULT_DS}, + data=visits_data, + ), + ) + + buy_data = [ + ("u0004114", "2020-01-02"), + ("u0042324", "2020-01-03"), + ("u0042324", "2020-01-03"), + ("u0005432", "2020-01-04"), + ("u1612112", "2020-01-07"), + ("u0004114", "2020-01-07"), + ("u0004117", "2020-01-10"), + ("u0003141", "2020-01-07"), + ("u0003452", "2020-01-04"), + ] + + create_table( + sql_client=sql_client, + sql_table=SqlTable(schema_name=schema, table_name="fct_buys"), + df=make_df( + sql_client=sql_client, + columns=["user_id", DEFAULT_DS], + time_columns={DEFAULT_DS}, + data=buy_data, + ), + ) + return True diff --git a/metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py b/metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py index b7c687d515..8e2c4a1b0c 100644 --- a/metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py +++ b/metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py @@ -7,10 +7,12 @@ from metricflow.constraints.time_constraint import TimeRangeConstraint from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder from metricflow.dataflow.dataflow_plan import ( + AppendRowNumberColumnNode, DataflowPlan, WriteToResultDataframeNode, FilterElementsNode, AggregateMeasuresNode, + JoinConversionEventsNode, JoinDescription, JoinToBaseOutputNode, ComputeMetricsNode, @@ -23,6 +25,7 @@ ) from metricflow.dataflow.dataflow_plan_to_text import dataflow_plan_as_text from metricflow.model.semantic_model import SemanticModel +from metricflow.model.objects.metric import CumulativeMetricWindow from metricflow.plan_conversion.column_resolver import DefaultColumnAssociationResolver from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlQueryPlanConverter from metricflow.plan_conversion.time_spine import TimeSpineSource @@ -1496,3 +1499,39 @@ def test_metric_with_measures_from_multiple_sources_no_dimensions( # noqa: D sql_client=sql_client, node=dataflow_plan.sink_output_nodes[0].parent_node, ) + + +def test_join_conversion_events_node( # noqa: D + request: FixtureRequest, + mf_test_session_state: MetricFlowTestSessionState, + dataflow_plan_builder: DataflowPlanBuilder[DataSourceDataSet], + dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter[DataSourceDataSet], + consistent_id_object_repository: ConsistentIdObjectRepository, + sql_client: SqlClient, +) -> None: + buys_source = AppendRowNumberColumnNode( + parent_node=consistent_id_object_repository.simple_model_read_nodes["buys_source"] + ) + visits_source = consistent_id_object_repository.simple_model_read_nodes["visits_source"] + + base_time_dimension = TimeDimensionSpec.from_name("ds") + conversion_time_dimension = TimeDimensionSpec.from_name("ds") + window = CumulativeMetricWindow(count=7, granularity=TimeGranularity.DAY) + entity = IdentifierSpec.from_name("user") + conversion_primary_keys = (IdentifierSpec.from_name("auto_gen_primary_key"),) + join_node = JoinConversionEventsNode[DataSourceDataSet]( + base_node=visits_source, + base_time_dimension_spec=base_time_dimension, + conversion_node=buys_source, + conversion_time_dimension_spec=conversion_time_dimension, + conversion_primary_key_specs=conversion_primary_keys, + entity_spec=entity, + window=window, + ) + convert_and_check( + request=request, + mf_test_session_state=mf_test_session_state, + dataflow_to_sql_converter=dataflow_to_sql_converter, + sql_client=sql_client, + node=join_node, + ) diff --git a/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/DuckDbSqlClient/test_join_conversion_events_node__plan0.sql b/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/DuckDbSqlClient/test_join_conversion_events_node__plan0.sql new file mode 100644 index 0000000000..dd6d0b6e88 --- /dev/null +++ b/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/DuckDbSqlClient/test_join_conversion_events_node__plan0.sql @@ -0,0 +1,76 @@ +-- Find conversions for IdentifierSpec(element_name='user', identifier_links=()) within the range of 7 day +SELECT DISTINCT + subq_5.ds AS ds + , subq_5.ds__week AS ds__week + , subq_5.ds__month AS ds__month + , subq_5.ds__quarter AS ds__quarter + , subq_5.ds__year AS ds__year + , subq_5.user AS user + , subq_5.auto_gen_primary_key AS auto_gen_primary_key + , subq_5.buys AS buys + , subq_5.buyers AS buyers + , first_value(subq_3.visits) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS visits + , first_value(subq_3.visitors) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS visitors + , first_value(subq_3.referrer_id) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS referrer_id + , first_value(subq_3.ds) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds + , first_value(subq_3.ds__week) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds__week + , first_value(subq_3.ds__month) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds__month + , first_value(subq_3.ds__quarter) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds__quarter + , first_value(subq_3.ds__year) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds__year + , first_value(subq_3.user) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS user +FROM ( + -- Read Elements From Data Source 'visits_source' + SELECT + 1 AS visits + , visits_source_src_10011.user_id AS visitors + , visits_source_src_10011.ds + , DATE_TRUNC('week', visits_source_src_10011.ds) AS ds__week + , DATE_TRUNC('month', visits_source_src_10011.ds) AS ds__month + , DATE_TRUNC('quarter', visits_source_src_10011.ds) AS ds__quarter + , DATE_TRUNC('year', visits_source_src_10011.ds) AS ds__year + , visits_source_src_10011.referrer_id + , visits_source_src_10011.user_id AS user + FROM ( + -- User Defined SQL Query + SELECT * FROM ***************************.fct_visits + ) visits_source_src_10011 +) subq_3 +INNER JOIN ( + -- Append row number column + SELECT + subq_4.ds + , subq_4.ds__week + , subq_4.ds__month + , subq_4.ds__quarter + , subq_4.ds__year + , subq_4.user + , subq_4.buys + , subq_4.buyers + , row_number() OVER (ORDER BY subq_4.buys, subq_4.buyers, subq_4.ds, subq_4.ds__week, subq_4.ds__month, subq_4.ds__quarter, subq_4.ds__year, subq_4.user) AS auto_gen_primary_key + FROM ( + -- Read Elements From Data Source 'buys_source' + SELECT + 1 AS buys + , buys_source_src_10002.user_id AS buyers + , buys_source_src_10002.ds + , DATE_TRUNC('week', buys_source_src_10002.ds) AS ds__week + , DATE_TRUNC('month', buys_source_src_10002.ds) AS ds__month + , DATE_TRUNC('quarter', buys_source_src_10002.ds) AS ds__quarter + , DATE_TRUNC('year', buys_source_src_10002.ds) AS ds__year + , buys_source_src_10002.user_id AS user + FROM ( + -- User Defined SQL Query + SELECT * FROM ***************************.fct_buys + ) buys_source_src_10002 + ) subq_4 +) subq_5 +ON + ( + subq_3.user = subq_5.user + ) AND ( + ( + subq_3.ds <= subq_5.ds + ) AND ( + subq_3.ds > subq_5.ds - INTERVAL 7 day + ) + ) diff --git a/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/DuckDbSqlClient/test_join_conversion_events_node__plan0_optimized.sql b/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/DuckDbSqlClient/test_join_conversion_events_node__plan0_optimized.sql new file mode 100644 index 0000000000..d0cfd1781d --- /dev/null +++ b/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/DuckDbSqlClient/test_join_conversion_events_node__plan0_optimized.sql @@ -0,0 +1,65 @@ +-- Find conversions for IdentifierSpec(element_name='user', identifier_links=()) within the range of 7 day +SELECT + subq_8.ds AS ds + , subq_8.ds__week AS ds__week + , subq_8.ds__month AS ds__month + , subq_8.ds__quarter AS ds__quarter + , subq_8.ds__year AS ds__year + , subq_8.user AS user + , subq_8.auto_gen_primary_key AS auto_gen_primary_key + , subq_8.buys AS buys + , subq_8.buyers AS buyers + , first_value(subq_6.visits) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS visits + , first_value(subq_6.visitors) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS visitors + , first_value(subq_6.referrer_id) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS referrer_id + , first_value(subq_6.ds) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds + , first_value(subq_6.ds__week) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds__week + , first_value(subq_6.ds__month) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds__month + , first_value(subq_6.ds__quarter) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds__quarter + , first_value(subq_6.ds__year) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds__year + , first_value(subq_6.user) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS user +FROM ( + -- Read Elements From Data Source 'visits_source' + SELECT + 1 AS visits + , user_id AS visitors + , ds + , DATE_TRUNC('week', ds) AS ds__week + , DATE_TRUNC('month', ds) AS ds__month + , DATE_TRUNC('quarter', ds) AS ds__quarter + , DATE_TRUNC('year', ds) AS ds__year + , referrer_id + , user_id AS user + FROM ( + -- User Defined SQL Query + SELECT * FROM ***************************.fct_visits + ) visits_source_src_10011 +) subq_6 +INNER JOIN ( + -- Read Elements From Data Source 'buys_source' + -- Append row number column + SELECT + ds + , DATE_TRUNC('week', ds) AS ds__week + , DATE_TRUNC('month', ds) AS ds__month + , DATE_TRUNC('quarter', ds) AS ds__quarter + , DATE_TRUNC('year', ds) AS ds__year + , user_id AS user + , 1 AS buys + , user_id AS buyers + , row_number() OVER (ORDER BY 1, user_id, ds, DATE_TRUNC('week', ds), DATE_TRUNC('month', ds), DATE_TRUNC('quarter', ds), DATE_TRUNC('year', ds)) AS auto_gen_primary_key + FROM ( + -- User Defined SQL Query + SELECT * FROM ***************************.fct_buys + ) buys_source_src_10002 +) subq_8 +ON + ( + subq_6.user = subq_8.user + ) AND ( + ( + subq_6.ds <= subq_8.ds + ) AND ( + subq_6.ds > subq_8.ds - INTERVAL 7 day + ) + ) diff --git a/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/test_join_conversion_events_node__plan0.xml b/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/test_join_conversion_events_node__plan0.xml new file mode 100644 index 0000000000..d5e6729085 --- /dev/null +++ b/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/SqlQueryPlan/test_join_conversion_events_node__plan0.xml @@ -0,0 +1,221 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +