Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle RPC range errors in getLogs #606

Merged
merged 9 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ module.exports = {
setupFiles: ['dotenv/config'],
watchPlugins: ['jest-watch-typeahead/filename', 'jest-watch-typeahead/testname'],
setupFilesAfterEnv: ['<rootDir>/modules/tests-helper/globalTestSetup.ts'],
globals: {
'ts-jest': {
isolatedModules: true,
},
},
};
79 changes: 51 additions & 28 deletions modules/pool/lib/pool-sync.service.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,70 @@
import * as _ from 'lodash';
import { prisma } from '../../../prisma/prisma-client';
import { PrismaLastBlockSyncedCategory } from '@prisma/client';
import { Chain, PrismaLastBlockSyncedCategory } from '@prisma/client';
import { poolService } from '../pool.service';
import { getContractAt } from '../../web3/contract';
import VaultAbi from '../abi/Vault.json';
import { networkContext } from '../../network/network-context.service';
import { getEvents } from '../../web3/events';

export class PoolSyncService {
get chain(): Chain {
return networkContext.chain;
}

get chainId(): string {
return networkContext.chainId;
}

get provider() {
return networkContext.provider;
}

get vaultAddress() {
return networkContext.data.balancer.vault;
}

get rpcUrl() {
return networkContext.data.rpcUrl;
}

get rpcMaxBlockRange() {
return networkContext.data.rpcMaxBlockRange;
}

public async syncChangedPools() {
let lastSync = await prisma.prismaLastBlockSynced.findUnique({
where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } },
where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } },
});
const lastSyncBlock = lastSync?.blockNumber ?? 0;
const latestBlock = await networkContext.provider.getBlockNumber();
const latestBlock = await this.provider.getBlockNumber();

const startBlock = lastSyncBlock + 1;
const endBlock =
latestBlock - startBlock > networkContext.data.rpcMaxBlockRange
? startBlock + networkContext.data.rpcMaxBlockRange
: latestBlock;
const endBlock = latestBlock;

// no new blocks have been minted, needed for slow networks
if (startBlock > endBlock) {
return;
}

const contract = getContractAt(networkContext.data.balancer.vault, VaultAbi);
// Update status for all the pools
const allPools = await prisma.prismaPool.findMany({
where: { chain: this.chain },
});
await poolService.updateOnChainStatusForPools(allPools.map((pool) => pool.id));

const events = await contract.queryFilter(
{ address: networkContext.data.balancer.vault },
// Get state changing events from the vault contract
const filteredEvents = await getEvents(
startBlock,
endBlock,
[this.vaultAddress],
['PoolBalanceChanged', 'PoolBalanceManaged', 'Swap'],
this.rpcUrl,
this.rpcMaxBlockRange,
VaultAbi,
);

const allPools = await prisma.prismaPool.findMany({
where: { chain: networkContext.chain },
});
console.log(`sync-changed-pools-${this.chainId} found ${filteredEvents.length} events`);

await poolService.updateOnChainStatusForPools(allPools.map((pool) => pool.id));

const filteredEvents = events.filter((event) =>
['PoolBalanceChanged', 'PoolBalanceManaged', 'Swap'].includes(event.event!),
);
const poolIds: string[] = _.uniq(filteredEvents.map((event) => event.args!.poolId));
if (poolIds.length !== 0) {
console.log(`Syncing ${poolIds.length} pools between blocks ${startBlock} and ${endBlock}`);
Expand All @@ -52,23 +75,23 @@ export class PoolSyncService {
}

await prisma.prismaLastBlockSynced.upsert({
where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } },
where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } },
update: {
blockNumber: endBlock,
},
create: {
category: PrismaLastBlockSyncedCategory.POOLS,
blockNumber: endBlock,
chain: networkContext.chain,
chain: this.chain,
},
});
}

