diff --git a/jest.config.js b/jest.config.js index 2dfa7e889..ab69250fa 100644 --- a/jest.config.js +++ b/jest.config.js @@ -5,4 +5,9 @@ module.exports = { setupFiles: ['dotenv/config'], watchPlugins: ['jest-watch-typeahead/filename', 'jest-watch-typeahead/testname'], setupFilesAfterEnv: ['/modules/tests-helper/globalTestSetup.ts'], + globals: { + 'ts-jest': { + isolatedModules: true, + }, + }, }; diff --git a/modules/pool/lib/pool-sync.service.ts b/modules/pool/lib/pool-sync.service.ts index d4c4dfdc6..e1163fa7e 100644 --- a/modules/pool/lib/pool-sync.service.ts +++ b/modules/pool/lib/pool-sync.service.ts @@ -1,47 +1,70 @@ import * as _ from 'lodash'; import { prisma } from '../../../prisma/prisma-client'; -import { PrismaLastBlockSyncedCategory } from '@prisma/client'; +import { Chain, PrismaLastBlockSyncedCategory } from '@prisma/client'; import { poolService } from '../pool.service'; -import { getContractAt } from '../../web3/contract'; import VaultAbi from '../abi/Vault.json'; import { networkContext } from '../../network/network-context.service'; +import { getEvents } from '../../web3/events'; export class PoolSyncService { + get chain(): Chain { + return networkContext.chain; + } + + get chainId(): string { + return networkContext.chainId; + } + + get provider() { + return networkContext.provider; + } + + get vaultAddress() { + return networkContext.data.balancer.vault; + } + + get rpcUrl() { + return networkContext.data.rpcUrl; + } + + get rpcMaxBlockRange() { + return networkContext.data.rpcMaxBlockRange; + } + public async syncChangedPools() { let lastSync = await prisma.prismaLastBlockSynced.findUnique({ - where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } }, + where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } }, }); const lastSyncBlock = lastSync?.blockNumber ?? 0; - const latestBlock = await networkContext.provider.getBlockNumber(); + const latestBlock = await this.provider.getBlockNumber(); const startBlock = lastSyncBlock + 1; - const endBlock = - latestBlock - startBlock > networkContext.data.rpcMaxBlockRange - ? startBlock + networkContext.data.rpcMaxBlockRange - : latestBlock; + const endBlock = latestBlock; // no new blocks have been minted, needed for slow networks if (startBlock > endBlock) { return; } - const contract = getContractAt(networkContext.data.balancer.vault, VaultAbi); + // Update status for all the pools + const allPools = await prisma.prismaPool.findMany({ + where: { chain: this.chain }, + }); + await poolService.updateOnChainStatusForPools(allPools.map((pool) => pool.id)); - const events = await contract.queryFilter( - { address: networkContext.data.balancer.vault }, + // Get state changing events from the vault contract + const filteredEvents = await getEvents( startBlock, endBlock, + [this.vaultAddress], + ['PoolBalanceChanged', 'PoolBalanceManaged', 'Swap'], + this.rpcUrl, + this.rpcMaxBlockRange, + VaultAbi, ); - const allPools = await prisma.prismaPool.findMany({ - where: { chain: networkContext.chain }, - }); + console.log(`sync-changed-pools-${this.chainId} found ${filteredEvents.length} events`); - await poolService.updateOnChainStatusForPools(allPools.map((pool) => pool.id)); - - const filteredEvents = events.filter((event) => - ['PoolBalanceChanged', 'PoolBalanceManaged', 'Swap'].includes(event.event!), - ); const poolIds: string[] = _.uniq(filteredEvents.map((event) => event.args!.poolId)); if (poolIds.length !== 0) { console.log(`Syncing ${poolIds.length} pools between blocks ${startBlock} and ${endBlock}`); @@ -52,23 +75,23 @@ export class PoolSyncService { } await prisma.prismaLastBlockSynced.upsert({ - where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } }, + where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } }, update: { blockNumber: endBlock, }, create: { category: PrismaLastBlockSyncedCategory.POOLS, blockNumber: endBlock, - chain: networkContext.chain, + chain: this.chain, }, }); } public async initOnChainDataForAllPools() { - const latestBlock = await networkContext.provider.getBlockNumber(); + const latestBlock = await this.provider.getBlockNumber(); const allPools = await prisma.prismaPool.findMany({ - where: { chain: networkContext.chain }, + where: { chain: this.chain }, }); const poolIds = allPools.map((pool) => pool.id); @@ -81,14 +104,14 @@ export class PoolSyncService { await poolService.updateVolumeAndFeeValuesForPools(poolIds); await prisma.prismaLastBlockSynced.upsert({ - where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } }, + where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } }, update: { blockNumber: latestBlock, }, create: { category: PrismaLastBlockSyncedCategory.POOLS, blockNumber: latestBlock, - chain: networkContext.chain, + chain: this.chain, }, }); } @@ -107,10 +130,10 @@ export class PoolSyncService { await prisma.prismaPoolCategory.createMany({ data: poolsWithGauges.map((pool) => ({ - id: `${networkContext.chain}-${pool.id}-INCENTIVIZED`, + id: `${this.chain}-${pool.id}-INCENTIVIZED`, poolId: pool.id, category: 'INCENTIVIZED' as const, - chain: networkContext.chain, + chain: this.chain, })), skipDuplicates: true, }); @@ -118,7 +141,7 @@ export class PoolSyncService { await prisma.prismaPoolCategory.deleteMany({ where: { category: 'INCENTIVIZED', - chain: networkContext.chain, + chain: this.chain, poolId: { notIn: poolsWithGauges.map((pool) => pool.id), }, diff --git a/modules/user/lib/user-sync-gauge-balance.service.ts b/modules/user/lib/user-sync-gauge-balance.service.ts index 04b79bd9e..fcf8eb2a5 100644 --- a/modules/user/lib/user-sync-gauge-balance.service.ts +++ b/modules/user/lib/user-sync-gauge-balance.service.ts @@ -5,13 +5,13 @@ import _ from 'lodash'; import { prismaBulkExecuteOperations } from '../../../prisma/prisma-util'; import RewardsOnlyGaugeAbi from './abi/RewardsOnlyGauge.json'; import { Multicaller } from '../../web3/multicaller'; -import { ethers } from 'ethers'; import { formatFixed } from '@ethersproject/bignumber'; import { PrismaPoolStakingType } from '@prisma/client'; import { networkContext } from '../../network/network-context.service'; import ERC20Abi from '../../web3/abi/ERC20.json'; import { gaugeSubgraphService } from '../../subgraphs/gauge-subgraph/gauge-subgraph.service'; import { AddressZero } from '@ethersproject/constants'; +import { getEvents } from '../../web3/events'; export class UserSyncGaugeBalanceService implements UserStakedBalanceService { get chain() { @@ -138,69 +138,30 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService { we need to figure out which users have a changed balance on any gauge contract and update their balance, therefore we check all transfer events since the last synced block */ - const erc20Interface = new ethers.utils.Interface(ERC20Abi); - // Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range - const toBlock = Math.min(startBlock + 50 * this.rpcMaxBlockRange, latestBlock); - const range = toBlock - startBlock; + // Split the range into smaller chunks to avoid RPC limits, setting up to 5 times max block range + const toBlock = Math.min(startBlock + 5 * this.rpcMaxBlockRange, latestBlock); console.log(`user-sync-staked-balances-${this.chainId} block range from ${startBlock} to ${toBlock}`); - console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${_.uniq(gaugeAddresses).length} gauges.`); - const events = await Promise.all( - // Getting logs in batches of max blocks allowed by RPC - Array.from({ length: Math.ceil(range / this.rpcMaxBlockRange) }, (_, i) => i).map(async (i) => { - const from = startBlock + i * this.rpcMaxBlockRange; - const to = Math.min(startBlock + (i + 1) * this.rpcMaxBlockRange, toBlock); - - // Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side - const logRequests: Promise[] = _.chunk(gaugeAddresses, 500).map((addresses) => { - // Fetch logs with a raw json request until we support Viem or Ethers6 - const payload = { - jsonrpc: '2.0', - id: 1, - method: 'eth_getLogs', - params: [ - { - address: addresses, - topics: [ethers.utils.id('Transfer(address,address,uint256)')], - fromBlock: '0x' + BigInt(from).toString(16), - toBlock: '0x' + BigInt(to).toString(16), - }, - ], - }; - - return fetch(this.rpcUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(payload), - }) - .then((response) => response.json() as Promise<{ result: ethers.providers.Log[] }>) - .then(({ result }) => result) - .catch((error) => { - console.error('Error fetching logs:', error); - return []; - }); - }); - - const events = await Promise.all(logRequests).then((res) => res.flat()); - - return events; - }), - ).then((res) => res.flat().filter((event) => event)); + console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${gaugeAddresses.length} gauges.`); + + const events = await getEvents( + startBlock, + toBlock, + gaugeAddresses, + ['Transfer'], + this.rpcUrl, + this.rpcMaxBlockRange, + ERC20Abi, + ); console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${gaugeAddresses.length} gauges done`); const balancesToFetch = _.uniqBy( events - .map((event) => { - const parsed = erc20Interface.parseLog(event); - - return [ - { erc20Address: event.address, userAddress: parsed.args?.from as string }, - { erc20Address: event.address, userAddress: parsed.args?.to as string }, - ]; - }) + .map((event) => [ + { erc20Address: event.address, userAddress: event.args?.from as string }, + { erc20Address: event.address, userAddress: event.args?.to as string }, + ]) .flat(), (entry) => entry.erc20Address + entry.userAddress, ); diff --git a/modules/user/lib/user-sync-wallet-balance.service.ts b/modules/user/lib/user-sync-wallet-balance.service.ts index 7389260aa..d971d87e4 100644 --- a/modules/user/lib/user-sync-wallet-balance.service.ts +++ b/modules/user/lib/user-sync-wallet-balance.service.ts @@ -1,7 +1,6 @@ import { isSameAddress } from '@balancer-labs/sdk'; import { formatFixed } from '@ethersproject/bignumber'; import { AddressZero } from '@ethersproject/constants'; -import { ethers } from 'ethers'; import _ from 'lodash'; import { prisma } from '../../../prisma/prisma-client'; import { prismaBulkExecuteOperations } from '../../../prisma/prisma-util'; @@ -12,6 +11,7 @@ import { Multicaller, MulticallUserBalance } from '../../web3/multicaller'; import ERC20Abi from '../../web3/abi/ERC20.json'; import { networkContext } from '../../network/network-context.service'; import { AllNetworkConfigs } from '../../network/network-config'; +import { getEvents } from '../../web3/events'; export class UserSyncWalletBalanceService { beetsBarService?: BeetsBarSubgraphService; @@ -130,7 +130,6 @@ export class UserSyncWalletBalanceService { } public async syncChangedBalancesForAllPools() { - const erc20Interface = new ethers.utils.Interface(ERC20Abi); const latestBlock = await this.provider.getBlockNumber(); const syncStatus = await prisma.prismaUserBalanceSyncStatus.findUnique({ where: { type_chain: { type: 'WALLET', chain: this.chain } }, @@ -157,62 +156,20 @@ export class UserSyncWalletBalanceService { return; } - // Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range - const toBlock = Math.min(fromBlock + 50 * this.rpcMaxBlockRange, latestBlock); - const range = toBlock - fromBlock; + // Split the range into smaller chunks to avoid RPC limits, setting up to 5 times max block range + const toBlock = Math.min(fromBlock + 5 * this.rpcMaxBlockRange, latestBlock); console.log(`UserWalletBalanceService: syncing balances from ${fromBlock} to ${toBlock}`); console.log(`user-sync-wallet-balances-for-all-pools-${this.chainId} getLogs of ${poolAddresses.length} pools`); - const events = await Promise.all( - // Getting logs in batches of max blocks allowed by RPC - Array.from({ length: Math.ceil(range / this.rpcMaxBlockRange) }, (_, i) => i).map(async (i) => { - const from = fromBlock + i * this.rpcMaxBlockRange; - const to = Math.min(fromBlock + (i + 1) * this.rpcMaxBlockRange, toBlock); - - // Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side - const logRequests: Promise[] = _.chunk(poolAddresses, 500).map((addresses) => { - // Fetch logs with a raw json request until we support Viem or Ethers6 - const payload = { - jsonrpc: '2.0', - id: 1, - method: 'eth_getLogs', - params: [ - { - address: addresses, - topics: [ethers.utils.id('Transfer(address,address,uint256)')], - fromBlock: '0x' + BigInt(from).toString(16), - toBlock: '0x' + BigInt(to).toString(16), - }, - ], - }; - - return fetch(AllNetworkConfigs[this.chainId].data.rpcUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(payload), - }) - .then((response) => response.json() as Promise<{ result: ethers.providers.Log[] }>) - .then(({ result }) => result) - .catch((error) => { - console.error('Error fetching logs:', error); - return []; - }); - - // Fetching logs with Viem - // viemClient.getLogs({ - // address: addresses, - // event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'), - // fromBlock: BigInt(from), - // toBlock: BigInt(to), - // }) - }); - - const events = await Promise.all(logRequests).then((res) => res.flat()); - - return events; - }), - ).then((res) => res.flat().filter((event) => event)); + + const events = await getEvents( + fromBlock, + toBlock, + poolAddresses, + ['Transfer'], + AllNetworkConfigs[this.chainId].data.rpcUrl, + this.rpcMaxBlockRange, + ERC20Abi, + ); console.log( `user-sync-wallet-balances-for-all-pools-${this.chainId} getLogs of ${poolAddresses.length} pools done`, @@ -230,14 +187,10 @@ export class UserSyncWalletBalanceService { //we also need to track fbeets balance relevantERC20Addresses.includes(event.address.toLowerCase()), ) - .map((event) => { - const parsed = erc20Interface.parseLog(event); - - return [ - { erc20Address: event.address, userAddress: parsed.args?.from as string }, - { erc20Address: event.address, userAddress: parsed.args?.to as string }, - ]; - }) + .map((event) => [ + { erc20Address: event.address, userAddress: event.args?.from as string }, + { erc20Address: event.address, userAddress: event.args?.to as string }, + ]) .flat(), (entry) => entry.erc20Address + entry.userAddress, ); diff --git a/modules/web3/events.test.ts b/modules/web3/events.test.ts new file mode 100644 index 000000000..0ebfd0c58 --- /dev/null +++ b/modules/web3/events.test.ts @@ -0,0 +1,57 @@ +import { getEvents } from './events'; + +global.fetch = jest.fn(); + +describe('getEvents', () => { + beforeEach(() => { + // Clear all instances and calls to constructor and all methods: + (global.fetch as jest.MockedFunction).mockClear(); + }); + + it('fetches successfully', async () => { + (global.fetch as jest.MockedFunction).mockImplementation(() => + Promise.resolve(new Response(JSON.stringify({ result: [] }))), + ); + + const result = await getEvents(0, 100, ['0x123'], ['0x456'], 'http://localhost', 50); + expect(result).toEqual([]); + expect(global.fetch).toHaveBeenCalledTimes(2); + }); + + it('handles fetch error', async () => { + (global.fetch as jest.MockedFunction).mockImplementationOnce(() => Promise.reject('Fetch error')); + + try { + await getEvents(0, 100, ['0x123'], ['0x456'], 'http://localhost', 50); + } catch (error) { + expect(error).toEqual('Fetch error'); + } + expect(global.fetch).toHaveBeenCalledTimes(2); + }); + + it('fetches same blocks twice when addresses are more than 500', async () => { + const addresses = new Array(501).fill('0x1'); + const errorMessage = 'block range is too wide'; + const log = { address: 'address' }; + + (global.fetch as jest.MockedFunction) + .mockImplementationOnce(() => + Promise.resolve(new Response(JSON.stringify({ error: { message: errorMessage } }))), + ) + // Return log for first block only + .mockImplementation((url, options) => { + const { body } = options || {}; + const payload = JSON.parse(body as string); + const result: { address: string }[] = []; + if (payload.params[0].fromBlock === '0x0') { + result.push(log); + } + return Promise.resolve(new Response(JSON.stringify({ result }))); + }); + + const result = await getEvents(0, 4, addresses, ['topic'], 'http://localhost', 2); + + expect(result.length).toEqual(2); + expect(global.fetch).toHaveBeenCalledTimes(6); + }); +}); diff --git a/modules/web3/events.ts b/modules/web3/events.ts new file mode 100644 index 000000000..06e3f2bd3 --- /dev/null +++ b/modules/web3/events.ts @@ -0,0 +1,167 @@ +import { Event } from '@ethersproject/contracts'; +import { Interface } from '@ethersproject/abi'; +import { chunk } from 'lodash'; + +export const getEvents = async ( + fromBlock: number, + toBlock: number, + addresses: string[], + topics: string[], + rpcUrl: string, + rpcMaxBlockRange: number, + abi?: any, + maxAddresses = 500, +): Promise => { + let iEvents: Interface; + if (abi && abi.length > 0) { + iEvents = new Interface(abi); + // check if topics are event names + const alreadyEncoded = topics.every((topic) => topic.startsWith('0x')); + if (!alreadyEncoded) topics = topics.map((topic) => iEvents.getEventTopic(topic)); + } + + const range = toBlock - fromBlock; + const numBatches = Math.ceil(range / rpcMaxBlockRange); + + const promises: Promise[] = []; + + for (let i = 0; i < numBatches; i++) { + const from = fromBlock + (i > 0 ? 1 : 0) + i * rpcMaxBlockRange; + const to = Math.min(fromBlock + (i + 1) * rpcMaxBlockRange, toBlock); + + const addressChunks = chunk(addresses, maxAddresses); + + for (const addressChunk of addressChunks) { + const promise = fetchLogs(from, to, addressChunk, topics, rpcUrl).catch((e: any) => { + // Ankr RPC returns error if block range is too wide + if (e.includes && e.includes('block range is too wide')) { + return getEvents(from, to, addressChunk, topics, rpcUrl, rpcMaxBlockRange / 2); + } + + // Infura returns 'more than 10000 results' error if block range is too wide + // error format: + // "query returned more than 10000 results. Try with this block range [0x30CE171, 0x30CE1C9]." + if (e.includes && e.includes('query returned more than 10000 results')) { + const range = e + .match(/\[([0-9a-fA-F, x]+)\]/) + .pop() + .split(', ') + .map((hex: string) => parseInt(hex, 16)); + + return getEvents(from, to, addressChunk, topics, rpcUrl, range[1] - range[0]); + } + + // Alchemy / tenderly rate limit + if ( + e.includes && + (e.includes('Your app has exceeded its compute units per second capacity') || + e.includes('rate limit exceeded')) + ) { + return new Promise((resolve) => { + setTimeout(() => { + resolve(getEvents(from, to, addressChunk, topics, rpcUrl, rpcMaxBlockRange)); + }, 1000); + }); + } + + // Allnodes addresses size limit + if (e.includes && e.includes('specify less number of addresses')) { + return new Promise((resolve) => { + setTimeout(() => { + resolve( + getEvents( + from, + to, + addressChunk, + topics, + rpcUrl, + rpcMaxBlockRange, + undefined, + maxAddresses / 2, + ), + ); + }, 1000); + }); + } + + console.error('Error fetching logs:', e); + return Promise.reject(e); + }); + + promises.push(promise); + } + } + + return Promise.all(promises) + .then((res) => res.flat().filter((log) => log)) + .then((logs) => decodeLogs(logs, iEvents)); +}; + +const fetchLogs = async ( + from: number, + to: number, + addresses: string[], + topics: string[], + rpcUrl: string, +): Promise => { + // Fetch logs with a raw json request until we support Viem or Ethers6 + const payload = { + jsonrpc: '2.0', + id: 1, + method: 'eth_getLogs', + params: [ + { + address: addresses, + topics: topics.length === 1 ? topics : [topics], + fromBlock: '0x' + BigInt(from).toString(16), + toBlock: '0x' + BigInt(to).toString(16), + }, + ], + }; + + return fetch(rpcUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(payload), + }) + .then( + (response) => + response.json() as Promise<{ result: Event[] } | { error: { code: string; message: string } }>, + ) + .then((response) => { + if ('error' in response) { + return Promise.reject(response.error.message); + } + + return response.result; + }); + + // Fetching logs with Viem + // viemClient.getLogs({ + // address: addresses, + // event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'), + // fromBlock: BigInt(from), + // toBlock: BigInt(to), + // }) +}; + +const decodeLogs = (logs: Event[], iEvents: Interface): Event[] => { + return logs.map((log) => { + // Decode event args + const event = iEvents ? iEvents.parseLog(log) : undefined; + const args: any = {}; + if (event) { + const argNames = iEvents.events[event.signature].inputs.map((input) => input.name); + for (let i = 0; i < argNames.length; i++) { + args[argNames[i]] = event.args[i]; + } + } + + return { + ...log, + args, + }; + }); +};