diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 62f0c7275..3b41745a1 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -8,7 +8,7 @@ jobs: env: V3_SUBGRAPH: https://api.studio.thegraph.com/proxy/31386/balancer-v3-sepolia/version/latest V3_POOLS_SUBGRAPH: https://api.studio.thegraph.com/proxy/31386/balancer-pools-v3-sepolia/version/latest - BALANCER_SUBGRAPH: https://api.thegraph.com/subgraphs/name/beethovenxfi/beethovenx + BALANCER_SUBGRAPH: https://api.thegraph.com/subgraphs/name/balancer-labs/balancer-v2 MASTERCHEF_SUBGRAPH: https://api.thegraph.com/subgraphs/name/beethovenxfi/masterchefv2 BLOCKS_SUBGRAPH: https://api.thegraph.com/subgraphs/name/danielmkm/optimism-blocks BEETS_BAR_SUBGRAPH: https://api.thegraph.com/subgraphs/name/beethovenxfi/beets-bar diff --git a/codegen.yml b/codegen.yml index 2e04ffd26..ccc7dcafc 100644 --- a/codegen.yml +++ b/codegen.yml @@ -20,9 +20,9 @@ generates: schema: ${V3_SUBGRAPH} plugins: - schema-ast - modules/subgraphs/balancer-v3-pools/generated/types.ts: + modules/sources/subgraphs/balancer-v3-pools/generated/types.ts: schema: ${V3_POOLS_SUBGRAPH} - documents: 'modules/subgraphs/balancer-v3-pools/*.graphql' + documents: 'modules/sources/subgraphs/balancer-v3-pools/*.graphql' plugins: - typescript - typescript-operations @@ -33,7 +33,7 @@ generates: BigInt: string Bytes: string BigDecimal: string - modules/subgraphs/balancer-v3-pools/generated/balancer-v3-pools-schema.graphql: + modules/sources/subgraphs/balancer-v3-pools/generated/balancer-v3-pools-schema.graphql: schema: ${V3_POOLS_SUBGRAPH} plugins: - schema-ast diff --git a/config/index.ts b/config/index.ts index 384c7b540..10454b102 100644 --- a/config/index.ts +++ b/config/index.ts @@ -1,5 +1,6 @@ import { Chain } from '@prisma/client'; -import { sepoliaConfig } from './sepolia'; +import sepoliaConfig from './sepolia'; +import mainnetConfig from './mainnet'; import { NetworkData } from '../modules/network/network-config-types'; export default { @@ -8,7 +9,7 @@ export default { [Chain.BASE]: {} as NetworkData, [Chain.FANTOM]: {} as NetworkData, [Chain.GNOSIS]: {} as NetworkData, - [Chain.MAINNET]: {} as NetworkData, + [Chain.MAINNET]: mainnetConfig, [Chain.OPTIMISM]: {} as NetworkData, [Chain.POLYGON]: {} as NetworkData, [Chain.SEPOLIA]: sepoliaConfig, diff --git a/config/mainnet.ts b/config/mainnet.ts new file mode 100644 index 000000000..99841cde5 --- /dev/null +++ b/config/mainnet.ts @@ -0,0 +1,376 @@ +import { BigNumber } from 'ethers'; +import { env } from '../app/env'; +import { DeploymentEnv, NetworkData } from '../modules/network/network-config-types'; + +const underlyingTokens = { + USDC: '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48', + USDT: '0xdac17f958d2ee523a2206206994597c13d831ec7', + DAI: '0x6b175474e89094c44da98b954eedeac495271d0f', + wETH: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2', +}; + +export default { + chain: { + slug: 'ethereum', + id: 1, + nativeAssetAddress: '0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE', + wrappedNativeAssetAddress: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2', + prismaId: 'MAINNET', + gqlId: 'MAINNET', + }, + subgraphs: { + startDate: '2019-04-20', + balancer: 'https://api.thegraph.com/subgraphs/name/balancer-labs/balancer-v2', + beetsBar: 'https://', + blocks: 'https://api.thegraph.com/subgraphs/name/blocklytics/ethereum-blocks', + gauge: 'https://api.thegraph.com/subgraphs/name/balancer-labs/balancer-gauges', + veBalLocks: 'https://api.thegraph.com/subgraphs/name/balancer-labs/balancer-gauges', + userBalances: 'https://', + }, + eth: { + address: '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee', + addressFormatted: '0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE', + symbol: 'ETH', + name: 'Ether', + }, + weth: { + address: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2', + addressFormatted: '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', + }, + coingecko: { + nativeAssetId: 'ethereum', + platformId: 'ethereum', + excludedTokenAddresses: [ + '0x04c154b66cb340f3ae24111cc767e0184ed00cc6', // pxETH, has Coingecko entry but no price + '0xb45ad160634c528cc3d2926d9807104fa3157305', // sDOLA, has Coingecko entry but no price + ], + }, + rpcUrl: + env.INFURA_API_KEY && (env.DEPLOYMENT_ENV as DeploymentEnv) === 'main' + ? `https://mainnet.infura.io/v3/${env.INFURA_API_KEY}` + : 'https://rpc.eth.gateway.fm', + rpcMaxBlockRange: 700, + protocolToken: 'bal', + bal: { + address: '0xba100000625a3754423978a60c9317c58a424e3d', + }, + veBal: { + address: '0xc128a9954e6c874ea3d62ce62b468ba073093f25', + delegationProxy: '0x0000000000000000000000000000000000000000', + }, + gaugeControllerAddress: '0xc128468b7ce63ea702c1f104d55a2566b13d3abd', + gaugeControllerHelperAddress: '0x8e5698dc4897dc12243c8642e77b4f21349db97c', + gyro: { + config: '0xac89cc9d78bbad7eb3a02601b4d65daa1f908aa6', + }, + balancer: { + v2: { + vaultAddress: '0xba12222222228d8ba445958a75a0704d566bf2c8', + defaultSwapFeePercentage: '0.5', + defaultYieldFeePercentage: '0.5', + tokenAdmin: '0xf302f9f50958c5593770fdf4d4812309ff77414f', + balancerQueriesAddress: '0xe39b5e3b6d74016b2f6a9673d7d7493b6df549d5', + }, + v3: { + vaultAddress: '0xba12222222228d8ba445958a75a0704d566bf2c8', + defaultSwapFeePercentage: '0.5', + defaultYieldFeePercentage: '0.5', + tokenAdmin: '0xf302f9f50958c5593770fdf4d4812309ff77414f', + }, + }, + multicall: '0x5ba1e12693dc8f9c48aad8770482f4739beed696', + multicall3: '0xca11bde05977b3631167028862be2a173976ca11', + avgBlockSpeed: 10, + sor: { + main: { + url: 'https://uu6cfghhd5lqa7py3nojxkivd40zuugb.lambda-url.ca-central-1.on.aws/', + maxPools: 8, + forceRefresh: false, + gasPrice: BigNumber.from(10), + swapGas: BigNumber.from('1000000'), + poolIdsToExclude: [ + '0xbfa413a2ff0f20456d57b643746133f54bfe0cd20000000000000000000004c3', + '0xdc063deafce952160ec112fa382ac206305657e60000000000000000000004c4', // Linear pools that cause issues with new b-sdk + ], + }, + canary: { + url: 'https://ksa66wlkjbvteijxmflqjehsay0jmekw.lambda-url.eu-central-1.on.aws/', + maxPools: 8, + forceRefresh: false, + gasPrice: BigNumber.from(10), + swapGas: BigNumber.from('1000000'), + poolIdsToExclude: [ + '0xbfa413a2ff0f20456d57b643746133f54bfe0cd20000000000000000000004c3', + '0xdc063deafce952160ec112fa382ac206305657e60000000000000000000004c4', // Linear pools that cause issues with new b-sdk + ], + }, + }, + ybAprConfig: { + aave: { + v2: { + subgraphUrl: 'https://api.thegraph.com/subgraphs/name/aave/protocol-v2', + tokens: { + USDC: { + underlyingAssetAddress: underlyingTokens.USDC, + aTokenAddress: '0xbcca60bb61934080951369a648fb03df4f96263c', + wrappedTokens: { + waUSDC: '0xd093fa4fb80d09bb30817fdcd442d4d02ed3e5de', + }, + }, + USDT: { + underlyingAssetAddress: underlyingTokens.USDT, + aTokenAddress: '0x3ed3b47dd13ec9a98b44e6204a523e766b225811', + wrappedTokens: { + waUSDT: '0xf8fd466f12e236f4c96f7cce6c79eadb819abf58', + }, + }, + DAI: { + underlyingAssetAddress: underlyingTokens.DAI, + aTokenAddress: '0x028171bca77440897b824ca71d1c56cac55b68a3', + wrappedTokens: { + waDAI: '0x02d60b84491589974263d922d9cc7a3152618ef6', + }, + }, + }, + }, + v3: { + subgraphUrl: 'https://api.thegraph.com/subgraphs/name/aave/protocol-v3', + tokens: { + USDC: { + underlyingAssetAddress: underlyingTokens.USDC, + aTokenAddress: '0x98c23e9d8f34fefb1b7bd6a91b7ff122f4e16f5c', + wrappedTokens: { + waUSDC: '0x57d20c946a7a3812a7225b881cdcd8431d23431c', + stataEthUSDC: '0x02c2d189b45ce213a40097b62d311cf0dd16ec92', + }, + }, + USDT: { + underlyingAssetAddress: underlyingTokens.USDT, + aTokenAddress: '0x23878914efe38d27c4d67ab83ed1b93a74d4086a', + wrappedTokens: { + waUSDT: '0xa7e0e66f38b8ad8343cff67118c1f33e827d1455', + stataEthUSDT: '0x65799b9fd4206cdaa4a1db79254fcbc2fd2ffee6', + }, + }, + DAI: { + underlyingAssetAddress: underlyingTokens.DAI, + aTokenAddress: '0x018008bfb33d285247a21d44e50697654f754e63', + wrappedTokens: { + waDAI: '0x098256c06ab24f5655c5506a6488781bd711c14b', + stataEthDAI: '0xeb708639e8e518b86a916db3685f90216b1c1c67', + }, + }, + wETH: { + underlyingAssetAddress: underlyingTokens.wETH, + aTokenAddress: '0x4d5f47fa6a74757f35c14fd3a6ef8e3c9bc514e8', + wrappedTokens: { + waWETH: '0x59463bb67ddd04fe58ed291ba36c26d99a39fbc6', + stataEthWETH: '0x03928473f25bb2da6bc880b07ecbadc636822264', + }, + }, + }, + }, + }, + bloom: { + tokens: { + tbyFeb1924: { + address: '0xc4cafefbc3dfea629c589728d648cb6111db3136', + feedAddress: '0xde1f5f2d69339171d679fb84e4562febb71f36e6', + }, + }, + }, + euler: { + subgraphUrl: 'https://api.thegraph.com/subgraphs/name/euler-xyz/euler-mainnet', + tokens: { + eUSDC: { address: '0xeb91861f8a4e1c12333f42dce8fb0ecdc28da716' }, + eDAI: { address: '0xe025e3ca2be02316033184551d4d3aa22024d9dc' }, + eUSDT: { address: '0x4d19f33948b99800b6113ff3e83bec9b537c85d2' }, + eFRAX: { address: '0x5484451a88a35cd0878a1be177435ca8a0e4054e' }, + }, + }, + gearbox: { + sourceUrl: 'https://mainnet.gearbox.foundation/api/pools', + tokens: { + dDAI: { address: '0x6cfaf95457d7688022fc53e7abe052ef8dfbbdba' }, + dUSDC: { address: '0xc411db5f5eb3f7d552f9b8454b2d74097ccde6e3' }, + }, + }, + idle: { + sourceUrl: 'https://api.idle.finance/junior-rates/', + authorizationHeader: + 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJjbGllbnRJZCI6IkFwcDciLCJpYXQiOjE2NzAyMzc1Mjd9.L12KJEt8fW1Cvy3o7Nl4OJ2wtEjzlObaAYJ9aC_CY6M', + tokens: { + idleDAI: { + address: '0xec9482040e6483b7459cc0db05d51dfa3d3068e1', + wrapped4626Address: '0x0c80f31b840c6564e6c5e18f386fad96b63514ca', + }, + idleUSDC: { + address: '0xdc7777c771a6e4b3a82830781bdde4dbc78f320e', + wrapped4626Address: '0xc3da79e0de523eef7ac1e4ca9abfe3aac9973133', + }, + idleUSDT: { + address: '0xfa3afc9a194babd56e743fa3b7aa2ccbed3eaaad', + wrapped4626Address: '0x544897a3b944fdeb1f94a0ed973ea31a80ae18e1', + }, + }, + }, + maker: { + tokens: { + sDAI: { + address: '0x83f20f44975d03b1b09e64809b757c47f942beea', + potAddress: '0x197e90f9fad81970ba7976f33cbd77088e5d7cf7', + }, + }, + }, + tessera: { + tokens: { + sAPE: { + tesseraPoolAddress: '0x5954ab967bc958940b7eb73ee84797dc8a2afbb9', + tokenAddress: '0x7966c5bae631294d7cffcea5430b78c2f76db6fa', + }, + }, + }, + tranchess: { + sourceUrl: 'https://tranchess.com/eth/api/v3/funds', + tokens: { + qETH: { + address: '0x93ef1ea305d11a9b2a3ebb9bb4fcc34695292e7d', + underlyingAssetName: 'WETH', + }, + }, + }, + stakewise: '0xf1c9acdc66974dfb6decb12aa385b9cd01190e38', + etherfi: '0xcd5fe23c85820f7b72d0926fc9b05b43e359b7ee', + sveth: true, + defaultHandlers: { + vETH: { + tokenAddress: '0x4bc3263eb5bb2ef7ad9ab6fb68be80e43b43801f', + sourceUrl: 'https://apy.liebi.com/veth', + path: 'veth', + }, + stETH: { + tokenAddress: '0xae7ab96520de3a18e5e111b5eaab095312d7fe84', + sourceUrl: 'https://eth-api.lido.fi/v1/protocol/steth/apr/sma', + path: 'data.smaApr', + isIbYield: true, + }, + wstETH: { + tokenAddress: '0x7f39c581f595b53c5cb19bd0b3f8da6c935e2ca0', + sourceUrl: 'https://eth-api.lido.fi/v1/protocol/steth/apr/sma', + path: 'data.smaApr', + isIbYield: true, + }, + cbETH: { + tokenAddress: '0xbe9895146f7af43049ca1c1ae358b0541ea49704', + sourceUrl: 'https://api.exchange.coinbase.com/wrapped-assets/CBETH/', + path: 'apy', + scale: 1, + isIbYield: true, + }, + sfrxETH: { + tokenAddress: '0xac3e018457b222d93114458476f3e3416abbe38f', + sourceUrl: 'https://api.frax.finance/v2/frxeth/summary/latest', + path: 'sfrxethApr', + isIbYield: true, + }, + StaFirETH: { + tokenAddress: '0x9559aaa82d9649c7a7b220e7c461d2e74c9a3593', + sourceUrl: 'https://drop-api.stafi.io/reth/v1/poolData', + path: 'data.stakeApr', + isIbYield: true, + }, + rETH: { + tokenAddress: '0xae78736cd615f374d3085123a210448e74fc6393', + sourceUrl: 'https://rocketpool.net/api/mainnet/payload', + path: 'rethAPR', + isIbYield: true, + }, + USDR: { + tokenAddress: '0xaf0d9d65fc54de245cda37af3d18cbec860a4d4b', + sourceUrl: 'http://usdr-api.us-east-1.elasticbeanstalk.com/usdr/apy', + path: 'usdr', + isIbYield: true, + }, + swETH: { + tokenAddress: '0xf951e335afb289353dc249e82926178eac7ded78', + sourceUrl: 'https://v3.svc.swellnetwork.io/api/tokens/sweth/apr', + isIbYield: true, + }, + wjAURA: { + tokenAddress: '0x198d7387fa97a73f05b8578cdeff8f2a1f34cd1f', + sourceUrl: 'https://data.jonesdao.io/api/v1/jones/apy-wjaura', + path: 'wjauraApy', + isIbYield: true, + }, + ETHx: { + tokenAddress: '0xa35b1b31ce002fbf2058d22f30f95d405200a15b', + sourceUrl: 'https://universe.staderlabs.com/eth/apy', + path: 'value', + isIbYield: true, + }, + usdm: { + tokenAddress: '0x57f5e098cad7a3d1eed53991d4d66c45c9af7812', + sourceUrl: 'https://apy.prod.mountainprotocol.com', + path: 'value', + isIbYield: true, + scale: 1, + }, + ankrETH: { + tokenAddress: '0xe95a203b1a91a908f9b9ce46459d101078c2c3cb', + sourceUrl: 'https://api.staking.ankr.com/v1alpha/metrics', + path: 'services.{serviceName == "eth"}.apy', + isIbYield: true, + }, + ezETH: { + tokenAddress: '0xbf5495efe5db9ce00f80364c8b423567e58d2110', + sourceUrl: 'https://app.renzoprotocol.com/api/apr', + path: 'apr', + isIbYield: true, + }, + rsETH: { + tokenAddress: '0xa1290d69c65a6fe4df752f95823fae25cb99e5a7', + sourceUrl: 'https://universe.kelpdao.xyz/rseth/apy', + path: 'value', + isIbYield: true, + }, + sDOLA: { + tokenAddress: '0xb45ad160634c528cc3d2926d9807104fa3157305', + sourceUrl: 'https://www.inverse.finance/api/dola-staking', + path: 'apr', + isIbYield: true, + }, + rswETH: { + tokenAddress: '0xfae103dc9cf190ed75350761e95403b7b8afa6c0', + sourceUrl: 'https://v3-lrt.svc.swellnetwork.io/api/tokens/rsweth/apr', + isIbYield: true, + }, + }, + }, + beefy: { + linearPools: [''], + }, + datastudio: { + main: { + user: 'datafeed-service@datastudio-366113.iam.gserviceaccount.com', + sheetId: '11anHUEb9snGwvB-errb5HvO8TvoLTRJhkDdD80Gxw1Q', + databaseTabName: 'Database v2', + compositionTabName: 'Pool Composition v2', + emissionDataTabName: 'EmissionData', + }, + canary: { + user: 'datafeed-service@datastudio-366113.iam.gserviceaccount.com', + sheetId: '1HnJOuRQXGy06tNgqjYMzQNIsaCSCC01Yxe_lZhXBDpY', + databaseTabName: 'Database v2', + compositionTabName: 'Pool Composition v2', + emissionDataTabName: 'EmissionData', + }, + }, + monitoring: { + main: { + alarmTopicArn: 'arn:aws:sns:ca-central-1:118697801881:api_alarms', + }, + canary: { + alarmTopicArn: 'arn:aws:sns:eu-central-1:118697801881:api_alarms', + }, + }, +}; diff --git a/config/sepolia.ts b/config/sepolia.ts index 2e38acee1..bd172c715 100644 --- a/config/sepolia.ts +++ b/config/sepolia.ts @@ -2,7 +2,7 @@ import { BigNumber } from 'ethers'; import { env } from '../app/env'; import { NetworkData } from '../modules/network/network-config-types'; -export const sepoliaConfig: NetworkData = { +export default { chain: { slug: 'sepolia', id: 11155111, @@ -39,6 +39,8 @@ export const sepoliaConfig: NetworkData = { }, rpcUrl: env.GROVE_CITY ? `https://sepolia.rpc.grove.city/v1/${env.GROVE_CITY}` + : env.ALCHEMY_API_KEY + ? `https://eth-sepolia.g.alchemy.com/v2/${env.ALCHEMY_API_KEY}` : env.INFURA_API_KEY ? `https://sepolia.infura.io/v3/${env.INFURA_API_KEY}` : 'https://gateway.tenderly.co/public/sepolia', diff --git a/modules/actions/pool/add-pools-from-subgraph.test.ts b/modules/actions/pool/add-pools-from-subgraph.test.ts deleted file mode 100644 index 9552712e9..000000000 --- a/modules/actions/pool/add-pools-from-subgraph.test.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { addMissingPoolsFromSubgraph } from './add-pools-from-subgraph'; -import { prisma } from '../../../prisma/prisma-client'; -import { PrismaPool } from '@prisma/client'; -import { - SwapFragment, - VaultPoolFragment as VaultSubgraphPoolFragment, -} from '../../sources/subgraphs/balancer-v3-vault/generated/types'; -import { TypePoolFragment as PoolSubgraphPoolFragment } from '../../subgraphs/balancer-v3-pools/generated/types'; -import { GraphQLClient } from 'graphql-request'; - -// Mock the module dependencies -jest.mock('../../sources/contracts', () => ({ - ...jest.requireActual('../../sources/contracts'), - fetchErc20Headers: jest.fn().mockResolvedValue({ '2': { name: 'name', symbol: 'symbol' } }), - fetchWeightedPoolData: jest.fn().mockResolvedValue({}), - fetchPoolTokens: jest.fn().mockResolvedValue({}), -})); - -jest.mock('../../../prisma/prisma-client', () => ({ - prisma: { - prismaPool: { - findMany: jest.fn().mockResolvedValue([{ id: '1' }] as PrismaPool[]), - create: jest.fn(), - }, - prismaToken: { - findMany: jest.fn(), - createMany: jest.fn(), - }, - prismaPoolExpandedTokens: { - createMany: jest.fn(), - }, - prismaPoolTokenDynamicData: { - createMany: jest.fn(), - }, - }, -})); - -//TODO cant mock properly -// describe('syncPools', () => { -// const vaultSubgraphClient = { -// getAllInitializedPools: jest.fn().mockResolvedValue([{ id: '1' }, { id: '2' }] as VaultSubgraphPoolFragment[]), -// getSwapsSince: jest.fn().mockResolvedValue([{ id: '1' }, { id: '2' }] as SwapFragment[]), -// }; -// const poolSubgraphClient = { -// Pools: jest.fn().mockResolvedValue({ -// pools: [ -// { id: '1', factory: { id: '1' } }, -// { id: '2', factory: { id: '1' } }, -// ] as PoolSubgraphPoolFragment[], -// }), -// }; - -// beforeEach(() => { -// jest.clearAllMocks(); -// return addMissingPoolsFromSubgraph(vaultSubgraphClient, poolSubgraphClient, 'SEPOLIA'); -// }); - -// it('should fetch pools from vault subgraph', async () => { -// expect(vaultSubgraphClient.getAllInitializedPools).toHaveBeenCalled(); -// }); - -// it('should fetch pools from pools subgraph', async () => { -// expect(poolSubgraphClient.Pools).toHaveBeenCalled(); -// }); - -// it('should store missing pools in the database', async () => { -// expect(prisma.prismaPool.create).toHaveBeenCalledWith({ data: expect.objectContaining({ id: '2' }) }); -// }); -// }); diff --git a/modules/actions/pool/add-pools-from-subgraph.ts b/modules/actions/pool/add-pools-from-subgraph.ts deleted file mode 100644 index 5c3ab9e25..000000000 --- a/modules/actions/pool/add-pools-from-subgraph.ts +++ /dev/null @@ -1,117 +0,0 @@ -import { Chain, Prisma, PrismaPoolType } from '@prisma/client'; -import { prisma } from '../../../prisma/prisma-client'; -import { - poolTransformer, - poolTokensTransformer, - poolTokensDynamicDataTransformer, - poolExpandedTokensTransformer, -} from '../../sources/transformers'; -import { V3PoolsSubgraphClient } from '../../subgraphs/balancer-v3-pools'; -import { BalancerVaultSubgraphSource } from '../../sources/subgraphs/balancer-v3-vault'; -import _ from 'lodash'; -import { tokensTransformer } from '../../sources/transformers/tokens-transformer'; - -type PoolDbEntry = { - pool: Prisma.PrismaPoolCreateInput; - poolTokenDynamicData: Prisma.PrismaPoolTokenDynamicDataCreateManyInput[]; - poolExpandedTokens: Prisma.PrismaPoolExpandedTokensCreateManyInput[]; -}; -/** - * Makes sure that all pools are synced in the database - * - * @param vaultSubgraphClient - * @param poolSubgraphClient - * @param chain - * @returns syncedPools - the pools that were synced - */ -export async function addMissingPoolsFromSubgraph( - vaultSubgraphClient: BalancerVaultSubgraphSource, - poolSubgraphClient: V3PoolsSubgraphClient, - chain = 'SEPOLIA' as Chain, -): Promise { - // Fetch pools from subgraph - const vaultSubgraphPools = await vaultSubgraphClient.getAllInitializedPools(); - const { pools: poolSubgraphPools } = await poolSubgraphClient.Pools(); - - // Find pools missing from the database - const dbPools = await prisma.prismaPool.findMany({ where: { chain, vaultVersion: 3 } }); - const dbPoolIds = new Set(dbPools.map((pool) => pool.id.toLowerCase())); - const missingPools = vaultSubgraphPools.filter((pool) => !dbPoolIds.has(pool.id)); - - // Store pool tokens and BPT in the tokens table before creating the pools - try { - const allTokens = tokensTransformer(missingPools, chain); - await prisma.prismaToken.createMany({ - data: allTokens, - skipDuplicates: true, - }); - } catch (e) { - console.error('Error creating tokens', e); - } - - // Transform pool data for the database - const dbEntries: PoolDbEntry[] = []; - - missingPools.forEach((missingPool) => { - const vaultSubgraphPool = vaultSubgraphPools.find((pool) => pool.id === missingPool.id); - const poolSubgraphPool = poolSubgraphPools.find((pool) => pool.id === missingPool.id); - if (!vaultSubgraphPool || !poolSubgraphPool) { - // That won't happen, but TS doesn't know that - return null; - } - const dbEntry: PoolDbEntry = { - pool: { - ...poolTransformer(vaultSubgraphPool, poolSubgraphPool, chain), - typeData: JSON.stringify({}), - tokens: { - createMany: { - // TODO: Will be great to create all the token data here, including dynamic data - // but for now we can only store static data, because prisma doesn't support nested createMany - // to create dynamic data tabels as well. One solution is to move "dynamicData" to the tokens table - data: poolTokensTransformer(vaultSubgraphPool, chain), - }, - }, - // placeholder data, will be updated with onchain values - dynamicData: { - create: { - id: vaultSubgraphPool.id, - swapFee: '0', - blockNumber: Number(vaultSubgraphPool.blockNumber), - swapEnabled: true, - totalLiquidity: 1, - totalShares: vaultSubgraphPool.totalShares, - totalSharesNum: parseFloat(vaultSubgraphPool.totalShares), - }, - }, - }, - poolTokenDynamicData: poolTokensDynamicDataTransformer(vaultSubgraphPool, poolSubgraphPool, chain), - poolExpandedTokens: poolExpandedTokensTransformer(vaultSubgraphPool, chain), - }; - dbEntries.push(dbEntry); - }); - - // Store missing pools in the database - const added: string[] = []; - for (const entry of dbEntries) { - try { - await prisma.prismaPool.create({ data: entry.pool }); - - await prisma.prismaPoolTokenDynamicData.createMany({ - skipDuplicates: true, - data: entry.poolTokenDynamicData, - }); - - await prisma.prismaPoolExpandedTokens.createMany({ - skipDuplicates: true, - data: entry.poolExpandedTokens, - }); - - added.push(entry.pool.id); - } catch (e) { - // TODO: handle errors - console.error(`Error creating pool ${entry.pool.id}`, e); - } - } - - return added; -} diff --git a/modules/actions/pool/get-changed-pools.ts b/modules/actions/pool/get-changed-pools.ts deleted file mode 100644 index b19423b1d..000000000 --- a/modules/actions/pool/get-changed-pools.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { Chain, Prisma, PrismaLastBlockSyncedCategory, PrismaPoolType } from '@prisma/client'; -import { prisma } from '../../../prisma/prisma-client'; -import { tokenService } from '../../token/token.service'; -import { fetchPoolTokenInfo, fetchPoolTokenRates } from '../../sources/contracts'; -import { ViemClient } from '../../sources/viem-client'; -import { fetchPoolData } from '../../sources/contracts/fetch-pool-data'; -import { formatEther, formatUnits, parseUnits } from 'viem'; -import { isSameAddress } from '@balancer-labs/sdk'; -import { prismaBulkExecuteOperations } from '../../../prisma/prisma-util'; -import { getPoolBalanceChanged } from '../../sources/logs/get-pool-balance-changed'; -import { start } from 'repl'; -import { getSwaps } from '../../sources/logs'; -import _ from 'lodash'; - -/** - * Get all pool IDs of pools that have emitted a poolBalanceChanged event - * - * @param vaultAddress - * @param viemClient - * @param chain - * @returns list of changed pool IDs - */ -export async function getChangedPools( - vaultAddress: string, - viemClient: ViemClient, - blockNumber: bigint, - chain = 'SEPOLIA' as Chain, -): Promise { - let lastSync = await prisma.prismaLastBlockSynced.findUnique({ - where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: chain } }, - }); - const lastSyncBlock = lastSync?.blockNumber ? BigInt(lastSync.blockNumber) : 0n; - const latestBlock = blockNumber; - - const startBlock = lastSyncBlock + 1n; - const endBlock = latestBlock; - - // no new blocks have been minted, needed for slow networks - if (startBlock > endBlock) { - return []; - } - - const poolBalanceChangedEvents = await getPoolBalanceChanged(vaultAddress, viemClient, startBlock, endBlock); - const poolIdsFromBalanceChangedEvents = poolBalanceChangedEvents.map((event) => event.args.pool!); - - const swapEvents = await getSwaps(vaultAddress, viemClient, startBlock, endBlock); - const poolIdsFromSwapEvents = swapEvents.map((event) => event.args.pool!); - - const changedPoolIds = _.uniq(poolIdsFromBalanceChangedEvents.concat(poolIdsFromSwapEvents)); - - return changedPoolIds; -} diff --git a/modules/actions/pool/sync-join-exits-v2.ts b/modules/actions/pool/sync-join-exits-v2.ts new file mode 100644 index 000000000..1ae6b779d --- /dev/null +++ b/modules/actions/pool/sync-join-exits-v2.ts @@ -0,0 +1,114 @@ +import { Chain, PoolEventType } from '@prisma/client'; +import { prisma } from '../../../prisma/prisma-client'; +import type { BalancerSubgraphService } from '../../subgraphs/balancer-subgraph/balancer-subgraph.service'; +import { JoinExit_OrderBy, OrderDirection } from '../../subgraphs/balancer-subgraph/generated/balancer-subgraph-types'; +import { roundToHour } from '../../common/time'; + +/** + * Get the join and exit events from the subgraph and store them in the database + * + * @param vaultSubgraphClient + */ +export const syncJoinExitsV2 = async (v2SubgraphClient: BalancerSubgraphService, chain: Chain): Promise => { + const vaultVersion = 2; + + // Get latest event from the DB + const latestEvent = await prisma.poolEvent.findFirst({ + where: { + type: { + in: ['JOIN', 'EXIT'], + }, + chain: chain, + vaultVersion, + }, + orderBy: { + blockNumber: 'desc', + }, + }); + + // Get events since the latest event or 100 days (it will be around 15k events on mainnet) + const hundredDaysAgo = Math.floor(+new Date(Date.now() - 100 * 24 * 60 * 60 * 1000) / 1000); + const where = + latestEvent?.blockTimestamp && latestEvent?.blockTimestamp > hundredDaysAgo + ? { block_gte: String(latestEvent.blockNumber) } + : { timestamp_gte: hundredDaysAgo }; + + // Get events + const { joinExits } = await v2SubgraphClient.getPoolJoinExits({ + first: 1000, + where: where, + orderBy: JoinExit_OrderBy.Timestamp, + orderDirection: OrderDirection.Asc, + }); + + // Store only the events that are not already in the DB + const existingEvents = await prisma.poolEvent.findMany({ + where: { + id: { in: joinExits.map((event) => event.id) }, + type: { + in: ['JOIN', 'EXIT'], + }, + chain: chain, + vaultVersion, + }, + }); + + const events = joinExits.filter((event) => !existingEvents.some((existing) => existing.id === event.id)); + + // Prepare DB entries + const dbEntries = await Promise.all( + events.map(async (event) => { + // TODO: Calculate USD amounts with token prices at the time of the event + // 🚨 Reading from the DB in a loop – will get slow with a large events volume + // But we need prices based on the event timestamp, so batching this should be based on timestamp ranges + const prices = await prisma.prismaTokenPrice.findMany({ + where: { + tokenAddress: { in: event.pool.tokensList }, + timestamp: roundToHour(Number(event.timestamp)), // 🚨 Assuming all prices are available hourly + chain: chain, + }, + include: { + token: true, + }, + }); + + const usd = event.pool.tokensList.map((address, index) => { + const price = prices.find((price) => price.tokenAddress === address); + return { + address: address, + amount: event.amounts[index], + valueUSD: Number(event.amounts[index]) * (price?.price || 0), // TODO: check USD amount + }; + }); + + return { + id: event.id, // tx + logIndex + tx: event.tx, + type: event.type === 'Join' ? PoolEventType.JOIN : PoolEventType.EXIT, + poolId: event.pool.id, + chain: chain, + userAddress: event.sender, + blockNumber: Number(event.block), + blockTimestamp: Number(event.timestamp), + logIndex: Number(event.id.substring(66)), + valueUSD: usd.reduce((acc, token) => acc + Number(token.valueUSD), 0), + payload: { + tokens: usd, + }, + }; + }), + ).catch((e) => { + console.error('Error preparing DB entries', e); + return []; + }); + + // Create entries and skip duplicates + await prisma.poolEvent.createMany({ + data: dbEntries, + skipDuplicates: true, + }); + + // TODO: do we need a separate function to update prices? If so, we should be syncing events first, then running a price on them + + return dbEntries.map((entry) => entry.id); +}; diff --git a/modules/actions/pool/sync-join-exits.ts b/modules/actions/pool/sync-join-exits.ts new file mode 100644 index 000000000..23e61c297 --- /dev/null +++ b/modules/actions/pool/sync-join-exits.ts @@ -0,0 +1,116 @@ +import { Chain, PoolEventType } from '@prisma/client'; +import { prisma } from '../../../prisma/prisma-client'; +import { V3VaultSubgraphClient } from '../../sources/subgraphs'; +import { formatUnits } from 'viem'; +import { JoinExit_OrderBy, OrderDirection } from '../../sources/subgraphs/balancer-v3-vault/generated/types'; +import { roundToHour } from '../../common/time'; + +/** + * Get the join and exit events from the subgraph and store them in the database + * + * @param vaultSubgraphClient + */ +export const syncJoinExits = async (vaultSubgraphClient: V3VaultSubgraphClient, chain: Chain): Promise => { + const vaultVersion = 3; + + // Get latest event from the DB + const latestEvent = await prisma.poolEvent.findFirst({ + where: { + type: { + in: ['JOIN', 'EXIT'], + }, + chain: chain, + vaultVersion, + }, + orderBy: { + blockNumber: 'desc', + }, + }); + + // Get events + const { joinExits } = await vaultSubgraphClient.JoinExits({ + first: 1000, + where: { + blockNumber_gt: String(latestEvent?.blockNumber || 0), + }, + orderBy: JoinExit_OrderBy.BlockNumber, + orderDirection: OrderDirection.Asc, + }); + + // Store only the events that are not already in the DB + const existingEvents = await prisma.poolEvent.findMany({ + where: { + id: { in: joinExits.map((event) => event.id) }, + type: { + in: ['JOIN', 'EXIT'], + }, + chain: chain, + vaultVersion, + }, + }); + + const events = joinExits.filter((event) => !existingEvents.some((existing) => existing.id === event.id)); + + // Prepare DB entries + const dbEntries = await Promise.all( + events.map(async (event) => { + // TODO: Calculate USD amounts with token prices at the time of the event + // 🚨 Reading from the DB in a loop – will get slow with a large events volume + const prices = await prisma.prismaTokenPrice.findMany({ + where: { + tokenAddress: { in: event.pool.tokens.map((token) => token.address) }, + timestamp: roundToHour(Number(event.blockTimestamp)), // 🚨 Assuming all prices are available hourly + chain: chain, + }, + include: { + token: true, + }, + }); + + const usd = event.pool.tokens.map((token) => { + const price = prices.find((price) => price.tokenAddress === token.address); + return { + address: token.address, + amount: event.amounts[token.index], + valueUSD: + Number(formatUnits(BigInt(event.amounts[token.index]), price?.token?.decimals ?? 18)) * + (price?.price || 0), // TODO: check USD amount + }; + }); + + return { + id: event.id, // tx + logIndex + tx: event.transactionHash, + type: event.type === 'Join' ? PoolEventType.JOIN : PoolEventType.EXIT, + poolId: event.pool.id, + chain: chain, + vaultVersion, + userAddress: event.user.id, + blockNumber: Number(event.blockNumber), + blockTimestamp: Number(event.blockTimestamp), + logIndex: Number(event.logIndex), + valueUSD: usd.reduce((acc, token) => acc + Number(token.valueUSD), 0), + payload: { + tokens: usd, + }, + }; + }), + ).catch((e) => { + console.error('Error preparing DB entries', e); + return []; + }); + + console.log(`Syncing ${dbEntries.length} join/exit events`); + + // Create entries and skip duplicates + await prisma.poolEvent + .createMany({ + data: dbEntries, + skipDuplicates: true, + }) + .catch((e) => { + console.error('Error creating DB entries', e); + }); + + return dbEntries.map((entry) => entry.id); +}; diff --git a/modules/actions/pool/sync-pools.test.ts b/modules/actions/pool/sync-pools.test.ts new file mode 100644 index 000000000..fe1114a95 --- /dev/null +++ b/modules/actions/pool/sync-pools.test.ts @@ -0,0 +1,85 @@ +import { syncPools } from './sync-pools'; +import { prisma } from '../../../prisma/prisma-client'; +import { PrismaPool } from '@prisma/client'; +import type { ViemClient } from '../../sources/viem-client'; +import type { V3VaultSubgraphClient } from '../../sources/subgraphs/balancer-v3-vault'; +import { VaultPoolFragment as VaultSubgraphPoolFragment } from '../../sources/subgraphs/balancer-v3-vault/generated/types'; +import { TypePoolFragment as PoolSubgraphPoolFragment } from '../../subgraphs/balancer-v3-pools/generated/types'; + +// Mock the module dependencies +jest.mock('../../sources/contracts/fetch-pool-data', () => ({ + fetchPoolData: jest.fn().mockResolvedValue({ + '1': { tokens: [{ address: '1' }] }, + '2': { tokens: [{ address: '2' }] }, + }), +})); + +jest.mock('../../../prisma/prisma-client', () => ({ + prisma: { + prismaPool: { + findMany: jest.fn().mockResolvedValue([{ id: '1' }] as PrismaPool[]), + upsert: jest.fn(), + }, + prismaPoolDynamicData: { + upsert: jest.fn(), + }, + prismaPoolToken: { + deleteMany: jest.fn(), + createMany: jest.fn(), + }, + prismaPoolTokenDynamicData: { + deleteMany: jest.fn(), + createMany: jest.fn(), + }, + prismaToken: { + findMany: jest.fn(), + createMany: jest.fn(), + }, + prismaPoolExpandedTokens: { + deleteMany: jest.fn(), + createMany: jest.fn(), + }, + prismaTokenPrice: { + findMany: jest.fn().mockResolvedValue([]), + }, + }, +})); + +// TODO: re-implement after the DB is refactored, so there is less logic to mock +describe('syncPools', () => { + const vaultSubgraphClient = { + getAllInitializedPools: jest.fn().mockResolvedValue([ + { id: '1', tokens: [] }, + { id: '2', tokens: [] }, + ] as unknown as VaultSubgraphPoolFragment[]), + } as unknown as jest.Mocked; + + const poolSubgraphClient = { + Pools: jest.fn().mockResolvedValue({ + pools: [ + { id: '1', factory: { id: '1' } }, + { id: '2', factory: { id: '1' } }, + ] as PoolSubgraphPoolFragment[], + }), + }; + const viemClient = jest.fn() as unknown as jest.Mocked; + + beforeEach(() => { + jest.clearAllMocks(); + return syncPools(vaultSubgraphClient, poolSubgraphClient, viemClient, '', 'SEPOLIA', BigInt(1)); + }); + + it('should fetch pools from vault subgraph', async () => { + expect(vaultSubgraphClient.getAllInitializedPools).toHaveBeenCalled(); + }); + + it('should fetch pools from pools subgraph', async () => { + expect(poolSubgraphClient.Pools).toHaveBeenCalled(); + }); + + it('should store missing pools in the database', async () => { + expect(prisma.prismaPool.upsert).toHaveBeenCalledWith( + expect.objectContaining({ create: expect.objectContaining({ id: '1' }) }), + ); + }); +}); diff --git a/modules/actions/pool/sync-pools.ts b/modules/actions/pool/sync-pools.ts new file mode 100644 index 000000000..e4a5919e0 --- /dev/null +++ b/modules/actions/pool/sync-pools.ts @@ -0,0 +1,74 @@ +import { Chain } from '@prisma/client'; +import { prisma } from '../../../prisma/prisma-client'; +import { fetchPoolData } from '../../sources/contracts/fetch-pool-data'; +import { ViemClient } from '../../sources/viem-client'; +import { onchainPoolUpdate } from '../../sources/transformers/onchain-pool-update'; +import { poolUpsertsUsd } from '../../sources/enrichers/pool-upserts-usd'; + +/** + * Gets and syncs all the pools state with the database + * + * TODO: simplify the schema by merging the pool and poolDynamicData tables and the poolToken, poolTokenDynamicData, expandedToken tables + * + * @param subgraphPools + * @param viemClient + * @param vaultAddress + * @param chain + * @param blockNumber + */ +export const syncPools = async ( + ids: string[], + viemClient: ViemClient, + vaultAddress: string, + chain = 'SEPOLIA' as Chain, + blockNumber: bigint, // TODO: deprecate since we are using always the latest block +) => { + // Enrich with onchain data for all the pools + const onchainData = await fetchPoolData(vaultAddress, ids, viemClient, blockNumber); + + // Get the data for the tables about pools + const dbUpdates = Object.keys(onchainData).map((id) => + onchainPoolUpdate(onchainData[id], Number(blockNumber), chain, id), + ); + + // Needed to get the token decimals for the USD calculations, + // Keeping it external, because we fetch these tokens in the upsert pools function + const allTokens = await prisma.prismaToken.findMany({ + where: { + chain: chain, + }, + }); + + const poolsWithUSD = await poolUpsertsUsd(dbUpdates, chain, allTokens); + + // Update pools data to the database + for (const { poolDynamicData, poolTokenDynamicData } of poolsWithUSD) { + try { + await prisma.prismaPoolDynamicData.update({ + where: { + poolId_chain: { + poolId: poolDynamicData.poolId, + chain: poolDynamicData.chain, + }, + }, + data: poolDynamicData, + }); + + for (const tokenUpdate of poolTokenDynamicData) { + await prisma.prismaPoolTokenDynamicData.update({ + where: { + id_chain: { + id: tokenUpdate.id, + chain: tokenUpdate.chain, + }, + }, + data: tokenUpdate, + }); + } + } catch (e) { + console.error('Error upserting pool', e); + } + } + + return ids; +}; diff --git a/modules/actions/pool/sync-swaps.ts b/modules/actions/pool/sync-swaps.ts new file mode 100644 index 000000000..09846ff99 --- /dev/null +++ b/modules/actions/pool/sync-swaps.ts @@ -0,0 +1,63 @@ +import { Chain, Prisma } from '@prisma/client'; +import { prisma } from '../../../prisma/prisma-client'; +import { V3VaultSubgraphClient } from '../../sources/subgraphs'; +import _ from 'lodash'; +import { swapTransformer } from '../../sources/transformers/swap-transformer'; +import { OrderDirection, Swap_OrderBy } from '../../sources/subgraphs/balancer-v3-vault/generated/types'; +import { swapsUsd } from '../../sources/enrichers/swaps-usd'; +import { daysAgo } from '../../common/time'; + +/** + * Adds all swaps since daysToSync to the database. Checks for latest synced swap to avoid duplicate work. + * + * @param vaultSubgraphClient + * @param chain + * @param daysToSync + * @returns + */ +export async function syncSwaps( + vaultSubgraphClient: V3VaultSubgraphClient, + chain = 'SEPOLIA' as Chain, + daysToSync = 30, +): Promise { + const vaultVersion = 3; + + // Get latest event from the DB + const latestEvent = await prisma.poolEvent.findFirst({ + where: { + type: 'SWAP', + chain: chain, + vaultVersion, + }, + orderBy: { + blockNumber: 'desc', + }, + }); + + // Get events since the latest event or limit to number or days we want to keep them in the DB + const since = daysAgo(daysToSync); + const where = + latestEvent?.blockTimestamp && latestEvent?.blockTimestamp > since + ? { blockNumber_gt: String(latestEvent.blockNumber) } + : { blockTimestamp_gte: String(since) }; + + // Get events + const { swaps } = await vaultSubgraphClient.Swaps({ + first: 1000, + where, + orderBy: Swap_OrderBy.BlockNumber, + orderDirection: OrderDirection.Asc, + }); + + const dbSwaps = swaps.map((swap) => swapTransformer(swap, chain)); + + // Enrich with USD values + const dbEntries = await swapsUsd(dbSwaps, chain); + + await prisma.poolEvent.createMany({ + skipDuplicates: true, + data: dbEntries, + }); + + return dbEntries.map((entry) => entry.id); +} diff --git a/modules/actions/pool/update-on-chain-data.ts b/modules/actions/pool/update-on-chain-data.ts deleted file mode 100644 index 1d16dd1bb..000000000 --- a/modules/actions/pool/update-on-chain-data.ts +++ /dev/null @@ -1,225 +0,0 @@ -import { Chain, Prisma, PrismaPoolType } from '@prisma/client'; -import { prisma } from '../../../prisma/prisma-client'; -import { tokenService } from '../../token/token.service'; -import { fetchPoolTokenInfo, fetchPoolTokenRates } from '../../sources/contracts'; -import { ViemClient } from '../../sources/viem-client'; -import { fetchPoolData } from '../../sources/contracts/fetch-pool-data'; -import { formatEther, formatUnits, parseUnits } from 'viem'; -import { isSameAddress } from '@balancer-labs/sdk'; -import { prismaBulkExecuteOperations } from '../../../prisma/prisma-util'; - -const SUPPORTED_POOL_TYPES: PrismaPoolType[] = ['WEIGHTED', 'STABLE']; - -export async function updateOnchainDataForAllPools( - vaultAddress: string, - viemClient: ViemClient, - blockNumber: bigint, - chain = 'SEPOLIA' as Chain, -): Promise { - const allPools = await prisma.prismaPool.findMany({ - where: { chain: chain, vaultVersion: 3 }, - select: { - id: true, - }, - }); - - return updateOnChainDataForPools( - vaultAddress, - '123', - allPools.map((pool) => pool.id), - viemClient, - blockNumber, - chain, - ); -} - -/** - * Makes sure that all pools are synced in the database - * - * @param vaultSubgraphClient - * @param poolSubgraphClient - * @param chain - * @returns syncedPools - the pools that were synced - */ -export async function updateOnChainDataForPools( - vaultAddress: string, - balancerQueriesAddress: string, - poolIds: string[], - viemClient: ViemClient, - blockNumber: bigint, - chain = 'SEPOLIA' as Chain, -): Promise { - if (poolIds.length === 0) { - return []; - } - const updated: string[] = []; - - const filteredPools = await prisma.prismaPool.findMany({ - where: { - id: { in: poolIds }, - chain: chain, - type: { in: SUPPORTED_POOL_TYPES }, - }, - include: { - tokens: { orderBy: { index: 'asc' }, include: { dynamicData: true, token: true } }, - dynamicData: true, - }, - }); - - const filteredPoolIds = filteredPools.map((pool) => pool.id); - const filteredPoolInputs = filteredPools.map((pool) => ({ - id: pool.id, - address: pool.address, - type: pool.type, - version: pool.version, - })); - - const tokenPricesForCurrentChain = await tokenService.getTokenPrices(chain); - const poolTokenData = await fetchPoolTokenInfo(vaultAddress, filteredPoolIds, viemClient, blockNumber); - const poolTokenRatesData = await fetchPoolTokenRates(vaultAddress, filteredPoolIds, viemClient, blockNumber); - const poolConfigData = await fetchPoolData(vaultAddress, filteredPoolInputs, viemClient, blockNumber); - - // TODO also need to add tokenPairs for SOR and calc normalized liquidity - // const tokenPairData = await fetchTokenPairData( - // filteredPools, - // balancerQueriesAddress, - // chain === 'ZKEVM' ? 190 : 1024, - // ); - - const operations = []; - for (const pool of filteredPools) { - const poolTokens = poolTokenData[pool.id]; - const poolTokenRates = poolTokenRatesData[pool.id]; - const poolConfig = poolConfigData[pool.id]; - - try { - // if (isStablePool(pool.type)) { - // if (!amp) { - // console.error(`Stable Pool Missing Amp: ${pool.id}`); - // continue; - // } - - // //only update if amp has changed - // if ((pool.typeData as StableData).amp !== amp) { - // operations.push( - // prisma.prismaPool.update({ - // where: { id_chain: { id: pool.id, chain: this.options.chain } }, - // data: { - // typeData: { - // ...(pool.typeData as StableData), - // amp, - // }, - // }, - // }), - // ); - // } - // } - - const swapFee = poolConfig.swapFee.toString(); - const totalSupply = formatEther(poolConfig.totalSupply); - const swapEnabled = !poolConfig.isPoolPaused; - const isPaused = poolConfig.isPoolPaused; - const isInRecoveryMode = poolConfig.isPoolInRecoveryMode; - - const yieldProtocolFeePercentage = '0'; // TODO - const protocolSwapFeePercentage = poolConfig.protocolSwapFeePercentage.toString(); - - if ( - pool.dynamicData && - (pool.dynamicData.swapFee !== swapFee || - pool.dynamicData.totalShares !== totalSupply || - pool.dynamicData.swapEnabled !== swapEnabled || - pool.dynamicData.protocolYieldFee !== yieldProtocolFeePercentage || - pool.dynamicData.protocolSwapFee !== protocolSwapFeePercentage || - pool.dynamicData.isInRecoveryMode !== isInRecoveryMode || - pool.dynamicData.isPaused !== isPaused) - ) { - operations.push( - prisma.prismaPoolDynamicData.update({ - where: { id_chain: { id: pool.id, chain: chain } }, - data: { - swapFee, - totalShares: totalSupply, - totalSharesNum: parseFloat(totalSupply), - swapEnabled: typeof swapEnabled !== 'undefined' ? swapEnabled : true, - isInRecoveryMode: isInRecoveryMode, - isPaused: isPaused, - protocolYieldFee: yieldProtocolFeePercentage, - protocolSwapFee: protocolSwapFeePercentage, - blockNumber: parseFloat(blockNumber.toString()), - }, - }), - ); - } - - // always update tokenPair data - // if (pool.dynamicData) { - // operations.push( - // prisma.prismaPoolDynamicData.update({ - // where: { id_chain: { id: pool.id, chain: this.options.chain } }, - // data: { - // tokenPairsData: tokenPairs, - // }, - // }), - // ); - // } - - for (let i = 0; i < poolTokens.tokens.length; i++) { - const tokenAddress = poolTokens.tokens[i]; - const poolToken = pool.tokens.find((token) => isSameAddress(token.address, tokenAddress)); - - if (!poolToken) { - throw `Pool Missing Expected Token: ${pool.id} ${tokenAddress}`; - } - - if (poolToken.index !== i) { - throw `Pooltoken index mismatch! "poolToken.index": ${poolToken.index} vs "i": ${i} on pool ${pool.id}`; - } - - const balance = formatUnits(poolTokens.balancesRaw[i], poolToken.token.decimals); - - // set token price rate for various rate types - - // top level token rates, e.g. LSTs in pools - let priceRate = formatEther(poolTokenRates[i]); - - // // bpt price rate - // if (onchainData.rate && isSameAddress(poolToken.address, pool.address)) { - // priceRate = onchainData.rate; - // } - - // TODO v3 does not contain the BPT as pool token, do we need to add it nevertheless? - - if ( - !poolToken.dynamicData || - poolToken.dynamicData.balance !== balance || - poolToken.dynamicData.priceRate !== priceRate - ) { - operations.push( - prisma.prismaPoolTokenDynamicData.update({ - where: { id_chain: { id: poolToken.id, chain: chain } }, - data: { - blockNumber: parseFloat(blockNumber.toString()), - priceRate, - balance, - balanceUSD: - poolToken.address === pool.address - ? 0 - : tokenService.getPriceForToken( - tokenPricesForCurrentChain, - poolToken.address, - chain, - ) * parseFloat(balance), - }, - }), - ); - } - } - } catch (e) { - console.log('error syncing on chain data', e); - } - } - await prismaBulkExecuteOperations(operations, false); - - return updated; -} diff --git a/modules/actions/pool/update-pools-from-subgraph.ts b/modules/actions/pool/update-pools-from-subgraph.ts deleted file mode 100644 index 0ca4ce110..000000000 --- a/modules/actions/pool/update-pools-from-subgraph.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { Chain, Prisma, PrismaPoolType } from '@prisma/client'; -import { prisma } from '../../../prisma/prisma-client'; -import { - poolTransformer, - poolTokensTransformer, - poolTokensDynamicDataTransformer, - poolExpandedTokensTransformer, -} from '../../sources/transformers'; -import { V3PoolsSubgraphClient } from '../../subgraphs/balancer-v3-pools'; -import { BalancerVaultSubgraphSource } from '../../sources/subgraphs/balancer-v3-vault'; -import _ from 'lodash'; -import { tokensTransformer } from '../../sources/transformers/tokens-transformer'; - -type PoolDbEntry = { - pool: Prisma.PrismaPoolCreateInput; - poolTokenDynamicData: Prisma.PrismaPoolTokenDynamicDataCreateManyInput[]; - poolExpandedTokens: Prisma.PrismaPoolExpandedTokensCreateManyInput[]; -}; - -/** - * Makes sure that all pools are synced in the database - * - * @param vaultSubgraphClient - * @param poolSubgraphClient - * @param chain - * @returns syncedPools - the pools that were synced - */ -export async function updatePoolsFromSubgraph( - vaultSubgraphClient: BalancerVaultSubgraphSource, - poolSubgraphClient: V3PoolsSubgraphClient, - chain = 'SEPOLIA' as Chain, -) { - // Fetch pools from subgraph - const vaultSubgraphPools = await vaultSubgraphClient.getAllInitializedPools(); - const { pools: poolSubgraphPools } = await poolSubgraphClient.Pools(); - - // Find pools missing from the database - const dbPools = await prisma.prismaPool.findMany({ where: { chain, vaultVersion: 3 } }); - const dbPoolIds = new Set(dbPools.map((pool) => pool.id.toLowerCase())); - const presentPools = vaultSubgraphPools.filter((pool) => dbPoolIds.has(pool.id)); - - // Making sure all tokens are present - try { - const allTokens = tokensTransformer(presentPools, chain); - await prisma.prismaToken.createMany({ - data: allTokens, - skipDuplicates: true, - }); - } catch (e) { - console.error('Error creating tokens', e); - } - - for (const presentPool of presentPools) { - const vaultSubgraphPool = vaultSubgraphPools.find((pool) => pool.id === presentPool.id); - const poolSubgraphPool = poolSubgraphPools.find((pool) => pool.id === presentPool.id); - if (!vaultSubgraphPool || !poolSubgraphPool) { - // That won't happen, but TS doesn't know that - continue; - } - - const dbPool = poolTransformer(vaultSubgraphPool, poolSubgraphPool, chain); - - await prisma.prismaPool.update({ - where: { id_chain: { id: presentPool.id, chain: chain } }, - data: { - owner: dbPool.owner, - type: dbPool.type, - typeData: dbPool.typeData, - version: dbPool.version, - }, - }); - - const transformedPoolToken = poolTokensTransformer(vaultSubgraphPool, chain); - - for (const poolToken of transformedPoolToken) { - await prisma.prismaPoolToken.update({ - where: { id_chain: { id: poolToken.id, chain: chain } }, - data: { - nestedPoolId: poolToken.nestedPoolId, - }, - }); - } - - const transformedPoolExpandedTokens = poolExpandedTokensTransformer(vaultSubgraphPool, chain); - - for (const poolToken of transformedPoolExpandedTokens) { - await prisma.prismaPoolExpandedTokens.update({ - where: { - tokenAddress_poolId_chain: { - chain: chain, - poolId: presentPool.id, - tokenAddress: poolToken.tokenAddress, - }, - }, - data: { - nestedPoolId: poolToken.nestedPoolId, - }, - }); - } - } -} diff --git a/modules/actions/pool/upsert-pools.ts b/modules/actions/pool/upsert-pools.ts new file mode 100644 index 000000000..96082c0fb --- /dev/null +++ b/modules/actions/pool/upsert-pools.ts @@ -0,0 +1,96 @@ +import { Chain } from '@prisma/client'; +import { prisma } from '../../../prisma/prisma-client'; +import { tokensTransformer } from '../../sources/transformers/tokens-transformer'; +import { fetchPoolData } from '../../sources/contracts/fetch-pool-data'; +import { ViemClient } from '../../sources/viem-client'; +import { JoinedSubgraphPool } from '../../sources/subgraphs'; +import { subgraphPoolUpsert } from '../../sources/transformers/subgraph-pool-upsert'; +import { poolUpsertsUsd } from '../../sources/enrichers/pool-upserts-usd'; + +/** + * Gets and syncs all the pools state with the database + * + * TODO: simplify the schema by merging the pool and poolDynamicData tables and the poolToken, poolTokenDynamicData, expandedToken tables + * + * @param subgraphPools + * @param viemClient + * @param vaultAddress + * @param chain + * @param blockNumber + */ +export const upsertPools = async ( + subgraphPools: JoinedSubgraphPool[], + viemClient: ViemClient, + vaultAddress: string, + chain = 'SEPOLIA' as Chain, + blockNumber: bigint, // TODO: deprecate since we are using always the latest block +) => { + // Enrich with onchain data for all the pools + const onchainData = await fetchPoolData( + vaultAddress, + subgraphPools.map((pool) => pool.id), + viemClient, + blockNumber, + ); + + // Store pool tokens and BPT in the tokens table before creating the pools + const allTokens = tokensTransformer(subgraphPools, chain); + try { + await prisma.prismaToken.createMany({ + data: allTokens, + skipDuplicates: true, + }); + } catch (e) { + console.error('Error creating tokens', e); + } + + // Get the data for the tables about pools + const dbPools = subgraphPools.map((poolData) => + subgraphPoolUpsert(poolData, onchainData[poolData.id], chain, Number(blockNumber)), + ); + + // Enrich updates with USD values + const poolsWithUSD = await poolUpsertsUsd(dbPools, chain, allTokens); + + // Upsert pools to the database + for (const { pool, poolToken, poolDynamicData, poolTokenDynamicData, poolExpandedTokens } of poolsWithUSD) { + console.log(poolDynamicData); + try { + await prisma.$transaction([ + prisma.prismaPool.upsert({ + where: { id_chain: { id: pool.id, chain: pool.chain } }, + create: pool, + update: pool, + }), + + prisma.prismaPoolDynamicData.upsert({ + where: { poolId_chain: { poolId: pool.id, chain: pool.chain } }, + create: poolDynamicData, + update: poolDynamicData, + }), + + // First nullify the pool tokens and then insert them again + prisma.prismaPoolToken.deleteMany({ where: { poolId: pool.id } }), + prisma.prismaPoolTokenDynamicData.deleteMany({ where: { poolTokenId: { startsWith: pool.id } } }), + prisma.prismaPoolExpandedTokens.deleteMany({ where: { poolId: pool.id } }), + + prisma.prismaPoolToken.createMany({ + data: poolToken, + skipDuplicates: true, + }), + + prisma.prismaPoolTokenDynamicData.createMany({ + data: poolTokenDynamicData, + skipDuplicates: true, + }), + + prisma.prismaPoolExpandedTokens.createMany({ + data: poolExpandedTokens, + skipDuplicates: true, + }), + ]); + } catch (e) { + console.error('Error upserting pool', e); + } + } +}; diff --git a/modules/actions/swap/add-swaps-from-subgraph.ts b/modules/actions/swap/add-swaps-from-subgraph.ts deleted file mode 100644 index 897e973dd..000000000 --- a/modules/actions/swap/add-swaps-from-subgraph.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { Chain, Prisma } from '@prisma/client'; -import { prisma } from '../../../prisma/prisma-client'; -import { BalancerVaultSubgraphSource } from '../../sources/subgraphs/balancer-v3-vault'; -import _ from 'lodash'; -import moment from 'moment'; -import { tokenService } from '../../token/token.service'; -import { swapsTransformer } from '../../sources/transformers/swaps-transformer'; - -type PoolDbEntry = { - pool: Prisma.PrismaPoolCreateInput; - poolTokenDynamicData: Prisma.PrismaPoolTokenDynamicDataCreateManyInput[]; - poolExpandedTokens: Prisma.PrismaPoolExpandedTokensCreateManyInput[]; -}; - -/** - * Adds all swaps since daysToSync to the database. Checks for latest synced swap to avoid duplicate work. - * - * @param vaultSubgraphClient - * @param chain - * @param daysToSync - * @returns - */ -export async function addSwapsFromSubgraph( - vaultSubgraphClient: BalancerVaultSubgraphSource, - chain = 'SEPOLIA' as Chain, - daysToSync = 30, -): Promise { - const poolIds = new Set(); - - // only sync from the latest swap in DB to avoid duplicate work - const lastSwap = await prisma.prismaPoolSwap.findFirst({ - orderBy: { timestamp: 'desc' }, - where: { chain: chain }, - }); - - //ensure we only query the last daysToSync worth of swaps - const daysToSyncTimestamp = moment().subtract(daysToSync, 'day').unix(); - const timestamp = lastSwap && lastSwap.timestamp > daysToSyncTimestamp ? lastSwap.timestamp : daysToSyncTimestamp; - - const swaps = await vaultSubgraphClient.getSwapsSince(timestamp); - - await prisma.prismaPoolSwap.createMany({ - skipDuplicates: true, - data: await swapsTransformer(swaps, chain), - }); - - // Do we need to create batch swaps as well? - // await this.createBatchSwaps(Array.from(txs)); - - // Remove everything older that daysToSync - await prisma.prismaPoolSwap.deleteMany({ - where: { - timestamp: { lt: daysToSyncTimestamp }, - chain: chain, - }, - }); - // await prisma.prismaPoolBatchSwap.deleteMany({ - // where: { - // timestamp: { lt: twoDaysAgo }, - // chain: this.chain, - // }, - // }); - - return Array.from(poolIds); -} diff --git a/modules/common/time.ts b/modules/common/time.ts index 5cf724bf8..f9fddb8ee 100644 --- a/modules/common/time.ts +++ b/modules/common/time.ts @@ -142,3 +142,23 @@ export function timestampRoundedUpToNearestHour(m: moment.Moment = moment()): nu return roundUp.unix(); } + +/** + * Time helper to round timestamp to the nearest hour + */ +export const roundToHour = (timestamp: number) => Math.floor(timestamp / 3600) * 3600; + +/** + * Time helper to round timestamp to the nearest midnight + */ +export const roundToMidnight = (timestamp: number) => Math.floor(timestamp / 86400) * 86400; + +/** + * Returns the timestamp for the days ago + * + * @param daysAgo + * @returns + */ +export const daysAgo = (daysAgo: number): number => { + return Math.floor(+new Date(Date.now() - daysAgo * secondsPerDay * 1000) / 1000); +}; diff --git a/modules/controllers/jobs-controller.test.ts b/modules/controllers/jobs-controller.test.ts index 537d322f2..bcc3f24e1 100644 --- a/modules/controllers/jobs-controller.test.ts +++ b/modules/controllers/jobs-controller.test.ts @@ -1,10 +1,18 @@ -import { addMissingPoolsFromSubgraph } from '../actions/pool/add-pools-from-subgraph'; +import { syncPools } from '../actions/pool/sync-pools'; import { JobsController } from './jobs-controller'; -// Mock the actions -jest.mock('@modules/actions/jobs_actions', () => ({ + +// Mock the action +jest.mock('../actions/pool/sync-pools', () => ({ syncPools: jest.fn(), })); +// Mock the clients +jest.mock('../sources/viem-client', () => ({ + getViemClient: jest.fn().mockReturnValue({ + getBlockNumber: jest.fn().mockResolvedValue(1), + }), +})); + describe('jobsController', () => { const jobsController = JobsController(); @@ -12,9 +20,9 @@ describe('jobsController', () => { jest.clearAllMocks(); }); - it('should call getClient with correct chain', () => { - jobsController.addMissingPoolsFromSubgraph('11155111'); + it('should call syncPools', async () => { + await jobsController.syncPools('11155111'); - expect(addMissingPoolsFromSubgraph).toHaveBeenCalled(); + expect(syncPools).toHaveBeenCalled(); }); }); diff --git a/modules/controllers/jobs-controller.ts b/modules/controllers/jobs-controller.ts index c0ba89d2c..c59a1019b 100644 --- a/modules/controllers/jobs-controller.ts +++ b/modules/controllers/jobs-controller.ts @@ -1,17 +1,27 @@ -import { update } from 'lodash'; import config from '../../config'; -import { addMissingPoolsFromSubgraph } from '../actions/pool/add-pools-from-subgraph'; -import { getChangedPools } from '../actions/pool/get-changed-pools'; -import { updateOnChainDataForPools } from '../actions/pool/update-on-chain-data'; +import { syncPools } from '../actions/pool/sync-pools'; +import { upsertPools } from '../actions/pool/upsert-pools'; +import { syncJoinExits } from '../actions/pool/sync-join-exits'; +import { syncJoinExitsV2 } from '../actions/pool/sync-join-exits-v2'; import { chainIdToChain } from '../network/chain-id-to-chain'; import { getViemClient } from '../sources/viem-client'; -import { getPoolsSubgraphClient } from '../subgraphs/balancer-v3-pools'; -import { BalancerVaultSubgraphSource } from '../sources/subgraphs/balancer-v3-vault'; -import { addSwapsFromSubgraph } from '../actions/swap/add-swaps-from-subgraph'; +import { getVaultSubgraphClient } from '../sources/subgraphs/balancer-v3-vault'; +import { syncSwaps } from '../actions/pool/sync-swaps'; import { updateVolumeAndFees } from '../actions/swap/update-volume-and-fees'; +import { BalancerSubgraphService } from '../subgraphs/balancer-subgraph/balancer-subgraph.service'; +import { getV3JoinedSubgraphClient } from '../sources/subgraphs'; +import { prisma } from '../../prisma/prisma-client'; +import { getChangedPools } from '../sources/logs/get-changed-pools'; /** - * Controller responsible for matching job requests to configured job handlers + * Controller responsible for configuring and executing ETL actions, usually in the form of jobs. + * + * @example + * ```ts + * const jobsController = JobsController(); + * await jobsController.syncPools('1'); + * await jobsController.syncJoinExits('1'); + * ``` * * @param name - the name of the job * @param chain - the chain to run the job on @@ -21,7 +31,42 @@ export function JobsController(tracer?: any) { // Setup tracing // ... return { - async addMissingPoolsFromSubgraph(chainId: string) { + async syncJoinExitsV2(chainId: string) { + const chain = chainIdToChain[chainId]; + const { + subgraphs: { balancer }, + } = config[chain]; + + // Guard against unconfigured chains + if (!balancer) { + throw new Error(`Chain not configured: ${chain}`); + } + + const subgraphClient = new BalancerSubgraphService(balancer, Number(chainId)); + const entries = await syncJoinExitsV2(subgraphClient, chain); + return entries; + }, + async syncJoinExitsV3(chainId: string) { + const chain = chainIdToChain[chainId]; + const { + subgraphs: { balancerV3 }, + } = config[chain]; + + // Guard against unconfigured chains + if (!balancerV3) { + throw new Error(`Chain not configured: ${chain}`); + } + + const vaultSubgraphClient = getVaultSubgraphClient(balancerV3); + const entries = await syncJoinExits(vaultSubgraphClient, chain); + return entries; + }, + /** + * Adds new pools found in subgraph to the database + * + * @param chainId + */ + async addPools(chainId: string) { const chain = chainIdToChain[chainId]; const { subgraphs: { balancerV3, balancerPoolsV3 }, @@ -31,27 +76,53 @@ export function JobsController(tracer?: any) { } = config[chain]; // Guard against unconfigured chains - if (!balancerV3) { + if (!balancerV3 || !balancerPoolsV3 || !vaultAddress) { throw new Error(`Chain not configured: ${chain}`); } - const vaultSubgraphClient = new BalancerVaultSubgraphSource(balancerV3); - const poolSubgraphClient = getPoolsSubgraphClient(balancerPoolsV3!); + const pools = await prisma.prismaPool.findMany(); + const ids = pools.map((pool) => pool.id); + const client = getV3JoinedSubgraphClient(balancerV3, balancerPoolsV3); + const newPools = await client.getAllInitializedPools({ id_not_in: ids }); + const viemClient = getViemClient(chain); const latestBlock = await viemClient.getBlockNumber(); - // TODO: add syncing v2 pools as well by splitting the poolService into separate - // actions with extracted configuration + await upsertPools(newPools, viemClient, vaultAddress, chain, latestBlock); + }, + /** + * Takes all the pools from subgraph, enriches with onchain data and upserts them to the database + * + * @param chainId + */ + async reloadPools(chainId: string) { + const chain = chainIdToChain[chainId]; + const { + subgraphs: { balancerV3, balancerPoolsV3 }, + balancer: { + v3: { vaultAddress }, + }, + } = config[chain]; - // find all missing pools and add them to the DB - const added = await addMissingPoolsFromSubgraph(vaultSubgraphClient, poolSubgraphClient, chain); + // Guard against unconfigured chains + if (!balancerV3 || !balancerPoolsV3 || !vaultAddress) { + throw new Error(`Chain not configured: ${chain}`); + } - // update with latest on-chain data (needed? this will run on a separate job anyway) - const updated = await updateOnChainDataForPools(vaultAddress, '123', added, viemClient, latestBlock); + const client = getV3JoinedSubgraphClient(balancerV3, balancerPoolsV3); + const allPools = await client.getAllInitializedPools(); - return updated; + const viemClient = getViemClient(chain); + const latestBlock = await viemClient.getBlockNumber(); + + await upsertPools(allPools, viemClient, vaultAddress, chain, latestBlock); }, - async updateOnChainDataChangedPools(chainId: string) { + /** + * Syncs database pools state with the onchain state + * + * @param chainId + */ + async syncPools(chainId: string) { const chain = chainIdToChain[chainId]; const { balancer: { @@ -63,27 +134,47 @@ export function JobsController(tracer?: any) { if (!vaultAddress) { throw new Error(`Chain not configured: ${chain}`); } - const viemClient = getViemClient(chain); - const blockNumber = await viemClient.getBlockNumber(); + const fromBlock = ( + await prisma.prismaPoolDynamicData.findFirst({ + orderBy: { blockNumber: 'desc' }, + }) + )?.blockNumber; - const changedPools = await getChangedPools(vaultAddress, viemClient, blockNumber, chain); - if (changedPools.length === 0) { - return []; + // Sepolia vault deployment block, uncomment to test from the beginning + // const fromBlock = 5274748n; + + // Guard against unsynced pools + if (!fromBlock) { + throw new Error(`No synced pools found for chain: ${chain}`); } - const updated = updateOnChainDataForPools( - vaultAddress, - '123', - changedPools, - viemClient, - blockNumber, - chain, - ); + const pools = await prisma.prismaPool.findMany(); + const dbIds = pools.map((pool) => pool.id.toLowerCase()); + const viemClient = getViemClient(chain); - return updated; + const { changedPools, latestBlock } = await getChangedPools(vaultAddress, viemClient, BigInt(fromBlock)); + const ids = changedPools.filter((id) => dbIds.includes(id.toLowerCase())); // only sync pools that are in the database + if (ids.length === 0 || !latestBlock) { + return []; + } + return syncPools(ids, viemClient, vaultAddress, chain, latestBlock + 1n); }, + async syncSwapsV3(chainId: string) { + const chain = chainIdToChain[chainId]; + const { + subgraphs: { balancerV3 }, + } = config[chain]; + // Guard against unconfigured chains + if (!balancerV3) { + throw new Error(`Chain not configured: ${chain}`); + } + + const vaultSubgraphClient = getVaultSubgraphClient(balancerV3); + const entries = await syncSwaps(vaultSubgraphClient, chain); + return entries; + }, // TODO also update yieldfee // TODO maybe update fee from onchain instead of swap? async syncSwapsUpdateVolumeAndFees(chainId: string) { @@ -97,9 +188,9 @@ export function JobsController(tracer?: any) { throw new Error(`Chain not configured: ${chain}`); } - const vaultSubgraphClient = new BalancerVaultSubgraphSource(balancerV3); + const vaultSubgraphClient = getVaultSubgraphClient(balancerV3); - const poolsWithNewSwaps = await addSwapsFromSubgraph(vaultSubgraphClient, chain); + const poolsWithNewSwaps = await syncSwaps(vaultSubgraphClient, chain); await updateVolumeAndFees(poolsWithNewSwaps); return poolsWithNewSwaps; }, diff --git a/modules/controllers/pools-controller.ts b/modules/controllers/pools-controller.ts index ef1cf7e1e..48773e608 100644 --- a/modules/controllers/pools-controller.ts +++ b/modules/controllers/pools-controller.ts @@ -1,12 +1,8 @@ import config from '../../config'; -import { updateOnchainDataForAllPools } from '../actions/pool/update-on-chain-data'; -import { updatePoolsFromSubgraph } from '../actions/pool/update-pools-from-subgraph'; -import { addSwapsFromSubgraph } from '../actions/swap/add-swaps-from-subgraph'; +import { syncSwaps } from '../actions/pool/sync-swaps'; import { updateVolumeAndFees } from '../actions/swap/update-volume-and-fees'; import { chainIdToChain } from '../network/chain-id-to-chain'; -import { BalancerVaultSubgraphSource } from '../sources/subgraphs/balancer-v3-vault'; -import { getViemClient } from '../sources/viem-client'; -import { getPoolsSubgraphClient } from '../subgraphs/balancer-v3-pools'; +import { getVaultSubgraphClient } from '../sources/subgraphs'; /** * Controller responsible for matching job requests to configured job handlers @@ -19,49 +15,6 @@ export function PoolsController(tracer?: any) { // Setup tracing // ... return { - async updatePoolsFromSubgraph(chainIds: string[]) { - const updatedPools: string[] = []; - for (const chainId of chainIds) { - const chain = chainIdToChain[chainId]; - const { - subgraphs: { balancerV3, balancerPoolsV3 }, - } = config[chain]; - - // Guard against unconfigured chains - if (!balancerV3) { - throw new Error(`Chain not configured: ${chain}`); - } - - const vaultSubgraphClient = new BalancerVaultSubgraphSource(balancerV3); - const poolSubgraphClient = getPoolsSubgraphClient(balancerPoolsV3!); - - // TODO: add syncing v2 pools as well by splitting the poolService into separate - // actions with extracted configuration - - // find all missing pools and add them to the DB - const added = await updatePoolsFromSubgraph(vaultSubgraphClient, poolSubgraphClient, chain); - } - - return updatedPools; - }, - async updateOnChainDataForAllPools(chainId: string) { - const chain = chainIdToChain[chainId]; - const { - balancer: { - v3: { vaultAddress }, - }, - } = config[chain]; - - // Guard against unconfigured chains - if (!vaultAddress) { - throw new Error(`Chain not configured: ${chain}`); - } - const viemClient = getViemClient(chain); - const latestBlockNumber = await viemClient.getBlockNumber(); - - const updated = updateOnchainDataForAllPools(vaultAddress, viemClient, latestBlockNumber, chain); - return updated; - }, async loadSwapsFeesVolumeForAllPools(chainId: string) { const chain = chainIdToChain[chainId]; const { @@ -73,9 +26,9 @@ export function PoolsController(tracer?: any) { throw new Error(`Chain not configured: ${chain}`); } - const vaultSubgraphClient = new BalancerVaultSubgraphSource(balancerV3); + const vaultSubgraphClient = getVaultSubgraphClient(balancerV3); - const poolsWithNewSwaps = await addSwapsFromSubgraph(vaultSubgraphClient, chain); + const poolsWithNewSwaps = await syncSwaps(vaultSubgraphClient, chain); await updateVolumeAndFees(poolsWithNewSwaps); return poolsWithNewSwaps; }, diff --git a/modules/controllers/queries-controller.ts b/modules/controllers/queries-controller.ts new file mode 100644 index 000000000..a348f3191 --- /dev/null +++ b/modules/controllers/queries-controller.ts @@ -0,0 +1,109 @@ +/** + * Responsible for handling all the queries – can be split based on models + */ +import { GqlPoolEvent, GqlPoolJoinExit, GqlPoolSwap, QueryPoolGetEventsArgs } from '../../schema'; +import { prisma } from '../../prisma/prisma-client'; +import { Prisma } from '@prisma/client'; +import { JoinExitEvent, SwapEvent } from '../../prisma/prisma-types'; + +const parseJoinExit = (event: JoinExitEvent): GqlPoolJoinExit => { + return { + __typename: 'GqlPoolJoinExit', + amounts: event.payload.tokens.map((token: any) => ({ + address: token.address, + amount: token.amount, + valueUSD: token.amountUsd, + })), + ...event, + timestamp: event.blockTimestamp, + sender: event.userAddress, + }; +}; + +const parseSwap = (event: SwapEvent): GqlPoolSwap => { + return { + __typename: 'GqlPoolSwap', + ...event, + sender: event.userAddress, + timestamp: event.blockTimestamp, + tokenIn: event.payload.tokenIn.address, + tokenAmountIn: event.payload.tokenIn.amount, + tokenInData: { + ...event.payload.tokenIn, + }, + tokenOut: event.payload.tokenOut.address, + tokenAmountOut: event.payload.tokenOut.amount, + tokenOutData: { + ...event.payload.tokenOut, + }, + }; +}; + +export function QueriesController(tracer?: any) { + return { + /** + * Getting pool events, with pagination and filtering + * + * @param param.first - number of items to return + * @param param.skip - number of items to skip + * @param param.where - filtering conditions + * @returns + */ + getEvents: async ({ + first, + skip, + where, + }: QueryPoolGetEventsArgs): Promise<(GqlPoolSwap | GqlPoolJoinExit)[]> => { + // Setting default values + first = first ?? 1000; + skip = skip ?? 0; + where = where ?? {}; + let { chainIn, poolIdIn, userAddress, typeIn } = where; + + const conditions: Prisma.PoolEventWhereInput = {}; + + if (typeIn && typeIn.length) { + conditions.type = { + in: typeIn, + }; + } + if (chainIn && chainIn.length) { + conditions.chain = { + in: chainIn, + }; + } + if (poolIdIn && poolIdIn.length) { + conditions.poolId = { + in: poolIdIn, + mode: 'insensitive', + }; + } + if (userAddress) { + conditions.userAddress = { + equals: userAddress, + mode: 'insensitive', + }; + } + + const dbEvents = await prisma.poolEvent.findMany({ + where: conditions, + take: first, + skip, + orderBy: [ + { + blockNumber: 'desc', + }, + { + logIndex: 'desc', + }, + ], + }); + + const results = dbEvents.map((event) => + event.type === 'SWAP' ? parseSwap(event as SwapEvent) : parseJoinExit(event as JoinExitEvent), + ); + + return results; + }, + }; +} diff --git a/modules/network/network-config-types.ts b/modules/network/network-config-types.ts index 4360ce55c..164b16e6e 100644 --- a/modules/network/network-config-types.ts +++ b/modules/network/network-config-types.ts @@ -1,14 +1,14 @@ -import { Chain } from '@prisma/client'; -import { BigNumber } from 'ethers'; -import { PoolAprService, PoolStakingService } from '../pool/pool-types'; -import { UserStakedBalanceService } from '../user/user-types'; -import { TokenPriceHandler } from '../token/token-types'; -import { BaseProvider } from '@ethersproject/providers'; -import { GqlChain } from '../../schema'; -import { ContentService } from '../content/content-types'; -import { YbAprConfig } from './apr-config-types'; -import { BalancerSubgraphService } from '../subgraphs/balancer-subgraph/balancer-subgraph.service'; -import { SftmxSubgraphService } from '../subgraphs/sftmx-subgraph/sftmx.service'; +import type { Chain } from '@prisma/client'; +import type { BigNumber } from 'ethers'; +import type { PoolAprService, PoolStakingService } from '../pool/pool-types'; +import type { UserStakedBalanceService } from '../user/user-types'; +import type { TokenPriceHandler } from '../token/token-types'; +import type { BaseProvider } from '@ethersproject/providers'; +import type { GqlChain } from '../../schema'; +import type { ContentService } from '../content/content-types'; +import type { YbAprConfig } from './apr-config-types'; +import type { BalancerSubgraphService } from '../subgraphs/balancer-subgraph/balancer-subgraph.service'; +import type { SftmxSubgraphService } from '../subgraphs/sftmx-subgraph/sftmx.service'; export interface NetworkConfig { data: NetworkData; diff --git a/modules/network/sepolia.ts b/modules/network/sepolia.ts index 07e7722ad..85c955cc0 100644 --- a/modules/network/sepolia.ts +++ b/modules/network/sepolia.ts @@ -12,7 +12,7 @@ import { GithubContentService } from '../content/github-content.service'; import { gaugeSubgraphService } from '../subgraphs/gauge-subgraph/gauge-subgraph.service'; import { YbTokensAprService } from '../pool/lib/apr-data-sources/yb-tokens-apr.service'; import { BalancerSubgraphService } from '../subgraphs/balancer-subgraph/balancer-subgraph.service'; -import { sepoliaConfig as sepoliaNetworkData } from '../../config/sepolia'; +import sepoliaNetworkData from '../../config/sepolia'; export { sepoliaNetworkData }; @@ -109,13 +109,26 @@ export const sepoliaNetworkConfig: NetworkConfig = { name: 'update-fee-volume-yield-all-pools', interval: every(1, 'hours'), }, + // V3 jobs { - name: 'sync-changed-pools-v3', - interval: every(15, 'minutes'), + name: 'add-pools-v3', + interval: every(5, 'minutes'), }, { - name: 'sync-new-pools-from-subgraph-v3', - interval: every(20, 'minutes'), + name: 'sync-pools-v3', + interval: every(1, 'minutes'), + }, + { + name: 'sync-join-exits-v2', + interval: every(10, 'minutes'), + }, + { + name: 'sync-join-exits-v3', + interval: every(1, 'minutes'), + }, + { + name: 'sync-swaps-v3', + interval: every(1, 'minutes'), }, { name: 'update-swaps-volume-and-fees-v3', diff --git a/modules/pool/lib/pool-swap.service.ts b/modules/pool/lib/pool-swap.service.ts index 5aa4e5b54..94cb34928 100644 --- a/modules/pool/lib/pool-swap.service.ts +++ b/modules/pool/lib/pool-swap.service.ts @@ -32,126 +32,6 @@ export class PoolSwapService { return networkContext.chain; } - public async getJoinExits(args: QueryPoolGetJoinExitsArgs): Promise { - const first = !args.first || args.first > 100 ? 10 : args.first; - - const allChainsJoinExits: GqlPoolJoinExit[] = []; - - if (args.where?.chainIn) { - for (const chain of args.where.chainIn) { - const balancerSubgraphService = AllNetworkConfigsKeyedOnChain[chain].services.balancerSubgraphService; - - const { joinExits } = await balancerSubgraphService.getPoolJoinExits({ - where: { pool_in: args.where?.poolIdIn }, - first, - skip: args.skip, - orderBy: JoinExit_OrderBy.Timestamp, - orderDirection: OrderDirection.Desc, - }); - - const mappedJoinExits: GqlPoolJoinExit[] = joinExits.map((joinExit) => ({ - ...joinExit, - __typename: 'GqlPoolJoinExit', - chain: chain, - poolId: joinExit.pool.id, - amounts: joinExit.amounts.map((amount, index) => ({ - address: joinExit.pool.tokensList[index], - amount, - })), - })); - - allChainsJoinExits.push(...mappedJoinExits); - } - } - - return allChainsJoinExits; - } - - public async getUserJoinExitsForPool( - userAddress: string, - poolId: string, - chain: Chain, - first = 10, - skip = 0, - ): Promise { - const balancerSubgraphService = AllNetworkConfigsKeyedOnChain[chain].services.balancerSubgraphService; - - const { joinExits } = await balancerSubgraphService.getPoolJoinExits({ - where: { pool: poolId, user: userAddress }, - first, - skip: skip, - orderBy: JoinExit_OrderBy.Timestamp, - orderDirection: OrderDirection.Desc, - }); - - return joinExits.map((joinExit) => ({ - ...joinExit, - __typename: 'GqlPoolJoinExit', - poolId: joinExit.pool.id, - chain: chain, - amounts: joinExit.amounts.map((amount, index) => ({ address: joinExit.pool.tokensList[index], amount })), - })); - } - - public async getSwaps(args: QueryPoolGetSwapsArgs): Promise { - const take = !args.first || args.first > 100 ? 10 : args.first; - - return prisma.prismaPoolSwap.findMany({ - take, - skip: args.skip || undefined, - where: { - poolId: { - in: args.where?.poolIdIn || undefined, - }, - tokenIn: { - in: args.where?.tokenInIn || undefined, - }, - tokenOut: { - in: args.where?.tokenOutIn || undefined, - }, - chain: { - in: args.where?.chainIn || undefined, - }, - }, - orderBy: { timestamp: 'desc' }, - }); - } - - public async getUserSwapsForPool( - userAddress: string, - poolId: string, - chain: Chain, - first = 10, - skip = 0, - ): Promise { - const balancerSubgraphService = AllNetworkConfigsKeyedOnChain[chain].services.balancerSubgraphService; - - const result = await balancerSubgraphService.getSwaps({ - first, - skip, - where: { - poolId, - userAddress, - }, - orderBy: Swap_OrderBy.Timestamp, - orderDirection: OrderDirection.Desc, - }); - - return result.swaps.map((swap) => ({ - id: swap.id, - chain: chain, - userAddress, - poolId: swap.poolId.id, - tokenIn: swap.tokenIn, - tokenAmountIn: swap.tokenAmountIn, - tokenOut: swap.tokenOut, - tokenAmountOut: swap.tokenAmountOut, - valueUSD: parseFloat(swap.valueUSD), - timestamp: swap.timestamp, - tx: swap.tx, - })); - } - public async getBatchSwaps(args: QueryPoolGetBatchSwapsArgs): Promise { const take = !args.first || args.first > 100 ? 10 : args.first; diff --git a/modules/pool/pool.gql b/modules/pool/pool.gql index 5037b7a09..c26d40b21 100644 --- a/modules/pool/pool.gql +++ b/modules/pool/pool.gql @@ -25,6 +25,10 @@ extend type Query { poolGetLinearPools(chains: [GqlChain!]): [GqlPoolLinear!]! poolGetGyroPools(chains: [GqlChain!]): [GqlPoolGyro!]! poolGetFxPools(chains: [GqlChain!]): [GqlPoolFx!]! + """ + Getting swap, join and exit events + """ + poolGetEvents(first: Int, skip: Int, where: GqlPoolEventsFilter): [GqlPoolEvent!]! } extend type Mutation { @@ -728,20 +732,6 @@ type GqlPoolTokenExpanded { isMainToken: Boolean! } -type GqlPoolSwap { - id: ID! - chain: GqlChain! - poolId: String! - userAddress: String! - tokenIn: String! - tokenOut: String! - tokenAmountIn: String! - tokenAmountOut: String! - timestamp: Int! - tx: String! - valueUSD: Float! -} - type GqlPoolBatchSwap { id: ID! chain: GqlChain! @@ -804,28 +794,78 @@ enum GqlPoolStakingGaugeStatus { input GqlPoolJoinExitFilter { poolIdIn: [String!] chainIn: [GqlChain!] + userAddress: String } -type GqlPoolJoinExit { - id: ID! +enum GqlPoolEventType { + JOIN + EXIT + SWAP +} + +input GqlPoolEventsFilter { + typeIn: [GqlPoolEventType!] + poolIdIn: [String!] + chainIn: [GqlChain!] + userAddress: String +} + +interface GqlPoolEvent { chain: GqlChain! - type: GqlPoolJoinExitType! + id: ID! + tx: String! + logIndex: Int! + blockNumber: Int! + blockTimestamp: Int! + type: GqlPoolEventType! sender: String! + userAddress: String! poolId: String! timestamp: Int! - valueUSD: String + valueUSD: Float! +} + +type GqlPoolJoinExit implements GqlPoolEvent { + chain: GqlChain! + id: ID! tx: String! - amounts: [GqlPoolJoinExitAmount!]! + logIndex: Int! + blockNumber: Int! + blockTimestamp: Int! + type: GqlPoolEventType! + sender: String! + userAddress: String! + poolId: String! + timestamp: Int! + valueUSD: Float! + amounts: [GqlPoolEventAmount!]! } -enum GqlPoolJoinExitType { - Join - Exit +type GqlPoolSwap implements GqlPoolEvent { + chain: GqlChain! + id: ID! + tx: String! + logIndex: Int! + blockNumber: Int! + blockTimestamp: Int! + type: GqlPoolEventType! + sender: String! + userAddress: String! + poolId: String! + timestamp: Int! + valueUSD: Float! + tokenIn: String! + tokenOut: String! + tokenAmountIn: String! + tokenAmountOut: String! + tokenInData: GqlPoolEventAmount! + tokenOutData: GqlPoolEventAmount! } -type GqlPoolJoinExitAmount { +type GqlPoolEventAmount { address: String! amount: String! + valueUSD: Float! } type GqlBalancePoolAprItem { diff --git a/modules/pool/pool.prisma b/modules/pool/pool.prisma index 1d3227782..d98b37132 100644 --- a/modules/pool/pool.prisma +++ b/modules/pool/pool.prisma @@ -494,3 +494,31 @@ model PrismaReliquaryTokenBalanceSnapshot { decimals Int balance String } + +enum PoolEventType { + JOIN + EXIT + SWAP +} + +model PoolEvent { + id String @id + tx String + type PoolEventType + chain Chain + poolId String + userAddress String + blockNumber Int + blockTimestamp Int + logIndex Int + vaultVersion Int @default(2) + valueUSD Float + payload Json + + @@index([type]) + @@index([chain]) + @@index([chain, poolId]) + @@index([userAddress]) + @@index([blockNumber]) + @@index([logIndex]) +} diff --git a/modules/pool/pool.resolvers.ts b/modules/pool/pool.resolvers.ts index f7fa7d834..9aa5653ed 100644 --- a/modules/pool/pool.resolvers.ts +++ b/modules/pool/pool.resolvers.ts @@ -1,9 +1,10 @@ import { poolService } from './pool.service'; -import { Resolvers } from '../../schema'; +import { GqlPoolJoinExit, GqlPoolSwap, Resolvers } from '../../schema'; import { isAdminRoute } from '../auth/auth-context'; import { prisma } from '../../prisma/prisma-client'; import { networkContext } from '../network/network-context.service'; import { headerChain } from '../context/header-chain'; +import { QueriesController } from '../controllers/queries-controller'; const balancerResolvers: Resolvers = { Query: { @@ -22,14 +23,17 @@ const balancerResolvers: Resolvers = { poolGetPoolsCount: async (parent, args, context) => { return poolService.getPoolsCount(args); }, - poolGetSwaps: async (parent, args, context) => { + // TODO: Deprecate in favor of poolGetEvents + poolGetSwaps: (parent, args, context) => { const currentChain = headerChain(); if (!args.where?.chainIn && currentChain) { args.where = { ...args.where, chainIn: [currentChain] }; } else if (!args.where?.chainIn) { throw new Error('poolGetSwaps error: Provide "where.chainIn" param'); } - return poolService.getPoolSwaps(args); + return QueriesController().getEvents({ ...args, where: { ...args.where, typeIn: ['SWAP'] } }) as Promise< + GqlPoolSwap[] + >; }, poolGetBatchSwaps: async (parent, args, context) => { const currentChain = headerChain(); @@ -40,14 +44,15 @@ const balancerResolvers: Resolvers = { } return poolService.getPoolBatchSwaps(args); }, - poolGetJoinExits: async (parent, args, context) => { - const currentChain = headerChain(); - if (!args.where?.chainIn && currentChain) { - args.where = { ...args.where, chainIn: [currentChain] }; - } else if (!args.where?.chainIn) { - throw new Error('poolGetJoinExits error: Provide "where.chainIn" param'); - } - return poolService.getPoolJoinExits(args); + // TODO: Deprecate in favor of poolGetEvents + poolGetJoinExits: (parent, args, context) => { + return QueriesController().getEvents({ + ...args, + where: { ...args.where, typeIn: ['JOIN', 'EXIT'] }, + }) as Promise; + }, + poolGetEvents: (parent, args, context) => { + return QueriesController().getEvents(args); }, poolGetFeaturedPoolGroups: async (parent, { chains }, context) => { const currentChain = headerChain(); diff --git a/modules/pool/pool.service.ts b/modules/pool/pool.service.ts index 451187fd1..7421b8220 100644 --- a/modules/pool/pool.service.ts +++ b/modules/pool/pool.service.ts @@ -99,10 +99,6 @@ export class PoolService { return prisma.prismaPoolFilter.findMany({ where: { chain: this.chain } }); } - public async getPoolSwaps(args: QueryPoolGetSwapsArgs): Promise { - return this.poolSwapService.getSwaps(args); - } - public async getPoolBatchSwaps(args: QueryPoolGetBatchSwapsArgs): Promise { const batchSwaps = await this.poolSwapService.getBatchSwaps(args); @@ -117,10 +113,6 @@ export class PoolService { })); } - public async getPoolJoinExits(args: QueryPoolGetJoinExitsArgs): Promise { - return this.poolSwapService.getJoinExits(args); - } - public async getFeaturedPoolGroups(chains: Chain[]): Promise { return this.poolGqlLoaderService.getFeaturedPoolGroups(chains); } diff --git a/modules/sources/contracts/abis/VaultV3.ts b/modules/sources/contracts/abis/VaultV3.ts index dcf38d114..41ffcc532 100644 --- a/modules/sources/contracts/abis/VaultV3.ts +++ b/modules/sources/contracts/abis/VaultV3.ts @@ -1,4 +1,4 @@ -export const vaultV3Abi = [ +export default [ { inputs: [ { diff --git a/modules/sources/contracts/fetch-pool-data.ts b/modules/sources/contracts/fetch-pool-data.ts index bafe1627e..cbe998440 100644 --- a/modules/sources/contracts/fetch-pool-data.ts +++ b/modules/sources/contracts/fetch-pool-data.ts @@ -1,95 +1,97 @@ -import { PrismaPoolType } from '@prisma/client'; +import { AbiParameterToPrimitiveType, ExtractAbiFunction } from 'abitype'; import { ViemClient } from '../types'; -import { vaultV3Abi } from './abis/VaultV3'; -import { fetchPoolTokenInfo } from './fetch-pool-tokens'; +import vaultV3Abi from './abis/VaultV3'; -interface PoolInput { - id: string; - address: string; - type: PrismaPoolType; - version: number; -} +// TODO: Find out if we need to do that, +// or can somehow get the correct type infered automatically from the viem's result set? +type PoolConfig = AbiParameterToPrimitiveType['outputs'][0]>; -interface PoolData { +export interface OnchainPoolData { totalSupply: bigint; swapFee: bigint; - protocolSwapFeePercentage: bigint; - // protocolYieldFeePercentage: bigint; // rate?: bigint; // amp?: [bigint, boolean, bigint]; isPoolPaused: boolean; isPoolInRecoveryMode: boolean; + tokens: { + address: string; + balance: bigint; + rateProvider: string; + rate: bigint; + }[]; } export async function fetchPoolData( vault: string, - pools: PoolInput[], + pools: string[], client: ViemClient, - blockNumber: bigint, -): Promise> { - const totalSupplyContracts = pools + blockNumber?: bigint, +): Promise<{ [address: string]: OnchainPoolData }> { + const contracts = pools .map((pool) => [ { address: vault as `0x${string}`, abi: vaultV3Abi, functionName: 'totalSupply', - args: [pool.address as `0x${string}`], - } as const, - ]) - .flat(); - - const configContracts = pools - .map((pool) => [ + args: [pool as `0x${string}`], + }, { address: vault as `0x${string}`, abi: vaultV3Abi, functionName: 'getPoolConfig', - args: [pool.address as `0x${string}`], - } as const, - ]) - .flat(); - - const protocolSwapFeeContracts = pools - .map((pool) => [ + args: [pool as `0x${string}`], + }, + { + address: vault as `0x${string}`, + abi: vaultV3Abi, + functionName: 'getPoolTokenInfo', + args: [pool as `0x${string}`], + }, { address: vault as `0x${string}`, abi: vaultV3Abi, - functionName: 'getProtocolSwapFeePercentage', - } as const, + functionName: 'getPoolTokenRates', + args: [pool as `0x${string}`], + }, ]) .flat(); - // TODO combine into one call - const totalSupplyResult = await client.multicall({ contracts: totalSupplyContracts, blockNumber: blockNumber }); - const configResult = await client.multicall({ contracts: configContracts, blockNumber: blockNumber }); - const protocolSwapFeeResult = await client.multicall({ - contracts: protocolSwapFeeContracts, - blockNumber: blockNumber, - }); + const results = await client.multicall({ contracts, blockNumber: blockNumber }); // Parse the results - const parsedResults: Record = {}; - pools.forEach((result, i) => { - if ( - totalSupplyResult[i].status === 'success' && - totalSupplyResult[i].result !== undefined && - configResult[i].status === 'success' && - configResult[i].result !== undefined && - protocolSwapFeeResult[i].status === 'success' && - protocolSwapFeeResult[i].result !== undefined - ) { - // parse the result here using the abi - const poolData = { - totalSupply: totalSupplyResult[i].result!, - swapFee: configResult[i].result!.staticSwapFeePercentage, - protocolSwapFeePercentage: 0n, // TODO can this be added to config? - isPoolPaused: configResult[i].result!.isPoolPaused, - isPoolInRecoveryMode: configResult[i].result!.isPoolInRecoveryMode, - } as PoolData; + const parsedResults = pools.map((pool, i) => { + const pointer = i * 4; + const config = + results[pointer + 1].status === 'success' + ? (results[pointer + 1].result as unknown as PoolConfig) + : undefined; + const poolTokens = + results[pointer + 2].status === 'success' + ? { + tokens: (results[pointer + 2].result as any)[0], + balancesRaw: (results[pointer + 2].result as any)[2], + rateProviders: (results[pointer + 2].result as any)[4], + } + : undefined; + const poolTokenRates = + results[pointer + 3].status === 'success' ? (results[pointer + 3].result as any) : undefined; - parsedResults[result.id] = poolData; - } - // Handle the error + return [ + pool.toLowerCase(), + { + totalSupply: results[pointer].status === 'success' ? (results[pointer].result as bigint) : undefined, + swapFee: config?.staticSwapFeePercentage, + isPoolPaused: config?.isPoolPaused, + isPoolInRecoveryMode: config?.isPoolInRecoveryMode, + tokens: poolTokens?.tokens.map((token: string, i: number) => ({ + address: token.toLowerCase(), + balance: poolTokens.balancesRaw[i], + rateProvider: poolTokens.rateProviders[i], + rate: poolTokenRates[i], + })), + }, + ]; }); - return parsedResults; + + return Object.fromEntries(parsedResults); } diff --git a/modules/sources/contracts/fetch-pool-tokens.ts b/modules/sources/contracts/fetch-pool-tokens.ts index ad5cf87ee..2303f8e72 100644 --- a/modules/sources/contracts/fetch-pool-tokens.ts +++ b/modules/sources/contracts/fetch-pool-tokens.ts @@ -1,5 +1,5 @@ import { ViemClient } from '../types'; -import { vaultV3Abi } from './abis/VaultV3'; +import vaultV3Abi from './abis/VaultV3'; type PoolTokenInfo = { tokens: `0x${string}`[]; @@ -17,7 +17,7 @@ export async function fetchPoolTokenInfo( vault: string, pools: string[], client: ViemClient, - blockNumber: bigint, + blockNumber?: bigint, ): Promise> { const contracts = pools .map((pool) => [ @@ -59,7 +59,7 @@ export async function fetchPoolTokenRates( vault: string, pools: string[], client: ViemClient, - blockNumber: bigint, + blockNumber?: bigint, ): Promise> { const contracts = pools .map((pool) => [ diff --git a/modules/sources/contracts/fetch-protocol-fees.ts b/modules/sources/contracts/fetch-protocol-fees.ts new file mode 100644 index 000000000..4f57adc7c --- /dev/null +++ b/modules/sources/contracts/fetch-protocol-fees.ts @@ -0,0 +1,33 @@ +import { ViemClient } from '../viem-client'; +import vaultV3Abi from './abis/VaultV3'; + +interface ProtocolFees { + protocolSwapFeePercentage?: bigint; + protocolYieldFeePercentage?: bigint; +} + +export async function fetchProtocolFees( + vault: string, + client: ViemClient, + blockNumber?: bigint, +): Promise { + const contracts = [ + { + address: vault as `0x${string}`, + abi: vaultV3Abi, + functionName: 'getProtocolSwapFeePercentage', + }, + { + address: vault as `0x${string}`, + abi: vaultV3Abi, + functionName: 'getProtocolYieldFeePercentage', + }, + ]; + + const results = await client.multicall({ contracts, blockNumber: blockNumber }); + + return { + protocolSwapFeePercentage: results[0].status === 'success' ? results[0].result : undefined, + protocolYieldFeePercentage: results[1].status === 'success' ? results[1].result : undefined, + } as ProtocolFees; +} diff --git a/modules/sources/contracts/index.ts b/modules/sources/contracts/index.ts index 1347526f8..4e78fb511 100644 --- a/modules/sources/contracts/index.ts +++ b/modules/sources/contracts/index.ts @@ -1,3 +1,4 @@ export * from './fetch-erc20-headers'; export * from './fetch-pool-tokens'; export * from './fetch-weighted-pools-data'; +export * from './fetch-pool-data'; diff --git a/modules/sources/enrichers/pool-upserts-usd.ts b/modules/sources/enrichers/pool-upserts-usd.ts new file mode 100644 index 000000000..ef05a07b3 --- /dev/null +++ b/modules/sources/enrichers/pool-upserts-usd.ts @@ -0,0 +1,61 @@ +import _ from 'lodash'; +import { Chain } from '@prisma/client'; +import { prisma } from '../../../prisma/prisma-client'; +import { SubgraphPoolUpsertData } from '../transformers/subgraph-pool-upsert'; +import { formatUnits } from 'viem'; +import { OnchainPoolUpdateData } from '../transformers/onchain-pool-update'; + +type EnrichedTokenData = T extends { poolTokenDynamicData: infer U } + ? U extends any[] + ? { + poolDynamicData: U[number] & { totalLiquidity: number }; + poolTokenDynamicData: (U[number] & { balanceUSD: number })[]; + } + : never + : never; + +/** + * Takes pool data for the DB upserts and enriches them with USD values: + * - pool tokens balances in USD + * - pool liquidity in USD + * + * @param upsertData + * @param chain + * @returns + */ +export async function poolUpsertsUsd( + upsertData: T[], + chain: Chain, + allTokens: { address: string; decimals: number }[], +): Promise<(T & EnrichedTokenData)[]> { + // Get the token prices needed for calculating token balances and total liquidity + const dbPrices = await prisma.prismaTokenCurrentPrice.findMany({ + where: { + tokenAddress: { in: allTokens.map((token) => token.address) }, + chain: chain, + }, + }); + const decimals = Object.fromEntries(allTokens.map((token) => [token.address, token.decimals])); + const prices = Object.fromEntries(dbPrices.map((price) => [price.tokenAddress, price.price])); + + return upsertData.map((pool) => { + const poolTokenDynamicData = pool.poolTokenDynamicData.map((token) => ({ + ...token, + balanceUSD: + parseFloat(formatUnits(BigInt(token.balance), decimals[token.id.split('-')[1]])) * + prices[token.id.split('-')[1]] || 0, + })); + + const poolDynamicData = { + ...pool.poolDynamicData, + // TODO: do we need to filter out BPTs? + totalLiquidity: poolTokenDynamicData.reduce((acc, token) => acc + Number(token.balanceUSD), 0), + }; + + return { + ...pool, + poolDynamicData, + poolTokenDynamicData, + }; + }) as (T & EnrichedTokenData)[]; +} diff --git a/modules/sources/enrichers/swaps-usd.ts b/modules/sources/enrichers/swaps-usd.ts new file mode 100644 index 000000000..f4b5586f1 --- /dev/null +++ b/modules/sources/enrichers/swaps-usd.ts @@ -0,0 +1,69 @@ +import _ from 'lodash'; +import { roundToHour, roundToMidnight } from '../../common/time'; +import { Chain } from '@prisma/client'; +import { prisma } from '../../../prisma/prisma-client'; +import { SwapEvent } from '../../../prisma/prisma-types'; + +/** + * Takes swaps events and enriches them with USD values + * + * @param swaps + * @param chain + * @returns + */ +export async function swapsUsd(swaps: SwapEvent[], chain: Chain): Promise { + // Enrich with USD values + // Group swaps based on timestamp, hourly and daily buckets + const groupedSwaps = _.groupBy(swaps, (swap) => { + const timestamp = swap.blockTimestamp; + // If swap is older than 30 days, round to midnight + if (timestamp < Math.floor(Date.now() / 1000) - 30 * 24 * 60 * 60) { + return roundToMidnight(timestamp); + } + // Otherwise round to the nearest hour + return roundToHour(timestamp); + }); + + const dbEntries: SwapEvent[] = []; + for (const [timestamp, swaps] of Object.entries(groupedSwaps)) { + const tokenPrices = await prisma.prismaTokenPrice.findMany({ + where: { + timestamp: { + equals: parseInt(timestamp), + }, + chain, + }, + }); + + for (const swap of swaps) { + let amountUsd = 0; + const tokenInPrice = + tokenPrices.find((price) => price.tokenAddress === swap.payload.tokenIn.address)?.price || 0; + const tokenOutPrice = + tokenPrices.find((price) => price.tokenAddress === swap.payload.tokenOut.address)?.price || 0; + + // Taking all the chances to get the token price + if (tokenInPrice > 0) { + amountUsd = tokenInPrice * parseFloat(swap.payload.tokenIn.amount); + } else { + amountUsd = tokenOutPrice * parseFloat(swap.payload.tokenOut.amount); + } + + dbEntries.push({ + ...swap, + valueUSD: amountUsd, + payload: { + tokenIn: { + ...swap.payload.tokenIn, + valueUSD: tokenInPrice * parseFloat(swap.payload.tokenIn.amount), + }, + tokenOut: { + ...swap.payload.tokenOut, + valueUSD: tokenOutPrice * parseFloat(swap.payload.tokenOut.amount), + }, + }, + }); + } + } + return dbEntries; +} diff --git a/modules/sources/logs/get-changed-pools.ts b/modules/sources/logs/get-changed-pools.ts new file mode 100644 index 000000000..59fe249ac --- /dev/null +++ b/modules/sources/logs/get-changed-pools.ts @@ -0,0 +1,108 @@ +import { ViemClient } from '../types'; + +const events = [ + { + anonymous: false, + inputs: [ + { + indexed: true, + internalType: 'address', + name: 'pool', + type: 'address', + }, + { + indexed: true, + internalType: 'address', + name: 'liquidityProvider', + type: 'address', + }, + { + indexed: false, + internalType: 'contract IERC20[]', + name: 'tokens', + type: 'address[]', + }, + { + indexed: false, + internalType: 'int256[]', + name: 'deltas', + type: 'int256[]', + }, + ], + name: 'PoolBalanceChanged', + type: 'event', + }, + { + anonymous: false, + inputs: [ + { + indexed: true, + internalType: 'address', + name: 'pool', + type: 'address', + }, + { + indexed: true, + internalType: 'contract IERC20', + name: 'tokenIn', + type: 'address', + }, + { + indexed: true, + internalType: 'contract IERC20', + name: 'tokenOut', + type: 'address', + }, + { + indexed: false, + internalType: 'uint256', + name: 'amountIn', + type: 'uint256', + }, + { + indexed: false, + internalType: 'uint256', + name: 'amountOut', + type: 'uint256', + }, + { + indexed: false, + internalType: 'uint256', + name: 'swapFeeAmount', + type: 'uint256', + }, + ], + name: 'Swap', + type: 'event', + }, +] as const; + +/** + * Extracts pool IDs from PoolBalanceChanged and Swap events changing the pool state + * + * @param vaultAddress - the address of the vault + * @param client - the viem client to use + * @param fromBlock - the block to start from + * @param toBlock - the block to end at. When passing toBlock clients usually complain about too wide block range, without a limit it throws only when max logs are reached + */ +export const getChangedPools = async ( + vaultAddress: string, + client: ViemClient, + fromBlock: bigint, + toBlock?: bigint, // +) => { + // Get Transfer logs from the vault + const logs = await client.getLogs({ + address: vaultAddress as `0x${string}`, + events, + fromBlock, + toBlock, + }); + + // Get pools and make them unique + const changedPools = logs + .map((log) => log.args.pool!) + .filter((value, index, self) => self.indexOf(value) === index); + const latestBlock = logs.reduce((max, log) => (log.blockNumber > max ? log.blockNumber : max), 0n); + return { changedPools, latestBlock }; +}; diff --git a/modules/sources/logs/get-new-pools.ts b/modules/sources/logs/get-new-pools.ts deleted file mode 100644 index 5ef6dc038..000000000 --- a/modules/sources/logs/get-new-pools.ts +++ /dev/null @@ -1,142 +0,0 @@ -import { ViemClient } from '../types'; - -const event = { - anonymous: false, - inputs: [ - { - indexed: true, - internalType: 'address', - name: 'pool', - type: 'address', - }, - { - indexed: true, - internalType: 'address', - name: 'factory', - type: 'address', - }, - { - components: [ - { - internalType: 'contract IERC20', - name: 'token', - type: 'address', - }, - { - internalType: 'enum TokenType', - name: 'tokenType', - type: 'uint8', - }, - { - internalType: 'contract IRateProvider', - name: 'rateProvider', - type: 'address', - }, - { - internalType: 'bool', - name: 'yieldFeeExempt', - type: 'bool', - }, - ], - indexed: false, - internalType: 'struct TokenConfig[]', - name: 'tokenConfig', - type: 'tuple[]', - }, - { - indexed: false, - internalType: 'uint256', - name: 'pauseWindowEndTime', - type: 'uint256', - }, - { - indexed: false, - internalType: 'address', - name: 'pauseManager', - type: 'address', - }, - { - components: [ - { - internalType: 'bool', - name: 'shouldCallBeforeInitialize', - type: 'bool', - }, - { - internalType: 'bool', - name: 'shouldCallAfterInitialize', - type: 'bool', - }, - { - internalType: 'bool', - name: 'shouldCallBeforeSwap', - type: 'bool', - }, - { - internalType: 'bool', - name: 'shouldCallAfterSwap', - type: 'bool', - }, - { - internalType: 'bool', - name: 'shouldCallBeforeAddLiquidity', - type: 'bool', - }, - { - internalType: 'bool', - name: 'shouldCallAfterAddLiquidity', - type: 'bool', - }, - { - internalType: 'bool', - name: 'shouldCallBeforeRemoveLiquidity', - type: 'bool', - }, - { - internalType: 'bool', - name: 'shouldCallAfterRemoveLiquidity', - type: 'bool', - }, - ], - indexed: false, - internalType: 'struct PoolCallbacks', - name: 'callbacks', - type: 'tuple', - }, - { - components: [ - { - internalType: 'bool', - name: 'supportsAddLiquidityCustom', - type: 'bool', - }, - { - internalType: 'bool', - name: 'supportsRemoveLiquidityCustom', - type: 'bool', - }, - ], - indexed: false, - internalType: 'struct LiquidityManagement', - name: 'liquidityManagement', - type: 'tuple', - }, - ], - name: 'PoolRegistered', - type: 'event', -} as const; - -/** - * Extract balances from the contract - */ -export const getNewPools = async (vaultAddress: string, client: ViemClient, fromBlock: bigint) => { - // Get Transfer logs from the vault - const logs = await client.getLogs({ - address: vaultAddress as `0x${string}`, - event, - fromBlock, - }); - - // Parse the logs - return logs; -}; diff --git a/modules/sources/logs/get-pool-balance-changed.ts b/modules/sources/logs/get-pool-balance-changed.ts deleted file mode 100644 index 5140d1d41..000000000 --- a/modules/sources/logs/get-pool-balance-changed.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { ViemClient } from '../types'; - -const event = { - anonymous: false, - inputs: [ - { - indexed: true, - internalType: 'address', - name: 'pool', - type: 'address', - }, - { - indexed: true, - internalType: 'address', - name: 'liquidityProvider', - type: 'address', - }, - { - indexed: false, - internalType: 'contract IERC20[]', - name: 'tokens', - type: 'address[]', - }, - { - indexed: false, - internalType: 'int256[]', - name: 'deltas', - type: 'int256[]', - }, - ], - name: 'PoolBalanceChanged', - type: 'event', -} as const; - -export const getPoolBalanceChanged = async ( - vaultAddress: string, - client: ViemClient, - fromBlock: bigint, - toBlock: bigint | undefined = undefined, -) => { - const logs = await client.getLogs({ - address: vaultAddress as `0x${string}`, - event, - fromBlock, - toBlock, - }); - - return logs; -}; diff --git a/modules/sources/logs/get-swaps.ts b/modules/sources/logs/get-swaps.ts deleted file mode 100644 index b63773e33..000000000 --- a/modules/sources/logs/get-swaps.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { ViemClient } from '../types'; - -const event = { - anonymous: false, - inputs: [ - { - indexed: true, - internalType: 'address', - name: 'pool', - type: 'address', - }, - { - indexed: true, - internalType: 'contract IERC20', - name: 'tokenIn', - type: 'address', - }, - { - indexed: true, - internalType: 'contract IERC20', - name: 'tokenOut', - type: 'address', - }, - { - indexed: false, - internalType: 'uint256', - name: 'amountIn', - type: 'uint256', - }, - { - indexed: false, - internalType: 'uint256', - name: 'amountOut', - type: 'uint256', - }, - { - indexed: false, - internalType: 'uint256', - name: 'swapFeeAmount', - type: 'uint256', - }, - ], - name: 'Swap', - type: 'event', -} as const; - -/** - * Extract balances from the contract - */ -export const getSwaps = async ( - vaultAddress: string, - client: ViemClient, - fromBlock: bigint, - toBlock: bigint | undefined = undefined, -) => { - const logs = await client.getLogs({ - address: vaultAddress as `0x${string}`, - event, - fromBlock, - toBlock, - }); - - return logs; -}; diff --git a/modules/sources/logs/get-transfers.ts b/modules/sources/logs/get-transfers.ts deleted file mode 100644 index 7396b4f38..000000000 --- a/modules/sources/logs/get-transfers.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { ViemClient } from '../types'; - -const event = { - anonymous: false, - inputs: [ - { - indexed: true, - internalType: 'address', - name: 'token', - type: 'address', - }, - { - indexed: true, - internalType: 'address', - name: 'from', - type: 'address', - }, - { - indexed: true, - internalType: 'address', - name: 'to', - type: 'address', - }, - { - indexed: false, - internalType: 'uint256', - name: 'value', - type: 'uint256', - }, - ], - name: 'Transfer', - type: 'event', -} as const; - -/** - * Extract balances from the contract - */ -export const getTransfers = async (vaultAddress: string, client: ViemClient, fromBlock: bigint) => { - // Get Transfer logs from the vault - const logs = await client.getLogs({ - address: vaultAddress as `0x${string}`, - event, - fromBlock, - }); - - // Parse the logs - return logs; -}; diff --git a/modules/sources/logs/index.ts b/modules/sources/logs/index.ts index 7bdec8277..88d86e112 100644 --- a/modules/sources/logs/index.ts +++ b/modules/sources/logs/index.ts @@ -1,3 +1 @@ -export * from './get-swaps'; -export * from './get-transfers'; -export * from './get-new-pools'; +export * from './get-changed-pools'; diff --git a/modules/sources/subgraphs/balancer-v3-pools/index.ts b/modules/sources/subgraphs/balancer-v3-pools/index.ts new file mode 100644 index 000000000..567911831 --- /dev/null +++ b/modules/sources/subgraphs/balancer-v3-pools/index.ts @@ -0,0 +1,44 @@ +import { GraphQLClient } from 'graphql-request'; +import { OrderDirection, Pool_OrderBy, PoolsQueryVariables, TypePoolFragment, getSdk } from './generated/types'; + +/** + * Builds a client based on subgraph URL. + * + * @param subgraphUrl - url of the subgraph + * @returns sdk - generated sdk for the subgraph + */ +export const getPoolsSubgraphClient = (subgraphUrl: string) => { + const client = new GraphQLClient(subgraphUrl); + const sdk = getSdk(client); + + return { + ...sdk, + async getAllPools(where: PoolsQueryVariables['where']): Promise { + const limit = 1000; + let hasMore = true; + let id = `0x`; + let pools: TypePoolFragment[] = []; + + while (hasMore) { + const response = await sdk.Pools({ + where: { ...where, id_gt: id }, + orderBy: Pool_OrderBy.Id, + orderDirection: OrderDirection.Asc, + first: limit, + }); + + pools = [...pools, ...response.pools]; + + if (response.pools.length < limit) { + hasMore = false; + } else { + id = response.pools[response.pools.length - 1].id; + } + } + + return pools; + }, + }; +}; + +export type V3PoolsSubgraphClient = ReturnType; diff --git a/modules/subgraphs/balancer-v3-pools/pools.graphql b/modules/sources/subgraphs/balancer-v3-pools/pools.graphql similarity index 100% rename from modules/subgraphs/balancer-v3-pools/pools.graphql rename to modules/sources/subgraphs/balancer-v3-pools/pools.graphql diff --git a/modules/sources/subgraphs/balancer-v3-vault/index.ts b/modules/sources/subgraphs/balancer-v3-vault/index.ts index bc24fd734..15a220438 100644 --- a/modules/sources/subgraphs/balancer-v3-vault/index.ts +++ b/modules/sources/subgraphs/balancer-v3-vault/index.ts @@ -1,5 +1,73 @@ import { GraphQLClient } from 'graphql-request'; -import { OrderDirection, Pool_OrderBy, SwapFragment, Swap_OrderBy, VaultPoolFragment, getSdk } from './generated/types'; +import { + OrderDirection, + Pool_OrderBy, + PoolsQueryVariables, + SwapFragment, + Swap_OrderBy, + VaultPoolFragment, + getSdk, +} from './generated/types'; + +export function getVaultSubgraphClient(url: string) { + const sdk = getSdk(new GraphQLClient(url)); + + return { + ...sdk, + async getAllInitializedPools(where: PoolsQueryVariables['where']): Promise { + const limit = 1000; + let hasMore = true; + let id = `0x`; + let pools: VaultPoolFragment[] = []; + + while (hasMore) { + const response = await sdk.Pools({ + where: { ...where, id_gt: id, isInitialized: true }, + orderBy: Pool_OrderBy.Id, + orderDirection: OrderDirection.Asc, + first: limit, + }); + + pools = [...pools, ...response.pools]; + + if (response.pools.length < limit) { + hasMore = false; + } else { + id = response.pools[response.pools.length - 1].id; + } + } + + return pools; + }, + async getSwapsSince(timestamp: number): Promise { + const limit = 1000; + let hasMore = true; + let startTimestamp = `${timestamp}`; + let swaps: SwapFragment[] = []; + + while (hasMore) { + const response = await sdk.Swaps({ + where: { blockTimestamp_gt: startTimestamp }, + orderBy: Swap_OrderBy.BlockTimestamp, + orderDirection: OrderDirection.Asc, + first: limit, + }); + + swaps = [...swaps, ...response.swaps]; + + if (response.swaps.length < limit) { + hasMore = false; + } else { + startTimestamp = response.swaps[response.swaps.length - 1].blockTimestamp; + } + } + + return swaps; + }, + }; +} + +export type V3VaultSubgraphClient = ReturnType; export class BalancerVaultSubgraphSource { private sdk: ReturnType; diff --git a/modules/sources/subgraphs/balancer-v3-vault/join-exits.graphql b/modules/sources/subgraphs/balancer-v3-vault/join-exits.graphql index 25c3c07f3..5a9039e10 100644 --- a/modules/sources/subgraphs/balancer-v3-vault/join-exits.graphql +++ b/modules/sources/subgraphs/balancer-v3-vault/join-exits.graphql @@ -6,6 +6,7 @@ fragment JoinExit on JoinExit { pool { id tokens { + index address } } @@ -13,6 +14,7 @@ fragment JoinExit on JoinExit { id } blockNumber + logIndex blockTimestamp transactionHash } diff --git a/modules/sources/subgraphs/balancer-v3-vault/pools.graphql b/modules/sources/subgraphs/balancer-v3-vault/pools.graphql index 58c04b4ac..a191bbfb9 100644 --- a/modules/sources/subgraphs/balancer-v3-vault/pools.graphql +++ b/modules/sources/subgraphs/balancer-v3-vault/pools.graphql @@ -1,6 +1,5 @@ fragment VaultPool on Pool { id - factory address name symbol diff --git a/modules/sources/subgraphs/balancer-v3-vault/swaps.graphql b/modules/sources/subgraphs/balancer-v3-vault/swaps.graphql index 77ec638f0..10ed31b72 100644 --- a/modules/sources/subgraphs/balancer-v3-vault/swaps.graphql +++ b/modules/sources/subgraphs/balancer-v3-vault/swaps.graphql @@ -12,6 +12,7 @@ fragment Swap on Swap { id } blockNumber + logIndex blockTimestamp transactionHash } diff --git a/modules/sources/subgraphs/index.ts b/modules/sources/subgraphs/index.ts new file mode 100644 index 000000000..27f1bc7af --- /dev/null +++ b/modules/sources/subgraphs/index.ts @@ -0,0 +1,3 @@ +export * from './balancer-v3-vault'; +export * from './balancer-v3-pools'; +export * from './joined-client'; diff --git a/modules/sources/subgraphs/joined-client.ts b/modules/sources/subgraphs/joined-client.ts new file mode 100644 index 000000000..dd6b6ac96 --- /dev/null +++ b/modules/sources/subgraphs/joined-client.ts @@ -0,0 +1,28 @@ +import { getVaultSubgraphClient } from './balancer-v3-vault'; +import { getPoolsSubgraphClient } from './balancer-v3-pools'; +import { PoolsQueryVariables } from './balancer-v3-vault/generated/types'; + +export type V3JoinedSubgraphClient = ReturnType; + +export type JoinedSubgraphPool = ReturnType extends Promise< + (infer T)[] +> + ? T + : never; + +export const getV3JoinedSubgraphClient = (vaultSubgraphUrl: string, poolsSubgraphUrl: string) => { + const vaultSubgraphClient = getVaultSubgraphClient(vaultSubgraphUrl); + const poolsSubgraphClient = getPoolsSubgraphClient(poolsSubgraphUrl); + + return { + getAllInitializedPools: async (where?: PoolsQueryVariables['where']) => { + const vaultPools = await vaultSubgraphClient.getAllInitializedPools(where); + const vaultPoolIds = vaultPools.map((pool) => pool.id); + const pools = await poolsSubgraphClient.getAllPools({ id_in: vaultPoolIds }); + return pools.map((pool) => ({ + ...pool, + ...vaultPools.find((vaultPool) => vaultPool.id === pool.id)!, + })); + }, + }; +}; diff --git a/modules/sources/transformers/index.ts b/modules/sources/transformers/index.ts index ca74df49e..384db0c23 100644 --- a/modules/sources/transformers/index.ts +++ b/modules/sources/transformers/index.ts @@ -1,2 +1,4 @@ export * from './pool-transformer'; export * from './pool-tokens-transformer'; +export * from './swap-transformer'; +export * from './tokens-transformer'; diff --git a/modules/sources/transformers/onchain-pool-update.ts b/modules/sources/transformers/onchain-pool-update.ts new file mode 100644 index 000000000..5391ad9e2 --- /dev/null +++ b/modules/sources/transformers/onchain-pool-update.ts @@ -0,0 +1,25 @@ +import { Chain } from '@prisma/client'; +import { OnchainPoolData } from '../contracts'; + +export type OnchainPoolUpdateData = ReturnType; + +export const onchainPoolUpdate = (onchainPoolData: OnchainPoolData, blockNumber: number, chain: Chain, id: string) => { + return { + poolDynamicData: { + poolId: id.toLowerCase(), + chain: chain, + isPaused: onchainPoolData.isPoolPaused, + isInRecoveryMode: onchainPoolData.isPoolInRecoveryMode, + totalShares: String(onchainPoolData.totalSupply), + blockNumber: blockNumber, + swapFee: String(onchainPoolData.swapFee ?? '0'), + }, + poolTokenDynamicData: onchainPoolData.tokens.map((tokenData) => ({ + id: `${id}-${tokenData.address.toLowerCase()}`, + chain: chain, + balance: String(tokenData.balance), + priceRate: String(tokenData.rate), + blockNumber: blockNumber, + })), + }; +}; diff --git a/modules/sources/transformers/pool-tokens-transformer.ts b/modules/sources/transformers/pool-tokens-transformer.ts index d6504423b..2f0d14454 100644 --- a/modules/sources/transformers/pool-tokens-transformer.ts +++ b/modules/sources/transformers/pool-tokens-transformer.ts @@ -1,46 +1,54 @@ -import { VaultPoolFragment as VaultSubgraphPoolFragment } from '../subgraphs/balancer-v3-vault/generated/types'; -import { TypePoolFragment as PoolSubgraphPoolFragment } from '../../subgraphs/balancer-v3-pools/generated/types'; -import { Chain, Prisma, PrismaPoolToken } from '@prisma/client'; +import { Chain, Prisma } from '@prisma/client'; +import { formatUnits } from 'viem'; +import { JoinedSubgraphPool } from '../subgraphs'; -export function poolTokensTransformer(vaultSubgraphPool: VaultSubgraphPoolFragment, chain: Chain): PrismaPoolToken[] { - const tokens = vaultSubgraphPool.tokens ?? []; +// Comment: removing return type, because prisma doesn't export 'PrismaPoolTokenCreateManyPoolInput' type +export function poolTokensTransformer(poolData: JoinedSubgraphPool, chain: Chain) { + const tokens = poolData.tokens ?? []; return tokens.map((token, i) => ({ - id: `${vaultSubgraphPool.id}-${token.address}`.toLowerCase(), - poolId: vaultSubgraphPool.id.toLowerCase(), + id: `${poolData.id}-${token.address}`.toLowerCase(), + poolId: poolData.id.toLowerCase(), chain: chain, address: token.address.toLowerCase(), index: token.index, nestedPoolId: token.nestedPool?.id.toLowerCase() ?? null, - priceRateProvider: vaultSubgraphPool.rateProviders![i].address.toLowerCase(), + priceRateProvider: poolData.rateProviders![i].address.toLowerCase(), exemptFromProtocolYieldFee: token.totalProtocolYieldFee === '0' ? true : false, })); } export function poolTokensDynamicDataTransformer( - vaultSubgraphPool: VaultSubgraphPoolFragment, - poolSubgraphPool: PoolSubgraphPoolFragment, + poolData: JoinedSubgraphPool, + onchainTokensData: { [address: string]: { balance: bigint; rate: bigint } }, chain: Chain, -): Prisma.PrismaPoolTokenDynamicDataCreateManyInput[] { - const tokens = vaultSubgraphPool.tokens ?? []; - return tokens.map((token, i) => ({ - id: `${vaultSubgraphPool.id}-${token.address}`.toLowerCase(), - poolTokenId: `${vaultSubgraphPool.id}-${token.address}`.toLowerCase(), - chain, - blockNumber: parseFloat(vaultSubgraphPool.blockNumber), - balance: token.balance, - balanceUSD: 0, - priceRate: '1', - weight: poolSubgraphPool.weights[token.index] ?? null, - })); +) { + const tokens = poolData.tokens ?? []; + + return tokens.map((token, i) => { + const id = `${poolData.id}-${token.address}`.toLowerCase(); + const onchainTokenData = onchainTokensData[token.address]; + const balance = onchainTokenData?.balance ?? 0n; + const rate = onchainTokenData?.rate ?? 0n; + + return { + id, + poolTokenId: id, + chain, + blockNumber: Number(poolData.blockNumber), + balance: String(balance), + priceRate: String(rate), + weight: poolData.weights[token.index] ?? null, + }; + }); } export function poolExpandedTokensTransformer( - vaultSubgraphPool: VaultSubgraphPoolFragment, + poolData: JoinedSubgraphPool, chain: Chain, ): Prisma.PrismaPoolExpandedTokensCreateManyInput[] { - const tokens = vaultSubgraphPool.tokens ?? []; + const tokens = poolData.tokens ?? []; return tokens.map((token, i) => ({ - poolId: vaultSubgraphPool.id.toLowerCase(), + poolId: poolData.id.toLowerCase(), chain: chain, tokenAddress: token.address.toLowerCase(), nestedPoolId: token.nestedPool?.id.toLowerCase(), diff --git a/modules/sources/transformers/pool-transformer.ts b/modules/sources/transformers/pool-transformer.ts index 61a7bdb4c..c59b27764 100644 --- a/modules/sources/transformers/pool-transformer.ts +++ b/modules/sources/transformers/pool-transformer.ts @@ -1,21 +1,14 @@ -import { Chain, PrismaPool, PrismaPoolType } from '@prisma/client'; -import { VaultPoolFragment as VaultSubgraphPoolFragment } from '../subgraphs/balancer-v3-vault/generated/types'; -import { - TypePoolFragment as PoolSubgraphPoolFragment, - PoolType, -} from '../../subgraphs/balancer-v3-pools/generated/types'; +import { Chain, PrismaPoolType } from '@prisma/client'; +import { PoolType } from '../subgraphs/balancer-v3-pools/generated/types'; import { StableData } from '../../pool/subgraph-mapper'; import { fx, gyro, linear, element, stable } from '../../pool/pool-data'; +import { JoinedSubgraphPool } from '../subgraphs'; -export const poolTransformer = ( - vaultSubgraphPool: VaultSubgraphPoolFragment, - poolSubgraphPool: PoolSubgraphPoolFragment, - chain: Chain, -) => { +export const poolTransformer = (poolData: JoinedSubgraphPool, chain: Chain) => { let type: PrismaPoolType; let typeData: ReturnType | {} = {}; - switch (poolSubgraphPool.factory.type) { + switch (poolData.factory.type) { case PoolType.Weighted: type = PrismaPoolType.WEIGHTED; break; @@ -30,19 +23,19 @@ export const poolTransformer = ( } return { - id: vaultSubgraphPool.id.toLowerCase(), + id: poolData.id.toLowerCase(), chain: chain, vaultVersion: 3, - address: vaultSubgraphPool.id.toLowerCase(), + address: poolData.id.toLowerCase(), decimals: 18, - symbol: vaultSubgraphPool.symbol, - name: vaultSubgraphPool.name, - owner: vaultSubgraphPool.id.toLowerCase(), //TODO - factory: poolSubgraphPool.factory.id.toLowerCase(), + symbol: poolData.symbol, + name: poolData.name, + owner: poolData.id.toLowerCase(), //TODO + factory: poolData.factory.id.toLowerCase(), type: type, typeData: typeData, - version: poolSubgraphPool.factory.version, - createTime: Number(vaultSubgraphPool.blockTimestamp), + version: poolData.factory.version, + createTime: Number(poolData.blockTimestamp), }; }; diff --git a/modules/sources/transformers/subgraph-pool-upsert.ts b/modules/sources/transformers/subgraph-pool-upsert.ts new file mode 100644 index 000000000..2541f015d --- /dev/null +++ b/modules/sources/transformers/subgraph-pool-upsert.ts @@ -0,0 +1,40 @@ +import { Chain } from '@prisma/client'; +import { OnchainPoolData } from '../contracts'; +import { JoinedSubgraphPool } from '../types'; +import { poolTransformer } from './pool-transformer'; +import { formatUnits } from 'viem'; +import { poolTokensDynamicDataTransformer, poolTokensTransformer } from './pool-tokens-transformer'; + +export type SubgraphPoolUpsertData = ReturnType; + +export const subgraphPoolUpsert = ( + subgraphPoolData: JoinedSubgraphPool, + onchainPoolData: OnchainPoolData, + chain: Chain, + blockNumber: number, +) => { + const onchainTokensData = Object.fromEntries(onchainPoolData.tokens.map((token) => [token.address, token])); + + return { + pool: poolTransformer(subgraphPoolData, chain), + poolDynamicData: { + id: subgraphPoolData.id, + poolId: subgraphPoolData.id, + chain: chain, + totalShares: String(onchainPoolData.totalSupply), + totalSharesNum: Number(formatUnits(onchainPoolData.totalSupply, 18)), + blockNumber: Number(blockNumber), + swapFee: String(onchainPoolData.swapFee ?? '0'), + swapEnabled: true, + totalLiquidity: 0, + }, + poolToken: poolTokensTransformer(subgraphPoolData, chain), + poolTokenDynamicData: poolTokensDynamicDataTransformer(subgraphPoolData, onchainTokensData, chain), + poolExpandedTokens: subgraphPoolData.tokens.map(({ address, nestedPool }) => ({ + tokenAddress: address, + poolId: subgraphPoolData.id, + chain: chain, + nestedPoolId: nestedPool?.id, + })), + }; +}; diff --git a/modules/sources/transformers/swap-transformer.ts b/modules/sources/transformers/swap-transformer.ts new file mode 100644 index 000000000..6e97c6e28 --- /dev/null +++ b/modules/sources/transformers/swap-transformer.ts @@ -0,0 +1,41 @@ +import _ from 'lodash'; +import { SwapFragment } from '../subgraphs/balancer-v3-vault/generated/types'; +import { Chain } from '@prisma/client'; +import { SwapEvent } from '../../../prisma/prisma-types'; + +/** + * Takes V3 subgraph swaps and transforms them into DB entries + * + * @param swaps + * @param chain + * @returns + */ +export function swapTransformer(swap: SwapFragment, chain: Chain): SwapEvent { + const vaultVersion = 3; + + return { + id: swap.id, // tx + logIndex + tx: swap.transactionHash, + type: 'SWAP', + poolId: swap.pool, + chain: chain, + vaultVersion, + userAddress: swap.user.id, + blockNumber: Number(swap.blockNumber), + blockTimestamp: Number(swap.blockTimestamp), + logIndex: Number(swap.logIndex), + valueUSD: 0, // Will be calculated later + payload: { + tokenIn: { + address: swap.tokenIn, + amount: swap.tokenAmountIn, + valueUSD: 0, + }, + tokenOut: { + address: swap.tokenOut, + amount: swap.tokenAmountOut, + valueUSD: 0, + }, + }, + }; +} diff --git a/modules/sources/transformers/swaps-transformer.ts b/modules/sources/transformers/swaps-transformer.ts deleted file mode 100644 index 9100b403b..000000000 --- a/modules/sources/transformers/swaps-transformer.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { SwapFragment } from '../subgraphs/balancer-v3-vault/generated/types'; -import { Chain, PrismaPoolSwap } from '@prisma/client'; -import { tokenService } from '../../token/token.service'; - -export async function swapsTransformer(swaps: SwapFragment[], chain: Chain): Promise { - const tokenPrices = await tokenService.getTokenPrices(chain); - - return swaps.map((swap) => { - let valueUSD = 0; - const tokenInPrice = tokenService.getPriceForToken(tokenPrices, swap.tokenIn, chain); // TODO need to get price close to swap timestamp - const tokenOutPrice = tokenService.getPriceForToken(tokenPrices, swap.tokenOut, chain); // TODO need to get price close to swap timestamp - - if (tokenInPrice > 0) { - valueUSD = tokenInPrice * parseFloat(swap.tokenAmountIn); - } else { - valueUSD = tokenOutPrice * parseFloat(swap.tokenAmountOut); - } - - return { - id: swap.id, - chain: chain, - timestamp: parseFloat(swap.blockTimestamp), - poolId: swap.pool, - userAddress: swap.user.id, - tokenIn: swap.tokenIn, - tokenInSym: swap.tokenInSymbol, - tokenOut: swap.tokenOut, - tokenOutSym: swap.tokenOutSymbol, - tokenAmountIn: swap.tokenAmountIn, - tokenAmountOut: swap.tokenAmountOut, - tx: swap.transactionHash, - valueUSD, - batchSwapId: null, - batchSwapIdx: null, - }; - }); -} diff --git a/modules/sources/transformers/tokens-transformer.ts b/modules/sources/transformers/tokens-transformer.ts index 3cc7704b8..799673b00 100644 --- a/modules/sources/transformers/tokens-transformer.ts +++ b/modules/sources/transformers/tokens-transformer.ts @@ -1,35 +1,31 @@ import { VaultPoolFragment as VaultSubgraphPoolFragment } from '../subgraphs/balancer-v3-vault/generated/types'; import { Chain } from '@prisma/client'; -type DbToken = { - address: string; - name: string; - decimals: number; - symbol: string; - chain: Chain; -}; - -export function tokensTransformer(vaultSubgraphPools: VaultSubgraphPoolFragment[], chain: Chain): DbToken[] { - const allTokens: DbToken[] = []; - vaultSubgraphPools.forEach((pool) => { - allTokens.push({ - address: pool.address, - decimals: 18, - name: pool.name, - symbol: pool.symbol, - chain: chain, - }); - if (pool.tokens) { - for (const poolToken of pool.tokens) { - allTokens.push({ - address: poolToken.address, - decimals: poolToken.decimals, - name: poolToken.name, - symbol: poolToken.symbol, - chain: chain, - }); - } - } +/** + * Extracts pool tokens from the vault subgraph pools and adds the BPT token as well. + * Return value is used to store all the token definitions in the database. + * + * @param vaultSubgraphPools + * @param chain + * @returns All tokens from the pools including the BPT token + */ +export function tokensTransformer(vaultSubgraphPools: VaultSubgraphPoolFragment[], chain: Chain) { + return vaultSubgraphPools.flatMap((pool) => { + return [ + ...pool.tokens.map((token) => ({ + address: token.address, + decimals: token.decimals, + name: token.name, + symbol: token.symbol, + chain: chain, + })), + { + address: pool.address, + decimals: 18, + name: pool.name, + symbol: pool.symbol, + chain: chain, + }, + ]; }); - return allTokens; } diff --git a/modules/sources/types.ts b/modules/sources/types.ts index 489287794..2e39ec41a 100644 --- a/modules/sources/types.ts +++ b/modules/sources/types.ts @@ -1 +1,3 @@ export type { ViemClient } from './viem-client'; +export type { OnchainPoolData } from './contracts/fetch-pool-data'; +export type { V3JoinedSubgraphClient, JoinedSubgraphPool } from './subgraphs/joined-client'; diff --git a/modules/subgraphs/balancer-subgraph/balancer-subgraph-queries.graphql b/modules/subgraphs/balancer-subgraph/balancer-subgraph-queries.graphql index faa6bd405..73b6d3490 100644 --- a/modules/subgraphs/balancer-subgraph/balancer-subgraph-queries.graphql +++ b/modules/subgraphs/balancer-subgraph/balancer-subgraph-queries.graphql @@ -379,6 +379,7 @@ fragment BalancerJoinExit on JoinExit { amounts id sender + block timestamp tx type diff --git a/modules/subgraphs/balancer-v3-pools/index.ts b/modules/subgraphs/balancer-v3-pools/index.ts deleted file mode 100644 index 7d0e78052..000000000 --- a/modules/subgraphs/balancer-v3-pools/index.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { GraphQLClient } from 'graphql-request'; -import { getSdk } from './generated/types'; - -/** - * Builds a client based on subgraph URL. - * - * @param subgraphUrl - url of the subgraph - * @returns sdk - generated sdk for the subgraph - */ -export const getPoolsSubgraphClient = (subgraphUrl: string) => { - const client = new GraphQLClient(subgraphUrl); - const sdk = getSdk(client); - - return sdk; -}; - -export type V3PoolsSubgraphClient = ReturnType; diff --git a/modules/user/user.resolvers.ts b/modules/user/user.resolvers.ts index 7750d639c..528a6765b 100644 --- a/modules/user/user.resolvers.ts +++ b/modules/user/user.resolvers.ts @@ -1,8 +1,9 @@ -import { Resolvers } from '../../schema'; +import { GqlPoolJoinExit, GqlPoolSwap, Resolvers } from '../../schema'; import { userService } from './user.service'; import { getRequiredAccountAddress, isAdminRoute } from '../auth/auth-context'; import { tokenService } from '../token/token.service'; import { headerChain } from '../context/header-chain'; +import { QueriesController } from '../controllers/queries-controller'; const resolvers: Resolvers = { Query: { @@ -26,6 +27,7 @@ const resolvers: Resolvers = { ), })); }, + // TODO: Deprecated in favor of poolGetEvents userGetPoolJoinExits: async (parent, { first, skip, poolId, chain, address }, context) => { const currentChain = headerChain(); if (!chain && currentChain) { @@ -35,8 +37,14 @@ const resolvers: Resolvers = { } const accountAddress = address || getRequiredAccountAddress(context); - return userService.getUserPoolInvestments(accountAddress, poolId, chain, first, skip); + const swaps = await QueriesController().getEvents({ + first, + skip, + where: { typeIn: ['JOIN', 'EXIT'], poolIdIn: [poolId], chainIn: [chain], userAddress: accountAddress }, + }); + return swaps as GqlPoolJoinExit[]; }, + // TODO: Deprecated in favor of poolGetEvents userGetSwaps: async (parent, { first, skip, poolId, chain, address }, context) => { const currentChain = headerChain(); if (!chain && currentChain) { @@ -45,7 +53,12 @@ const resolvers: Resolvers = { throw new Error('userGetSwaps error: Provide "chain" param'); } const accountAddress = address || getRequiredAccountAddress(context); - return userService.getUserSwaps(accountAddress, poolId, chain, first, skip); + const swaps = await QueriesController().getEvents({ + first, + skip, + where: { typeIn: ['SWAP'], poolIdIn: [poolId], chainIn: [chain], userAddress: accountAddress }, + }); + return swaps as GqlPoolSwap[]; }, userGetStaking: async (parent, { chains, address }, context) => { const currentChain = headerChain(); diff --git a/modules/user/user.service.ts b/modules/user/user.service.ts index 6b3e01a1b..4ea7e6420 100644 --- a/modules/user/user.service.ts +++ b/modules/user/user.service.ts @@ -23,26 +23,6 @@ export class UserService { return this.userBalanceService.getUserPoolBalances(address, chains); } - public async getUserPoolInvestments( - address: string, - poolId: string, - chain: Chain, - first?: number, - skip?: number, - ): Promise { - return this.poolSwapService.getUserJoinExitsForPool(address, poolId, chain, first, skip); - } - - public async getUserSwaps( - address: string, - poolId: string, - chain: Chain, - first?: number, - skip?: number, - ): Promise { - return this.poolSwapService.getUserSwapsForPool(address, poolId, chain, first, skip); - } - public async getUserFbeetsBalance(address: string): Promise> { return this.userBalanceService.getUserFbeetsBalance(address); } diff --git a/modules/web3/multicaller-viem.ts b/modules/web3/multicaller-viem.ts new file mode 100644 index 000000000..34f525eb6 --- /dev/null +++ b/modules/web3/multicaller-viem.ts @@ -0,0 +1,22 @@ +import { ContractFunctionParameters } from 'viem'; +import { ViemClient } from '../../modules/sources/types'; + +type Call = { path: string } & ContractFunctionParameters; + +/** + * Wrapper for multicall that takes an array of calls and returns an object with the results mapped by the path + * + * @param client + * @param calls + * @returns + */ +export const multicallViem = async (client: ViemClient, calls: Call[]) => { + const results = await client.multicall({ contracts: calls }); + + const parsedResults = calls.map((call, i) => [ + call.path, + results[i].status === 'success' ? results[i].result : undefined, + ]); + + return Object.fromEntries(parsedResults); +}; diff --git a/prisma/migrations/20240305205511_add_pool_events/migration.sql b/prisma/migrations/20240305205511_add_pool_events/migration.sql new file mode 100644 index 000000000..e15a3baaa --- /dev/null +++ b/prisma/migrations/20240305205511_add_pool_events/migration.sql @@ -0,0 +1,38 @@ +-- CreateEnum +CREATE TYPE "PoolEventType" AS ENUM ('JOIN', 'EXIT', 'SWAP'); + +-- CreateTable +CREATE TABLE "PoolEvent" ( + "id" TEXT NOT NULL, + "tx" TEXT NOT NULL, + "type" "PoolEventType" NOT NULL, + "chain" "Chain" NOT NULL, + "poolId" TEXT NOT NULL, + "userAddress" TEXT NOT NULL, + "blockNumber" INTEGER NOT NULL, + "blockTimestamp" INTEGER NOT NULL, + "logIndex" INTEGER NOT NULL, + "vaultVersion" INTEGER NOT NULL DEFAULT 2, + "valueUSD" DOUBLE PRECISION NOT NULL, + "payload" JSONB NOT NULL, + + CONSTRAINT "PoolEvent_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "PoolEvent_type_idx" ON "PoolEvent"("type"); + +-- CreateIndex +CREATE INDEX "PoolEvent_chain_idx" ON "PoolEvent"("chain"); + +-- CreateIndex +CREATE INDEX "PoolEvent_chain_poolId_idx" ON "PoolEvent"("chain", "poolId"); + +-- CreateIndex +CREATE INDEX "PoolEvent_userAddress_idx" ON "PoolEvent"("userAddress"); + +-- CreateIndex +CREATE INDEX "PoolEvent_blockNumber_idx" ON "PoolEvent"("blockNumber"); + +-- CreateIndex +CREATE INDEX "PoolEvent_logIndex_idx" ON "PoolEvent"("logIndex"); diff --git a/prisma/prisma-types.ts b/prisma/prisma-types.ts index 390f90532..16ca19cc9 100644 --- a/prisma/prisma-types.ts +++ b/prisma/prisma-types.ts @@ -1,4 +1,31 @@ -import { Prisma, PrismaToken, PrismaTokenPrice, PrismaTokenTypeOption } from '@prisma/client'; +import { Prisma, PrismaToken, PrismaTokenTypeOption, PoolEvent } from '@prisma/client'; + +export type SwapEvent = PoolEvent & { + type: 'SWAP'; + payload: { + tokenIn: { + address: string; + amount: string; + valueUSD: number; + }; + tokenOut: { + address: string; + amount: string; + valueUSD: number; + }; + }; +}; + +export type JoinExitEvent = PoolEvent & { + type: 'JOIN' | 'EXIT'; + payload: { + tokens: { + address: string; + amount: string; + valueUSD: number; + }[]; + }; +}; export const poolWithTokens = Prisma.validator()({ include: { tokens: true }, diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 0d155eef8..516fd063b 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -542,6 +542,34 @@ model PrismaReliquaryTokenBalanceSnapshot { balance String } +enum PoolEventType { + JOIN + EXIT + SWAP +} + +model PoolEvent { + id String @id + tx String + type PoolEventType + chain Chain + poolId String + userAddress String + blockNumber Int + blockTimestamp Int + logIndex Int + vaultVersion Int @default(2) + valueUSD Float + payload Json + + @@index([type]) + @@index([chain]) + @@index([chain, poolId]) + @@index([userAddress]) + @@index([blockNumber]) + @@index([logIndex]) +} + model PrismaSftmxStakingData { diff --git a/tasks/index.ts b/tasks/index.ts index 49ace8c83..11b50ce31 100644 --- a/tasks/index.ts +++ b/tasks/index.ts @@ -3,11 +3,28 @@ import { JobsController } from '../modules/controllers/jobs-controller'; // TODO needed? const jobsController = JobsController(); +/** + * Used to run jobs locally from the command line + * + * @param job + * @param chain + * @returns + */ async function run(job: string = process.argv[2], chain: string = process.argv[3]) { console.log('Running job', job, chain); - if (job === 'sync-changed-pools-v3') { - return jobsController.addMissingPoolsFromSubgraph(chain); + if (job === 'add-pools-v3') { + return jobsController.addPools(chain); + } else if (job === 'reload-pools-v3') { + return jobsController.reloadPools(chain); + } else if (job === 'sync-pools-v3') { + return jobsController.syncPools(chain); + } else if (job === 'sync-join-exits-v3') { + return jobsController.syncJoinExitsV3(chain); + } else if (job === 'sync-join-exits-v2') { + return jobsController.syncJoinExitsV2(chain); + } else if (job === 'sync-swaps-v3') { + return jobsController.syncSwapsV3(chain); } return Promise.reject(new Error(`Unknown job: ${job}`)); diff --git a/worker/job-handlers.ts b/worker/job-handlers.ts index e21700aa3..8a1dd6ddb 100644 --- a/worker/job-handlers.ts +++ b/worker/job-handlers.ts @@ -108,15 +108,6 @@ export function configureWorkerRoutes(app: Express) { await runIfNotAlreadyRunning(job.name, chainId, () => poolService.syncChangedPools(), res, next); break; - case 'sync-changed-pools-v3': - await runIfNotAlreadyRunning( - job.name, - chainId, - () => jobsController.updateOnChainDataChangedPools(chainId), - res, - next, - ); - break; case 'user-sync-wallet-balances-for-all-pools': await runIfNotAlreadyRunning( job.name, @@ -183,15 +174,6 @@ export function configureWorkerRoutes(app: Express) { next, ); break; - case 'load-on-chain-data-for-pools-with-active-updates-v3': - await runIfNotAlreadyRunning( - job.name, - chainId, - () => poolService.loadOnChainDataForPoolsWithActiveUpdatesV3(), - res, - next, - ); - break; case 'sync-new-pools-from-subgraph': await runIfNotAlreadyRunning( job.name, @@ -201,11 +183,11 @@ export function configureWorkerRoutes(app: Express) { next, ); break; - case 'sync-new-pools-from-subgraph-v3': + case 'sync-join-exits-v2': await runIfNotAlreadyRunning( job.name, chainId, - () => jobsController.addMissingPoolsFromSubgraph(chainId), + () => jobsController.syncJoinExitsV2(chainId), res, next, ); @@ -225,15 +207,6 @@ export function configureWorkerRoutes(app: Express) { next, ); break; - case 'update-liquidity-24h-ago-for-all-pools-v3': - await runIfNotAlreadyRunning( - job.name, - chainId, - () => poolService.updateLiquidity24hAgoForAllPoolsV3(), - res, - next, - ); - break; case 'cache-average-block-time': await runIfNotAlreadyRunning( job.name, @@ -264,15 +237,6 @@ export function configureWorkerRoutes(app: Express) { next, ); break; - case 'sync-latest-snapshots-for-all-pools-v3': - await runIfNotAlreadyRunning( - job.name, - chainId, - () => poolService.syncLatestSnapshotsForAllPoolsV3(), - res, - next, - ); - break; case 'update-lifetime-values-for-all-pools': await runIfNotAlreadyRunning( job.name, @@ -282,15 +246,6 @@ export function configureWorkerRoutes(app: Express) { next, ); break; - case 'update-lifetime-values-for-all-pools-v3': - await runIfNotAlreadyRunning( - job.name, - chainId, - () => poolService.updateLifetimeValuesForAllPoolsV3(), - res, - next, - ); - break; case 'feed-data-to-datastudio': await runIfNotAlreadyRunning( job.name, @@ -365,6 +320,52 @@ export function configureWorkerRoutes(app: Express) { case 'sync-sftmx-withdrawal-requests': await runIfNotAlreadyRunning(job.name, chainId, () => sftmxService.syncWithdrawalRequests(), res, next); break; + // V3 Jobs + case 'add-pools-v3': + await runIfNotAlreadyRunning(job.name, chainId, () => jobsController.addPools(chainId), res, next); + break; + case 'sync-pools-v3': + await runIfNotAlreadyRunning(job.name, chainId, () => jobsController.syncPools(chainId), res, next); + break; + case 'sync-swaps-v3': + await runIfNotAlreadyRunning(job.name, chainId, () => jobsController.syncSwapsV3(chainId), res, next); + break; + case 'sync-join-exits-v3': + await runIfNotAlreadyRunning( + job.name, + chainId, + () => jobsController.syncJoinExitsV3(chainId), + res, + next, + ); + break; + case 'update-liquidity-24h-ago-for-all-pools-v3': + await runIfNotAlreadyRunning( + job.name, + chainId, + () => poolService.updateLiquidity24hAgoForAllPoolsV3(), + res, + next, + ); + break; + case 'sync-latest-snapshots-for-all-pools-v3': + await runIfNotAlreadyRunning( + job.name, + chainId, + () => poolService.syncLatestSnapshotsForAllPoolsV3(), + res, + next, + ); + break; + case 'update-lifetime-values-for-all-pools-v3': + await runIfNotAlreadyRunning( + job.name, + chainId, + () => poolService.updateLifetimeValuesForAllPoolsV3(), + res, + next, + ); + break; case 'update-swaps-volume-and-fees-v3': await runIfNotAlreadyRunning( job.name,