Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support render for versioned dbt models. #516

Merged
merged 16 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ repos:
name: Run codespell to check for common misspellings in files
language: python
types: [text]
args:
- --exclude-file=tests/sample/manifest_model_version.json
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
Expand Down
23 changes: 15 additions & 8 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
from __future__ import annotations

import itertools
import json
import os
import shutil
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from subprocess import Popen, PIPE
from subprocess import PIPE, Popen
from typing import Any

from cosmos.config import ProfileConfig
from cosmos.constants import (
DbtResourceType,
ExecutionMode,
LoadMode,
DBT_LOG_DIR_NAME,
DBT_LOG_FILENAME,
DBT_LOG_PATH_ENVVAR,
DBT_TARGET_PATH_ENVVAR,
DBT_LOG_DIR_NAME,
DBT_TARGET_DIR_NAME,
DBT_TARGET_PATH_ENVVAR,
DbtResourceType,
ExecutionMode,
LoadMode,
)
from cosmos.dbt.executable import get_system_dbt
from cosmos.dbt.parser.project import DbtProject as LegacyDbtProject
Expand Down Expand Up @@ -137,6 +138,9 @@ def load_via_dbt_ls(self) -> None:
This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command
line for both parsing and filtering the nodes.

Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...

Updates in-place:
* self.nodes
* self.filtered_nodes
Expand Down Expand Up @@ -252,7 +256,7 @@ def load_via_dbt_ls(self) -> None:
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict["name"],
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
Expand Down Expand Up @@ -326,6 +330,9 @@ def load_from_dbt_manifest(self) -> None:
However, since the Manifest does not represent filters, it relies on the Custom Cosmos implementation
to filter out the nodes relevant to the user (based on self.exclude and self.select).

Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...

Updates in-place:
* self.nodes
* self.filtered_nodes
Expand All @@ -337,7 +344,7 @@ def load_from_dbt_manifest(self) -> None:

for unique_id, node_dict in manifest.get("nodes", {}).items():
node = DbtNode(
name=node_dict["name"],
name=node_dict.get("alias", node_dict["name"]),
unique_id=unique_id,
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict["depends_on"].get("nodes", []),
Expand Down
2 changes: 1 addition & 1 deletion dev/dags/cosmos_seed_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
for seed in ["raw_customers", "raw_payments", "raw_orders"]:
DbtRunOperationOperator(
task_id=f"drop_{seed}_if_exists",
macro_name="drop_table",
macro_name="drop_table_by_name",
args={"table_name": seed},
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
Expand Down
2 changes: 1 addition & 1 deletion dev/dags/dbt/jaffle_shop/macros/drop_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro drop_table(table_name) -%}
{%- macro drop_table_by_name(table_name) -%}
{%- set drop_query -%}
DROP TABLE IF EXISTS {{ target.schema }}.{{ table_name }} CASCADE
{%- endset -%}
Expand Down
6 changes: 0 additions & 6 deletions dev/dags/dbt/jaffle_shop_python/macros/drop_table.sql

This file was deleted.

11 changes: 11 additions & 0 deletions dev/dags/dbt/model_version/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## `jaffle_shop`
binhnq94 marked this conversation as resolved.
Show resolved Hide resolved

`jaffle_shop` is a fictional ecommerce store. This dbt project transforms raw data from an app database into a customers and orders model ready for analytics.

See [dbt's documentation](https://github.com/dbt-labs/jaffle_shop) for more info.

### Modifications

This project has been modified from the original to highlight some of the features of Cosmos. Namely:

- tags have been added to the models
26 changes: 26 additions & 0 deletions dev/dags/dbt/model_version/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: 'jaffle_shop'

config-version: 2
version: '0.1'

profile: 'jaffle_shop'

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]

target-path: "target"
clean-targets:
- "target"
- "dbt_modules"
- "logs"

require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
jaffle_shop:
materialized: table
staging:
materialized: view
70 changes: 70 additions & 0 deletions dev/dags/dbt/model_version/models/customers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
with customers as (

select * from {{ ref('stg_customers') }}

),

orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

customer_orders as (

select
customer_id,

min(order_date) as first_order,
max(order_date) as most_recent_order,
count(order_id) as number_of_orders
from orders

group by customer_id

),

customer_payments as (

select
orders.customer_id,
sum(amount) as total_amount

from payments

left join orders on
payments.order_id = orders.order_id

group by orders.customer_id

),

final as (

select
customers.customer_id,
customers.first_name,
customers.last_name,
customers.full_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value

from customers

left join customer_orders
on customers.customer_id = customer_orders.customer_id

left join customer_payments
on customers.customer_id = customer_payments.customer_id

)

select * from final
69 changes: 69 additions & 0 deletions dev/dags/dbt/model_version/models/customers_v1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
with customers as (

select * from {{ ref('stg_customers', v=1) }}

),

orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

customer_orders as (

select
customer_id,

min(order_date) as first_order,
max(order_date) as most_recent_order,
count(order_id) as number_of_orders
from orders

group by customer_id

),

customer_payments as (

select
orders.customer_id,
sum(amount) as total_amount

from payments

left join orders on
payments.order_id = orders.order_id

group by orders.customer_id

),

final as (

select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value

from customers

left join customer_orders
on customers.customer_id = customer_orders.customer_id

left join customer_payments
on customers.customer_id = customer_payments.customer_id

)

select * from final
14 changes: 14 additions & 0 deletions dev/dags/dbt/model_version/models/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% docs orders_status %}

Orders can be one of the following statuses:

| status | description |
|----------------|------------------------------------------------------------------------------------------------------------------------|
| placed | The order has been placed but has not yet left the warehouse |
| shipped | The order has ben shipped to the customer and is currently in transit |
| completed | The order has been received by the customer |
| return_pending | The customer has indicated that they would like to return the order, but it has not yet been received at the warehouse |
| returned | The order has been returned by the customer and received at the warehouse |


{% enddocs %}
56 changes: 56 additions & 0 deletions dev/dags/dbt/model_version/models/orders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %}

with orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

order_payments as (

select
order_id,

{% for payment_method in payment_methods -%}
sum(case when payment_method = '{{ payment_method }}' then amount else 0 end) as {{ payment_method }}_amount,
{% endfor -%}

sum(amount) as total_amount

from payments

group by order_id

),

final as (

select
orders.order_id,
orders.customer_id,
orders.order_date,
orders.status,

{% for payment_method in payment_methods -%}

order_payments.{{ payment_method }}_amount,

{% endfor -%}

order_payments.total_amount as amount

from orders


left join order_payments
on orders.order_id = order_payments.order_id

)

select * from final
Loading
Loading