public async initOnChainDataForAllPools() {
const latestBlock = await networkContext.provider.getBlockNumber();
const latestBlock = await this.provider.getBlockNumber();

const allPools = await prisma.prismaPool.findMany({
where: { chain: networkContext.chain },
where: { chain: this.chain },
});

const poolIds = allPools.map((pool) => pool.id);
Expand All @@ -81,14 +104,14 @@ export class PoolSyncService {
await poolService.updateVolumeAndFeeValuesForPools(poolIds);

await prisma.prismaLastBlockSynced.upsert({
where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: networkContext.chain } },
where: { category_chain: { category: PrismaLastBlockSyncedCategory.POOLS, chain: this.chain } },
update: {
blockNumber: latestBlock,
},
create: {
category: PrismaLastBlockSyncedCategory.POOLS,
blockNumber: latestBlock,
chain: networkContext.chain,
chain: this.chain,
},
});
}
Expand All @@ -107,18 +130,18 @@ export class PoolSyncService {

await prisma.prismaPoolCategory.createMany({
data: poolsWithGauges.map((pool) => ({
id: `${networkContext.chain}-${pool.id}-INCENTIVIZED`,
id: `${this.chain}-${pool.id}-INCENTIVIZED`,
poolId: pool.id,
category: 'INCENTIVIZED' as const,
chain: networkContext.chain,
chain: this.chain,
})),
skipDuplicates: true,
});

await prisma.prismaPoolCategory.deleteMany({
where: {
category: 'INCENTIVIZED',
chain: networkContext.chain,
chain: this.chain,
poolId: {
notIn: poolsWithGauges.map((pool) => pool.id),
},
Expand Down
71 changes: 16 additions & 55 deletions modules/user/lib/user-sync-gauge-balance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import _ from 'lodash';
import { prismaBulkExecuteOperations } from '../../../prisma/prisma-util';
import RewardsOnlyGaugeAbi from './abi/RewardsOnlyGauge.json';
import { Multicaller } from '../../web3/multicaller';
import { ethers } from 'ethers';
import { formatFixed } from '@ethersproject/bignumber';
import { PrismaPoolStakingType } from '@prisma/client';
import { networkContext } from '../../network/network-context.service';
import ERC20Abi from '../../web3/abi/ERC20.json';
import { gaugeSubgraphService } from '../../subgraphs/gauge-subgraph/gauge-subgraph.service';
import { AddressZero } from '@ethersproject/constants';
import { getEvents } from '../../web3/events';

export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
get chain() {
Expand Down Expand Up @@ -138,69 +138,30 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
we need to figure out which users have a changed balance on any gauge contract and update their balance,
therefore we check all transfer events since the last synced block
*/
const erc20Interface = new ethers.utils.Interface(ERC20Abi);

// Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range
const toBlock = Math.min(startBlock + 50 * this.rpcMaxBlockRange, latestBlock);
const range = toBlock - startBlock;
console.log(`user-sync-staked-balances-${this.chainId} block range from ${startBlock} to ${toBlock}`);
console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${_.uniq(gaugeAddresses).length} gauges.`);
const events = await Promise.all(
// Getting logs in batches of max blocks allowed by RPC
Array.from({ length: Math.ceil(range / this.rpcMaxBlockRange) }, (_, i) => i).map(async (i) => {
const from = startBlock + i * this.rpcMaxBlockRange;
const to = Math.min(startBlock + (i + 1) * this.rpcMaxBlockRange, toBlock);

// Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side
const logRequests: Promise<ethers.providers.Log[]>[] = _.chunk(gaugeAddresses, 500).map((addresses) => {
// Fetch logs with a raw json request until we support Viem or Ethers6
const payload = {
jsonrpc: '2.0',
id: 1,
method: 'eth_getLogs',
params: [
{
address: addresses,
topics: [ethers.utils.id('Transfer(address,address,uint256)')],
fromBlock: '0x' + BigInt(from).toString(16),
toBlock: '0x' + BigInt(to).toString(16),
},
],
};

return fetch(this.rpcUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
})
.then((response) => response.json() as Promise<{ result: ethers.providers.Log[] }>)
.then(({ result }) => result)
.catch((error) => {
console.error('Error fetching logs:', error);
return [];
});
});

const events = await Promise.all(logRequests).then((res) => res.flat());

return events;
}),
).then((res) => res.flat().filter((event) => event));
console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${gaugeAddresses.length} gauges.`);

const events = await getEvents(
startBlock,
toBlock,
gaugeAddresses,
['Transfer'],
this.rpcUrl,
this.rpcMaxBlockRange,
ERC20Abi,
);

console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${gaugeAddresses.length} gauges done`);

const balancesToFetch = _.uniqBy(
events
.map((event) => {
const parsed = erc20Interface.parseLog(event);

return [
{ erc20Address: event.address, userAddress: parsed.args?.from as string },
{ erc20Address: event.address, userAddress: parsed.args?.to as string },
];
})
.map((event) => [
{ erc20Address: event.address, userAddress: event.args?.from as string },
{ erc20Address: event.address, userAddress: event.args?.to as string },
])
.flat(),
(entry) => entry.erc20Address + entry.userAddress,
);
Expand Down
77 changes: 15 additions & 62 deletions modules/user/lib/user-sync-wallet-balance.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { isSameAddress } from '@balancer-labs/sdk';
import { formatFixed } from '@ethersproject/bignumber';
import { AddressZero } from '@ethersproject/constants';
import { ethers } from 'ethers';
import _ from 'lodash';
import { prisma } from '../../../prisma/prisma-client';
import { prismaBulkExecuteOperations } from '../../../prisma/prisma-util';
Expand All @@ -12,6 +11,7 @@ import { Multicaller, MulticallUserBalance } from '../../web3/multicaller';
import ERC20Abi from '../../web3/abi/ERC20.json';
import { networkContext } from '../../network/network-context.service';
import { AllNetworkConfigs } from '../../network/network-config';
import { getEvents } from '../../web3/events';

export class UserSyncWalletBalanceService {
beetsBarService?: BeetsBarSubgraphService;
Expand Down Expand Up @@ -130,7 +130,6 @@ export class UserSyncWalletBalanceService {
}

public async syncChangedBalancesForAllPools() {
const erc20Interface = new ethers.utils.Interface(ERC20Abi);
const latestBlock = await this.provider.getBlockNumber();
const syncStatus = await prisma.prismaUserBalanceSyncStatus.findUnique({
where: { type_chain: { type: 'WALLET', chain: this.chain } },
Expand Down Expand Up @@ -159,60 +158,18 @@ export class UserSyncWalletBalanceService {

// Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range
const toBlock = Math.min(fromBlock + 50 * this.rpcMaxBlockRange, latestBlock);
const range = toBlock - fromBlock;
console.log(`UserWalletBalanceService: syncing balances from ${fromBlock} to ${toBlock}`);
console.log(`user-sync-wallet-balances-for-all-pools-${this.chainId} getLogs of ${poolAddresses.length} pools`);
const events = await Promise.all(
// Getting logs in batches of max blocks allowed by RPC
Array.from({ length: Math.ceil(range / this.rpcMaxBlockRange) }, (_, i) => i).map(async (i) => {
const from = fromBlock + i * this.rpcMaxBlockRange;
const to = Math.min(fromBlock + (i + 1) * this.rpcMaxBlockRange, toBlock);

// Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side
const logRequests: Promise<ethers.providers.Log[]>[] = _.chunk(poolAddresses, 500).map((addresses) => {
// Fetch logs with a raw json request until we support Viem or Ethers6
const payload = {
jsonrpc: '2.0',
id: 1,
method: 'eth_getLogs',
params: [
{
address: addresses,
topics: [ethers.utils.id('Transfer(address,address,uint256)')],
fromBlock: '0x' + BigInt(from).toString(16),
toBlock: '0x' + BigInt(to).toString(16),
},
],
};

return fetch(AllNetworkConfigs[this.chainId].data.rpcUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
})
.then((response) => response.json() as Promise<{ result: ethers.providers.Log[] }>)
.then(({ result }) => result)
.catch((error) => {
console.error('Error fetching logs:', error);
return [];
});

// Fetching logs with Viem
// viemClient.getLogs({
// address: addresses,
// event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'),
// fromBlock: BigInt(from),
// toBlock: BigInt(to),
// })
});

const events = await Promise.all(logRequests).then((res) => res.flat());

return events;
}),
).then((res) => res.flat().filter((event) => event));

const events = await getEvents(
fromBlock,
toBlock,
poolAddresses,
['Transfer'],
AllNetworkConfigs[this.chainId].data.rpcUrl,
this.rpcMaxBlockRange,
ERC20Abi,
);

console.log(
`user-sync-wallet-balances-for-all-pools-${this.chainId} getLogs of ${poolAddresses.length} pools done`,
Expand All @@ -230,14 +187,10 @@ export class UserSyncWalletBalanceService {
//we also need to track fbeets balance
relevantERC20Addresses.includes(event.address.toLowerCase()),
)
.map((event) => {
const parsed = erc20Interface.parseLog(event);

return [
{ erc20Address: event.address, userAddress: parsed.args?.from as string },
{ erc20Address: event.address, userAddress: parsed.args?.to as string },
];
})
.map((event) => [
{ erc20Address: event.address, userAddress: event.args?.from as string },
{ erc20Address: event.address, userAddress: event.args?.to as string },
])
.flat(),
(entry) => entry.erc20Address + entry.userAddress,
);
Expand Down
Loading