-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
v3 data cd #175
v3 data cd #175
Conversation
gmbronco
commented
Feb 27, 2024
- some syntax cleanup and legacy code isolation (tokenService) to make jobs run in separation
- will refactor token service to be a prisma client instead.
- will work on simplifying tables schema for easier data syncing
- one action for syncing all the pools data
few questions:
|
Pool.dynamicData? Idea was that we would never need to update pool table as it's static and only update dynamic data.
yes, makes sense.
True, it's a legacy product. Let's get rid of it
Ok, let's setup a nice pattern for how to use the viem multicaller.
Oh nice!
On the other hand, this makes sure a pool is either updated entirely or not at all. Wonder what is worse, having a pool with inconsistent data or with outdated data.
It's just nice to work with, but agree, it's cumbersome...
I think we dont need most of that and it will probably also still change. Would store as we need it.
🤣 |
with one table it's going to be easier to make db upserts – something for another PR – added benefit would as well be a native support in prisma > 4.6.0
i'll leave a todo note
Turns out AbiType works only with tuples as a return type, because they are decoded as objects. arrays are just returned as arrays, so we need to parse them manually and abitype doesn't return type for that. But we don't have to worry much about it once we abstract it in a fetching function.
I think it's better to keep consistency and fetch everything in one multicall. – even do decorations, eg: total liquidity. Revise only when it gets slow. |
@franzns – here is the updated V3 syncing with following updates: Jobs:
as many times as needed (probably ~10 - 15x) Read:
|
New work:
|
@@ -6,8 +6,18 @@ const jobsController = JobsController(); | |||
async function run(job: string = process.argv[2], chain: string = process.argv[3]) { | |||
console.log('Running job', job, chain); | |||
|
|||
if (job === 'sync-changed-pools-v3') { | |||
return jobsController.addMissingPoolsFromSubgraph(chain); | |||
if (job === 'add-pools-v3') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this class just used so one can run the jobs locally using yarn task ? Maybe add a comment if so
}; | ||
}; | ||
|
||
export function QueriesController(tracer?: any) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wont scale, we'll need to split that into multiple queries controllers.Though I am not even sure whether these should be controllers or whether we just leave them in the pool service. Maybe we create another category "loaders" where we put the query logic divided by domain (pools, users, etc.). Same for mutations, have another category "mutators" divided by domain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, agree, i consider this a placeholder, lets give it a revamp once we collect more actions. i like loaders / mutations split by domain, same goes for jobs controller.
const tokenPrices = await prisma.prismaTokenPrice.findMany({ | ||
where: { | ||
timestamp: { | ||
gte: parseInt(timestamp), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt this be equal to? Otherwise you get a lot of prices and also wont work properly for "refilling" old swaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, good catch.
modules/actions/pool/sync-swaps.ts
Outdated
}); | ||
|
||
// Get events since the latest event or limit to number or days we want to keep them in the DB | ||
const since = Math.floor(+new Date(Date.now() - daysToSync * 24 * 60 * 60 * 1000) / 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use a var for this, no idea how many days this is
modules/actions/pool/sync-swaps.ts
Outdated
// Store only the events that are not already in the DB | ||
const existingEvents = await prisma.poolEvent.findMany({ | ||
where: { | ||
id: { in: swaps.map((event) => event.id) }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this? we already only get swaps with a greater blockNumber than what we have stored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we dont even need to check against existing events? it anyway does create with skipDuplicates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does prisma detects duplicates by ID? if so, probably not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although it's useful, because we don't need to be getting USD prices for synced events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd remove it completely, there might be some duplicate swaps but it would save another potentionally big query (as id: { in: XXX} is a huge array)
modules/actions/pool/sync-swaps.ts
Outdated
const since = Math.floor(+new Date(Date.now() - daysToSync * 24 * 60 * 60 * 1000) / 1000); | ||
const where = | ||
latestEvent?.blockTimestamp && latestEvent?.blockTimestamp > since | ||
? { blockNumber_gte: String(latestEvent.blockNumber) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be gt
not gte
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initially i did it on purpose to overlap events in case we missed something, but probably doesn't make sense, because most likely all events from the same block are available atomically.. (?)
Maybe adding id_not_in would make things more predictable, i mean we would be getting only missing swaps for sure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, events from the same block are available atomically.
|
||
// Prepare DB entries | ||
const dbEntries = await Promise.all( | ||
events.map(async (event) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you refactor this to how swaps are synced? No reason there are written so differently. Swaps sync seems super clean compared to this...
const events = joinExits.filter((event) => !existingEvents.some((existing) => existing.id === event.id)); | ||
|
||
// Prepare DB entries | ||
const dbEntries = await Promise.all( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same, as for sync join exits v3. can it be refactored to how swaps are done?
* | ||
* @param chainId | ||
*/ | ||
async reloadPools(chainId: string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would move this to the pool controller, as it is not triggered by a job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we start to see a pattern emerging with organising things around models. looks like jobs becomes pretty much pools at the moment. how about forgetting about jobs specific controller, and use "model/domain" naming instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean. We can evolve it later
* | ||
* @param chainId | ||
*/ | ||
async updatePools(chainId: string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isnt this the same as "reloadPools" above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, removing
const pools = await prisma.prismaPool.findMany(); | ||
const ids = pools.map((pool) => pool.id); | ||
const client = getV3JoinedSubgraphClient(balancerV3, balancerPoolsV3); | ||
const newPools = await client.getAllInitializedPools({ id_not_in: ids }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is clever! But we need to be vary of the max return limit. Subgraph only max returns 1000 entities
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe safer to get all pools with paging and then filter here on node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getV3JoinedSubgraphClient – should handle paging – pools subgraph client needs to have the same paginated query as the vault has, i'll add it.
modules/actions/pool/upsert-pools.ts
Outdated
} | ||
|
||
// Get the token prices needed for calculating token balances and total liquidity | ||
const dbPrices = await prisma.prismaTokenPrice.findMany({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be prismaTokenCurrentPrice
modules/actions/pool/upsert-pools.ts
Outdated
await prisma.prismaPoolToken.deleteMany({ where: { poolId: pool.id } }); | ||
await prisma.prismaPoolTokenDynamicData.deleteMany({ where: { poolTokenId: { startsWith: pool.id } } }); | ||
await prisma.prismaPoolExpandedTokens.deleteMany({ where: { poolId: pool.id } }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that needed? Wonder if we could have unexpected side effects if upsertPools
is called regularly. Then pools end up having no tokens quite often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see the problem. There is no upsertMany
that works properly. hmm, need to add a todo here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can wrap it in a transaction, so there won't be intermittent "empty" state
isPoolPaused: configResult[i].result!.isPoolPaused, | ||
isPoolInRecoveryMode: configResult[i].result!.isPoolInRecoveryMode, | ||
} as PoolData; | ||
const parsedResults = pools.map((pool, i) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this whole parsing is so ugly. We need a way to abstract that properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i agree that something similiar to multicaller would be great. I added this here: web3/multicaller-viem.ts – but making it work with the types is another challenge and i don't think we need to fight for this one. There are just few places where we are fetching onchain data, and they are always opinionated, i mean we always know what we are fetching, so it's easy to match inputs. if anything i'd consider that for a separate task.
modules/actions/pool/sync-pools.ts
Outdated
swapFee: String(onchainPoolData.swapFee ?? '0'), | ||
}, | ||
}, | ||
poolTokenDynamicData: onchainPoolData.tokens.map((tokenData) => ({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt this use the transformer?
modules/actions/pool/sync-pools.ts
Outdated
/** TODO: enrich updates with USD values | ||
const tokenAddresses = Array.from( | ||
new Set(Object.values(onchainData).flatMap((pool) => pool.tokens.map((token) => token.address))), | ||
); | ||
|
||
// Get the token prices needed for calculating token balances and total liquidity | ||
const dbPrices = await prisma.prismaTokenCurrentPrice.findMany({ | ||
where: { | ||
tokenAddress: { in: tokenAddresses }, | ||
chain: chain, | ||
}, | ||
include: { | ||
token: true, | ||
}, | ||
}); | ||
|
||
// Build helper maps for token prices and decimals | ||
const decimals = Object.fromEntries(dbPrices.map(({ token }) => [token.address, token.decimals])); | ||
const prices = Object.fromEntries(dbPrices.map((price) => [price.tokenAddress, price.price])); | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great to actually add this to this PR :)
@@ -801,28 +791,78 @@ enum GqlPoolStakingGaugeStatus { | |||
input GqlPoolJoinExitFilter { | |||
poolIdIn: [String!] | |||
chainIn: [GqlChain!] | |||
userAddress: String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotta tripple check if any of these changes break the current UI. I think they dont, but need to check it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was paying attention to just add new attributes, so join/exits have all the same attrs as swaps