From 536cfa4e3128f7cc47a12fe0a67b1a900a7cf4dc Mon Sep 17 00:00:00 2001 From: SebMelendez01 <78228475+SebMelendez01@users.noreply.github.com> Date: Tue, 25 Jun 2024 16:22:41 -0400 Subject: [PATCH] Stablecoins: Ethereum Breakdown Table (#221) --- .../agg_chain_stablecoin_breakdown.sql | 0 .../agg_chain_stablecoin_metrics.sql | 0 .../agg_chain_stablecoin_transfers.sql | 0 ...agg_daily_stablecoin_metrics_breakdown.sql | 148 +++++++++++++ ...illed_stablecoin_balances_by_addresses.sql | 208 ++++++++++++++++++ ..._agg_daily_stablecoin_breakdown_silver.yml | 17 ++ .../agg_daily_stablecoin_breakdown.sql | 21 ++ .../agg_daily_stablecoin_breakdown_silver.sql | 14 ++ ...eum_daily_stablecoin_metrics_breakdown.sql | 9 + .../agg_ethereum_stablecoin_balances.sql | 9 + 10 files changed, 426 insertions(+) rename macros/{ => stablecoins}/agg_chain_stablecoin_breakdown.sql (100%) rename macros/{ => stablecoins}/agg_chain_stablecoin_metrics.sql (100%) rename macros/{ => stablecoins}/agg_chain_stablecoin_transfers.sql (100%) create mode 100644 macros/stablecoins/agg_daily_stablecoin_metrics_breakdown.sql create mode 100644 macros/stablecoins/agg_foward_filled_stablecoin_balances_by_addresses.sql create mode 100644 models/metrics/stablecoins/breakdowns/_test_agg_daily_stablecoin_breakdown_silver.yml create mode 100644 models/metrics/stablecoins/breakdowns/agg_daily_stablecoin_breakdown.sql create mode 100644 models/metrics/stablecoins/breakdowns/agg_daily_stablecoin_breakdown_silver.sql create mode 100644 models/staging/ethereum/agg_ethereum_daily_stablecoin_metrics_breakdown.sql create mode 100644 models/staging/ethereum/agg_ethereum_stablecoin_balances.sql diff --git a/macros/agg_chain_stablecoin_breakdown.sql b/macros/stablecoins/agg_chain_stablecoin_breakdown.sql similarity index 100% rename from macros/agg_chain_stablecoin_breakdown.sql rename to macros/stablecoins/agg_chain_stablecoin_breakdown.sql diff --git a/macros/agg_chain_stablecoin_metrics.sql b/macros/stablecoins/agg_chain_stablecoin_metrics.sql similarity index 100% rename from macros/agg_chain_stablecoin_metrics.sql rename to macros/stablecoins/agg_chain_stablecoin_metrics.sql diff --git a/macros/agg_chain_stablecoin_transfers.sql b/macros/stablecoins/agg_chain_stablecoin_transfers.sql similarity index 100% rename from macros/agg_chain_stablecoin_transfers.sql rename to macros/stablecoins/agg_chain_stablecoin_transfers.sql diff --git a/macros/stablecoins/agg_daily_stablecoin_metrics_breakdown.sql b/macros/stablecoins/agg_daily_stablecoin_metrics_breakdown.sql new file mode 100644 index 00000000..d8ef9933 --- /dev/null +++ b/macros/stablecoins/agg_daily_stablecoin_metrics_breakdown.sql @@ -0,0 +1,148 @@ +{% macro agg_daily_stablecoin_metrics_breakdown(chain) %} +with + transfer_transactions as ( + select + block_timestamp, + from_address, + contract_address, + symbol, + transfer_volume, + to_address + -- @anthony + -- Can move into stablecoin transfers table if needed + -- Logic is slightly different for solana tron and near + -- Right now I am leaving it here so that we dont have to change the logic in the stablecoin transfers table + + --Average transfer volume is currently done in the API stablecoins.py with `_fetch_avg_transaction_size` + , case + {% if chain not in ('solana', 'tron', 'near') %} + when + to_address not in (select contract_address from {{ ref("dim_" ~ chain ~ "_contract_addresses") }}) + and from_address not in (select contract_address from {{ ref("dim_" ~ chain ~ "_contract_addresses")}}) + then 1 + else 0 + {% else %} + when + to_address in (select address from {{ ref("dim_" ~ chain ~ "_eoa_addresses") }}) + and from_address in (select address from {{ ref("dim_" ~ chain ~ "_eoa_addresses") }}) + then 1 + else 0 + {% endif %} + end as is_p2p + from {{ ref("fact_" ~ chain ~ "_stablecoin_transfers")}} + {% if is_incremental() %} + where block_timestamp >= ( + select dateadd('day', -3, max(date)) + from {{ this }} + ) + {% endif %} + ), + filtered_contracts as ( + select * from pc_dbt_db.prod.dim_contracts_gold where chain = '{{ chain }}' + ), + transfer_transactions_agg as ( + select + block_timestamp::date as date + , transfer_transactions.from_address::string as from_address + , transfer_transactions.contract_address as contract_address + , transfer_transactions.symbol as symbol + , sum(transfer_volume) as stablecoin_transfer_volume + , sum( + case + when transfer_transactions.from_address is not null + then 1 + else 0 + end + ) as stablecoin_daily_txns + , count(distinct(to_address)) as stablecoin_dau + , sum(transfer_volume * is_p2p) as p2p_stablecoin_transfer_volume + , sum( + case + when transfer_transactions.from_address is not null and is_p2p = 1 + then 1 + else 0 + end + ) as p2p_stablecoin_daily_txns + , count( + distinct case + when is_p2p = 1 then to_address + else null + end + ) as p2p_stablecoin_dau + from transfer_transactions + group by 1, 2, 3, 4 + ), + results as ( + select + coalesce(balances.date, transfer_transactions_agg.date) as date + --stablecoin idenifiers + , coalesce(balances.contract_address, transfer_transactions_agg.contract_address) as contract_address + , coalesce(balances.symbol, transfer_transactions_agg.symbol) as symbol + --sender idenifiers + , coalesce(balances.address, transfer_transactions_agg.from_address) as from_address + , filtered_contracts.name as contract_name + , coalesce(filtered_contracts.name, transfer_transactions_agg.from_address) as contract + , filtered_contracts.friendly_name as application + , dim_apps_gold.icon as icon + , filtered_contracts.app as app + , filtered_contracts.category as category + --metrics + , coalesce(stablecoin_transfer_volume, 0) as stablecoin_transfer_volume + , coalesce(stablecoin_daily_txns, 0) as stablecoin_daily_txns + , coalesce(stablecoin_dau, 0) stablecoin_dau + , coalesce(stablecoin_supply, 0) as stablecoin_supply + --p2p metrics + , coalesce(p2p_stablecoin_transfer_volume, 0) as p2p_stablecoin_transfer_volume + , coalesce(p2p_stablecoin_daily_txns, 0) as p2p_stablecoin_daily_txns + , coalesce(p2p_stablecoin_dau, 0) as p2p_stablecoin_dau + , '{{ chain }}' as chain + from {{ ref("agg_" ~ chain ~ "_stablecoin_balances")}} balances + -- _stablecoin_balances needs to go first because of the defined dumby address + -- 0x00000000000000000000000000000DEADARTEMIS + left join transfer_transactions_agg + on lower(transfer_transactions_agg.from_address) = lower(balances.address) + and transfer_transactions_agg.date = balances.date + and lower(transfer_transactions_agg.contract_address) = lower(balances.contract_address) + left join filtered_contracts + on lower(transfer_transactions_agg.from_address) = lower(filtered_contracts.address) + left join pc_dbt_db.prod.dim_apps_gold dim_apps_gold + on filtered_contracts.app = dim_apps_gold.namespace + ), + results_dollar_denom as ( + select + results.date + , results.contract_address + , results.symbol + + , from_address + , contract_name + , contract + , application + , icon + , app + , category + , stablecoin_transfer_volume * coalesce( + d.token_current_price, 1 + ) as stablecoin_transfer_volume + , stablecoin_daily_txns + , stablecoin_dau + , p2p_stablecoin_transfer_volume * coalesce( + d.token_current_price, 1 + ) as p2p_stablecoin_transfer_volume + , p2p_stablecoin_daily_txns + , p2p_stablecoin_dau + , stablecoin_supply * coalesce( + d.token_current_price, 1 + ) as stablecoin_supply + , chain + from results + left join {{ ref( "fact_" ~ chain ~ "_stablecoin_contracts") }} c + on lower(results.contract_address) = lower(c.contract_address) + left join {{ ref( "fact_coingecko_token_realtime_data") }} d + on lower(c.coingecko_id) = lower(d.token_id) + + ) +select * +from results_dollar_denom +where date < to_date(sysdate()) +{% endmacro %} \ No newline at end of file diff --git a/macros/stablecoins/agg_foward_filled_stablecoin_balances_by_addresses.sql b/macros/stablecoins/agg_foward_filled_stablecoin_balances_by_addresses.sql new file mode 100644 index 00000000..549d0298 --- /dev/null +++ b/macros/stablecoins/agg_foward_filled_stablecoin_balances_by_addresses.sql @@ -0,0 +1,208 @@ +{% macro agg_foward_filled_stablecoin_balances_by_addresses(chain) %} + +-- Use this for a backfill, +-- It is important to backfill 6 months at a time otherwise the query will +-- take > 4 hours on an XL to run +-- Make sure to set to '' after backfill is complete + + {% set backfill_date = '' %} + +-- TODO: Set backfill_date dynamically using jinja templating +-- if system.date > max(date) then max date + 6 months +-- else system.date +with + stablecoin_senders as (select from_address from {{ ref("fact_" ~ chain ~ "_stablecoin_transfers")}}) + , stablecoin_balances as ( + select + block_timestamp + , lower(t1.contract_address) as contract_address + , symbol + , lower(address) as address + , balance_token / pow(10, num_decimals) as stablecoin_supply + from {{ ref("fact_" ~ chain ~ "_address_balances_by_token")}} t1 + inner join {{ ref("fact_" ~ chain ~ "_stablecoin_contracts")}} t2 + on lower(t1.contract_address) = lower(t2.contract_address) + where lower(address) in (select lower(from_address) from stablecoin_senders) and block_timestamp < to_date(sysdate()) + -- Use this for a backfill, + -- It is important to backfill 6 months at a time otherwise the query will + -- take > 4 hours on an XL to run + {% if backfill_date != '' %} + and block_timestamp < '{{ backfill_date }}' + {% endif %} + and block_timestamp < to_date(sysdate()) + {% if is_incremental() %} + and block_timestamp >= (select dateadd('day', -1, max(date)) from {{ this }}) + {% endif %} + ) + {% if is_incremental() %} + , stale_stablecoin_balances as ( + select + date as block_timestamp + , t.contract_address + , t.symbol + , t.address + , t.stablecoin_supply + from {{this}} t + left join ( + select distinct address, contract_address + from stablecoin_balances + ) sb on t.address = sb.address and t.contract_address = sb.contract_address + where date >= (select dateadd('day', -1, max(date)) from {{ this }}) + and sb.address is null and sb.contract_address is null + ) + {% endif %} + , heal_balance_table as ( + select + block_timestamp + , contract_address + , symbol + , address + , stablecoin_supply + from stablecoin_balances + {% if is_incremental() %} + union + select + block_timestamp + , contract_address + , symbol + , address + , stablecoin_supply + from stale_stablecoin_balances + {% endif %} + ) + , date_range as ( + select + min(block_timestamp)::date as date + , contract_address + , symbol + , address + from heal_balance_table + group by contract_address, address, symbol + + union all + + select + dateadd(day, 1, date) as date + , contract_address + , symbol + , address + from date_range + where date < dateadd(day, -1, to_date(sysdate())) + {% if backfill_date != '' %} + and date < dateadd(day, -1, '{{ backfill_date }}') + {% endif %} + ) + , balances as ( + select + block_timestamp::date as date + , contract_address + , symbol + , address + , stablecoin_supply + from ( + select + block_timestamp + , contract_address + , symbol + , address + , stablecoin_supply + , row_number() over (partition by block_timestamp::date, contract_address, address, symbol order by block_timestamp desc) AS rn + from heal_balance_table + ) + where rn = 1 + ) + , historical_supply_by_address_balances as ( + select + date + , address + , contract_address + , symbol + , coalesce( + stablecoin_supply, + LAST_VALUE(balances.stablecoin_supply ignore nulls) over ( + partition by contract_address, address, symbol + order by date + rows between unbounded preceding and current row + ) + ) as stablecoin_supply + from date_range + left join balances using (date, contract_address, symbol, address) + -- append only new rows to the table + {% if is_incremental() %} + where date > (select max(date) from {{ this }}) + {% endif %} + ) + , daily_flows as ( + select + date + , sum(inflow) inflow + , lower(contract_address) as contract_address + , symbol + from {{ref("fact_" ~ chain ~ "_stablecoin_transfers")}} + {% if backfill_date != '' %} + where date < '{{ backfill_date }}' + {% endif %} + + group by date, contract_address, symbol + union all + select + dateadd( + day, -1, (select min(trunc(date, 'day')) from {{ref("fact_" ~ chain ~ "_stablecoin_transfers")}}) + ) as date + , sum(initial_supply) as inflow + , lower(contract_address) as contract_address + , symbol + from {{ref("fact_" ~ chain ~ "_stablecoin_contracts")}} + {% if backfill_date != '' %} + where date < '{{ backfill_date }}' + {% endif %} + group by contract_address, symbol + ) + , historical_supply_by_inflow_outflow as ( + select + date + , symbol + , stablecoin_supply + , contract_address + from ( + select + date + , symbol + , sum(inflow) over ( + partition by contract_address, symbol order by date asc + ) as stablecoin_supply + , contract_address + from daily_flows + ) + ) + , total_historical_supply_by_address_balances as ( + select + date + , contract_address + , symbol + , sum(stablecoin_supply) as stablecoin_supply + from historical_supply_by_address_balances + group by date, contract_address, symbol + ) + + select + date + , address + , contract_address + , symbol + , stablecoin_supply + from historical_supply_by_address_balances + union + -- this is a hacky way to fix the issue with the balances table and get the total supply for that day + select + date + , '0x00000000000000000000000000000DEADARTEMIS' as address + , contract_address + , symbol + , historical_supply_by_inflow_outflow.stablecoin_supply - total_historical_supply_by_address_balances.stablecoin_supply as stablecoin_supply + from total_historical_supply_by_address_balances + left join historical_supply_by_inflow_outflow using (date, contract_address, symbol) + +{% endmacro %} + + diff --git a/models/metrics/stablecoins/breakdowns/_test_agg_daily_stablecoin_breakdown_silver.yml b/models/metrics/stablecoins/breakdowns/_test_agg_daily_stablecoin_breakdown_silver.yml new file mode 100644 index 00000000..6d7f0822 --- /dev/null +++ b/models/metrics/stablecoins/breakdowns/_test_agg_daily_stablecoin_breakdown_silver.yml @@ -0,0 +1,17 @@ +models: + - name: agg_daily_stablecoin_breakdown_silver + tests: + - "dbt_expectations.expect_grouped_row_values_to_have_recent_data": + group_by: [CHAIN, SYMBOL] + timestamp_column: "DATE" + datepart: "day" + interval: 2 + columns: + - name: "DATE" + tests: + - not_null + - dbt_expectations.expect_column_to_exist + - name: "CHAIN" + tests: + - not_null + - dbt_expectations.expect_column_to_exist diff --git a/models/metrics/stablecoins/breakdowns/agg_daily_stablecoin_breakdown.sql b/models/metrics/stablecoins/breakdowns/agg_daily_stablecoin_breakdown.sql new file mode 100644 index 00000000..25620ef4 --- /dev/null +++ b/models/metrics/stablecoins/breakdowns/agg_daily_stablecoin_breakdown.sql @@ -0,0 +1,21 @@ +{{ config(materialized="table") }} +select + date + , contract_address + , symbol + , from_address + , contract_name + , contract + , application + , icon + , app + , category + , stablecoin_transfer_volume + , stablecoin_daily_txns + , stablecoin_dau + , p2p_stablecoin_transfer_volume + , p2p_stablecoin_daily_txns + , p2p_stablecoin_dau + , stablecoin_supply + , chain +from {{ ref("agg_daily_stablecoin_breakdown_silver") }} \ No newline at end of file diff --git a/models/metrics/stablecoins/breakdowns/agg_daily_stablecoin_breakdown_silver.sql b/models/metrics/stablecoins/breakdowns/agg_daily_stablecoin_breakdown_silver.sql new file mode 100644 index 00000000..7cfba225 --- /dev/null +++ b/models/metrics/stablecoins/breakdowns/agg_daily_stablecoin_breakdown_silver.sql @@ -0,0 +1,14 @@ +{{ config(materialized="table") }} +with + daily_data as ( + {{ + dbt_utils.union_relations( + relations=[ + ref("agg_ethereum_daily_stablecoin_metrics_breakdown"), + ] + ) + }} + ) +select *, date::string || '-' || chain || '-' || symbol || from_address || contract_address as unique_id +from daily_data +where date < to_date(sysdate()) \ No newline at end of file diff --git a/models/staging/ethereum/agg_ethereum_daily_stablecoin_metrics_breakdown.sql b/models/staging/ethereum/agg_ethereum_daily_stablecoin_metrics_breakdown.sql new file mode 100644 index 00000000..c9d3bf5c --- /dev/null +++ b/models/staging/ethereum/agg_ethereum_daily_stablecoin_metrics_breakdown.sql @@ -0,0 +1,9 @@ +{{ + config( + materialized="table", + unique_key=["date", "symbol", "from_address"], + snowflake_warehouse="BALANCES_LG", + ) +}} + +{{ agg_daily_stablecoin_metrics_breakdown("ethereum") }} \ No newline at end of file diff --git a/models/staging/ethereum/agg_ethereum_stablecoin_balances.sql b/models/staging/ethereum/agg_ethereum_stablecoin_balances.sql new file mode 100644 index 00000000..713d4706 --- /dev/null +++ b/models/staging/ethereum/agg_ethereum_stablecoin_balances.sql @@ -0,0 +1,9 @@ +{{ + config( + materialized="incremental", + unique_key=["address", "contract_address", "date"], + snowflake_warehouse="BALANCES_LG", + ) +}} + +{{ agg_foward_filled_stablecoin_balances_by_addresses("ethereum") }} \ No newline at end of file