-
Notifications
You must be signed in to change notification settings - Fork 5
/
dataset.sql
129 lines (99 loc) · 4.46 KB
/
dataset.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
{% macro dataset(
activity_stream,
primary_activity,
appended_activities=[]
) %} {{ return(adapter.dispatch("dataset", "dbt_activity_schema")(
activity_stream,
primary_activity,
appended_activities
)) }} {% endmacro %}
{% macro default__dataset(
activity_stream,
primary_activity,
appended_activities
) %}
{# Create a derived dataset using self-joins from an Activity Stream model.
params:
activity_stream: ref() | str
The dbt `ref()` or a CTE name that contains the required columns.
primary_activity: activity (class)
The primary activity of the derived dataset.
appended_activities: List[ activity (class) ]
The list of appended activities to self-join to the primary activity.
#}
{% set columns = dbt_activity_schema.columns() %}
{% set primary = dbt_activity_schema.primary %}
{% set appended = dbt_activity_schema.appended %}
{% set alias_cte = dbt_activity_schema.alias_cte %}
{% set alias_column = dbt_activity_schema.alias_column %}
{% set alias_appended_activity = dbt_activity_schema.alias_appended_activity %}
{% set render_join = dbt_activity_schema.render_additional_join_condition %}
{% set render_agg = dbt_activity_schema.render_aggregation %}
with
filter_activity_stream_using_primary_activity as (
select
{% for col in primary_activity.included_columns + primary_activity.required_columns %}
{{ dbt_activity_schema.parse_column(primary(), col) }} as {{ col }}{%- if not loop.last -%},{%- endif %}
{% endfor %}
from {{ activity_stream }} as {{ primary() }}
where {{ primary() }}.{{ columns.activity }} = {{ dbt.string_literal(primary_activity.name) }}
and {{ primary_activity.relationship.where_clause }}
),
{% for activity in appended_activities %}{% set i = loop.index %}
{{ alias_cte(activity, i) }} as (
select
-- Primary Activity Columns
{% for col in primary_activity.included_columns + primary_activity.required_columns %}
{{ primary() }}.{{- col }},
{% endfor %}
{% for col in activity.included_columns %}
{%- set parsed_col = dbt_activity_schema.parse_column(appended(), col) -%}
{% call activity.relationship.aggregation_func() %}
{{ parsed_col }}
{% endcall %} as {{ dbt_activity_schema.alias_appended_activity(activity, col) }}
{% if not loop.last %},{% endif %}
{% endfor %}
from filter_activity_stream_using_primary_activity as {{ primary() }}
left join {{ activity_stream }} as {{ appended() }}
on (
-- Join on Customer UUID Column
{{ appended() }}.{{ columns.customer }} = {{ primary() }}.{{ columns.customer }}
-- Join the Correct Activity
and {{ appended() }}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }}
-- Relationship Specific Join Conditions
and (
{# nth_ever_join_clause relies on instantiated nth_occurance arg, in
addition to the i passed to the join #}
{% if activity.relationship.name == "nth_ever" %}
{{ activity.relationship.join_clause(activity.relationship.nth_occurance) }}
{% else %}
{{ activity.relationship.join_clause() }}
{% endif %}
)
-- Additional Join Condition
and ( {{ activity.additional_join_condition }} )
)
group by
{% for col in primary_activity.included_columns + primary_activity.required_columns %}
{{ primary() }}.{{ col }}{%- if not loop.last -%},{%- endif %}
{% endfor %}
),
{% endfor %}
rejoin_aggregated_activities as (
select
{% for col in primary_activity.included_columns %}
{{ primary() }}.{{ col }},
{% endfor %}
{% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %}
{% for col in activity.included_columns %}
{{ alias_cte(activity, i) }}.{{ alias_appended_activity(activity, col) }}{% if not (last_outer_loop and loop.last) %},{% endif %}
{% endfor %}
{% endfor %}
from filter_activity_stream_using_primary_activity as {{ primary() }}
{% for activity in appended_activities %}{% set i = loop.index %}
left join {{ alias_cte(activity, i) }}
on {{ alias_cte(activity, i) }}.{{ columns.activity_id }} = {{ primary() }}.{{ columns.activity_id }}
{% endfor %}
)
select * from rejoin_aggregated_activities
{% endmacro %}