Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/derrickbovard #12

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions orchestrate/dags/derrickbovard_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import datetime

from airflow.decorators import dag, task_group
from airflow.providers.airbyte.operators.airbyte import \
AirbyteTriggerSyncOperator
from operators.datacoves.dbt import DatacovesDbtOperator


@dag(
default_args={
"start_date": datetime.datetime(2023, 1, 1, 0, 0),
"owner": "Derrick Bovard",
"email": "[email protected]",
"email_on_failure": True,
"retries": 3,
},
description="Personal Loan Average",
schedule="0 0 1 */12 *",
tags=["version_1"],
catchup=False,
)
def derrickbovard_dag():
@task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load")
def extract_and_load_airbyte():
country_populations_datacoves_train = AirbyteTriggerSyncOperator(
task_id="country_populations_datacoves_train",
connection_id="676575f7-22d7-41f4-ab78-52099d8cbccb",
airbyte_conn_id="airbyte_connection",
)
derrickbovard_source_datacoves_train = AirbyteTriggerSyncOperator(
task_id="derrickbovard_source_datacoves_train",
connection_id="082728a2-8bbb-4824-a345-ca345cc8313a",
airbyte_conn_id="airbyte_connection",
)

tg_extract_and_load_airbyte = extract_and_load_airbyte()
transform = DatacovesDbtOperator(
task_id="transform", bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'"
)
transform.set_upstream([tg_extract_and_load_airbyte])


dag = derrickbovard_dag()
27 changes: 27 additions & 0 deletions orchestrate/dags_yml_definitions/derrickbovard_dag
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
description: "Personal Loan Average"
schedule: "0 0 1 */12 *"
tags:
- version_1
default_args:
start_date: 2023-01-01
# Replace name and [email protected] (not real email)
owner: Derrick Bovard
email: [email protected]
email_on_failure: true
retries: 3
catchup: false

nodes:
extract_and_load_airbyte:
generator: AirbyteDbtGenerator
type: task_group

tooltip: "Airbyte Extract and Load"
dbt_list_args: "--select tag:daily_run_airbyte"

transform:
operator: operators.datacoves.dbt.DatacovesDbtOperator
type: task

bash_command: "dbt build -s 'tag:daily_run_airbyte+ -t prd'"
dependencies: ["extract_and_load_airbyte"]
10 changes: 10 additions & 0 deletions transform/models/L1_staging/loans/_loans_derrickbovard.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 2

sources:
- name: DERRICKBOVARD
database: RAW
tags:
- daily_run_airbyte
tables:
- name: PERSONAL_LOANS
description: 'A personal loans table'
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
with raw_source as (

select *
from {{ source('DERRICKBOVARD', 'PERSONAL_LOANS') }}

),

