Skip to content

Commit

Permalink
added tests for JoinConversionEventsNode
Browse files Browse the repository at this point in the history
  • Loading branch information
WilliamDee committed Nov 21, 2022
1 parent 7e53e74 commit e0e4546
Show file tree
Hide file tree
Showing 8 changed files with 521 additions and 0 deletions.
2 changes: 2 additions & 0 deletions metricflow/test/fixtures/model_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def template_mapping(mf_test_session_state: MetricFlowTestSessionState) -> Dict[
"source_schema": schema,
"accounts_source_table": f"{schema}.fct_accounts",
"primary_accounts_table": f"{schema}.dim_primary_accounts",
"buys_source_table": f"{schema}.fct_buys",
"visits_source_table": f"{schema}.fct_visits",
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
data_source:
name: buys_source
description: buys_source
owners:
- [email protected]

sql_query: |
-- User Defined SQL Query
SELECT * FROM $buys_source_table
measures:
- name: buys
expr: 1
agg: count
- name: buyers
expr: user_id
agg: count_distinct

dimensions:
- name: ds
type: time
type_params:
is_primary: True
time_granularity: day

identifiers:
- name: user
type: foreign
expr: user_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
data_source:
name: visits_source
description: visits_source
owners:
- [email protected]

sql_query: |
-- User Defined SQL Query
SELECT * FROM $visits_source_table
measures:
- name: visits
expr: 1
agg: count
- name: visitors
expr: user_id
agg: count_distinct


dimensions:
- name: ds
type: time
type_params:
is_primary: True
time_granularity: day
- name: referrer_id
type: categorical

identifiers:
- name: user
type: foreign
expr: user_id
55 changes: 55 additions & 0 deletions metricflow/test/fixtures/table_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,61 @@ def create_simple_model_tables(mf_test_session_state: MetricFlowTestSessionState
),
)

# Events data
visits_data = [
("u0004114", "2020-01-01", "fb_ad_1"),
("u0004214", "2020-01-01", "fb_ad_2"),
("u0003141", "2020-01-01", "homepage_1"),
("u0003154", "2020-01-01", "homepage_2"),
("u1612112", "2020-01-02", "homepage_1"),
("u0042324", "2020-01-02", "homepage_1"),
("u0005432", "2020-01-02", "fb_ad_3"),
("u0003452", "2020-01-02", "fb_ad_1"),
("u0003452", "2020-01-02", "user_2"),
("u0042324", "2020-01-03", "fb_ad_1"),
("u0005432", "2020-01-03", "google_ad_1"),
("u0005472", "2020-01-03", "google_ad_2"),
("u0005414", "2020-01-04", "google_ad_1"),
("u0004114", "2020-01-06", "homepage_1"),
("u0004114", "2020-01-07", "fb_ad_2"),
("u0004117", "2020-01-10", "google_ad_1"),
("u0003141", "2020-01-12", "user_1"),
]

create_table(
sql_client=sql_client,
sql_table=SqlTable(schema_name=schema, table_name="fct_visits"),
df=make_df(
sql_client=sql_client,
columns=["user_id", DEFAULT_DS, "referrer_id"],
time_columns={DEFAULT_DS},
data=visits_data,
),
)

buy_data = [
("u0004114", "2020-01-02"),
("u0042324", "2020-01-03"),
("u0042324", "2020-01-03"),
("u0005432", "2020-01-04"),
("u1612112", "2020-01-07"),
("u0004114", "2020-01-07"),
("u0004117", "2020-01-10"),
("u0003141", "2020-01-07"),
("u0003452", "2020-01-04"),
]

create_table(
sql_client=sql_client,
sql_table=SqlTable(schema_name=schema, table_name="fct_buys"),
df=make_df(
sql_client=sql_client,
columns=["user_id", DEFAULT_DS],
time_columns={DEFAULT_DS},
data=buy_data,
),
)

return True


