Skip to content

Commit

Permalink
Merge pull request #5 from mjirv/feat-add_flows
Browse files Browse the repository at this point in the history
Feat: add flows
  • Loading branch information
mjirv authored Jul 20, 2022
2 parents 5514ba7 + 71e5879 commit 47693a0
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 4 deletions.
44 changes: 40 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Add the following to your `packages.yml`:
## Usage
**dbt Product Analytics** provides three macros: `event_stream()`, `funnel()`, and `retention()`.
**dbt Product Analytics** provides four macros: `event_stream()`, `funnel()`, `retention()`, and `flows()`.

Use them in models and analyses like any other dbt macro.

Expand Down Expand Up @@ -104,6 +104,42 @@ Advanced:

Three other parameters are available: `periods`, `period_type`, and `dimensions`.

- `period`: The period windows you want look at (defaults to `[1, 7, 14, 30, 60, 120])`
- `period_type`: The date type you want to use (defaults to `day`)
- `dimensions`: A list of columns from your event stream that you want to group by (defaults to `[]`)
- **`period`**: The period windows you want look at (defaults to `[1, 7, 14, 30, 60, 120])`
- **`period_type`**: The date type you want to use (defaults to `day`)
- **`dimensions`**: A list of columns from your event stream that you want to group by (defaults to `[]`)

### flows() ([source](https://github.com/mjirv/dbt_product_analytics/blob/main/macros/flows.sql))

_Runs a flow analysis, i.e. shows you common paths users take before or after a given event_

#### Usage

Example:

```sql
{{
dbt_product_analytics.flows(
event_stream=events,
primary_event='placed'
)
}}
```

Output:

```sql
michael=# select * from dbt_product_analytics.flows_orders ;
event_0 | event_1 | event_2 | event_3 | event_4 | event_5 | n_events
---------+-----------+----------+---------+---------+---------+----------
placed | | | | | | 13
placed | completed | returned | | | | 1
placed | completed | | | | | 1
```

Advanced:

Three other parameters are available: `n_events_from`, `before_or_after`, and `top_n`.

- **`n_events_from`**: The number of events to include in the flows (defaults to `5`)
- **`before_or_after`**: Whether to look at the events following your `primary_action` or the ones leading up to it (defaults to `'after'`)
- **`top_n`**: How many flows to include (defaults to displaying the top `20`)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
select
'placed' as event_0
, 'completed' as event_1
, cast(null as varchar) as event_2
, cast(null as varchar) as event_3
, cast(null as varchar) as event_4
, cast(null as varchar) as event_5
, 3 as n_events

union all

select
'placed' as event_0
, 'completed' as event_1
, 'returned' as event_2
, 'returned' as event_3
, cast(null as varchar) as event_4
, cast(null as varchar) as event_5
, 1 as n_events

union all

select
'placed' as event_0
, 'completed' as event_1
, 'returned' as event_2
, cast(null as varchar) as event_3
, cast(null as varchar) as event_4
, cast(null as varchar) as event_5
, 1 as n_events

union all

select
'placed' as event_0
, 'returned' as event_1
, cast(null as varchar) as event_2
, cast(null as varchar) as event_3
, cast(null as varchar) as event_4
, cast(null as varchar) as event_5
, 1 as n_events
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
select
cast(null as varchar) as event_5
, cast(null as varchar) as event_4
, cast(null as varchar) as event_3
, cast(null as varchar) as event_2
, cast(null as varchar) as event_1
, 'placed' as event_0
, 5 as n_events

union all

select
cast(null as varchar) as event_5
, cast(null as varchar) as event_4
, cast(null as varchar) as event_3
, 'completed' as event_2
, 'returned' as event_1
, 'placed' as event_0
, 1 as n_events
16 changes: 16 additions & 0 deletions integration_tests/models/product_analytics/flows_orders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{% set events =
dbt_product_analytics.event_stream(
from=ref('orders'),
event_type_col="status",
user_id_col="customer_id",
date_col="order_date",
start_date="2018-01-01",
end_date="2019-01-01")
%}

{{
dbt_product_analytics.flows(
event_stream=events,
primary_event='placed'
)
}}
17 changes: 17 additions & 0 deletions integration_tests/models/product_analytics/flows_orders_before.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{% set events =
dbt_product_analytics.event_stream(
from=ref('orders'),
event_type_col="status",
user_id_col="customer_id",
date_col="order_date",
start_date="2018-01-01",
end_date="2019-01-01")
%}

{{
dbt_product_analytics.flows(
event_stream=events,
primary_event='placed',
before_or_after='before'
)
}}
12 changes: 12 additions & 0 deletions integration_tests/models/product_analytics/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,15 @@ models:
input_mapping:
ref('order_events'): ref('order_events')
expected_output: ref('dmt_expected__retention')
- name: flows_orders
tests:
- dbt_datamocktool.unit_test:
input_mapping:
ref('orders'): ref('raw_orders_simple')
expected_output: ref('dmt_expected__flows_simple')
- name: flows_orders_before
tests:
- dbt_datamocktool.unit_test:
input_mapping:
ref('orders'): ref('raw_orders_simple')
expected_output: ref('dmt_expected__flows_simple_before')
41 changes: 41 additions & 0 deletions macros/flows.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{% macro flows(event_stream=None, primary_event=None, n_events_from=5, before_or_after='after', top_n=20) %}

{% if event_stream is none %}
{{ exceptions.raise_compiler_error('parameter \'event_stream\' must be provided')}}
{% endif %}

{% if primary_event is none %}
{{ exceptions.raise_compiler_error('parameter \'primary_event\' must be provided')}}
{% endif %}

with event_stream as {{ dbt_product_analytics._select_event_stream(event_stream) }}

, flow_events as (
select
{% if before_or_after == 'after' %} event_type as event_0 {% endif %}
{% for i in range(n_events_from) %}
{% if before_or_after == 'before' %} {% set index = n_events_from - i %} {% else %} {% set index = i + 1 %} {% endif %}
{% if before_or_after == 'before' %}{% if not loop.first %},{% endif %}lag{% else %}, lead{% endif %}(event_type, {{ index }}) over(partition by user_id order by event_date) as event_{{ index }}
{% endfor %}
{% if before_or_after == 'before' %}, event_type as event_0 {% endif %}
from event_stream
)

, flow_counts as (
select
*
, count(*) as n_events
from flow_events
where event_0 = '{{ primary_event }}'
group by 1 {% for i in range(n_events_from) %}, {{ i + 2 }} {% endfor %}
)

, final as (
select *
from flow_counts
order by n_events desc
limit {{ top_n }}
)

select * from final
{% endmacro %}

0 comments on commit 47693a0

Please sign in to comment.