final as (

select
"_AIRBYTE_RAW_ID"::varchar as airbyte_raw_id,
"_AIRBYTE_EXTRACTED_AT"::timestamp_tz as airbyte_extracted_at,
"_AIRBYTE_META"::variant as airbyte_meta,
"TOTAL_ACC"::float as total_acc,
"ANNUAL_INC"::float as annual_inc,
"EMP_LENGTH"::varchar as emp_length,
"DESC"::varchar as desc,
"TOTAL_PYMNT"::float as total_pymnt,
"LAST_PYMNT_D"::varchar as last_pymnt_d,
"ADDR_STATE"::varchar as addr_state,
"NEXT_PYMNT_D"::varchar as next_pymnt_d,
"EMP_TITLE"::varchar as emp_title,
"COLLECTION_RECOVERY_FEE"::float as collection_recovery_fee,
"MTHS_SINCE_LAST_MAJOR_DEROG"::float as mths_since_last_major_derog,
"INQ_LAST_6MTHS"::float as inq_last_6mths,
"SUB_GRADE"::varchar as sub_grade,
"FUNDED_AMNT_INV"::float as funded_amnt_inv,
"DELINQ_2YRS"::float as delinq_2yrs,
"LOAN_ID"::varchar as loan_id,
"FUNDED_AMNT"::float as funded_amnt,
"VERIFICATION_STATUS"::varchar as verification_status,
"DTI"::float as dti,
"TOTAL_REC_PRNCP"::float as total_rec_prncp,
"GRADE"::varchar as grade,
"HOME_OWNERSHIP"::varchar as home_ownership,
"ISSUE_D"::varchar as issue_d,
"MTHS_SINCE_LAST_DELINQ"::float as mths_since_last_delinq,
"OUT_PRNCP"::float as out_prncp,
"PUB_REC"::float as pub_rec,
"INT_RATE"::float as int_rate,
"ZIP_CODE"::varchar as zip_code,
"OPEN_ACC"::float as open_acc,
"TERM"::varchar as term,
"PYMNT_PLAN"::varchar as pymnt_plan,
"URL"::varchar as url,
"REVOL_BAL"::float as revol_bal,
"RECOVERIES"::float as recoveries,
"LAST_PYMNT_AMNT"::float as last_pymnt_amnt,
"LOAN_AMNT"::float as loan_amnt,
"PURPOSE"::varchar as purpose,
"INITIAL_LIST_STATUS"::varchar as initial_list_status,
"TOTAL_REC_INT"::float as total_rec_int,
"TOTAL_PYMNT_INV"::float as total_pymnt_inv,
"MTHS_SINCE_LAST_RECORD"::float as mths_since_last_record,
"LAST_CREDIT_PULL_D"::varchar as last_credit_pull_d,
"TOTAL_REC_LATE_FEE"::float as total_rec_late_fee,
"MEMBER_ID"::float as member_id,
"POLICY_CODE"::float as policy_code,
"TITLE"::varchar as title,
"LOAN_STATUS"::varchar as loan_status,
"INSTALLMENT"::float as installment,
"EARLIEST_CR_LINE"::varchar as earliest_cr_line,
"REVOL_UTIL"::varchar as revol_util,
"OUT_PRNCP_INV"::float as out_prncp_inv,
"COLLECTIONS_12_MTHS_EX_MED"::float as collections_12_mths_ex_med

from raw_source

)

select * from final
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
version: 2

models:
- name: stg_derrickbovard_personal_loans
description: 'Personal Loans description'
columns:
- name: airbyte_raw_id
- name: airbyte_extracted_at
- name: airbyte_meta
- name: total_acc
- name: annual_inc
- name: emp_length
- name: desc
- name: total_pymnt
- name: last_pymnt_d
- name: addr_state
- name: next_pymnt_d
- name: emp_title
- name: collection_recovery_fee
- name: mths_since_last_major_derog
- name: inq_last_6mths
- name: sub_grade
- name: funded_amnt_inv
- name: delinq_2yrs
- name: loan_id
- name: funded_amnt
- name: verification_status
- name: dti
- name: total_rec_prncp
- name: grade
- name: home_ownership
- name: issue_d
- name: mths_since_last_delinq
- name: out_prncp
- name: pub_rec
- name: int_rate
- name: zip_code
- name: open_acc
- name: term
- name: pymnt_plan
- name: url
- name: revol_bal
- name: recoveries
- name: last_pymnt_amnt
- name: loan_amnt
- name: purpose
- name: initial_list_status
- name: total_rec_int
- name: total_pymnt_inv
- name: mths_since_last_record
- name: last_credit_pull_d
- name: total_rec_late_fee
- name: member_id
- name: policy_code
- name: title
- name: loan_status
- name: installment
- name: earliest_cr_line
- name: revol_util
- name: out_prncp_inv
- name: collections_12_mths_ex_med
20 changes: 20 additions & 0 deletions transform/models/L2_core/derrickbovard_avg_by_grade.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
with raw_source as (

select * from {{ ref('stg_derrickbovard_personal_loans') }}

),

final as (

select
grade,
avg(loan_amnt) as avg_loan_amount,
count(*) as total_loans
from raw_source
where loan_status = 'Fully Paid'
group by grade
order by grade

)

select * from final
9 changes: 9 additions & 0 deletions transform/models/L2_core/derrickbovard_avg_by_grade.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 2

models:
- name: derrickbovard_avg_by_grade
description: 'Derricks model calculating loan average by grade.'
columns:
- name: grade
- name: avg_loan_amount
- name: total_loans
Loading