diff --git a/queries/orderbook/.sqlfluff b/queries/orderbook/.sqlfluff index 3efa57a9..44fc4586 100644 --- a/queries/orderbook/.sqlfluff +++ b/queries/orderbook/.sqlfluff @@ -3,3 +3,5 @@ start_block=100 end_block=100000000 EPSILON_LOWER=10000000000000000 EPSILON_UPPER=12000000000000000 +results=solver_rewards_script_table +env=prod diff --git a/queries/orderbook/barn_batch_rewards.sql b/queries/orderbook/barn_batch_rewards.sql index d6e654fd..ea71b41d 100644 --- a/queries/orderbook/barn_batch_rewards.sql +++ b/queries/orderbook/barn_batch_rewards.sql @@ -204,6 +204,29 @@ reward_per_auction as ( from reward_data ), +dune_sync_batch_data_table as ( --noqa: ST03 + select --noqa: ST06 + 'barn' as environment, + auction_id, + settlement_block as block_number, + block_deadline, + case + when tx_hash is null then null + else concat('0x', encode(tx_hash, 'hex')) + end as tx_hash, + concat('0x', encode(solver, 'hex')) as solver, + execution_cost, + surplus, + protocol_fee, + network_fee, + uncapped_payment as uncapped_payment_eth, + capped_payment, + winning_score, + reference_score + from reward_per_auction + order by block_deadline +), + primary_rewards as ( select solver, @@ -242,8 +265,12 @@ aggregate_results as ( partner_list, partner_fee as partner_fee_eth from primary_rewards as pr left outer join aggregate_partner_fees_per_solver as aif on pr.solver = aif.solver +), + +solver_rewards_script_table as ( + select * + from aggregate_results + order by solver ) -select * -from aggregate_results -order by solver +select * from {{results}} diff --git a/queries/orderbook/order_data.sql b/queries/orderbook/order_data.sql new file mode 100644 index 00000000..5d2ba299 --- /dev/null +++ b/queries/orderbook/order_data.sql @@ -0,0 +1,215 @@ +with trade_hashes as ( + select + settlement.solver, + t.block_number, + order_uid, + fee_amount, + settlement.tx_hash, + auction_id + from + trades as t + left outer join lateral ( + select + s.tx_hash, + s.solver, + s.auction_id, + s.block_number, + s.log_index + from settlements as s + where s.block_number = t.block_number and s.log_index > t.log_index + order by s.log_index asc + limit 1 + ) as settlement on true + inner join settlement_observations as so + on settlement.block_number = so.block_number and settlement.log_index = so.log_index + where settlement.block_number >= {{start_block}} and settlement.block_number <= {{end_block}} +), + +-- order data +order_data as ( + select + uid, + sell_token, + buy_token, + sell_amount, + buy_amount, + kind, + app_data + from orders + union all + select + uid, + sell_token, + buy_token, + sell_amount, + buy_amount, + kind, + app_data + from jit_orders +), + +protocol_fee_kind as ( + select distinct on (fp.auction_id, fp.order_uid) + fp.auction_id, + fp.order_uid, + fp.kind + from fee_policies as fp inner join trade_hashes as th + on fp.auction_id = th.auction_id and fp.order_uid = th.order_uid + order by fp.auction_id, fp.order_uid, fp.application_order +), + +-- unprocessed trade data +trade_data_unprocessed as ( + select + ss.winner as solver, + s.auction_id, + s.tx_hash, + t.order_uid, + od.sell_token, + od.buy_token, + t.sell_amount, -- the total amount the user sends + t.buy_amount, -- the total amount the user receives + oe.surplus_fee as observed_fee, -- the total discrepancy between what the user sends and what they would have send if they traded at clearing price + od.kind, + case + when od.kind = 'sell' then od.buy_token + when od.kind = 'buy' then od.sell_token + end as surplus_token, + cast(convert_from(ad.full_app_data, 'UTF8') as jsonb) -> 'metadata' -> 'partnerFee' ->> 'recipient' as partner_fee_recipient, + coalesce(oe.protocol_fee_amounts[1], 0) as first_protocol_fee_amount, + coalesce(oe.protocol_fee_amounts[2], 0) as second_protocol_fee_amount + from + settlements as s inner join settlement_scores as ss -- contains block_deadline + on s.auction_id = ss.auction_id + inner join trades as t -- contains traded amounts + on s.block_number = t.block_number -- given the join that follows with the order execution table, this works even when multiple txs appear in the same block + inner join order_data as od -- contains tokens and limit amounts + on t.order_uid = od.uid + inner join order_execution as oe -- contains surplus fee + on t.order_uid = oe.order_uid and s.auction_id = oe.auction_id + left outer join app_data as ad -- contains full app data + on od.app_data = ad.contract_app_data + where s.block_number >= {{start_block}} and s.block_number <= {{end_block}} +), + +-- processed trade data: +trade_data_processed as ( + select --noqa: ST06 + tdu.auction_id, + tdu.solver, + tdu.tx_hash, + tdu.order_uid, + tdu.sell_amount, + tdu.buy_amount, + tdu.sell_token, + tdu.observed_fee, + tdu.surplus_token, + tdu.second_protocol_fee_amount, + tdu.first_protocol_fee_amount + tdu.second_protocol_fee_amount as protocol_fee, + tdu.partner_fee_recipient, + case + when tdu.partner_fee_recipient is not null then tdu.second_protocol_fee_amount + else 0 + end as partner_fee, + tdu.surplus_token as protocol_fee_token, + pfk.kind as protocol_fee_kind + from trade_data_unprocessed as tdu left outer join protocol_fee_kind as pfk + on tdu.order_uid = pfk.order_uid and tdu.auction_id = pfk.auction_id +), + +price_data as ( + select + tdp.auction_id, + tdp.order_uid, + ap_surplus.price / pow(10, 18) as surplus_token_native_price, + ap_protocol.price / pow(10, 18) as protocol_fee_token_native_price, + ap_sell.price / pow(10, 18) as network_fee_token_native_price + from trade_data_processed as tdp + left outer join auction_prices as ap_sell -- contains price: sell token + on tdp.auction_id = ap_sell.auction_id and tdp.sell_token = ap_sell.token + left outer join auction_prices as ap_surplus -- contains price: surplus token + on tdp.auction_id = ap_surplus.auction_id and tdp.surplus_token = ap_surplus.token + left outer join auction_prices as ap_protocol -- contains price: protocol fee token + on tdp.auction_id = ap_protocol.auction_id and tdp.surplus_token = ap_protocol.token +), + +trade_data_processed_with_prices as ( + select --noqa: ST06 + tdp.auction_id, + tdp.solver, + tdp.tx_hash, + tdp.order_uid, + tdp.surplus_token, + tdp.protocol_fee, + tdp.protocol_fee_token, + tdp.partner_fee, + tdp.partner_fee_recipient, + case + when tdp.sell_token != tdp.surplus_token then tdp.observed_fee - (tdp.sell_amount - tdp.observed_fee) / tdp.buy_amount * coalesce(tdp.protocol_fee, 0) + else tdp.observed_fee - coalesce(tdp.protocol_fee, 0) + end as network_fee, + tdp.sell_token as network_fee_token, + surplus_token_native_price, + protocol_fee_token_native_price, + network_fee_token_native_price, + protocol_fee_kind + from + trade_data_processed as tdp inner join price_data as pd + on tdp.auction_id = pd.auction_id and tdp.order_uid = pd.order_uid +), + +winning_quotes as ( + select --noqa: ST06 + concat('0x', encode(oq.solver, 'hex')) as quote_solver, + oq.order_uid + from trades as t inner join orders as o on order_uid = uid + inner join order_quotes as oq on t.order_uid = oq.order_uid + where + ( + o.class = 'market' + or ( + o.kind = 'sell' + and ( + oq.sell_amount - oq.gas_amount * oq.gas_price / oq.sell_token_price + ) * oq.buy_amount >= o.buy_amount * oq.sell_amount + ) + or ( + o.kind = 'buy' + and o.sell_amount >= oq.sell_amount + oq.gas_amount * oq.gas_price / oq.sell_token_price + ) + ) + and o.partially_fillable = 'f' -- the code above might fail for partially fillable orders + and t.block_number >= {{start_block}} + and t.block_number <= {{end_block}} + and oq.solver != '\x0000000000000000000000000000000000000000' +) -- Most efficient column order for sorting would be having tx_hash or order_uid first + +select + '{{env}}' as environment, + trade_hashes.auction_id, + trade_hashes.block_number, + concat('0x', encode(trade_hashes.order_uid, 'hex')) as order_uid, + concat('0x', encode(trade_hashes.solver, 'hex')) as solver, + quote_solver, + concat('0x', encode(trade_hashes.tx_hash, 'hex')) as tx_hash, + cast(coalesce(surplus_fee, 0) as text) as surplus_fee, + coalesce(reward, 0.0) as amount, + cast(coalesce(cast(protocol_fee as numeric(78, 0)), 0) as text) as protocol_fee, + case + when protocol_fee_token is not null then concat('0x', encode(protocol_fee_token, 'hex')) + end as protocol_fee_token, + coalesce(protocol_fee_token_native_price, 0.0) as protocol_fee_native_price, + cast(cast(oq.sell_amount as numeric(78, 0)) as text) as quote_sell_amount, + cast(cast(oq.buy_amount as numeric(78, 0)) as text) as quote_buy_amount, + oq.gas_amount * oq.gas_price as quote_gas_cost, + oq.sell_token_price as quote_sell_token_price, + cast(cast(coalesce(tdpwp.partner_fee, 0) as numeric(78, 0)) as text) as partner_fee, + tdpwp.partner_fee_recipient, + tdpwp.protocol_fee_kind +from trade_hashes left outer join order_execution as o + on trade_hashes.order_uid = o.order_uid and trade_hashes.auction_id = o.auction_id +left outer join winning_quotes as wq on trade_hashes.order_uid = wq.order_uid +left outer join trade_data_processed_with_prices as tdpwp + on trade_hashes.order_uid = tdpwp.order_uid and trade_hashes.auction_id = tdpwp.auction_id +left outer join order_quotes as oq on trade_hashes.order_uid = oq.order_uid +order by trade_hashes.block_number asc diff --git a/queries/orderbook/prod_batch_rewards.sql b/queries/orderbook/prod_batch_rewards.sql index d6e654fd..b3d70c51 100644 --- a/queries/orderbook/prod_batch_rewards.sql +++ b/queries/orderbook/prod_batch_rewards.sql @@ -204,6 +204,29 @@ reward_per_auction as ( from reward_data ), +dune_sync_batch_data_table as ( --noqa: ST03 + select --noqa: ST06 + 'prod' as environment, + auction_id, + settlement_block as block_number, + block_deadline, + case + when tx_hash is null then null + else concat('0x', encode(tx_hash, 'hex')) + end as tx_hash, + concat('0x', encode(solver, 'hex')) as solver, + execution_cost, + surplus, + protocol_fee, + network_fee, + uncapped_payment as uncapped_payment_eth, + capped_payment, + winning_score, + reference_score + from reward_per_auction + order by block_deadline +), + primary_rewards as ( select solver, @@ -242,8 +265,12 @@ aggregate_results as ( partner_list, partner_fee as partner_fee_eth from primary_rewards as pr left outer join aggregate_partner_fees_per_solver as aif on pr.solver = aif.solver +), + +solver_rewards_script_table as ( + select * + from aggregate_results + order by solver ) -select * -from aggregate_results -order by solver +select * from {{results}} diff --git a/src/config.py b/src/config.py index f4d96bef..f0faeb71 100644 --- a/src/config.py +++ b/src/config.py @@ -335,6 +335,33 @@ def from_env() -> IOConfig: ) +@dataclass(frozen=True) +class DataProcessingConfig: + """Configuration of data processing component.""" + + # pylint: disable=too-many-instance-attributes + bucket_size: int + + @staticmethod + def from_network(network: Network) -> DataProcessingConfig: + """Initialize data processing config for a given network.""" + match network: + case Network.MAINNET: + bucket_size = 10000 + case Network.GNOSIS: + bucket_size = 30000 + case Network.ARBITRUM_ONE: + bucket_size = 1000000 + case _: + raise ValueError( + f"No buffer accounting config set up for network {network}." + ) + + return DataProcessingConfig( + bucket_size=bucket_size, + ) + + @dataclass(frozen=True) class AccountingConfig: """Full configuration for solver accounting.""" @@ -349,6 +376,7 @@ class AccountingConfig: protocol_fee_config: ProtocolFeeConfig buffer_accounting_config: BufferAccountingConfig io_config: IOConfig + data_processing_config: DataProcessingConfig @staticmethod def from_network(network: Network) -> AccountingConfig: @@ -363,6 +391,7 @@ def from_network(network: Network) -> AccountingConfig: protocol_fee_config=ProtocolFeeConfig.from_network(network), buffer_accounting_config=BufferAccountingConfig.from_network(network), io_config=IOConfig.from_env(), + data_processing_config=DataProcessingConfig.from_network(network), ) diff --git a/src/data_sync/__init__.py b/src/data_sync/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/data_sync/common.py b/src/data_sync/common.py new file mode 100644 index 00000000..35b067ae --- /dev/null +++ b/src/data_sync/common.py @@ -0,0 +1,110 @@ +"""Shared methods between both sync scripts.""" + +from datetime import datetime, timezone +from typing import List, Tuple +from web3 import Web3 +from src.logger import set_log + +log = set_log(__name__) + + +def find_block_with_timestamp(node: Web3, time_stamp: float) -> int: + """ + This implements binary search and returns the smallest block number + whose timestamp is at least as large as the time_stamp argument passed in the function + """ + end_block_number = int(node.eth.get_block("finalized")["number"]) + start_block_number = 1 + close_in_seconds = 30 + + while True: + mid_block_number = (start_block_number + end_block_number) // 2 + block = node.eth.get_block(mid_block_number) + block_time = block["timestamp"] + difference_in_seconds = int((time_stamp - block_time)) + + if abs(difference_in_seconds) < close_in_seconds: + break + + if difference_in_seconds < 0: + end_block_number = mid_block_number - 1 + else: + start_block_number = mid_block_number + 1 + + ## we now brute-force to ensure we have found the right block + for b in range(mid_block_number - 200, mid_block_number + 200): + block = node.eth.get_block(b) + block_time_stamp = block["timestamp"] + if block_time_stamp >= time_stamp: + return int(block["number"]) + # fallback in case correct block number hasn't been found + # in that case, we will include some more blocks than necessary + return mid_block_number + 200 + + +def compute_block_and_month_range( # pylint: disable=too-many-locals + node: Web3, recompute_previous_month: bool +) -> Tuple[List[Tuple[int, int]], List[str]]: + """ + This determines the block range and the relevant months + for which we will compute and upload data on Dune. + """ + # The function first a list of block ranges, followed by a list of + # # months. Block ranges are stored as (start_block, end_block) pairs, + # and are meant to be interpreted as closed intervals. + # Moreover, we assume that the job runs at least once every 24h + # Because of that, if it is the first day of month, we also + # compute the previous month's table just to be on the safe side + + latest_finalized_block = node.eth.get_block("finalized") + + current_month_end_block = int(latest_finalized_block["number"]) + current_month_end_timestamp = latest_finalized_block["timestamp"] + + current_month_end_datetime = datetime.fromtimestamp( + current_month_end_timestamp, tz=timezone.utc + ) + current_month_start_datetime = datetime( + current_month_end_datetime.year, current_month_end_datetime.month, 1, 00, 00 + ) + current_month_start_timestamp = current_month_start_datetime.replace( + tzinfo=timezone.utc + ).timestamp() + + current_month_start_block = find_block_with_timestamp( + node, current_month_start_timestamp + ) + + current_month = ( + f"{current_month_end_datetime.year}_{current_month_end_datetime.month}" + ) + months_list = [current_month] + block_range = [(current_month_start_block, current_month_end_block)] + if current_month_end_datetime.day == 1 or recompute_previous_month: + if current_month_end_datetime.month == 1: + previous_month = f"{current_month_end_datetime.year - 1}_12" + previous_month_start_datetime = datetime( + current_month_end_datetime.year - 1, 12, 1, 00, 00 + ) + else: + previous_month = f"""{current_month_end_datetime.year}_ + {current_month_end_datetime.month - 1} + """ + previous_month_start_datetime = datetime( + current_month_end_datetime.year, + current_month_end_datetime.month - 1, + 1, + 00, + 00, + ) + months_list.append(previous_month) + previous_month_start_timestamp = previous_month_start_datetime.replace( + tzinfo=timezone.utc + ).timestamp() + previous_month_start_block = find_block_with_timestamp( + node, previous_month_start_timestamp + ) + previous_month_end_block = current_month_start_block - 1 + block_range.append((previous_month_start_block, previous_month_end_block)) + + return block_range, months_list diff --git a/src/data_sync/config.py b/src/data_sync/config.py new file mode 100644 index 00000000..2da627ca --- /dev/null +++ b/src/data_sync/config.py @@ -0,0 +1,37 @@ +"""Configuration details for sync jobs""" + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class SyncConfig: + """ + This data class contains all the credentials and volume paths + required to sync with both a persistent volume and Dune's S3 Buckets. + """ + + volume_path: Path + # File System + sync_file: str = "sync_block.csv" + sync_column: str = "last_synced_block" + + +@dataclass +class BatchDataSyncConfig: + """Configuration for batch data sync.""" + + # The name of the table to upload to + table: str = "batch_data_test" + # Description of the table (for creation) + description: str = "Table containing raw batch data" + + +@dataclass +class OrderDataSyncConfig: + """Configuration for order data sync.""" + + # The name of the table to upload to + table: str = "order_data_test" + # Description of the table (for creation) + description: str = "Table containing raw order data" diff --git a/src/data_sync/sync_data.py b/src/data_sync/sync_data.py new file mode 100644 index 00000000..ed8747cd --- /dev/null +++ b/src/data_sync/sync_data.py @@ -0,0 +1,118 @@ +"""Main Entry point for running any sync job""" + +import argparse +import asyncio +import os +from dataclasses import dataclass +from dotenv import load_dotenv +from web3 import Web3 +from src.fetch.orderbook import OrderbookFetcher, OrderbookEnv +from src.config import AccountingConfig, Network, web3 +from src.logger import set_log +from src.models.tables import SyncTable +from src.data_sync.common import compute_block_and_month_range +from src.models.block_range import BlockRange + + +log = set_log(__name__) + + +@dataclass +class ScriptArgs: + """Runtime arguments' parser/initializer""" + + sync_table: SyncTable + + def __init__(self) -> None: + parser = argparse.ArgumentParser("Dune Community Sources Sync") + parser.add_argument( + "--sync-table", + type=SyncTable, + required=True, + choices=list(SyncTable), + ) + arguments, _ = parser.parse_known_args() + self.sync_table: SyncTable = arguments.sync_table + + +async def sync_data_to_db( # pylint: disable=too-many-arguments + type_of_data: str, + node: Web3, + orderbook: OrderbookFetcher, + network: str, + config: AccountingConfig, + recompute_previous_month: bool, +) -> None: + """ + Order/Batch data Sync Logic. The recompute_previous_month flag, when enabled, + forces a recomputation of the previous month. If it is set to False, previous month + is still recomputed when the current date is the first day of the current month. + """ + + block_range_list, months_list = compute_block_and_month_range( + node, recompute_previous_month + ) + # we note that the block range computed above is meant to be interpreted as + # a closed interval + for i, (start_block, end_block) in enumerate(block_range_list): + network_name = "ethereum" if network == "mainnet" else network + table_name = type_of_data + "_data_" + network_name + "_" + months_list[i] + block_range = BlockRange(block_from=start_block, block_to=end_block) + log.info( + f"About to process block range ({start_block}, {end_block}) for month {months_list[i]}" + ) + if type_of_data == "batch": + data = orderbook.get_batch_data(block_range, config) + else: + data = orderbook.get_order_data(block_range, config) + log.info("SQL query successfully executed. About to update analytics table.") + + data.to_sql( + table_name, + OrderbookFetcher.pg_engine(OrderbookEnv.ANALYTICS), + if_exists="replace", + ) + log.info( + f"{type_of_data} data sync run completed successfully for month {months_list[i]}" + ) + + +def sync_data() -> None: + """ + Main function + """ + load_dotenv() + args = ScriptArgs() + orderbook = OrderbookFetcher() + network = os.environ.get("NETWORK", "mainnet") + config = AccountingConfig.from_network(Network(os.environ["NETWORK"])) + log.info(f"Network is set to: {network}") + + if args.sync_table == SyncTable.BATCH_DATA: + asyncio.run( + sync_data_to_db( + "batch", + web3, + orderbook, + network, + config, + recompute_previous_month=False, + ) + ) + elif args.sync_table == SyncTable.ORDER_DATA: + asyncio.run( + sync_data_to_db( + "order", + web3, + orderbook, + network, + config, + recompute_previous_month=False, + ) + ) + else: + log.error(f"unsupported sync_table '{args.sync_table}'") + + +if __name__ == "__main__": + sync_data() diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py new file mode 100644 index 00000000..11862765 --- /dev/null +++ b/src/fetch/orderbook.py @@ -0,0 +1,217 @@ +"""Basic client for connecting to postgres database with login credentials""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +import pandas as pd +from dotenv import load_dotenv +from pandas import DataFrame +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine +from src.config import AccountingConfig +from src.logger import set_log +from src.models.block_range import BlockRange +from src.utils.query_file import open_query + +log = set_log(__name__) + +MAX_PROCESSING_DELAY = 10 + + +class OrderbookEnv(Enum): + """ + Enum for distinguishing between CoW Protocol's staging and production environment + """ + + BARN = "BARN" + PROD = "PROD" + ANALYTICS = "ANALYTICS" + + def __str__(self) -> str: + return str(self.value) + + +@dataclass +class OrderbookFetcher: + """ + A pair of Dataframes primarily intended to store query results + from production and staging orderbook databases + """ + + @staticmethod + def pg_engine(db_env: OrderbookEnv) -> Engine: + """Returns a connection to postgres database""" + load_dotenv() + if db_env == OrderbookEnv.ANALYTICS: + db_url = os.environ["ANALYTICS_DB_URL"] + else: + db_url = os.environ[f"{db_env}_DB_URL"] + + db_string = f"postgresql+psycopg2://{db_url}" + return create_engine(db_string) + + @classmethod + def _read_query_for_env( + cls, query: str, env: OrderbookEnv, data_types: Optional[dict[str, str]] = None + ) -> DataFrame: + return pd.read_sql_query(query, con=cls.pg_engine(env), dtype=data_types) + + @classmethod + def _query_both_dbs( + cls, + query_prod: str, + query_barn: str, + data_types: Optional[dict[str, str]] = None, + ) -> tuple[DataFrame, DataFrame]: + barn = cls._read_query_for_env(query_barn, OrderbookEnv.BARN, data_types) + prod = cls._read_query_for_env(query_prod, OrderbookEnv.PROD, data_types) + return barn, prod + + @classmethod + def run_batch_data_query( + cls, block_range: BlockRange, config: AccountingConfig + ) -> DataFrame: + """ + Fetches and validates Batch Data DataFrame as concatenation from Prod and Staging DB + """ + load_dotenv() + batch_data_query_prod = ( + open_query("orderbook/prod_batch_rewards.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + .replace( + "{{EPSILON_LOWER}}", str(config.reward_config.batch_reward_cap_lower) + ) + .replace( + "{{EPSILON_UPPER}}", str(config.reward_config.batch_reward_cap_upper) + ) + .replace("{{results}}", "dune_sync_batch_data_table") + ) + batch_data_query_barn = ( + open_query("orderbook/barn_batch_rewards.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + .replace( + "{{EPSILON_LOWER}}", str(config.reward_config.batch_reward_cap_lower) + ) + .replace( + "{{EPSILON_UPPER}}", str(config.reward_config.batch_reward_cap_upper) + ) + .replace("{{results}}", "dune_sync_batch_data_table") + ) + data_types = { + # According to this: https://stackoverflow.com/a/11548224 + # capitalized int64 means `Optional` and it appears to work. + "block_number": "Int64", + "block_deadline": "int64", + } + barn, prod = cls._query_both_dbs( + batch_data_query_prod, batch_data_query_barn, data_types + ) + + # Warn if solver appear in both environments. + if not set(prod.solver).isdisjoint(set(barn.solver)): + log.warning( + f"solver overlap in {block_range}: solvers " + f"{set(prod.solver).intersection(set(barn.solver))} part of both prod and barn" + ) + + if not prod.empty and not barn.empty: + return pd.concat([prod, barn]) + if not prod.empty: + return prod.copy() + if not barn.empty: + return barn.copy() + return pd.DataFrame() + + @classmethod + def get_batch_data( + cls, block_range: BlockRange, config: AccountingConfig + ) -> DataFrame: + """ + Decomposes the block range into buckets of X blocks each, + where X depends on the chain, so as to ensure the + batch data query runs fast enough. + At the end, it concatenates everything into one data frame + """ + start = block_range.block_from + end = block_range.block_to + bucket_size = config.data_processing_config.bucket_size + res = [] + while start <= end: + size = min(end - start + 1, bucket_size) + log.info(f"About to process block range ({start}, {start + size - 1})") + res.append( + cls.run_batch_data_query( + BlockRange(block_from=start, block_to=start + size - 1), + config=config, + ) + ) + start = start + size + return pd.concat(res) + + @classmethod + def run_order_data_query(cls, block_range: BlockRange) -> DataFrame: + """ + Fetches and validates Order Data DataFrame as concatenation from Prod and Staging DB + """ + cow_reward_query_prod = ( + open_query("orderbook/order_data.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + .replace("{{env}}", "prod") + ) + cow_reward_query_barn = ( + open_query("orderbook/order_data.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + .replace("{{env}}", "barn") + ) + data_types = {"block_number": "int64", "amount": "float64"} + barn, prod = cls._query_both_dbs( + cow_reward_query_prod, cow_reward_query_barn, data_types + ) + + # Warn if solver appear in both environments. + if not set(prod.solver).isdisjoint(set(barn.solver)): + log.warning( + f"solver overlap in {block_range}: solvers " + f"{set(prod.solver).intersection(set(barn.solver))} part of both prod and barn" + ) + + if not prod.empty and not barn.empty: + return pd.concat([prod, barn]) + if not prod.empty: + return prod.copy() + if not barn.empty: + return barn.copy() + return pd.DataFrame() + + @classmethod + def get_order_data( + cls, block_range: BlockRange, config: AccountingConfig + ) -> DataFrame: + """ + Decomposes the block range into buckets of 10k blocks each, + so as to ensure the batch data query runs fast enough. + At the end, it concatenates everything into one data frame + """ + load_dotenv() + start = block_range.block_from + end = block_range.block_to + bucket_size = config.data_processing_config.bucket_size + res = [] + while start < end: + size = min(end - start + 1, bucket_size) + log.info(f"About to process block range ({start}, {start + size - 1})") + res.append( + cls.run_order_data_query( + BlockRange(block_from=start, block_to=start + size - 1) + ) + ) + start = start + size + return pd.concat(res) diff --git a/src/models/block_range.py b/src/models/block_range.py new file mode 100644 index 00000000..a1f326dd --- /dev/null +++ b/src/models/block_range.py @@ -0,0 +1,23 @@ +""" +BlockRange Model is just a data class for left and right bounds +""" + +from dataclasses import dataclass + + +@dataclass +class BlockRange: + """ + Basic dataclass for an Ethereum block range with some Dune compatibility methods. + TODO (easy) - this data class could probably live in dune-client. + https://github.com/cowprotocol/dune-bridge/issues/40 + """ + + block_from: int + block_to: int + + def __str__(self) -> str: + return f"BlockRange(from={self.block_from}, to={self.block_to})" + + def __repr__(self) -> str: + return str(self) diff --git a/src/models/tables.py b/src/models/tables.py new file mode 100644 index 00000000..78f3c67a --- /dev/null +++ b/src/models/tables.py @@ -0,0 +1,18 @@ +"""Data structure containing the supported sync tables""" + +from enum import Enum + + +class SyncTable(Enum): + """Enum for Deployment Supported Table Sync""" + + BATCH_DATA = "batch_data" + ORDER_DATA = "order_data" + + def __str__(self) -> str: + return str(self.value) + + @staticmethod + def supported_tables() -> list[str]: + """Returns a list of supported tables (i.e. valid object contructors).""" + return [str(t) for t in list(SyncTable)] diff --git a/src/pg_client.py b/src/pg_client.py index ce6255e4..d6dfb951 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -46,6 +46,7 @@ def get_solver_rewards( .replace("{{end_block}}", end_block) .replace("{{EPSILON_LOWER}}", str(reward_cap_lower)) .replace("{{EPSILON_UPPER}}", str(reward_cap_upper)) + .replace("{{results}}", "solver_rewards_script_table") ) batch_reward_query_barn = ( open_query("orderbook/barn_batch_rewards.sql") @@ -53,6 +54,7 @@ def get_solver_rewards( .replace("{{end_block}}", end_block) .replace("{{EPSILON_LOWER}}", str(reward_cap_lower)) .replace("{{EPSILON_UPPER}}", str(reward_cap_upper)) + .replace("{{results}}", "solver_rewards_script_table") ) results = []