Skip to content

Commit

Permalink
Merge pull request #4 from mjirv/feat-add_snowflake_match_recognize
Browse files Browse the repository at this point in the history
Feat: funnel uses match_recognize on Snowflake and Trino
  • Loading branch information
mjirv authored Jul 18, 2022
2 parents f6558ac + 671d5ab commit 5ebfe27
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
3 changes: 2 additions & 1 deletion integration_tests/seeds/raw_orders_simple.csv
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ id,customer_id,order_date,status
17,7,2018-01-18,completed
18,6,2018-01-20,completed
19,5,2018-01-22,returned
20,2,2018-01-23,completed
20,2,2018-01-23,completed
21,3,2018-01-24,returned
16 changes: 16 additions & 0 deletions macros/event_stream.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
start_date=none,
end_date=none)
%}
{{ return(adapter.dispatch('event_stream', 'dbt_product_analytics')(from, event_type_col, user_id_col, date_col, start_date, end_date)) }}
{% endmacro %}

{% macro default__event_stream(from, event_type_col, user_id_col, date_col, start_date, end_date) %}
select {{ event_type_col }} as event_type, {{ user_id_col }} as user_id, {{ date_col }} as event_date
from {{ from }}
where 1 = 1
Expand All @@ -15,4 +19,16 @@
{% if end_date is not none %}
and {{ date_col }} < '{{ end_date }}'
{% endif %}
{% endmacro %}

{% macro trino__event_stream(from, event_type_col, user_id_col, date_col, start_date, end_date) %}
select {{ event_type_col }} as event_type, {{ user_id_col }} as user_id, {{ date_col }} as event_date
from {{ from }}
where 1 = 1
{% if start_date is not none %}
and {{ date_col }} >= date '{{ start_date }}'
{% endif %}
{% if end_date is not none %}
and {{ date_col }} < date '{{ end_date }}'
{% endif %}
{% endmacro %}
54 changes: 52 additions & 2 deletions macros/funnel.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{% macro funnel(steps=none, event_stream=none) %}
{{ return(adapter.dispatch('funnel','dbt_product_analytics')(steps, event_stream)) }}
{% endmacro %}

{% macro default__funnel(steps, event_stream) %}
with event_stream as ( {% if not (event_stream|string|trim).startswith('select ') %} select * from {% endif %} {{ event_stream }} )
{% for step in steps %}
, event_stream_step_{{ loop.index }} as (
Expand Down Expand Up @@ -32,14 +36,60 @@

, final as (
select event_type
, unique_users, 1.0 * unique_users / nullif(first_value(unique_users) over(), 0) as pct_conversion
, 1.0 * unique_users / nullif(lag(unique_users) over(), 0) as pct_of_previous
, unique_users, 1.0 * unique_users / nullif(first_value(unique_users) over(), 0) as pct_conversion
, 1.0 * unique_users / nullif(lag(unique_users) over(), 0) as pct_of_previous
from event_funnel
)

select * from final
{% endmacro %}

{% macro snowflake__funnel(steps, event_stream) %}
with event_stream as ( {% if not (event_stream|string|trim).startswith('select ') %} select * from {% endif %} {{ event_stream }} )

, steps as (
{% for step in steps %}
select
'{{ step.event_type }}' as event_type
, {{ loop.index }} as index
{% if not loop.last %}
union all
{% endif %}
{% endfor %}
)
, event_funnel as (
select event_type, count(distinct user_id) as unique_users
from event_stream
match_recognize(
partition by user_id
order by event_date
all rows per match
pattern({% for step in steps %} ({% for i in range(loop.length - loop.index + 1) %} step_{{ loop.index }}+{% endfor %}) {% if not loop.last %} | {% endif %} {% endfor %} )
define
{% for step in steps %}
step_{{ loop.index }} as event_type = '{{ step.event_type }}' {% if not loop.last %} , {% endif %}
{% endfor %}
)
group by event_type
)

, final as (
select event_funnel.event_type
, unique_users, cast(unique_users as double) / nullif(first_value(unique_users) over(order by steps.index), 0) as pct_conversion
, 1.0 * cast(unique_users as double) / nullif(lag(unique_users) over(order by steps.index), 0) as pct_of_previous
from event_funnel
left join steps
on event_funnel.event_type = steps.event_type
order by steps.index
)

select * from final
{% endmacro %}

{% macro trino__funnel(steps, event_stream) %}
{{ dbt_product_analytics.snowflake__funnel(steps, event_stream) }}
{% endmacro %}


{#
### EXAMPLE ###
Expand Down

0 comments on commit 5ebfe27

Please sign in to comment.