Expand Down
39 changes: 39 additions & 0 deletions metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from metricflow.constraints.time_constraint import TimeRangeConstraint
from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder
from metricflow.dataflow.dataflow_plan import (
AppendRowNumberColumnNode,
DataflowPlan,
WriteToResultDataframeNode,
FilterElementsNode,
AggregateMeasuresNode,
JoinConversionEventsNode,
JoinDescription,
JoinToBaseOutputNode,
ComputeMetricsNode,
Expand All @@ -23,6 +25,7 @@
)
from metricflow.dataflow.dataflow_plan_to_text import dataflow_plan_as_text
from metricflow.model.semantic_model import SemanticModel
from metricflow.model.objects.metric import CumulativeMetricWindow
from metricflow.plan_conversion.column_resolver import DefaultColumnAssociationResolver
from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlQueryPlanConverter
from metricflow.plan_conversion.time_spine import TimeSpineSource
Expand Down Expand Up @@ -1496,3 +1499,39 @@ def test_metric_with_measures_from_multiple_sources_no_dimensions( # noqa: D
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


def test_join_conversion_events_node( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder[DataSourceDataSet],
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter[DataSourceDataSet],
consistent_id_object_repository: ConsistentIdObjectRepository,
sql_client: SqlClient,
) -> None:
buys_source = AppendRowNumberColumnNode(
parent_node=consistent_id_object_repository.simple_model_read_nodes["buys_source"]
)
visits_source = consistent_id_object_repository.simple_model_read_nodes["visits_source"]

base_time_dimension = TimeDimensionSpec.from_name("ds")
conversion_time_dimension = TimeDimensionSpec.from_name("ds")
window = CumulativeMetricWindow(count=7, granularity=TimeGranularity.DAY)
entity = IdentifierSpec.from_name("user")
conversion_primary_keys = (IdentifierSpec.from_name("auto_gen_primary_key"),)
join_node = JoinConversionEventsNode[DataSourceDataSet](
base_node=visits_source,
base_time_dimension_spec=base_time_dimension,
conversion_node=buys_source,
conversion_time_dimension_spec=conversion_time_dimension,
conversion_primary_key_specs=conversion_primary_keys,
entity_spec=entity,
window=window,
)
convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=join_node,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
-- Find conversions for IdentifierSpec(element_name='user', identifier_links=()) within the range of 7 day
SELECT DISTINCT
subq_5.ds AS ds
, subq_5.ds__week AS ds__week
, subq_5.ds__month AS ds__month
, subq_5.ds__quarter AS ds__quarter
, subq_5.ds__year AS ds__year
, subq_5.user AS user
, subq_5.auto_gen_primary_key AS auto_gen_primary_key
, subq_5.buys AS buys
, subq_5.buyers AS buyers
, first_value(subq_3.visits) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS visits
, first_value(subq_3.visitors) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS visitors
, first_value(subq_3.referrer_id) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS referrer_id
, first_value(subq_3.ds) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds
, first_value(subq_3.ds__week) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds__week
, first_value(subq_3.ds__month) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds__month
, first_value(subq_3.ds__quarter) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds__quarter
, first_value(subq_3.ds__year) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS ds__year
, first_value(subq_3.user) OVER (PARTITION BY subq_5.user, subq_5.ds ORDER BY subq_3.ds DESC) AS user
FROM (
-- Read Elements From Data Source 'visits_source'
SELECT
1 AS visits
, visits_source_src_10011.user_id AS visitors
, visits_source_src_10011.ds
, DATE_TRUNC('week', visits_source_src_10011.ds) AS ds__week
, DATE_TRUNC('month', visits_source_src_10011.ds) AS ds__month
, DATE_TRUNC('quarter', visits_source_src_10011.ds) AS ds__quarter
, DATE_TRUNC('year', visits_source_src_10011.ds) AS ds__year
, visits_source_src_10011.referrer_id
, visits_source_src_10011.user_id AS user
FROM (
-- User Defined SQL Query
SELECT * FROM ***************************.fct_visits
) visits_source_src_10011
) subq_3
INNER JOIN (
-- Append row number column
SELECT
subq_4.ds
, subq_4.ds__week
, subq_4.ds__month
, subq_4.ds__quarter
, subq_4.ds__year
, subq_4.user
, subq_4.buys
, subq_4.buyers
, row_number() OVER (ORDER BY subq_4.buys, subq_4.buyers, subq_4.ds, subq_4.ds__week, subq_4.ds__month, subq_4.ds__quarter, subq_4.ds__year, subq_4.user) AS auto_gen_primary_key
FROM (
-- Read Elements From Data Source 'buys_source'
SELECT
1 AS buys
, buys_source_src_10002.user_id AS buyers
, buys_source_src_10002.ds
, DATE_TRUNC('week', buys_source_src_10002.ds) AS ds__week
, DATE_TRUNC('month', buys_source_src_10002.ds) AS ds__month
, DATE_TRUNC('quarter', buys_source_src_10002.ds) AS ds__quarter
, DATE_TRUNC('year', buys_source_src_10002.ds) AS ds__year
, buys_source_src_10002.user_id AS user
FROM (
-- User Defined SQL Query
SELECT * FROM ***************************.fct_buys
) buys_source_src_10002
) subq_4
) subq_5
ON
(
subq_3.user = subq_5.user
) AND (
(
subq_3.ds <= subq_5.ds
) AND (
subq_3.ds > subq_5.ds - INTERVAL 7 day
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
-- Find conversions for IdentifierSpec(element_name='user', identifier_links=()) within the range of 7 day
SELECT
subq_8.ds AS ds
, subq_8.ds__week AS ds__week
, subq_8.ds__month AS ds__month
, subq_8.ds__quarter AS ds__quarter
, subq_8.ds__year AS ds__year
, subq_8.user AS user
, subq_8.auto_gen_primary_key AS auto_gen_primary_key
, subq_8.buys AS buys
, subq_8.buyers AS buyers
, first_value(subq_6.visits) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS visits
, first_value(subq_6.visitors) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS visitors
, first_value(subq_6.referrer_id) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS referrer_id
, first_value(subq_6.ds) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds
, first_value(subq_6.ds__week) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds__week
, first_value(subq_6.ds__month) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds__month
, first_value(subq_6.ds__quarter) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds__quarter
, first_value(subq_6.ds__year) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS ds__year
, first_value(subq_6.user) OVER (PARTITION BY subq_8.user, subq_8.ds ORDER BY subq_6.ds DESC) AS user
FROM (
-- Read Elements From Data Source 'visits_source'
SELECT
1 AS visits
, user_id AS visitors
, ds
, DATE_TRUNC('week', ds) AS ds__week
, DATE_TRUNC('month', ds) AS ds__month
, DATE_TRUNC('quarter', ds) AS ds__quarter
, DATE_TRUNC('year', ds) AS ds__year
, referrer_id
, user_id AS user
FROM (
-- User Defined SQL Query
SELECT * FROM ***************************.fct_visits
) visits_source_src_10011
) subq_6
INNER JOIN (
-- Read Elements From Data Source 'buys_source'
-- Append row number column
SELECT
ds
, DATE_TRUNC('week', ds) AS ds__week
, DATE_TRUNC('month', ds) AS ds__month
, DATE_TRUNC('quarter', ds) AS ds__quarter
, DATE_TRUNC('year', ds) AS ds__year
, user_id AS user
, 1 AS buys
, user_id AS buyers
, row_number() OVER (ORDER BY 1, user_id, ds, DATE_TRUNC('week', ds), DATE_TRUNC('month', ds), DATE_TRUNC('quarter', ds), DATE_TRUNC('year', ds)) AS auto_gen_primary_key
FROM (
-- User Defined SQL Query
SELECT * FROM ***************************.fct_buys
) buys_source_src_10002
) subq_8
ON
(
subq_6.user = subq_8.user
) AND (
(
subq_6.ds <= subq_8.ds
) AND (
subq_6.ds > subq_8.ds - INTERVAL 7 day
)
)
Loading

0 comments on commit e0e4546

Please sign in to comment.