diff --git a/week_4/project/dbt_config.py b/week_4/project/dbt_config.py new file mode 100644 index 00000000..77efd7f2 --- /dev/null +++ b/week_4/project/dbt_config.py @@ -0,0 +1 @@ +DBT_PROJECT_PATH = "/opt/dagster/dagster_home/project/dbt_test_project" diff --git a/week_4/project/dbt_test_project/dbt_project.yml b/week_4/project/dbt_test_project/dbt_project.yml new file mode 100644 index 00000000..925ed22f --- /dev/null +++ b/week_4/project/dbt_test_project/dbt_project.yml @@ -0,0 +1,22 @@ +name: 'test_dbt' +version: '1.0.0' +config-version: 2 + +profile: 'test_dbt' + +source-paths: ["models"] +analysis-paths: ["analysis"] +test-paths: ["tests"] +data-paths: ["data"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_modules" + +models: + test_dbt: + example: + +materialized: view \ No newline at end of file diff --git a/week_4/project/dbt_test_project/models/my_first_dbt_model.sql b/week_4/project/dbt_test_project/models/my_first_dbt_model.sql new file mode 100644 index 00000000..52304b48 --- /dev/null +++ b/week_4/project/dbt_test_project/models/my_first_dbt_model.sql @@ -0,0 +1,5 @@ +{{ config(materialized='table') }} + + +SELECT * +FROM {{ source('postgresql', 'dbt_table') }} \ No newline at end of file diff --git a/week_4/project/dbt_test_project/models/my_second_dbt_model.sql b/week_4/project/dbt_test_project/models/my_second_dbt_model.sql new file mode 100644 index 00000000..0fad5c2b --- /dev/null +++ b/week_4/project/dbt_test_project/models/my_second_dbt_model.sql @@ -0,0 +1,5 @@ +{{ config(materialized='table') }} + + +SELECT column_2 AS my_column +FROM {{ ref('my_first_dbt_model') }} diff --git a/week_4/project/dbt_test_project/models/schema.yml b/week_4/project/dbt_test_project/models/schema.yml new file mode 100644 index 00000000..221e45b1 --- /dev/null +++ b/week_4/project/dbt_test_project/models/schema.yml @@ -0,0 +1,22 @@ +version: 2 + +models: + - name: my_first_dbt_model + description: "A starter dbt model" + columns: + - name: column_1 + tests: + - not_null + - name: column_2 + tests: + - not_null + - name: column_3 + tests: + - not_null + + - name: my_second_dbt_model + description: "A starter dbt model" + columns: + - name: my_column + tests: + - not_null \ No newline at end of file diff --git a/week_4/project/dbt_test_project/models/sources.yml b/week_4/project/dbt_test_project/models/sources.yml new file mode 100644 index 00000000..8f712a32 --- /dev/null +++ b/week_4/project/dbt_test_project/models/sources.yml @@ -0,0 +1,7 @@ +version: 2 + +sources: + - name: postgresql + schema: analytics + tables: + - name: dbt_table diff --git a/week_4/project/dbt_test_project/profiles.yml b/week_4/project/dbt_test_project/profiles.yml new file mode 100644 index 00000000..fae321a9 --- /dev/null +++ b/week_4/project/dbt_test_project/profiles.yml @@ -0,0 +1,12 @@ +test_dbt: + target: test + outputs: + test: + type: postgres + host: postgresql + user: postgres_user + password: postgres_password + port: 5432 + dbname: postgres_db + schema: analytics + threads: 4 \ No newline at end of file diff --git a/week_4/project/repo.py b/week_4/project/repo.py index df01a5c6..b8668c5c 100644 --- a/week_4/project/repo.py +++ b/week_4/project/repo.py @@ -1,11 +1,20 @@ -from dagster import repository +from dagster import repository, with_resources +from dagster_dbt import dbt_cli_resource +from project.dbt_config import DBT_PROJECT_PATH +from project.resources import postgres_resource from project.week_4 import ( get_s3_data_docker, process_data_docker, put_redis_data_docker, ) +from project.week_4_challenge import create_dbt_table, insert_dbt_data @repository def repo(): return [get_s3_data_docker, process_data_docker, put_redis_data_docker] + + +@repository +def assets_dbt(): + pass diff --git a/week_4/project/resources.py b/week_4/project/resources.py index aad69c2e..a142dc16 100644 --- a/week_4/project/resources.py +++ b/week_4/project/resources.py @@ -4,9 +4,26 @@ import boto3 import redis +import sqlalchemy from dagster import Field, Int, String, resource +class Postgres: + def __init__(self, host: str, user: str, password: str, database: str): + self.host = host + self.user = user + self.password = password + self.database = database + self._engine = sqlalchemy.create_engine(self.uri) + + @property + def uri(self): + return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}" + + def execute_query(self, query: str): + self._engine.execute(query) + + class S3: def __init__(self, bucket: str, access_key: str, secret_key: str, endpoint_url: str = None): self.bucket = bucket @@ -39,6 +56,25 @@ def put_data(self, name: str, value: str): self.client.set(name, value) +@resource( + config_schema={ + "host": Field(String), + "user": Field(String), + "password": Field(String), + "database": Field(String), + }, + description="A resource that can run Postgres", +) +def postgres_resource(context) -> Postgres: + """This resource defines a Postgres client""" + return Postgres( + host=context.resource_config["host"], + user=context.resource_config["user"], + password=context.resource_config["password"], + database=context.resource_config["database"], + ) + + @resource def mock_s3_resource(context): stocks = [ diff --git a/week_4/project/week_4_challenge.py b/week_4/project/week_4_challenge.py new file mode 100644 index 00000000..402d3863 --- /dev/null +++ b/week_4/project/week_4_challenge.py @@ -0,0 +1,36 @@ +from random import randint + +from dagster import AssetIn, asset +from dagster_dbt import load_assets_from_dbt_project +from project.dbt_config import DBT_PROJECT_PATH + + +@asset( + required_resource_keys={"database"}, + op_tags={"kind": "postgres"}, +) +def create_dbt_table(context): + sql = "CREATE SCHEMA IF NOT EXISTS analytics;" + context.resources.database.execute_query(sql) + sql = "CREATE TABLE IF NOT EXISTS analytics.dbt_table (column_1 VARCHAR(100), column_2 VARCHAR(100), column_3 VARCHAR(100));" + context.resources.database.execute_query(sql) + + +@asset( + required_resource_keys={"database"}, + op_tags={"kind": "postgres"}, +) +def insert_dbt_data(context, create_dbt_table): + sql = "INSERT INTO analytics.dbt_table (column_1, column_2, column_3) VALUES ('A', 'B', 'C');" + + number_of_rows = randint(1, 10) + for _ in range(number_of_rows): + context.resources.database.execute_query(sql) + context.log.info("Inserted a row") + + context.log.info("Batch inserted") + + +@asset +def final(context): + context.log.info("Week 4 Challenge completed") \ No newline at end of file