From 5a0201e2db8b72387193c5c60e0d222a733af6cb Mon Sep 17 00:00:00 2001 From: Michael Irvine Date: Wed, 13 Jul 2022 22:03:37 -0400 Subject: [PATCH 1/4] adds dispatch for multi-adapter support --- macros/funnel.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/macros/funnel.sql b/macros/funnel.sql index 295619b..a40d518 100644 --- a/macros/funnel.sql +++ b/macros/funnel.sql @@ -1,4 +1,8 @@ {% macro funnel(steps=none, event_stream=none) %} + {{ return(adapter.dispatch('funnel','dbt_product_analytics')(steps, event_stream)) }} +{% endmacro %} + +{% macro default__funnel(steps, event_stream) %} with event_stream as ( {% if not (event_stream|string|trim).startswith('select ') %} select * from {% endif %} {{ event_stream }} ) {% for step in steps %} , event_stream_step_{{ loop.index }} as ( From c544a64a750484eeef32fecbc784361bc95fa51a Mon Sep 17 00:00:00 2001 From: Michael Irvine Date: Wed, 13 Jul 2022 22:39:23 -0400 Subject: [PATCH 2/4] adds untested match_recognize implementation for snowflake --- macros/funnel.sql | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/macros/funnel.sql b/macros/funnel.sql index a40d518..c932de4 100644 --- a/macros/funnel.sql +++ b/macros/funnel.sql @@ -36,8 +36,37 @@ , final as ( select event_type - , unique_users, 1.0 * unique_users / nullif(first_value(unique_users) over(), 0) as pct_conversion - , 1.0 * unique_users / nullif(lag(unique_users) over(), 0) as pct_of_previous + , unique_users, 1.0 * unique_users / nullif(first_value(unique_users) over(), 0) as pct_conversion + , 1.0 * unique_users / nullif(lag(unique_users) over(), 0) as pct_of_previous + from event_funnel + ) + + select * from final +{% endmacro %} + +{% macro snowflake__funnel(steps, event_stream) %} + with event_stream as ( {% if not (event_stream|string|trim).startswith('select ') %} select * from {% endif %} {{ event_stream }} ) + + , event_funnel as ( + select event_type, count(*) unique_users + from event_stream + match_recognize( + partition by user_id + order by event_date + one row per match + pattern({% for step in steps %} step_{{ loop.index }} {% endfor %} ) + define + {% for step in steps %} + step_{{ loop.index }} as event_type = '{{ step.event_type }}' {% if not loop.last %} , {% endif %} + {% endfor %} + ) + group by event_type + ) + + , final as ( + select event_type + , unique_users, 1.0 * unique_users / nullif(first_value(unique_users) over(), 0) as pct_conversion + , 1.0 * unique_users / nullif(lag(unique_users) over(), 0) as pct_of_previous from event_funnel ) From 6116b18d62dd8207090d83ad2f342f7ab9c84410 Mon Sep 17 00:00:00 2001 From: Michael Irvine Date: Sat, 16 Jul 2022 21:21:59 -0400 Subject: [PATCH 3/4] adds trino and fixes some errors --- macros/event_stream.sql | 16 ++++++++++++++++ macros/funnel.sql | 7 ++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/macros/event_stream.sql b/macros/event_stream.sql index cbd3444..8c71774 100644 --- a/macros/event_stream.sql +++ b/macros/event_stream.sql @@ -6,6 +6,10 @@ start_date=none, end_date=none) %} + {{ return(adapter.dispatch('event_stream', 'dbt_product_analytics')(from, event_type_col, user_id_col, date_col, start_date, end_date)) }} +{% endmacro %} + +{% macro default__event_stream(from, event_type_col, user_id_col, date_col, start_date, end_date) %} select {{ event_type_col }} as event_type, {{ user_id_col }} as user_id, {{ date_col }} as event_date from {{ from }} where 1 = 1 @@ -15,4 +19,16 @@ {% if end_date is not none %} and {{ date_col }} < '{{ end_date }}' {% endif %} +{% endmacro %} + +{% macro trino__event_stream(from, event_type_col, user_id_col, date_col, start_date, end_date) %} + select {{ event_type_col }} as event_type, {{ user_id_col }} as user_id, {{ date_col }} as event_date + from {{ from }} + where 1 = 1 + {% if start_date is not none %} + and {{ date_col }} >= date '{{ start_date }}' + {% endif %} + {% if end_date is not none %} + and {{ date_col }} < date '{{ end_date }}' + {% endif %} {% endmacro %} \ No newline at end of file diff --git a/macros/funnel.sql b/macros/funnel.sql index c932de4..b40b1c3 100644 --- a/macros/funnel.sql +++ b/macros/funnel.sql @@ -53,6 +53,7 @@ match_recognize( partition by user_id order by event_date + measures event_type as event_type one row per match pattern({% for step in steps %} step_{{ loop.index }} {% endfor %} ) define @@ -66,13 +67,17 @@ , final as ( select event_type , unique_users, 1.0 * unique_users / nullif(first_value(unique_users) over(), 0) as pct_conversion - , 1.0 * unique_users / nullif(lag(unique_users) over(), 0) as pct_of_previous + , 1.0 * unique_users / nullif(lag(unique_users) over(order by unique_users), 0) as pct_of_previous from event_funnel ) select * from final {% endmacro %} +{% macro trino__funnel(steps, event_stream) %} + {{ dbt_product_analytics.snowflake__funnel(steps, event_stream) }} +{% endmacro %} + {# ### EXAMPLE ### From 671d5abee8c50842fd45e6a840e78c1eff817ca7 Mon Sep 17 00:00:00 2001 From: Michael Irvine Date: Sun, 17 Jul 2022 20:40:03 -0400 Subject: [PATCH 4/4] trino/snowflake works --- integration_tests/seeds/raw_orders_simple.csv | 3 ++- macros/funnel.sql | 26 ++++++++++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/integration_tests/seeds/raw_orders_simple.csv b/integration_tests/seeds/raw_orders_simple.csv index ab25ba7..30d2b1f 100644 --- a/integration_tests/seeds/raw_orders_simple.csv +++ b/integration_tests/seeds/raw_orders_simple.csv @@ -18,4 +18,5 @@ id,customer_id,order_date,status 17,7,2018-01-18,completed 18,6,2018-01-20,completed 19,5,2018-01-22,returned -20,2,2018-01-23,completed \ No newline at end of file +20,2,2018-01-23,completed +21,3,2018-01-24,returned \ No newline at end of file diff --git a/macros/funnel.sql b/macros/funnel.sql index b40b1c3..85ee30d 100644 --- a/macros/funnel.sql +++ b/macros/funnel.sql @@ -47,15 +47,24 @@ {% macro snowflake__funnel(steps, event_stream) %} with event_stream as ( {% if not (event_stream|string|trim).startswith('select ') %} select * from {% endif %} {{ event_stream }} ) + , steps as ( + {% for step in steps %} + select + '{{ step.event_type }}' as event_type + , {{ loop.index }} as index + {% if not loop.last %} + union all + {% endif %} + {% endfor %} + ) , event_funnel as ( - select event_type, count(*) unique_users + select event_type, count(distinct user_id) as unique_users from event_stream match_recognize( partition by user_id order by event_date - measures event_type as event_type - one row per match - pattern({% for step in steps %} step_{{ loop.index }} {% endfor %} ) + all rows per match + pattern({% for step in steps %} ({% for i in range(loop.length - loop.index + 1) %} step_{{ loop.index }}+{% endfor %}) {% if not loop.last %} | {% endif %} {% endfor %} ) define {% for step in steps %} step_{{ loop.index }} as event_type = '{{ step.event_type }}' {% if not loop.last %} , {% endif %} @@ -65,10 +74,13 @@ ) , final as ( - select event_type - , unique_users, 1.0 * unique_users / nullif(first_value(unique_users) over(), 0) as pct_conversion - , 1.0 * unique_users / nullif(lag(unique_users) over(order by unique_users), 0) as pct_of_previous + select event_funnel.event_type + , unique_users, cast(unique_users as double) / nullif(first_value(unique_users) over(order by steps.index), 0) as pct_conversion + , 1.0 * cast(unique_users as double) / nullif(lag(unique_users) over(order by steps.index), 0) as pct_of_previous from event_funnel + left join steps + on event_funnel.event_type = steps.event_type + order by steps.index ) select * from final