Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

faster logs fetching for staking #588

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 93 additions & 51 deletions modules/user/lib/user-sync-gauge-balance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,30 @@ import { gaugeSubgraphService } from '../../subgraphs/gauge-subgraph/gauge-subgr
import { AddressZero } from '@ethersproject/constants';

export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
get chain() {
return networkContext.chain;
}

get chainId() {
return networkContext.chainId;
}

get provider() {
return networkContext.provider;
}

get rpcUrl() {
return networkContext.data.rpcUrl;
}

get rpcMaxBlockRange() {
return networkContext.data.rpcMaxBlockRange;
}

get multicallAddress() {
return networkContext.data.multicall;
}

public async initStakedBalances(stakingTypes: PrismaPoolStakingType[]): Promise<void> {
if (!stakingTypes.includes('GAUGE')) {
return;
Expand All @@ -25,7 +49,7 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
console.log('initStakedBalances: loading pools...');
const pools = await prisma.prismaPool.findMany({
select: { id: true },
where: { chain: networkContext.chain },
where: { chain: this.chain },
});

const filteredGaugeShares = gaugeShares.filter((share) => {
Expand All @@ -45,14 +69,14 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
data: userAddresses.map((userAddress) => ({ address: userAddress })),
skipDuplicates: true,
}),
prisma.prismaUserStakedBalance.deleteMany({ where: { chain: networkContext.chain } }),
prisma.prismaUserStakedBalance.deleteMany({ where: { chain: this.chain } }),
prisma.prismaUserStakedBalance.createMany({
data: filteredGaugeShares.map((share) => {
const pool = pools.find((pool) => pool.id === share.gauge.poolId);

return {
id: `${share.gauge.id}-${share.user.id}`,
chain: networkContext.chain,
chain: this.chain,
balance: share.balance,
balanceNum: parseFloat(share.balance),
userAddress: share.user.id,
Expand All @@ -63,8 +87,8 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
}),
}),
prisma.prismaUserBalanceSyncStatus.upsert({
where: { type_chain: { type: 'STAKED', chain: networkContext.chain } },
create: { type: 'STAKED', chain: networkContext.chain, blockNumber: block.number },
where: { type_chain: { type: 'STAKED', chain: this.chain } },
create: { type: 'STAKED', chain: this.chain, blockNumber: block.number },
update: { blockNumber: block.number },
}),
],
Expand All @@ -77,7 +101,7 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
public async syncChangedStakedBalances(): Promise<void> {
// we always store the latest synced block
const status = await prisma.prismaUserBalanceSyncStatus.findUnique({
where: { type_chain: { type: 'STAKED', chain: networkContext.chain } },
where: { type_chain: { type: 'STAKED', chain: this.chain } },
});

if (!status) {
Expand All @@ -86,22 +110,18 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {

const pools = await prisma.prismaPool.findMany({
include: { staking: true },
where: { chain: networkContext.chain },
where: { chain: this.chain },
});
console.log(`user-sync-staked-balances-${this.chainId} got data from db.`);

console.log(`user-sync-staked-balances-${networkContext.chainId} got data from db.`);
const latestBlock = await this.provider.getBlockNumber();
console.log(`user-sync-staked-balances-${this.chainId} got latest block.`);

const latestBlock = await networkContext.provider.getBlockNumber();
console.log(`user-sync-staked-balances-${networkContext.chainId} got latest block.`);
const gaugeAddresses = await gaugeSubgraphService.getAllGaugeAddresses();
console.log(`user-sync-staked-balances-${networkContext.chainId} got ${gaugeAddresses.length} gauges.`);

// we sync at most 10k blocks at a time
const startBlock = status.blockNumber + 1;
const endBlock =
latestBlock - startBlock > networkContext.data.rpcMaxBlockRange
? startBlock + networkContext.data.rpcMaxBlockRange
: latestBlock;
const endBlock = latestBlock;

// no new blocks have been minted, needed for slow networks
if (startBlock > endBlock) {
Expand All @@ -112,32 +132,58 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
we need to figure out which users have a changed balance on any gauge contract and update their balance,
therefore we check all transfer events since the last synced block
*/

const events: ethers.providers.Log[] = [];
const logPromises: Promise<ethers.providers.Log[]>[] = [];
const erc20Interface = new ethers.utils.Interface(ERC20Abi);

console.log(`user-sync-staked-balances-${networkContext.chainId} getLogs of ${gaugeAddresses.length} gauges`);
// Split the range into smaller chunks to avoid RPC limits, setting up to 50 times max block range
const toBlock = Math.min(startBlock + 50 * this.rpcMaxBlockRange, latestBlock);
const range = toBlock - startBlock;
console.log(`user-sync-staked-balances-${this.chainId} block range from ${startBlock} to ${toBlock}`);
console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${_.uniq(gaugeAddresses).length} gauges.`);
const events = await Promise.all(
// Getting logs in batches of max blocks allowed by RPC
Array.from({ length: Math.ceil(range / this.rpcMaxBlockRange) }, (_, i) => i).map(async (i) => {
const from = startBlock + i * this.rpcMaxBlockRange;
const to = Math.min(startBlock + (i + 1) * this.rpcMaxBlockRange, toBlock);

for (const gaugeAddress of gaugeAddresses) {
logPromises.push(
networkContext.provider.getLogs({
//ERC20 Transfer topic
topics: ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],
fromBlock: startBlock,
toBlock: endBlock,
address: gaugeAddress,
}),
);
}
const allResponses = await Promise.all(logPromises);
console.log(
`user-sync-staked-balances-${networkContext.chainId} getLogs of ${gaugeAddresses.length} gauges done`,
);
// Usually RPCs are handling any number of addresses, but it here batching just to be on the safe side
const logRequests: Promise<ethers.providers.Log[]>[] = _.chunk(gaugeAddresses, 500).map((addresses) => {
// Fetch logs with a raw json request until we support Viem or Ethers6
const payload = {
jsonrpc: '2.0',
id: 1,
method: 'eth_getLogs',
params: [
{
address: addresses,
topics: [ethers.utils.id('Transfer(address,address,uint256)')],
fromBlock: '0x' + BigInt(from).toString(16),
toBlock: '0x' + BigInt(to).toString(16),
},
],
};

for (const response of allResponses) {
events.push(...response);
}
return fetch(this.rpcUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
})
.then((response) => response.json() as Promise<{ result: ethers.providers.Log[] }>)
.then(({ result }) => result)
.catch((error) => {
console.error('Error fetching logs:', error);
return [];
});
});

const events = await Promise.all(logRequests).then((res) => res.flat());

return events;
}),
).then((res) => res.flat().filter((event) => event));

console.log(`user-sync-staked-balances-${this.chainId} getLogs for ${gaugeAddresses.length} gauges done`);

const balancesToFetch = _.uniqBy(
events
Expand All @@ -153,28 +199,24 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
(entry) => entry.erc20Address + entry.userAddress,
);

console.log(
`user-sync-staked-balances-${networkContext.chainId} got ${balancesToFetch.length} balances to fetch.`,
);
console.log(`user-sync-staked-balances-${this.chainId} got ${balancesToFetch.length} balances to fetch.`);

if (balancesToFetch.length === 0) {
await prisma.prismaUserBalanceSyncStatus.update({
where: { type_chain: { type: 'STAKED', chain: networkContext.chain } },
where: { type_chain: { type: 'STAKED', chain: this.chain } },
data: { blockNumber: endBlock },
});

return;
}

const balances = await Multicaller.fetchBalances({
multicallAddress: networkContext.data.multicall,
provider: networkContext.provider,
multicallAddress: this.multicallAddress,
provider: this.provider,
balancesToFetch,
});

console.log(
`user-sync-staked-balances-${networkContext.chainId} got ${balancesToFetch.length} balances to fetch done.`,
);
console.log(`user-sync-staked-balances-${this.chainId} got ${balancesToFetch.length} balances to fetch done.`);

await prismaBulkExecuteOperations(
[
Expand All @@ -193,7 +235,7 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
where: {
id_chain: {
id: `${userBalance.erc20Address}-${userBalance.userAddress}`,
chain: networkContext.chain,
chain: this.chain,
},
},
update: {
Expand All @@ -202,7 +244,7 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
},
create: {
id: `${userBalance.erc20Address}-${userBalance.userAddress}`,
chain: networkContext.chain,
chain: this.chain,
balance: formatFixed(userBalance.balance, 18),
balanceNum: parseFloat(formatFixed(userBalance.balance, 18)),
userAddress: userBalance.userAddress,
Expand All @@ -216,7 +258,7 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
where: {
type_chain: {
type: 'STAKED',
chain: networkContext.chain,
chain: this.chain,
},
},
data: { blockNumber: endBlock },
Expand All @@ -232,14 +274,14 @@ export class UserSyncGaugeBalanceService implements UserStakedBalanceService {
const amount = formatFixed(balance, 18);

await prisma.prismaUserStakedBalance.upsert({
where: { id_chain: { id: `${staking.address}-${userAddress}`, chain: networkContext.chain } },
where: { id_chain: { id: `${staking.address}-${userAddress}`, chain: this.chain } },
update: {
balance: amount,
balanceNum: parseFloat(amount),
},
create: {
id: `${staking.address}-${userAddress}`,
chain: networkContext.chain,
chain: this.chain,
balance: amount,
balanceNum: parseFloat(amount),
userAddress: userAddress,
Expand Down
2 changes: 1 addition & 1 deletion modules/user/lib/user-sync-wallet-balance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ export class UserSyncWalletBalanceService {

return events;
}),
).then((res) => res.flat());
).then((res) => res.flat().filter((event) => event));

console.log(
`user-sync-wallet-balances-for-all-pools-${this.chainId} getLogs of ${poolAddresses.length} pools done`,
Expand Down