From 3a5a00b8faefd343c2611f4070701aadaf14cfd5 Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Mon, 18 Dec 2023 17:17:41 +0100 Subject: [PATCH 1/9] improv: handle RPC errors in getLogs fetching --- modules/pool/lib/pool-sync.service.ts | 79 ++++++++---- .../lib/user-sync-gauge-balance.service.ts | 75 +++-------- .../lib/user-sync-wallet-balance.service.ts | 77 +++-------- modules/web3/events.ts | 120 ++++++++++++++++++ 4 files changed, 205 insertions(+), 146 deletions(-) create mode 100644 modules/web3/events.ts diff --git a/modules/pool/lib/pool-sync.service.ts b/modules/pool/lib/pool-sync.service.ts index d4c4dfdc6..e1163fa7e 100644 --- a/modules/pool/lib/pool-sync.service.ts +++ b/modules/pool/lib/pool-sync.service.ts @@ -1,47 +1,70 @@ import * as _ from 'lodash'; import { prisma } from '../../../prisma/prisma-client'; -import { PrismaLastBlockSyncedCategory } from '@prisma/client'; +import { Chain, PrismaLastBlockSyncedCategory } from '@prisma/client'; import { poolService } from '../pool.service'; -import { getContractAt } from '../../web3/contract'; import VaultAbi from '../abi/Vault.json'; import { networkContext } from '../../network/network-context.service'; +import { getEvents } from '../../web3/events'; export class PoolSyncService { + get chain(): Chain { + return networkContext.chain; + } + + get chainId(): string { + return networkContext.chainId; + } + + get provider() { + return networkContext.provider; + } + + get vaultAddress() { + return networkContext.data.balancer.vault; + } + + get rpcUrl() { + return networkContext.data.rpcUrl; + } + + get rpcMaxBlockRange() { + return networkContext.data.rpcMaxBlockRange; + } + public async syncChangedPools() { let lastSync = await prisma.prismaLastBlockSynced.findUnique({ - where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } }, + where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } }, }); const lastSyncBlock = lastSync?.blockNumber ?? 0; - const latestBlock = await networkContext.provider.getBlockNumber(); + const latestBlock = await this.provider.getBlockNumber(); const startBlock = lastSyncBlock + 1; - const endBlock = - latestBlock - startBlock > networkContext.data.rpcMaxBlockRange - ? startBlock + networkContext.data.rpcMaxBlockRange - : latestBlock; + const endBlock = latestBlock; // no new blocks have been minted, needed for slow networks if (startBlock > endBlock) { return; } - const contract = getContractAt(networkContext.data.balancer.vault, VaultAbi); + // Update status for all the pools + const allPools = await prisma.prismaPool.findMany({ + where: { chain: this.chain }, + }); + await poolService.updateOnChainStatusForPools(allPools.map((pool) => pool.id)); - const events = await contract.queryFilter( - { address: networkContext.data.balancer.vault }, + // Get state changing events from the vault contract + const filteredEvents = await getEvents( startBlock, endBlock, + [this.vaultAddress], + ['PoolBalanceChanged', 'PoolBalanceManaged', 'Swap'], + this.rpcUrl, + this.rpcMaxBlockRange, + VaultAbi, ); - const allPools = await prisma.prismaPool.findMany({ - where: { chain: networkContext.chain }, - }); + console.log(`sync-changed-pools-${this.chainId} found ${filteredEvents.length} events`); - await poolService.updateOnChainStatusForPools(allPools.map((pool) => pool.id)); - - const filteredEvents = events.filter((event) => - ['PoolBalanceChanged', 'PoolBalanceManaged', 'Swap'].includes(event.event!), - ); const poolIds: string[] = _.uniq(filteredEvents.map((event) => event.args!.poolId)); if (poolIds.length !== 0) { console.log(`Syncing ${poolIds.length} pools between blocks ${startBlock} and ${endBlock}`); @@ -52,23 +75,23 @@ export class PoolSyncService { } await prisma.prismaLastBlockSynced.upsert({ - where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } }, + where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } }, update: { blockNumber: endBlock, }, create: { category: PrismaLastBlockSyncedCategory.POOLS, blockNumber: endBlock, - chain: networkContext.chain, + chain: this.chain, }, }); } public async initOnChainDataForAllPools() { - const latestBlock = await networkContext.provider.getBlockNumber(); + const latestBlock = await this.provider.getBlockNumber(); const allPools = await prisma.prismaPool.findMany({ - where: { chain: networkContext.chain }, + where: { chain: this.chain }, }); const poolIds = allPools.map((pool) => pool.id); @@ -81,14 +104,14 @@ export class PoolSyncService { await poolService.updateVolumeAndFeeValuesForPools(poolIds); await prisma.prismaLastBlockSynced.upsert({ - where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } }, + where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } }, update: { blockNumber: latestBlock, }, create: { category: PrismaLastBlockSyncedCategory.POOLS, blockNumber: latestBlock, - chain: networkContext.chain, + chain: this.chain, }, }); } @@ -107,10 +130,10 @@ export class PoolSyncService { await prisma.prismaPoolCategory.createMany({ data: poolsWithGauges.map((pool) => ({ - id: `${networkContext.chain}-${pool.id}-INCENTIVIZED`, + id: `${this.chain}-${pool.id}-INCENTIVIZED`, poolId: pool.id, category: 'INCENTIVIZED' as const, - chain: networkContext.chain, + chain: this.chain, })), skipDuplicates: true, }); @@ -118,7 +141,7 @@ export class PoolSyncService { await prisma.prismaPoolCategory.deleteMany({ where: { category: 'INCENTIVIZED', - chain: networkContext.chain, + chain: this.chain, poolId: { notIn: poolsWithGauges.map((pool) => pool.id), }, diff --git a/modules/user/lib/user-sync-gauge-balance.service.ts b/modules/user/lib/user-sync-gauge-balance.service.ts index 04b79bd9e..a79f7ed0f 100644 --- a/modules/user/lib/user-sync-gauge-balance.service.ts +++ b/modules/user/lib/user-sync-gauge-balance.service.ts @@ -5,13 +5,13 @@ import _ from 'lodash'; import { prismaBulkExecuteOperations } from '../../../prisma/prisma-util'; import RewardsOnlyGaugeAbi from './abi/RewardsOnlyGauge.json'; import { Multicaller } from '../../web3/multicaller'; -import { ethers } from 'ethers'; import { formatFixed } from '@ethersproject/bignumber'; import { PrismaPoolStakingType } from '@prisma/client'; import { networkContext } from '../../network/network-context.service'; import ERC20Abi from '../../web3/abi/ERC20.json'; import { gaugeSubgraphService } from '../../subgraphs/gauge-subgraph/gauge-subgraph.service'; import { AddressZero } from '@ethersproject/constants'; +import { getEvents } from '../../web3/events'; export class UserSyncGaugeBalanceService implements UserStakedBalanceService { get chain() { @@ -123,7 +123,9 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService { select: { gaugeAddress: true }, where: { chain: this.chain }, }) - ).map((gauge) => gauge.gaugeAddress); + ) + .map((gauge) => gauge.gaugeAddress) + .filter((address, i, arr) => arr.indexOf(address) === i); // Make unique // we sync at most 10k blocks at a time const startBlock = status.blockNumber + 1; @@ -138,69 +140,30 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService { we need to figure out which users have a changed balance on any gauge contract and update their balance, therefore we check all transfer events since the last synced block */ - const erc20Interface = new ethers.utils.Interface(ERC20Abi); // Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range const toBlock = Math.min(startBlock + 50 * this.rpcMaxBlockRange, latestBlock); - const range = toBlock - startBlock; console.log(`user-sync-staked-balances-${this.chainId} block range from ${startBlock} to ${toBlock}`); - console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${_.uniq(gaugeAddresses).length} gauges.`); - const events = await Promise.all( - // Getting logs in batches of max blocks allowed by RPC - Array.from({ length: Math.ceil(range / this.rpcMaxBlockRange) }, (_, i) => i).map(async (i) => { - const from = startBlock + i * this.rpcMaxBlockRange; - const to = Math.min(startBlock + (i + 1) * this.rpcMaxBlockRange, toBlock); - - // Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side - const logRequests: Promise[] = _.chunk(gaugeAddresses, 500).map((addresses) => { - // Fetch logs with a raw json request until we support Viem or Ethers6 - const payload = { - jsonrpc: '2.0', - id: 1, - method: 'eth_getLogs', - params: [ - { - address: addresses, - topics: [ethers.utils.id('Transfer(address,address,uint256)')], - fromBlock: '0x' + BigInt(from).toString(16), - toBlock: '0x' + BigInt(to).toString(16), - }, - ], - }; - - return fetch(this.rpcUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(payload), - }) - .then((response) => response.json() as Promise<{ result: ethers.providers.Log[] }>) - .then(({ result }) => result) - .catch((error) => { - console.error('Error fetching logs:', error); - return []; - }); - }); - - const events = await Promise.all(logRequests).then((res) => res.flat()); - - return events; - }), - ).then((res) => res.flat().filter((event) => event)); + console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${gaugeAddresses.length} gauges.`); + + const events = await getEvents( + startBlock, + toBlock, + gaugeAddresses, + ['Transfer'], + this.rpcUrl, + this.rpcMaxBlockRange, + ERC20Abi, + ); console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${gaugeAddresses.length} gauges done`); const balancesToFetch = _.uniqBy( events - .map((event) => { - const parsed = erc20Interface.parseLog(event); - - return [ - { erc20Address: event.address, userAddress: parsed.args?.from as string }, - { erc20Address: event.address, userAddress: parsed.args?.to as string }, - ]; - }) + .map((event) => [ + { erc20Address: event.address, userAddress: event.args?.from as string }, + { erc20Address: event.address, userAddress: event.args?.to as string }, + ]) .flat(), (entry) => entry.erc20Address + entry.userAddress, ); diff --git a/modules/user/lib/user-sync-wallet-balance.service.ts b/modules/user/lib/user-sync-wallet-balance.service.ts index 7389260aa..94adb817c 100644 --- a/modules/user/lib/user-sync-wallet-balance.service.ts +++ b/modules/user/lib/user-sync-wallet-balance.service.ts @@ -1,7 +1,6 @@ import { isSameAddress } from '@balancer-labs/sdk'; import { formatFixed } from '@ethersproject/bignumber'; import { AddressZero } from '@ethersproject/constants'; -import { ethers } from 'ethers'; import _ from 'lodash'; import { prisma } from '../../../prisma/prisma-client'; import { prismaBulkExecuteOperations } from '../../../prisma/prisma-util'; @@ -12,6 +11,7 @@ import { Multicaller, MulticallUserBalance } from '../../web3/multicaller'; import ERC20Abi from '../../web3/abi/ERC20.json'; import { networkContext } from '../../network/network-context.service'; import { AllNetworkConfigs } from '../../network/network-config'; +import { getEvents } from '../../web3/events'; export class UserSyncWalletBalanceService { beetsBarService?: BeetsBarSubgraphService; @@ -130,7 +130,6 @@ export class UserSyncWalletBalanceService { } public async syncChangedBalancesForAllPools() { - const erc20Interface = new ethers.utils.Interface(ERC20Abi); const latestBlock = await this.provider.getBlockNumber(); const syncStatus = await prisma.prismaUserBalanceSyncStatus.findUnique({ where: { type_chain: { type: 'WALLET', chain: this.chain } }, @@ -159,60 +158,18 @@ export class UserSyncWalletBalanceService { // Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range const toBlock = Math.min(fromBlock + 50 * this.rpcMaxBlockRange, latestBlock); - const range = toBlock - fromBlock; console.log(`UserWalletBalanceService: syncing balances from ${fromBlock} to ${toBlock}`); console.log(`user-sync-wallet-balances-for-all-pools-${this.chainId} getLogs of ${poolAddresses.length} pools`); - const events = await Promise.all( - // Getting logs in batches of max blocks allowed by RPC - Array.from({ length: Math.ceil(range / this.rpcMaxBlockRange) }, (_, i) => i).map(async (i) => { - const from = fromBlock + i * this.rpcMaxBlockRange; - const to = Math.min(fromBlock + (i + 1) * this.rpcMaxBlockRange, toBlock); - - // Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side - const logRequests: Promise[] = _.chunk(poolAddresses, 500).map((addresses) => { - // Fetch logs with a raw json request until we support Viem or Ethers6 - const payload = { - jsonrpc: '2.0', - id: 1, - method: 'eth_getLogs', - params: [ - { - address: addresses, - topics: [ethers.utils.id('Transfer(address,address,uint256)')], - fromBlock: '0x' + BigInt(from).toString(16), - toBlock: '0x' + BigInt(to).toString(16), - }, - ], - }; - - return fetch(AllNetworkConfigs[this.chainId].data.rpcUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(payload), - }) - .then((response) => response.json() as Promise<{ result: ethers.providers.Log[] }>) - .then(({ result }) => result) - .catch((error) => { - console.error('Error fetching logs:', error); - return []; - }); - - // Fetching logs with Viem - // viemClient.getLogs({ - // address: addresses, - // event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'), - // fromBlock: BigInt(from), - // toBlock: BigInt(to), - // }) - }); - - const events = await Promise.all(logRequests).then((res) => res.flat()); - - return events; - }), - ).then((res) => res.flat().filter((event) => event)); + + const events = await getEvents( + fromBlock, + toBlock, + poolAddresses, + ['Transfer'], + AllNetworkConfigs[this.chainId].data.rpcUrl, + this.rpcMaxBlockRange, + ERC20Abi, + ); console.log( `user-sync-wallet-balances-for-all-pools-${this.chainId} getLogs of ${poolAddresses.length} pools done`, @@ -230,14 +187,10 @@ export class UserSyncWalletBalanceService { //we also need to track fbeets balance relevantERC20Addresses.includes(event.address.toLowerCase()), ) - .map((event) => { - const parsed = erc20Interface.parseLog(event); - - return [ - { erc20Address: event.address, userAddress: parsed.args?.from as string }, - { erc20Address: event.address, userAddress: parsed.args?.to as string }, - ]; - }) + .map((event) => [ + { erc20Address: event.address, userAddress: event.args?.from as string }, + { erc20Address: event.address, userAddress: event.args?.to as string }, + ]) .flat(), (entry) => entry.erc20Address + entry.userAddress, ); diff --git a/modules/web3/events.ts b/modules/web3/events.ts new file mode 100644 index 000000000..8bf635987 --- /dev/null +++ b/modules/web3/events.ts @@ -0,0 +1,120 @@ +import { Event } from '@ethersproject/contracts'; +import { Interface } from '@ethersproject/abi'; +import { chunk } from 'lodash'; + +export const getEvents = async ( + fromBlock: number, + toBlock: number, + addresses: string[], + topics: string[], + rpcUrl: string, + rpcMaxBlockRange: number, + abi?: any, +): Promise => { + let iEvents: Interface; + if (abi) { + iEvents = new Interface(abi); + // check if topics are event names + const alreadyEncoded = topics.every((topic) => topic.startsWith('0x')); + if (!alreadyEncoded) topics = topics.map((topic) => iEvents.getEventTopic(topic)); + } + const range = toBlock - fromBlock; + + return Promise.all( + // Getting logs in batches of max blocks allowed by RPC + Array.from({ length: Math.ceil(range / rpcMaxBlockRange) }, (_, i) => i).map(async (i) => { + const from = fromBlock + (i > 0 ? 1 : 0) + i * rpcMaxBlockRange; + const to = Math.min(fromBlock + (i + 1) * rpcMaxBlockRange, toBlock); + + // Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side + const logRequests: Promise[] = chunk(addresses, 500).map(async (addresses) => { + // Fetch logs with a raw json request until we support Viem or Ethers6 + const payload = { + jsonrpc: '2.0', + id: 1, + method: 'eth_getLogs', + params: [ + { + address: addresses, + topics: topics.length === 1 ? topics : [topics], + fromBlock: '0x' + BigInt(from).toString(16), + toBlock: '0x' + BigInt(to).toString(16), + }, + ], + }; + + return fetch(rpcUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(payload), + }) + .then( + (response) => + response.json() as Promise< + { result: Event[] } | { error: { code: string; message: string } } + >, + ) + .then((response) => { + if ('error' in response) { + return Promise.reject(response.error.message); + } + + return response.result; + }) + .catch((e: any) => { + // Ankr RPC returns error if block range is too wide + if (e.includes('block range is too wide')) { + return getEvents(from, to, addresses, topics, rpcUrl, rpcMaxBlockRange / 2); + } + + // Infura returns 'more than 10000 results' error if block range is too wide + if (e.includes('query returned more than 10000 results')) { + const range = e + .match(/\[([0-9a-fA-F, x]+)\]/) + .pop() + .split(', ') + .map((hex: string) => parseInt(hex, 16)); + + return getEvents(from, to, addresses, topics, rpcUrl, range[1] - range[0]); + } + + console.error('Error fetching logs:', e); + return Promise.reject(e); + }); + + // Fetching logs with Viem + // viemClient.getLogs({ + // address: addresses, + // event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'), + // fromBlock: BigInt(from), + // toBlock: BigInt(to), + // }) + }); + + const events = await Promise.all(logRequests).then((res) => res.flat()); + + return events; + }), + ) + .then((res) => res.flat().filter((log) => log)) + .then((logs) => + logs.map((log) => { + // Decode event args + const event = iEvents ? iEvents.parseLog(log) : undefined; + const args: any = {}; + if (event) { + const argNames = iEvents.events[event.signature].inputs.map((input) => input.name); + for (let i = 0; i < argNames.length; i++) { + args[argNames[i]] = event.args[i]; + } + } + + return { + ...log, + args, + }; + }), + ); +}; From 22974790ea0b94212667d74f5478a57cd089d067 Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Tue, 19 Dec 2023 14:44:08 +0100 Subject: [PATCH 2/9] removing obsolete unique filter --- modules/user/lib/user-sync-gauge-balance.service.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/modules/user/lib/user-sync-gauge-balance.service.ts b/modules/user/lib/user-sync-gauge-balance.service.ts index a79f7ed0f..82a529140 100644 --- a/modules/user/lib/user-sync-gauge-balance.service.ts +++ b/modules/user/lib/user-sync-gauge-balance.service.ts @@ -123,9 +123,7 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService { select: { gaugeAddress: true }, where: { chain: this.chain }, }) - ) - .map((gauge) => gauge.gaugeAddress) - .filter((address, i, arr) => arr.indexOf(address) === i); // Make unique + ).map((gauge) => gauge.gaugeAddress); // we sync at most 10k blocks at a time const startBlock = status.blockNumber + 1; From ea3bd1a0431ac19ce60161f28be791cce26a644e Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Wed, 20 Dec 2023 10:29:14 +0100 Subject: [PATCH 3/9] test: getEvents initial --- jest.config.js | 5 ++++ modules/web3/events.test.ts | 57 +++++++++++++++++++++++++++++++++++++ modules/web3/events.ts | 2 +- 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 modules/web3/events.test.ts diff --git a/jest.config.js b/jest.config.js index 2dfa7e889..ab69250fa 100644 --- a/jest.config.js +++ b/jest.config.js @@ -5,4 +5,9 @@ module.exports = { setupFiles: ['dotenv/config'], watchPlugins: ['jest-watch-typeahead/filename', 'jest-watch-typeahead/testname'], setupFilesAfterEnv: ['/modules/tests-helper/globalTestSetup.ts'], + globals: { + 'ts-jest': { + isolatedModules: true, + }, + }, }; diff --git a/modules/web3/events.test.ts b/modules/web3/events.test.ts new file mode 100644 index 000000000..0ebfd0c58 --- /dev/null +++ b/modules/web3/events.test.ts @@ -0,0 +1,57 @@ +import { getEvents } from './events'; + +global.fetch = jest.fn(); + +describe('getEvents', () => { + beforeEach(() => { + // Clear all instances and calls to constructor and all methods: + (global.fetch as jest.MockedFunction).mockClear(); + }); + + it('fetches successfully', async () => { + (global.fetch as jest.MockedFunction).mockImplementation(() => + Promise.resolve(new Response(JSON.stringify({ result: [] }))), + ); + + const result = await getEvents(0, 100, ['0x123'], ['0x456'], 'http://localhost', 50); + expect(result).toEqual([]); + expect(global.fetch).toHaveBeenCalledTimes(2); + }); + + it('handles fetch error', async () => { + (global.fetch as jest.MockedFunction).mockImplementationOnce(() => Promise.reject('Fetch error')); + + try { + await getEvents(0, 100, ['0x123'], ['0x456'], 'http://localhost', 50); + } catch (error) { + expect(error).toEqual('Fetch error'); + } + expect(global.fetch).toHaveBeenCalledTimes(2); + }); + + it('fetches same blocks twice when addresses are more than 500', async () => { + const addresses = new Array(501).fill('0x1'); + const errorMessage = 'block range is too wide'; + const log = { address: 'address' }; + + (global.fetch as jest.MockedFunction) + .mockImplementationOnce(() => + Promise.resolve(new Response(JSON.stringify({ error: { message: errorMessage } }))), + ) + // Return log for first block only + .mockImplementation((url, options) => { + const { body } = options || {}; + const payload = JSON.parse(body as string); + const result: { address: string }[] = []; + if (payload.params[0].fromBlock === '0x0') { + result.push(log); + } + return Promise.resolve(new Response(JSON.stringify({ result }))); + }); + + const result = await getEvents(0, 4, addresses, ['topic'], 'http://localhost', 2); + + expect(result.length).toEqual(2); + expect(global.fetch).toHaveBeenCalledTimes(6); + }); +}); diff --git a/modules/web3/events.ts b/modules/web3/events.ts index 8bf635987..78d2d1c90 100644 --- a/modules/web3/events.ts +++ b/modules/web3/events.ts @@ -12,7 +12,7 @@ export const getEvents = async ( abi?: any, ): Promise => { let iEvents: Interface; - if (abi) { + if (abi && abi.length > 0) { iEvents = new Interface(abi); // check if topics are event names const alreadyEncoded = topics.every((topic) => topic.startsWith('0x')); From aaa799cc2904b1689c2bbef61bea6959c262d85b Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Wed, 20 Dec 2023 10:56:40 +0100 Subject: [PATCH 4/9] getEvents refactored --- modules/web3/events.ts | 195 ++++++++++++++++++++++------------------- 1 file changed, 103 insertions(+), 92 deletions(-) diff --git a/modules/web3/events.ts b/modules/web3/events.ts index 78d2d1c90..69a0b35f1 100644 --- a/modules/web3/events.ts +++ b/modules/web3/events.ts @@ -18,103 +18,114 @@ export const getEvents = async ( const alreadyEncoded = topics.every((topic) => topic.startsWith('0x')); if (!alreadyEncoded) topics = topics.map((topic) => iEvents.getEventTopic(topic)); } + const range = toBlock - fromBlock; + const numBatches = Math.ceil(range / rpcMaxBlockRange); + + const promises: Promise[] = []; + + for (let i = 0; i < numBatches; i++) { + const from = fromBlock + (i > 0 ? 1 : 0) + i * rpcMaxBlockRange; + const to = Math.min(fromBlock + (i + 1) * rpcMaxBlockRange, toBlock); + + const addressChunks = chunk(addresses, 500); + + for (const addressChunk of addressChunks) { + const promise = fetchLogs(from, to, addressChunk, topics, rpcUrl).catch((e: any) => { + // Ankr RPC returns error if block range is too wide + if (e.includes('block range is too wide')) { + return getEvents(from, to, addressChunk, topics, rpcUrl, rpcMaxBlockRange / 2); + } - return Promise.all( - // Getting logs in batches of max blocks allowed by RPC - Array.from({ length: Math.ceil(range / rpcMaxBlockRange) }, (_, i) => i).map(async (i) => { - const from = fromBlock + (i > 0 ? 1 : 0) + i * rpcMaxBlockRange; - const to = Math.min(fromBlock + (i + 1) * rpcMaxBlockRange, toBlock); - - // Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side - const logRequests: Promise[] = chunk(addresses, 500).map(async (addresses) => { - // Fetch logs with a raw json request until we support Viem or Ethers6 - const payload = { - jsonrpc: '2.0', - id: 1, - method: 'eth_getLogs', - params: [ - { - address: addresses, - topics: topics.length === 1 ? topics : [topics], - fromBlock: '0x' + BigInt(from).toString(16), - toBlock: '0x' + BigInt(to).toString(16), - }, - ], - }; - - return fetch(rpcUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(payload), - }) - .then( - (response) => - response.json() as Promise< - { result: Event[] } | { error: { code: string; message: string } } - >, - ) - .then((response) => { - if ('error' in response) { - return Promise.reject(response.error.message); - } - - return response.result; - }) - .catch((e: any) => { - // Ankr RPC returns error if block range is too wide - if (e.includes('block range is too wide')) { - return getEvents(from, to, addresses, topics, rpcUrl, rpcMaxBlockRange / 2); - } - - // Infura returns 'more than 10000 results' error if block range is too wide - if (e.includes('query returned more than 10000 results')) { - const range = e - .match(/\[([0-9a-fA-F, x]+)\]/) - .pop() - .split(', ') - .map((hex: string) => parseInt(hex, 16)); - - return getEvents(from, to, addresses, topics, rpcUrl, range[1] - range[0]); - } - - console.error('Error fetching logs:', e); - return Promise.reject(e); - }); - - // Fetching logs with Viem - // viemClient.getLogs({ - // address: addresses, - // event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'), - // fromBlock: BigInt(from), - // toBlock: BigInt(to), - // }) + // Infura returns 'more than 10000 results' error if block range is too wide + if (e.includes('query returned more than 10000 results')) { + const range = e + .match(/\[([0-9a-fA-F, x]+)\]/) + .pop() + .split(', ') + .map((hex: string) => parseInt(hex, 16)); + + return getEvents(from, to, addressChunk, topics, rpcUrl, range[1] - range[0]); + } + + console.error('Error fetching logs:', e); + return Promise.reject(e); }); - const events = await Promise.all(logRequests).then((res) => res.flat()); + promises.push(promise); + } + } - return events; - }), - ) + return Promise.all(promises) .then((res) => res.flat().filter((log) => log)) - .then((logs) => - logs.map((log) => { - // Decode event args - const event = iEvents ? iEvents.parseLog(log) : undefined; - const args: any = {}; - if (event) { - const argNames = iEvents.events[event.signature].inputs.map((input) => input.name); - for (let i = 0; i < argNames.length; i++) { - args[argNames[i]] = event.args[i]; - } - } + .then((logs) => decodeLogs(logs, iEvents)); +}; + +const fetchLogs = async ( + from: number, + to: number, + addresses: string[], + topics: string[], + rpcUrl: string, +): Promise => { + // Fetch logs with a raw json request until we support Viem or Ethers6 + const payload = { + jsonrpc: '2.0', + id: 1, + method: 'eth_getLogs', + params: [ + { + address: addresses, + topics: topics.length === 1 ? topics : [topics], + fromBlock: '0x' + BigInt(from).toString(16), + toBlock: '0x' + BigInt(to).toString(16), + }, + ], + }; + + return fetch(rpcUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(payload), + }) + .then( + (response) => + response.json() as Promise<{ result: Event[] } | { error: { code: string; message: string } }>, + ) + .then((response) => { + if ('error' in response) { + return Promise.reject(response.error.message); + } + + return response.result; + }); + + // Fetching logs with Viem + // viemClient.getLogs({ + // address: addresses, + // event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'), + // fromBlock: BigInt(from), + // toBlock: BigInt(to), + // }) +}; + +const decodeLogs = (logs: Event[], iEvents: Interface): Event[] => { + return logs.map((log) => { + // Decode event args + const event = iEvents ? iEvents.parseLog(log) : undefined; + const args: any = {}; + if (event) { + const argNames = iEvents.events[event.signature].inputs.map((input) => input.name); + for (let i = 0; i < argNames.length; i++) { + args[argNames[i]] = event.args[i]; + } + } - return { - ...log, - args, - }; - }), - ); + return { + ...log, + args, + }; + }); }; From 053e5236f475f8a6840ffd4661b912117dbcae0b Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Wed, 20 Dec 2023 12:40:06 +0100 Subject: [PATCH 5/9] handle Alchemy ratelimits --- modules/web3/events.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/modules/web3/events.ts b/modules/web3/events.ts index 69a0b35f1..47e43598f 100644 --- a/modules/web3/events.ts +++ b/modules/web3/events.ts @@ -48,6 +48,15 @@ export const getEvents = async ( return getEvents(from, to, addressChunk, topics, rpcUrl, range[1] - range[0]); } + // Alchemy rate limit + if (e.includes('Your app has exceeded its compute units per second capacity')) { + return new Promise((resolve) => { + setTimeout(() => { + resolve(getEvents(from, to, addressChunk, topics, rpcUrl, rpcMaxBlockRange)); + }, 1000); + }); + } + console.error('Error fetching logs:', e); return Promise.reject(e); }); From a8c838bf3cbd2021450983b645fd9ca096f73443 Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Wed, 20 Dec 2023 13:11:52 +0100 Subject: [PATCH 6/9] handle tenderly rate limits --- modules/web3/events.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/modules/web3/events.ts b/modules/web3/events.ts index 47e43598f..7a62a38d5 100644 --- a/modules/web3/events.ts +++ b/modules/web3/events.ts @@ -33,12 +33,13 @@ export const getEvents = async ( for (const addressChunk of addressChunks) { const promise = fetchLogs(from, to, addressChunk, topics, rpcUrl).catch((e: any) => { // Ankr RPC returns error if block range is too wide - if (e.includes('block range is too wide')) { + if (e.includes && e.includes('block range is too wide')) { return getEvents(from, to, addressChunk, topics, rpcUrl, rpcMaxBlockRange / 2); } // Infura returns 'more than 10000 results' error if block range is too wide - if (e.includes('query returned more than 10000 results')) { + // in a format like this: [0x1a2b3c4d5e6f7, 0x1a2b3c4d5e6f7] + if (e.includes && e.includes('query returned more than 10000 results')) { const range = e .match(/\[([0-9a-fA-F, x]+)\]/) .pop() @@ -48,8 +49,12 @@ export const getEvents = async ( return getEvents(from, to, addressChunk, topics, rpcUrl, range[1] - range[0]); } - // Alchemy rate limit - if (e.includes('Your app has exceeded its compute units per second capacity')) { + // Alchemy / tenderly rate limit + if ( + e.includes && + (e.includes('Your app has exceeded its compute units per second capacity') || + e.includes('rate limit exceeded')) + ) { return new Promise((resolve) => { setTimeout(() => { resolve(getEvents(from, to, addressChunk, topics, rpcUrl, rpcMaxBlockRange)); From 6825b463b4471496f2e80f506897278539fdf4f5 Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Wed, 20 Dec 2023 13:31:40 +0100 Subject: [PATCH 7/9] handle addresses limit --- modules/web3/events.ts | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/modules/web3/events.ts b/modules/web3/events.ts index 7a62a38d5..6b45970b8 100644 --- a/modules/web3/events.ts +++ b/modules/web3/events.ts @@ -10,6 +10,7 @@ export const getEvents = async ( rpcUrl: string, rpcMaxBlockRange: number, abi?: any, + maxAddresses = 500, ): Promise => { let iEvents: Interface; if (abi && abi.length > 0) { @@ -28,7 +29,7 @@ export const getEvents = async ( const from = fromBlock + (i > 0 ? 1 : 0) + i * rpcMaxBlockRange; const to = Math.min(fromBlock + (i + 1) * rpcMaxBlockRange, toBlock); - const addressChunks = chunk(addresses, 500); + const addressChunks = chunk(addresses, maxAddresses); for (const addressChunk of addressChunks) { const promise = fetchLogs(from, to, addressChunk, topics, rpcUrl).catch((e: any) => { @@ -62,6 +63,26 @@ export const getEvents = async ( }); } + // Allnodes addresses size limit + if (e.includes && e.includes('specify less number of addresses')) { + return new Promise((resolve) => { + setTimeout(() => { + resolve( + getEvents( + from, + to, + addressChunk, + topics, + rpcUrl, + rpcMaxBlockRange, + undefined, + maxAddresses / 2, + ), + ); + }, 1000); + }); + } + console.error('Error fetching logs:', e); return Promise.reject(e); }); From 52057c1a1a9838a91192a17026a8335873f4239c Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Wed, 20 Dec 2023 15:31:41 +0100 Subject: [PATCH 8/9] smaller log ranges for wallet balances syncing --- modules/user/lib/user-sync-gauge-balance.service.ts | 4 ++-- modules/user/lib/user-sync-wallet-balance.service.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/user/lib/user-sync-gauge-balance.service.ts b/modules/user/lib/user-sync-gauge-balance.service.ts index 82a529140..fcf8eb2a5 100644 --- a/modules/user/lib/user-sync-gauge-balance.service.ts +++ b/modules/user/lib/user-sync-gauge-balance.service.ts @@ -139,8 +139,8 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService { therefore we check all transfer events since the last synced block */ - // Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range - const toBlock = Math.min(startBlock + 50 * this.rpcMaxBlockRange, latestBlock); + // Split the range into smaller chunks to avoid RPC limits, setting up to 5 times max block range + const toBlock = Math.min(startBlock + 5 * this.rpcMaxBlockRange, latestBlock); console.log(`user-sync-staked-balances-${this.chainId} block range from ${startBlock} to ${toBlock}`); console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${gaugeAddresses.length} gauges.`); diff --git a/modules/user/lib/user-sync-wallet-balance.service.ts b/modules/user/lib/user-sync-wallet-balance.service.ts index 94adb817c..d971d87e4 100644 --- a/modules/user/lib/user-sync-wallet-balance.service.ts +++ b/modules/user/lib/user-sync-wallet-balance.service.ts @@ -156,8 +156,8 @@ export class UserSyncWalletBalanceService { return; } - // Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range - const toBlock = Math.min(fromBlock + 50 * this.rpcMaxBlockRange, latestBlock); + // Split the range into smaller chunks to avoid RPC limits, setting up to 5 times max block range + const toBlock = Math.min(fromBlock + 5 * this.rpcMaxBlockRange, latestBlock); console.log(`UserWalletBalanceService: syncing balances from ${fromBlock} to ${toBlock}`); console.log(`user-sync-wallet-balances-for-all-pools-${this.chainId} getLogs of ${poolAddresses.length} pools`); From f39ea715b3e13bbe96083884655bc3adc39f2217 Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Thu, 21 Dec 2023 16:50:43 +0100 Subject: [PATCH 9/9] documenting infura's error --- modules/web3/events.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/web3/events.ts b/modules/web3/events.ts index 6b45970b8..06e3f2bd3 100644 --- a/modules/web3/events.ts +++ b/modules/web3/events.ts @@ -39,7 +39,8 @@ export const getEvents = async ( } // Infura returns 'more than 10000 results' error if block range is too wide - // in a format like this: [0x1a2b3c4d5e6f7, 0x1a2b3c4d5e6f7] + // error format: + // "query returned more than 10000 results. Try with this block range [0x30CE171, 0x30CE1C9]." if (e.includes && e.includes('query returned more than 10000 results')) { const range = e .match(/\[([0-9a-fA-F, x]+)\]/)