From b7501ad8097d922f5f43035c9b8049f2e8e4dcef Mon Sep 17 00:00:00 2001 From: Teghan Nightengale Date: Thu, 16 Mar 2023 10:52:50 -0400 Subject: [PATCH] 0.2.0 (#18) ## Features - major overhaul of `dataset.sql` so that each appended activity is joined and aggregated in an independent CTE, which are then combined in a final CTE - clarified project variables usage - large reorganization of the folders and yml --- README.md | 83 +++++++++--------- dbt_project.yml | 22 ++++- integration_tests/dbt_project.yml | 4 +- .../first_after/dataset__first_after_3.sql | 15 +++- .../models/first_after/first_after.yml | 18 ++++ ...before.sql => dataset__first_before_1.sql} | 0 .../models/first_before/first_before.yml | 8 ++ ...rst_ever.sql => dataset__first_ever_1.sql} | 0 .../models/first_ever/first_ever.yml | 8 ++ .../dataset__first_in_between_3.sql | 23 ++++- .../models/last_after/last_after.yml | 8 ++ ..._before.sql => dataset__last_before_1.sql} | 0 .../models/last_before/last_before.yml | 8 ++ ...last_ever.sql => dataset__last_ever_1.sql} | 0 .../models/last_ever/last_ever.yml | 8 ++ integration_tests/models/models.yml | 43 ---------- ..._before.csv => output__first_before_1.csv} | 0 ...irst_ever.csv => output__first_ever_1.csv} | 0 .../output/output__first_in_between_3.csv | 6 +- .../output/output__last_after_1.csv | 6 +- ...t_before.csv => output__last_before_1.csv} | 0 ..._last_ever.csv => output__last_ever_1.csv} | 0 macros/activity.sql | 53 ++++++------ macros/dataset.sql | 84 ++++++++++--------- .../relationships/append_only/first_after.sql | 2 +- .../append_only/first_before.sql | 2 +- .../append_only/first_in_between.sql | 2 +- .../relationships/append_only/last_after.sql | 2 +- .../relationships/append_only/last_before.sql | 2 +- .../append_only/last_in_between.sql | 2 +- macros/relationships/first_ever.sql | 2 +- macros/relationships/last_ever.sql | 2 +- macros/relationships/nth_ever.sql | 2 +- .../alias_appended_activity.sql} | 10 +-- macros/utils/aliasing/alias_column.sql | 28 +++++++ macros/utils/aliasing/alias_cte.sql | 29 +++++++ .../alias_stream.sql} | 10 ++- macros/utils/columns.sql | 5 +- .../utils/{ => helpers}/json_unpack_key.sql | 0 macros/utils/{ => helpers}/ltrim.sql | 0 macros/utils/render_aggregation.sql | 47 ----------- .../render_additional_join_condition.sql | 4 +- macros/utils/rendering/render_aggregation.sql | 56 +++++++++++++ 43 files changed, 369 insertions(+), 235 deletions(-) create mode 100644 integration_tests/models/first_after/first_after.yml rename integration_tests/models/first_before/{dataset__first_before.sql => dataset__first_before_1.sql} (100%) create mode 100644 integration_tests/models/first_before/first_before.yml rename integration_tests/models/first_ever/{dataset__first_ever.sql => dataset__first_ever_1.sql} (100%) create mode 100644 integration_tests/models/first_ever/first_ever.yml create mode 100644 integration_tests/models/last_after/last_after.yml rename integration_tests/models/last_before/{dataset__last_before.sql => dataset__last_before_1.sql} (100%) create mode 100644 integration_tests/models/last_before/last_before.yml rename integration_tests/models/last_ever/{dataset__last_ever.sql => dataset__last_ever_1.sql} (100%) create mode 100644 integration_tests/models/last_ever/last_ever.yml delete mode 100644 integration_tests/models/models.yml rename integration_tests/seeds/first_before/output/{output__first_before.csv => output__first_before_1.csv} (100%) rename integration_tests/seeds/first_ever/output/{output__first_ever.csv => output__first_ever_1.csv} (100%) rename integration_tests/seeds/last_before/output/{output__last_before.csv => output__last_before_1.csv} (100%) rename integration_tests/seeds/last_ever/output/{output__last_ever.csv => output__last_ever_1.csv} (100%) rename macros/utils/{generate_appended_column_alias.sql => aliasing/alias_appended_activity.sql} (54%) create mode 100644 macros/utils/aliasing/alias_column.sql create mode 100644 macros/utils/aliasing/alias_cte.sql rename macros/utils/{generate_stream_alias.sql => aliasing/alias_stream.sql} (68%) rename macros/utils/{ => helpers}/json_unpack_key.sql (100%) rename macros/utils/{ => helpers}/ltrim.sql (100%) delete mode 100644 macros/utils/render_aggregation.sql rename macros/utils/{ => rendering}/render_additional_join_condition.sql (86%) create mode 100644 macros/utils/rendering/render_aggregation.sql diff --git a/README.md b/README.md index 7a4e37c..3fb50a8 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,10 @@ modelling framework, based on the - [Install](#install) - [Usage](#usage) - [Create a Dataset](#create-a-dataset) - - [Configure Columns](#configure-columns) - - [Required Columns](#required-columns) - - [Mapping Column Names](#mapping-column-names) - - [Included Dataset Columns](#included-dataset-columns) - - [Configure Appended Activity Column Names](#configure-appended-activity-column-names) + - [Required Columns](#required-columns) +- [Vars](#vars) + - [Column Mappings (optional)](#column-mappings-optional) + - [Included Columns (optional)](#included-columns-optional) - [Macros](#macros) - [Dataset (source)](#dataset-source) - [Activity (source)](#activity-source) @@ -59,8 +58,8 @@ Include in `packages.yml`: ```yaml packages: - - git: "https://github.com/tnightengale/dbt-activity-schema" - revision: 0.1.0 + - package: tnightengale/dbt_activity_schema + version: 0.2.0 ``` For latest release, see https://github.com/tnightengale/dbt-activity-schema/releases. @@ -68,8 +67,10 @@ https://github.com/tnightengale/dbt-activity-schema/releases. ## Usage ### Create a Dataset -Use the [dataset macro](#dataset-source) with the appropriate arguments to -derive a Dataset by self-joining the Activity Stream model in your project. The +Use the [dataset macro](#dataset-source) to self-join an Activity Stream using +[relationships](#relationships). + +The [dataset macro](#dataset-source) will compile based on the provided [activity macros](#activity-source) and the [relationship macros](#relationships). It can then be nested in a CTE in a dbt-Core model. Eg: @@ -80,7 +81,7 @@ with dataset_cte as ( {{ dbt_activity_schema.dataset( - activity_stream_ref = ref("example__activity_stream"), + activity_stream = ref("example__activity_stream"), primary_activity = dbt_activity_schema.activity( dbt_activity_schema.all_ever(), "bought something"), @@ -98,17 +99,12 @@ select * from dataset_cte ``` > Note: This package does not contain macros to create the Activity Stream -> model. It derives Dataset models on top of an existing Activity Stream model. +> model. It generates the SQL to self-join an existing Activity Stream model. -### Configure Columns +### Required Columns This package conforms to the [Activity Schema V2 Specification](https://github.com/ActivitySchema/ActivitySchema/blob/main/2.0.md#entity-table) -and, by default, it expects the columns in that spec to exist in the Activity -Stream model. - -#### Required Columns -In order for critical joins in the [dataset macro](#dataset-source) to work as -expected, the following columns must exist: +and requires the following columns to function: - **`activity`**: A string or ID that identifies the action or fact attributable to the `customer`. - **`customer`**: The UUID of the entity or customer. Must be used across @@ -120,18 +116,24 @@ expected, the following columns must exist: - **`activity_occurrence`**: The running count of the activity per customer. Create using a rank window function, partitioned by activity and customer. -#### Mapping Column Names -If the required columns exist conceptually under different names, they can be -aliased using the nested `activity_schema_v2_column_mappings` project var. Eg: +## Vars +This package can be configured with the following project variables. All project +vars can be scoped globally or to the `dbt_activity_schema` package. + +### Column Mappings (optional) +The `column_mappings` project variable can be used to alias columns in Activity +Stream. If the [required columns](#required-columns) exist conceptually under +different names, they can be mapped to their names in the [V2 +Specification](https://github.com/ActivitySchema/ActivitySchema/blob/main/2.0.md#entity-table). +Eg: ```yml # dbt_project.yml - ... vars: dbt_activity_schema: - activity_schema_v2_column_mappings: + column_mappings: # Activity Stream with required column names that # differ from the V2 spec, mapped from their spec name. customer: entity_uuid @@ -140,19 +142,18 @@ vars: ... ``` -#### Included Dataset Columns -The set of columns that are included in the compiled SQL of the [dataset -macro](#dataset-source) can be configured using the nested -`default_dataset_columns` project var. Eg: +### Included Columns (optional) +The `included_columns` project variable can be set to indicate the default +columns to be included in each [activity](#activity-source) passed to +[dataset](#dataset-source). Eg: ```yml # dbt_project.yml - ... vars: dbt_activity_schema: # List columns from the Activity Schema to include in the Dataset - default_dataset_columns: + included_columns: - activity_id - entity_uuid - activity_occurred_at @@ -161,16 +162,12 @@ vars: ... ``` -These defaults can be overridden using the `override_columns` argument in the -[activity macro](#activity-source). +If it is not set, all the columns from the [V2 +Specification](https://github.com/ActivitySchema/ActivitySchema/blob/main/2.0.md#entity-table) +will be included, based on the [columns macro](./macros/utils/columns.sql). -#### Configure Appended Activity Column Names -The naming convention of the columns, in the activities passed to the -`appended_activities` argument can be configured by overriding the -[generate_appended_column_alias](./macros/utils/generate_appended_column_alias.sql) -macro. See the dbt docs on [overriding package -macros](https://docs.getdbt.com/reference/dbt-jinja-functions/dispatch#overriding-package-macros) -for more details. +These defaults can be overridden on a per-activity basis by passing a list of column names to the `included_columns` argument in the +[activity macro](#activity-source). ## Macros @@ -178,10 +175,10 @@ for more details. Generate the SQL for self-joining the Activity Stream. **args:** -- **`activity_stream_ref (required)`** : - [ref](https://docs.getdbt.com/reference/dbt-jinja-functions/ref) +- **`activity_stream (required)`** : + [ref](https://docs.getdbt.com/reference/dbt-jinja-functions/ref) | str - The dbt `ref()` that points to the activity stream model. + The dbt `ref()` or a CTE name that contains the [required columns](#required-columns). - **`primary_activity (required)`** : [activity](#activity-source) @@ -209,9 +206,9 @@ dataset. The string identifier of the activity in the Activity Stream. Should match the value in the `activity` column. -- **`override_columns (optional)`** : List [ str ] +- **`included_columns (optional)`** : List [ str ] - List of columns to include for the activity. Setting this Overrides the + List of columns to include for the activity. Setting this overrides the defaults configured by the `default_dataset_columns` project var. - **`additional_join_condition (optional)`** : str diff --git a/dbt_project.yml b/dbt_project.yml index 0a4a789..832f907 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,9 +1,9 @@ # Project name. name: 'dbt_activity_schema' -version: '0.1.1' +version: '0.2.0' config-version: 2 -require-dbt-version: [">=1.3.0"] +require-dbt-version: [">=1.3.0", "<2.0.0"] # The "profile" dbt uses for this project. profile: 'dbt_activity_schema' @@ -18,5 +18,19 @@ snapshot-paths: ["snapshots"] target-path: "target" clean-targets: - - "target" - - "dbt_modules" + - "target" + - "dbt_modules" + +vars: + included_columns: + - activity_id + - ts + - customer + - anonymous_customer_id + - activity + - activity_occurrence + - activity_repeated_at + - feature_json + - revenue_impact + - link + column_mappings: {} diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 6522a2d..97f5f7c 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -21,12 +21,12 @@ models: vars: dbt_activity_schema: - default_dataset_columns: + included_columns: - activity_id - entity_uuid - ts - revenue_impact - activity_schema_v2_column_mappings: + column_mappings: customer: entity_uuid anonymous_customer_id: anonymous_entity_uuid diff --git a/integration_tests/models/first_after/dataset__first_after_3.sql b/integration_tests/models/first_after/dataset__first_after_3.sql index b50d77a..3830263 100644 --- a/integration_tests/models/first_after/dataset__first_after_3.sql +++ b/integration_tests/models/first_after/dataset__first_after_3.sql @@ -3,13 +3,24 @@ ref("input__first_after"), dbt_activity_schema.activity( dbt_activity_schema.all_ever(), - "signed up" + "signed up", + [ + "activity_id", + "entity_uuid", + "ts", + "revenue_impact", + "feature_json" + ] ), [ dbt_activity_schema.activity( dbt_activity_schema.first_after(), "visit page", - ["feature_json", "activity_occurrence", "ts"], + [ + "feature_json", + "activity_occurrence", + "ts" + ], additional_join_condition=" json_extract({primary}.feature_json, 'type') = json_extract({appended}.feature_json, 'type') diff --git a/integration_tests/models/first_after/first_after.yml b/integration_tests/models/first_after/first_after.yml new file mode 100644 index 0000000..748e012 --- /dev/null +++ b/integration_tests/models/first_after/first_after.yml @@ -0,0 +1,18 @@ +version: 2 + +models: + + - name: dataset__first_after_1 + tests: + - dbt_utils.equality: + compare_model: ref("output__first_after_1") + + - name: dataset__first_after_2 + tests: + - dbt_utils.equality: + compare_model: ref("output__first_after_2") + + - name: dataset__first_after_3 + tests: + - dbt_utils.equality: + compare_model: ref("output__first_after_3") diff --git a/integration_tests/models/first_before/dataset__first_before.sql b/integration_tests/models/first_before/dataset__first_before_1.sql similarity index 100% rename from integration_tests/models/first_before/dataset__first_before.sql rename to integration_tests/models/first_before/dataset__first_before_1.sql diff --git a/integration_tests/models/first_before/first_before.yml b/integration_tests/models/first_before/first_before.yml new file mode 100644 index 0000000..1bb0060 --- /dev/null +++ b/integration_tests/models/first_before/first_before.yml @@ -0,0 +1,8 @@ +version: 2 + +models: + + - name: dataset__first_before_1 + tests: + - dbt_utils.equality: + compare_model: ref("output__first_before_1") diff --git a/integration_tests/models/first_ever/dataset__first_ever.sql b/integration_tests/models/first_ever/dataset__first_ever_1.sql similarity index 100% rename from integration_tests/models/first_ever/dataset__first_ever.sql rename to integration_tests/models/first_ever/dataset__first_ever_1.sql diff --git a/integration_tests/models/first_ever/first_ever.yml b/integration_tests/models/first_ever/first_ever.yml new file mode 100644 index 0000000..9bd9b8e --- /dev/null +++ b/integration_tests/models/first_ever/first_ever.yml @@ -0,0 +1,8 @@ +version: 2 + +models: + + - name: dataset__first_ever_1 + tests: + - dbt_utils.equality: + compare_model: ref("output__first_ever_1") diff --git a/integration_tests/models/first_in_between/dataset__first_in_between_3.sql b/integration_tests/models/first_in_between/dataset__first_in_between_3.sql index b2f070f..a2700bf 100644 --- a/integration_tests/models/first_in_between/dataset__first_in_between_3.sql +++ b/integration_tests/models/first_in_between/dataset__first_in_between_3.sql @@ -3,17 +3,36 @@ ref("input__first_in_between"), dbt_activity_schema.activity( dbt_activity_schema.all_ever(), - "signed up" + "signed up", + [ + "activity_id", + "entity_uuid", + "ts", + "revenue_impact", + "feature_json" + ] ), [ dbt_activity_schema.activity( dbt_activity_schema.first_in_between(), "visit page", - ["feature_json", "activity_occurrence", "ts"], + [ + "feature_json", + "activity_occurrence", + "ts" + ], additional_join_condition=" json_extract({primary}.feature_json, 'type') = json_extract({appended}.feature_json, 'type') " + ), + dbt_activity_schema.activity( + dbt_activity_schema.first_in_between(), + "bought something", + [ + "activity_id", + "ts" + ] ) ] ) diff --git a/integration_tests/models/last_after/last_after.yml b/integration_tests/models/last_after/last_after.yml new file mode 100644 index 0000000..d25aaf0 --- /dev/null +++ b/integration_tests/models/last_after/last_after.yml @@ -0,0 +1,8 @@ +version: 2 + +models: + + - name: dataset__last_after_1 + tests: + - dbt_utils.equality: + compare_model: ref("output__last_after_1") diff --git a/integration_tests/models/last_before/dataset__last_before.sql b/integration_tests/models/last_before/dataset__last_before_1.sql similarity index 100% rename from integration_tests/models/last_before/dataset__last_before.sql rename to integration_tests/models/last_before/dataset__last_before_1.sql diff --git a/integration_tests/models/last_before/last_before.yml b/integration_tests/models/last_before/last_before.yml new file mode 100644 index 0000000..cbb5531 --- /dev/null +++ b/integration_tests/models/last_before/last_before.yml @@ -0,0 +1,8 @@ +version: 2 + +models: + + - name: dataset__last_before_1 + tests: + - dbt_utils.equality: + compare_model: ref("output__last_before_1") diff --git a/integration_tests/models/last_ever/dataset__last_ever.sql b/integration_tests/models/last_ever/dataset__last_ever_1.sql similarity index 100% rename from integration_tests/models/last_ever/dataset__last_ever.sql rename to integration_tests/models/last_ever/dataset__last_ever_1.sql diff --git a/integration_tests/models/last_ever/last_ever.yml b/integration_tests/models/last_ever/last_ever.yml new file mode 100644 index 0000000..589bc36 --- /dev/null +++ b/integration_tests/models/last_ever/last_ever.yml @@ -0,0 +1,8 @@ +version: 2 + +models: + + - name: dataset__last_ever_1 + tests: + - dbt_utils.equality: + compare_model: ref("output__last_ever_1") diff --git a/integration_tests/models/models.yml b/integration_tests/models/models.yml deleted file mode 100644 index 66f55e2..0000000 --- a/integration_tests/models/models.yml +++ /dev/null @@ -1,43 +0,0 @@ -version: 2 - -models: - - - name: dataset__first_after_1 - tests: - - dbt_utils.equality: - compare_model: ref("output__first_after_1") - - - name: dataset__first_after_2 - tests: - - dbt_utils.equality: - compare_model: ref("output__first_after_2") - - - name: dataset__first_after_3 - tests: - - dbt_utils.equality: - compare_model: ref("output__first_after_3") - - - name: dataset__first_before - tests: - - dbt_utils.equality: - compare_model: ref("output__first_before") - - - name: dataset__first_ever - tests: - - dbt_utils.equality: - compare_model: ref("output__first_ever") - - # - name: dataset__last_after_1 - # tests: - # - dbt_utils.equality: - # compare_model: ref("output__last_after_1") - - - name: dataset__last_before - tests: - - dbt_utils.equality: - compare_model: ref("output__last_before") - - - name: dataset__last_ever - tests: - - dbt_utils.equality: - compare_model: ref("output__last_ever") diff --git a/integration_tests/seeds/first_before/output/output__first_before.csv b/integration_tests/seeds/first_before/output/output__first_before_1.csv similarity index 100% rename from integration_tests/seeds/first_before/output/output__first_before.csv rename to integration_tests/seeds/first_before/output/output__first_before_1.csv diff --git a/integration_tests/seeds/first_ever/output/output__first_ever.csv b/integration_tests/seeds/first_ever/output/output__first_ever_1.csv similarity index 100% rename from integration_tests/seeds/first_ever/output/output__first_ever.csv rename to integration_tests/seeds/first_ever/output/output__first_ever_1.csv diff --git a/integration_tests/seeds/first_in_between/output/output__first_in_between_3.csv b/integration_tests/seeds/first_in_between/output/output__first_in_between_3.csv index e6121e1..db0def4 100644 --- a/integration_tests/seeds/first_in_between/output/output__first_in_between_3.csv +++ b/integration_tests/seeds/first_in_between/output/output__first_in_between_3.csv @@ -1,3 +1,3 @@ -activity_id,entity_uuid,ts,activity,anonymous_entity_uuid,feature_json,revenue_impact,link,activity_occurrence,activity_repeated_at,first_in_between_visit_page_feature_json,first_in_between_visit_page_activity_occurrence,first_in_between_visit_page_ts -3,1,2022-01-02 22:10:11,signed up,,"{""type"": 1}",0,,1,,"{""type"": 1}",4,2022-01-06 22:10:11 -9,7,2022-01-08 22:10:11,signed up,,"{""type"": 1}",0,,1,,"{""type"": 1}",4,2022-01-12 22:10:11 +activity_id,entity_uuid,ts,revenue_impact,feature_json,first_in_between_visit_page_feature_json,first_in_between_visit_page_activity_occurrence,first_in_between_visit_page_ts,first_in_between_bought_something_activity_id,first_in_between_bought_something_ts +3,1,2022-01-02 22:10:11,0,"{""type"": 1}","{""type"": 1}",4,2022-01-06 22:10:11,6,2022-01-05 22:10:11 +9,7,2022-01-08 22:10:11,0,"{""type"": 1}","{""type"": 1}",4,2022-01-12 22:10:11,12,2022-01-11 22:10:11 diff --git a/integration_tests/seeds/last_after/output/output__last_after_1.csv b/integration_tests/seeds/last_after/output/output__last_after_1.csv index 059f544..3c66400 100644 --- a/integration_tests/seeds/last_after/output/output__last_after_1.csv +++ b/integration_tests/seeds/last_after/output/output__last_after_1.csv @@ -1,3 +1,3 @@ -activity_id,entity_uuid,ts,activity,anonymous_entity_uuid,feature_json,revenue_impact,link,activity_occurrence,activity_repeated_at,last_after_visit_page_feature_json,last_after_visit_page_ts -3,1,2022-01-02 22:10:11,signed up,,"[{""signed up"": 1}]",0,,1,,"[{""visited page"": 1}]",2022-01-06 22:10:11 -9,7,2022-01-08 22:10:11,signed up,,"[{""signed up"": 1}]",0,,1,,"[{""visited page"": 1}]",2022-01-12 22:10:11 +activity_id,entity_uuid,ts,revenue_impact,last_after_visit_page_activity_id,last_after_visit_page_entity_uuid,last_after_visit_page_ts,last_after_visit_page_revenue_impact +3,1,2022-01-02 22:10:11,0,7,1,2022-01-06 22:10:11,0 +9,7,2022-01-08 22:10:11,0,13,7,2022-01-12 22:10:11,0 diff --git a/integration_tests/seeds/last_before/output/output__last_before.csv b/integration_tests/seeds/last_before/output/output__last_before_1.csv similarity index 100% rename from integration_tests/seeds/last_before/output/output__last_before.csv rename to integration_tests/seeds/last_before/output/output__last_before_1.csv diff --git a/integration_tests/seeds/last_ever/output/output__last_ever.csv b/integration_tests/seeds/last_ever/output/output__last_ever_1.csv similarity index 100% rename from integration_tests/seeds/last_ever/output/output__last_ever.csv rename to integration_tests/seeds/last_ever/output/output__last_ever_1.csv diff --git a/macros/activity.sql b/macros/activity.sql index 9914c3e..915307d 100644 --- a/macros/activity.sql +++ b/macros/activity.sql @@ -1,28 +1,27 @@ {% macro activity( relationship, activity_name, - override_columns=[], - additional_join_condition=[] + included_columns=var("included_columns", var("dbt_activity_schema", {}).get("included_columns", dbt_activity_schema.columns().values())), + additional_join_condition="true" ) %} -{{ return(adapter.dispatch("appended_activity", "dbt_activity_schema")( +{{ return(adapter.dispatch("activity", "dbt_activity_schema")( relationship, activity_name, - override_columns, + included_columns, additional_join_condition )) }} {% endmacro %} - -{% macro default__appended_activity( +{% macro default__activity( relationship, activity_name, - override_columns, + included_columns, additional_join_condition ) %} -{# An activity to append to the `primary_activity` in the dataset. +{# An activity to include in the dataset. params: @@ -34,9 +33,10 @@ params: The string identifier of the activity in the Activity Stream to join to the primary activity. - override_columns: List[str] - List of columns to join to the primary activity, defaults to the project - var `appended_activity_columns`. + included_columns: List[str] + List of columns to join to the primary activity, defaults to the + `included_columns` vars if it is set, otherwise defaults to the columns + defined in columns.sql. additional_join_condition: str A valid sql boolean to condition the join of the appended activity. Can @@ -62,25 +62,28 @@ params: list argument. #} -{% if override_columns %} - {% set columns = override_columns %} -{% else %} - {% set columns = var("dbt_activity_schema", {}).get( - "default_dataset_columns", dbt_activity_schema.columns().values() | list - ) %} -{% endif %} +{% set columns = dbt_activity_schema.columns() %} -{# The columns.feature_json requires columns.ts to be present. #} -{% if (dbt_activity_schema.columns().feature_json in columns) and - (dbt_activity_schema.columns().ts not in columns) %} -{% do columns.append(dbt_activity_schema.columns().ts) %} -{% endif %} +{# Required for the joins, but not necessarily included in the final result. #} +{% set required_columns = [ + columns.activity_id, + columns.activity, + columns.ts, + columns.customer, + columns.activity_occurrence, + columns.activity_repeated_at +] %} -{% set additional_join_condition = additional_join_condition if additional_join_condition else "true" %} +{% for col in included_columns %} + {% if col in required_columns %} + {% do required_columns.remove(col) %} + {% endif %} +{% endfor %} {% do return(namespace( name = activity_name, - columns = columns, + included_columns = included_columns, + required_columns = required_columns, relationship = relationship, additional_join_condition = additional_join_condition )) %} diff --git a/macros/dataset.sql b/macros/dataset.sql index 79dc5b9..4387fc8 100644 --- a/macros/dataset.sql +++ b/macros/dataset.sql @@ -18,10 +18,8 @@ params: - activity_stream_ref: ref() - The dbt ref() that points to the activty stream table. Use the project - variables in ./dataclasses/columns.sql to set the columns of the activity - stream. + activity_stream: ref() | str + The dbt `ref()` or a CTE name that contains the required columns. primary_activity: activity (class) The primary activity of the derived dataset. @@ -31,35 +29,42 @@ params: #} {% set columns = dbt_activity_schema.columns() %} -{% set stream = dbt_activity_schema.generate_stream_alias %} -{% set alias = dbt_activity_schema.generate_appended_column_alias %} +{% set stream = dbt_activity_schema.alias_stream %} +{% set alias_cte = dbt_activity_schema.alias_cte %} +{% set alias_column = dbt_activity_schema.alias_column %} +{% set alias_appended_activity = dbt_activity_schema.alias_appended_activity %} {% set render_join = dbt_activity_schema.render_additional_join_condition %} {% set render_agg = dbt_activity_schema.render_aggregation %} - with -join_appended_activities as ( +filter_activity_stream_using_primary_activity as ( select - - -- Primary Activity Columns - {% for col in primary_activity.columns %} - {{ stream() }}.{{- col }}, + {% for col in primary_activity.included_columns + primary_activity.required_columns %} + {{ alias_column(col) }}{%- if not loop.last -%},{%- endif %} {% endfor %} - -- Appended Activties Columns - {% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %} - {% for col in activity.columns %} + from {{ activity_stream_ref }} as {{ stream() }} + + where {{ alias_column(columns.activity) }} = {{ dbt.string_literal(primary_activity.name) }} + and {{ primary_activity.relationship.where_clause }} +), - {{ stream(i) }}.{{ col }} as {{ alias(activity, col) }}{% if not (last_outer_loop and loop.last) %},{% endif %} +{% for activity in appended_activities %}{% set i = loop.index %} - {% endfor %} +{{ alias_cte(activity, i) }} as ( + select + + -- Primary Activity Columns + {% for col in primary_activity.included_columns %} + {{ stream() }}.{{- col }}, {% endfor %} - from {{ activity_stream_ref }} as {{ stream() }} + {% for col in activity.included_columns %} + {{ render_agg(col, activity, i) }}{% if not loop.last %},{% endif %} + {% endfor %} - -- Join Appended Activities Loop - {% for activity in appended_activities %}{% set i = loop.index %} + from filter_activity_stream_using_primary_activity as {{ stream() }} left join {{ activity_stream_ref }} as {{ stream(i) }} on ( @@ -83,36 +88,37 @@ join_appended_activities as ( and ( {{ render_join(activity.additional_join_condition, i) }} ) ) - {% endfor %} - - -- Where Clause for the Primary Activity, Determined by the `occurance` - where {{ stream() }}.{{ columns.activity }} = {{ dbt.string_literal(primary_activity.name) }} - and {{ primary_activity.relationship.where_clause }} + group by + {% for col in primary_activity.included_columns %} + {{ alias_column(col) }}{%- if not loop.last -%},{%- endif %} + {% endfor %} ), -aggregate_appended_activities as ( +{% endfor %} + +rejoin_aggregated_activities as ( select - {% for col in primary_activity.columns %} - {{- col }}, + + {% for col in primary_activity.included_columns %} + {{ alias_column(col) }}, {% endfor %} {% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %} - {% for col in activity.columns %} + {% for col in activity.included_columns %} + {{ alias_cte(activity, i) }}.{{ alias_appended_activity(activity, col) }}{% if not (last_outer_loop and loop.last) %},{% endif %} + {% endfor %} + {% endfor %} - {{ render_agg(col, activity) }} + from filter_activity_stream_using_primary_activity as {{ stream() }} - {% if not (last_outer_loop and loop.last) %},{% endif %} + {% for activity in appended_activities %}{% set i = loop.index %} - {% endfor %} - {% endfor %} + left join {{ alias_cte(activity, i) }} + on {{ alias_cte(activity, i) }}.{{ columns.activity_id }} = {{ stream() }}.{{ columns.activity_id }} - from join_appended_activities - group by - {% for col in primary_activity.columns %} - {{- col }}{% if not loop.last %},{% endif %} - {% endfor %} + {% endfor %} ) -select * from aggregate_appended_activities +select * from rejoin_aggregated_activities {% endmacro %} diff --git a/macros/relationships/append_only/first_after.sql b/macros/relationships/append_only/first_after.sql index 3c0cfd9..81eaeab 100644 --- a/macros/relationships/append_only/first_after.sql +++ b/macros/relationships/append_only/first_after.sql @@ -1,6 +1,6 @@ {% macro first_after_join_clause(i) %} -{% set stream = dbt_activity_schema.generate_stream_alias %} +{% set stream = dbt_activity_schema.alias_stream %} {% set columns = dbt_activity_schema.columns() %} ( diff --git a/macros/relationships/append_only/first_before.sql b/macros/relationships/append_only/first_before.sql index c880972..4474f0a 100644 --- a/macros/relationships/append_only/first_before.sql +++ b/macros/relationships/append_only/first_before.sql @@ -1,6 +1,6 @@ {% macro first_before_join_clause(i) %} -{% set stream = dbt_activity_schema.generate_stream_alias %} +{% set stream = dbt_activity_schema.alias_stream %} {% set columns = dbt_activity_schema.columns() %} ( diff --git a/macros/relationships/append_only/first_in_between.sql b/macros/relationships/append_only/first_in_between.sql index d79a519..f33c953 100644 --- a/macros/relationships/append_only/first_in_between.sql +++ b/macros/relationships/append_only/first_in_between.sql @@ -1,6 +1,6 @@ {% macro first_in_between_join_clause(i) %} -{% set stream = dbt_activity_schema.generate_stream_alias %} +{% set stream = dbt_activity_schema.alias_stream %} {% set columns = dbt_activity_schema.columns() %} ( diff --git a/macros/relationships/append_only/last_after.sql b/macros/relationships/append_only/last_after.sql index 14d1ca6..0b4a802 100644 --- a/macros/relationships/append_only/last_after.sql +++ b/macros/relationships/append_only/last_after.sql @@ -1,6 +1,6 @@ {% macro last_after_join_clause(i) %} -{% set stream = dbt_activity_schema.generate_stream_alias %} +{% set stream = dbt_activity_schema.alias_stream %} {% set columns = dbt_activity_schema.columns() %} ( diff --git a/macros/relationships/append_only/last_before.sql b/macros/relationships/append_only/last_before.sql index 1905303..0049828 100644 --- a/macros/relationships/append_only/last_before.sql +++ b/macros/relationships/append_only/last_before.sql @@ -1,6 +1,6 @@ {% macro last_before_join_clause(i) %} -{% set stream = dbt_activity_schema.generate_stream_alias %} +{% set stream = dbt_activity_schema.alias_stream %} {% set columns = dbt_activity_schema.columns() %} ( diff --git a/macros/relationships/append_only/last_in_between.sql b/macros/relationships/append_only/last_in_between.sql index 174fe3c..8e91bf7 100644 --- a/macros/relationships/append_only/last_in_between.sql +++ b/macros/relationships/append_only/last_in_between.sql @@ -1,6 +1,6 @@ {% macro last_in_between_join_clause(i) %} -{% set stream = dbt_activity_schema.generate_stream_alias %} +{% set stream = dbt_activity_schema.alias_stream %} {% set columns = dbt_activity_schema.columns() %} ( diff --git a/macros/relationships/first_ever.sql b/macros/relationships/first_ever.sql index aa043a3..e0550f8 100644 --- a/macros/relationships/first_ever.sql +++ b/macros/relationships/first_ever.sql @@ -1,6 +1,6 @@ {% macro first_ever_join_clause(i=none) %} ( - {{ dbt_activity_schema.generate_stream_alias(i) }}.{{ dbt_activity_schema.columns().activity_occurrence }} = 1 + {{ dbt_activity_schema.alias_stream(i) }}.{{ dbt_activity_schema.columns().activity_occurrence }} = 1 ) {% endmacro %} diff --git a/macros/relationships/last_ever.sql b/macros/relationships/last_ever.sql index 655432f..ca0c6f6 100644 --- a/macros/relationships/last_ever.sql +++ b/macros/relationships/last_ever.sql @@ -1,6 +1,6 @@ {% macro last_ever_join_clause(i=none) %} ( - {{ dbt_activity_schema.generate_stream_alias(i) }}.{{ dbt_activity_schema.columns().activity_repeated_at }} is null + {{ dbt_activity_schema.alias_stream(i) }}.{{ dbt_activity_schema.columns().activity_repeated_at }} is null ) {% endmacro %} diff --git a/macros/relationships/nth_ever.sql b/macros/relationships/nth_ever.sql index 0f7c3d4..7f4da43 100644 --- a/macros/relationships/nth_ever.sql +++ b/macros/relationships/nth_ever.sql @@ -1,6 +1,6 @@ {% macro nth_ever_join_clause(nth_occurance, i=none) %} ( - {{ dbt_activity_schema.generate_stream_alias(i) }}.{{ dbt_activity_schema.columns().activity_occurrence }} = {{ nth_occurance }} + {{ dbt_activity_schema.alias_stream(i) }}.{{ dbt_activity_schema.columns().activity_occurrence }} = {{ nth_occurance }} ) {% endmacro %} diff --git a/macros/utils/generate_appended_column_alias.sql b/macros/utils/aliasing/alias_appended_activity.sql similarity index 54% rename from macros/utils/generate_appended_column_alias.sql rename to macros/utils/aliasing/alias_appended_activity.sql index 41a2f81..9a5d2a3 100644 --- a/macros/utils/generate_appended_column_alias.sql +++ b/macros/utils/aliasing/alias_appended_activity.sql @@ -1,20 +1,20 @@ -{% macro generate_appended_column_alias(activity, column_name) %} - {{ return(adapter.dispatch("generate_appended_column_alias", "dbt_activity_schema")(activity, column_name))}} +{% macro alias_appended_activity(activity, column_name) %} + {{ return(adapter.dispatch("alias_appended_activity", "dbt_activity_schema")(activity, column_name))}} {% endmacro %} -{% macro default__generate_appended_column_alias(activity, column_name) %} +{% macro default__alias_appended_activity(activity, column_name) %} {# Generate the name of appended columns in `dataset.sql`. params: - activity: appended_activity (activites) + activity: activity (class) The appended activity object, containing the string attributes to be concatenated in the column alias prefix. column_name: str - The name of the column that will be concatenated in the column alias suffix. + The name of the column that will be aliased. #} {% set concatenated_activity_alias %} diff --git a/macros/utils/aliasing/alias_column.sql b/macros/utils/aliasing/alias_column.sql new file mode 100644 index 0000000..75db540 --- /dev/null +++ b/macros/utils/aliasing/alias_column.sql @@ -0,0 +1,28 @@ +{% macro alias_column(column_name, i=none) %} + {{ return(adapter.dispatch("alias_column", "dbt_activity_schema")(column_name, i))}} +{% endmacro %} + + +{%- macro default__alias_column(column_name, i) -%} + +{# Generate the alias for the stream and it's appended activities. + +params: + + column_name: str + The name of the column that will be aliased. + + i: int + The cardinality of the appended activity, and thus the self join of the + Activity Schema. Used to rejoin the Activity Schema multiple times, for + multiple appended activities, with each being given a unique alias. + +#} + +{% set alias %} +{{ dbt_activity_schema.alias_stream(i) }}.{{ column_name }} +{% endset %} + +{% do return(alias) %} + +{%- endmacro -%} diff --git a/macros/utils/aliasing/alias_cte.sql b/macros/utils/aliasing/alias_cte.sql new file mode 100644 index 0000000..5c141ad --- /dev/null +++ b/macros/utils/aliasing/alias_cte.sql @@ -0,0 +1,29 @@ +{%- macro alias_cte(activity, i) -%} + {{ return(adapter.dispatch("alias_cte", "dbt_activity_schema")(activity, i))}} +{% endmacro %} + + +{%- macro default__alias_cte(activity, i) -%} + +{# Generate the alias for the stream and it's appended activities. + +params: + + activity: activity (class) + The activity used to create the alias with a meaningful name for the + compiled dataset. + + i: int + The cardinality of the appended activity, and thus the self join of the + Activity Schema. Used to rejoin the Activity Schema multiple times, for + multiple appended activities, with each being given a unique alias. + +#} + +{% set alias %} +append_and_aggregate__{{ i }}__{{ activity.relationship.name }} +{% endset %} + +{% do return(alias) %} + +{%- endmacro -%} diff --git a/macros/utils/generate_stream_alias.sql b/macros/utils/aliasing/alias_stream.sql similarity index 68% rename from macros/utils/generate_stream_alias.sql rename to macros/utils/aliasing/alias_stream.sql index 1dad2a7..c88ff7f 100644 --- a/macros/utils/generate_stream_alias.sql +++ b/macros/utils/aliasing/alias_stream.sql @@ -1,9 +1,9 @@ -{% macro generate_stream_alias(i=none) %} - {{ return(adapter.dispatch("generate_stream_alias", "dbt_activity_schema")(i))}} +{% macro alias_stream(i=none) %} + {{ return(adapter.dispatch("alias_stream", "dbt_activity_schema")(i))}} {% endmacro %} -{%- macro default__generate_stream_alias(i) -%} +{%- macro default__alias_stream(i) -%} {# Generate the alias for the stream and it's appended activities. @@ -16,10 +16,14 @@ params: #} +{% set alias %} {%- if i -%} stream_{{- i }} {%- else -%} stream {%- endif -%} +{% endset %} + +{% do return(alias) %} {%- endmacro -%} diff --git a/macros/utils/columns.sql b/macros/utils/columns.sql index 6a2f2b8..beac1d9 100644 --- a/macros/utils/columns.sql +++ b/macros/utils/columns.sql @@ -20,9 +20,8 @@ ) %} -{# Update names using the `activity_schema_v2_column_mappings` project but keep keys according to -the Activity Schema V2 specification. #} -{% do column_names.update(var("dbt_activity_schema", {}).get("activity_schema_v2_column_mappings", {})) %} +{# Update names using the `column_mappings` project var. #} +{% do column_names.update(var("column_mappings", var("dbt_activity_schema", {}).get("column_mappings", {}))) %} {% do return(column_names) %} diff --git a/macros/utils/json_unpack_key.sql b/macros/utils/helpers/json_unpack_key.sql similarity index 100% rename from macros/utils/json_unpack_key.sql rename to macros/utils/helpers/json_unpack_key.sql diff --git a/macros/utils/ltrim.sql b/macros/utils/helpers/ltrim.sql similarity index 100% rename from macros/utils/ltrim.sql rename to macros/utils/helpers/ltrim.sql diff --git a/macros/utils/render_aggregation.sql b/macros/utils/render_aggregation.sql deleted file mode 100644 index 14ea9e1..0000000 --- a/macros/utils/render_aggregation.sql +++ /dev/null @@ -1,47 +0,0 @@ -{%- macro render_aggregation(col, activity) -%} - {{ return(adapter.dispatch("render_aggregation", "dbt_activity_schema")(col, activity)) }} -{%- endmacro -%} - - -{%- macro default__render_aggregation(col, activity) -%} - -{# Render the aggregation, handling special cases for non-cardinal columns. - -params: - - col: str - The column to aggregate. - - activity: activity (class) - The activity class which contains a name and aggregation function. - -#} - -{% set columns = dbt_activity_schema.columns() %} -{% set aliased_col = dbt_activity_schema.generate_appended_column_alias(activity, col) %} -{% set aliased_activity_ts_col = dbt_activity_schema.generate_appended_column_alias(activity, columns.ts) %} - -{# Handle non-cardinal feature_json aggregation by concating ts column. #} -{% if col in [columns.feature_json] %} - - {% set ts_concat_feature_json %} - {% call activity.relationship.aggregation_func() %} - {{ dbt.concat([aliased_activity_ts_col, aliased_col]) }} - {% endcall %} - {% endset %} - - {% set ts_aggregated %} - {% call activity.relationship.aggregation_func() %} - {{ aliased_activity_ts_col }} - {% endcall %} - {% endset %} - - {{ dbt_activity_schema.ltrim(ts_concat_feature_json, ts_aggregated) }} as {{ aliased_col }} - -{% else %} - {% call activity.relationship.aggregation_func() %} - {{ aliased_col }} - {% endcall %} as {{ aliased_col }} -{% endif %} - -{% endmacro %} diff --git a/macros/utils/render_additional_join_condition.sql b/macros/utils/rendering/render_additional_join_condition.sql similarity index 86% rename from macros/utils/render_additional_join_condition.sql rename to macros/utils/rendering/render_additional_join_condition.sql index 4a4a744..10f4f58 100644 --- a/macros/utils/render_additional_join_condition.sql +++ b/macros/utils/rendering/render_additional_join_condition.sql @@ -23,8 +23,8 @@ params: {%- do return( clause.format( - primary=dbt_activity_schema.generate_stream_alias(), - appended=dbt_activity_schema.generate_stream_alias(i) + primary=dbt_activity_schema.alias_stream(), + appended=dbt_activity_schema.alias_stream(i) ) ) -%} diff --git a/macros/utils/rendering/render_aggregation.sql b/macros/utils/rendering/render_aggregation.sql new file mode 100644 index 0000000..98fc2d0 --- /dev/null +++ b/macros/utils/rendering/render_aggregation.sql @@ -0,0 +1,56 @@ +{%- macro render_aggregation(column_name, activity, i) -%} + {{ return(adapter.dispatch("render_aggregation", "dbt_activity_schema")(column_name, activity, i)) }} +{%- endmacro -%} + + +{%- macro default__render_aggregation(column_name, activity, i) -%} + +{# Render the aggregation, handling special cases for non-cardinal columns. + +params: + + column_name: str + The column to aggregate. + + activity: activity (class) + The activity class which contains a name and aggregation function. + + i: int + The cardinality of the appended activity. Used to fully qualify the + appended activity columns with an alias used in the join of dataset.sql. +#} + +{% set columns = dbt_activity_schema.columns() %} +{% set qualified_col = dbt_activity_schema.alias_column(column_name, i) %} +{% set alias = dbt_activity_schema.alias_appended_activity(activity, column_name) %} + +{# Handle non-cardinal aggregations by prepending ts column, aggregating, then trimming. #} +{% set aggregation %} +{% if column_name in [columns.feature_json] %} + + {% set qualified_ts_col = dbt_activity_schema.alias_column(columns.ts, i) %} + {% set ts_concat_feature_json %} + {% call activity.relationship.aggregation_func() %} + {{ dbt.concat([qualified_ts_col, qualified_col]) }} + {% endcall %} + {% endset %} + + {% set ts_aggregated %} + {% call activity.relationship.aggregation_func() %} + {{ qualified_ts_col }} + {% endcall %} + {% endset %} + + {{ dbt_activity_schema.ltrim(ts_concat_feature_json, ts_aggregated) }} as {{ alias }} + +{# Aggregate cardinal columns normally. #} +{% else %} + {% call activity.relationship.aggregation_func() %} + {{ qualified_col }} + {% endcall %} as {{ alias }} +{% endif %} +{% endset %} + +{% do return(aggregation) %} + +{% endmacro %}