Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3 data cd #175

Merged
merged 24 commits into from
Mar 11, 2024
Merged

v3 data cd #175

merged 24 commits into from
Mar 11, 2024

Conversation

gmbronco
Copy link
Collaborator

  • some syntax cleanup and legacy code isolation (tokenService) to make jobs run in separation
  • will refactor token service to be a prisma client instead.
  • will work on simplifying tables schema for easier data syncing
  • one action for syncing all the pools data

@gmbronco
Copy link
Collaborator Author

gmbronco commented Feb 27, 2024

few questions:

  • I’d like to merge and drop “dynamic data” tables, because it’s 1:1. Is there any need I don’t know why it’s separated?
  • Batch multicalls together logically, eg: if we are syncing pool tokens, let’s get all the info in one call logically matching what we need for the DB schema.
  • When would we need a block number in onchain calls? should we always be getting the latest?
  • Easier to get types from multicall directly when we know what we want to get, rather than wrapping in a multicaller class.
  • Typecasting read functions return types using AbiType
  • original onchain syncing has a huge fn call, i find it problematic because it's trying to coordinate a log of data sources into one update, which makes it error prone. we can split it into smaller chunks to make it easier to handle.
  • Most of the time we don’t need specific return types, unless we need strong guarantees something isn’t broken between two different domains – for now, trying to set them up is slowing down iterations
  • Don’t we want to store all config params from the vault getPoolConfig?
  • I cannot get over not using arrow notation, it's hard to go back to the 2000s :)

@franzns
Copy link
Collaborator

franzns commented Feb 28, 2024

few questions:

  • I’d like to merge and drop “dynamic data” tables, because it’s 1:1. Is there any need I don’t know why it’s separated?

Pool.dynamicData? Idea was that we would never need to update pool table as it's static and only update dynamic data.

  • Batch multicalls together logically, eg: if we are syncing pool tokens, let’s get all the info in one call logically matching what we need for the DB schema.

yes, makes sense.

  • When would we need a block number in onchain calls? should we always be getting the latest?

True, it's a legacy product. Let's get rid of it

  • Easier to get types from multicall directly when we know what we want to get, rather than wrapping in a multicaller class.

Ok, let's setup a nice pattern for how to use the viem multicaller.

  • Typecasting read functions return types using AbiType

Oh nice!

  • original onchain syncing has a huge fn call, i find it problematic because it's trying to coordinate a log of data sources into one update, which makes it error prone. we can split it into smaller chunks to make it easier to handle.

On the other hand, this makes sure a pool is either updated entirely or not at all. Wonder what is worse, having a pool with inconsistent data or with outdated data.

  • Most of the time we don’t need specific return types, unless we need strong guarantees something isn’t broken between two different domains – for now, trying to set them up is slowing down iterations

It's just nice to work with, but agree, it's cumbersome...

  • Don’t we want to store all config params from the vault getPoolConfig?

I think we dont need most of that and it will probably also still change. Would store as we need it.

  • I cannot get over not using arrow notation, it's hard to go back to the 2000s :)

🤣

@gmbronco
Copy link
Collaborator Author

Pool.dynamicData? Idea was that we would never need to update pool table as it's static and only update dynamic data.

with one table it's going to be easier to make db upserts – something for another PR – added benefit would as well be a native support in prisma > 4.6.0

block number
True, it's a legacy product. Let's get rid of it

i'll leave a todo note

Typecasting read functions in multicall

Turns out AbiType works only with tuples as a return type, because they are decoded as objects. arrays are just returned as arrays, so we need to parse them manually and abitype doesn't return type for that. But we don't have to worry much about it once we abstract it in a fetching function.

  • original onchain syncing has a huge fn call, i find it problematic because it's trying to coordinate a log of data sources into one update, which makes it error prone. we can split it into smaller chunks to make it easier to handle.

On the other hand, this makes sure a pool is either updated entirely or not at all. Wonder what is worse, having a pool with inconsistent data or with outdated data.

I think it's better to keep consistency and fetch everything in one multicall. – even do decorations, eg: total liquidity. Revise only when it gets slow.

@gmbronco
Copy link
Collaborator Author

gmbronco commented Mar 4, 2024

@franzns – here is the updated V3 syncing with following updates:

Jobs:

  • sync pools — takes all the pools from subgraph and upserts current data
  • sync join exists — subgraph to DB – all events on v3 and just 100 days for v2
    syncing is based on subgraph pagination and fetches 1000 records in one call, to fully sync missing events run:
yarn task sync-join-exits-v2 1

as many times as needed (probably ~10 - 15x)

Read:

  • poolGetJoinExits – lists events, filterable by chain, pool, user

@gmbronco gmbronco marked this pull request as ready for review March 4, 2024 21:36
@gmbronco
Copy link
Collaborator Author

gmbronco commented Mar 6, 2024

New work:

  •  Jobs for syncing:
    • V3 swaps
    • V3 Join/exits
    • V2 Join/exits
  • Adding pools – renamed this to addPools which finds new pools in subgraph and adds them to the DB
  • Syncing pools – syncPools is checking logs, fetches onchain data for affected pools and upserts to the DB
  • In case when we need to do full sync with subgraph and onchain data, we can run the addPools with specific IDs and it will handle all the data with upserts

@@ -6,8 +6,18 @@ const jobsController = JobsController();
async function run(job: string = process.argv[2], chain: string = process.argv[3]) {
console.log('Running job', job, chain);

if (job === 'sync-changed-pools-v3') {
return jobsController.addMissingPoolsFromSubgraph(chain);
if (job === 'add-pools-v3') {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class just used so one can run the jobs locally using yarn task ? Maybe add a comment if so

};
};

export function QueriesController(tracer?: any) {
Copy link
Collaborator

@franzns franzns Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wont scale, we'll need to split that into multiple queries controllers.Though I am not even sure whether these should be controllers or whether we just leave them in the pool service. Maybe we create another category "loaders" where we put the query logic divided by domain (pools, users, etc.). Same for mutations, have another category "mutators" divided by domain.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, agree, i consider this a placeholder, lets give it a revamp once we collect more actions. i like loaders / mutations split by domain, same goes for jobs controller.

const tokenPrices = await prisma.prismaTokenPrice.findMany({
where: {
timestamp: {
gte: parseInt(timestamp),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this be equal to? Otherwise you get a lot of prices and also wont work properly for "refilling" old swaps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good catch.

});

// Get events since the latest event or limit to number or days we want to keep them in the DB
const since = Math.floor(+new Date(Date.now() - daysToSync * 24 * 60 * 60 * 1000) / 1000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use a var for this, no idea how many days this is

// Store only the events that are not already in the DB
const existingEvents = await prisma.poolEvent.findMany({
where: {
id: { in: swaps.map((event) => event.id) },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? we already only get swaps with a greater blockNumber than what we have stored.

Copy link
Collaborator

@franzns franzns Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we dont even need to check against existing events? it anyway does create with skipDuplicates

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does prisma detects duplicates by ID? if so, probably not needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although it's useful, because we don't need to be getting USD prices for synced events.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove it completely, there might be some duplicate swaps but it would save another potentionally big query (as id: { in: XXX} is a huge array)

const since = Math.floor(+new Date(Date.now() - daysToSync * 24 * 60 * 60 * 1000) / 1000);
const where =
latestEvent?.blockTimestamp && latestEvent?.blockTimestamp > since
? { blockNumber_gte: String(latestEvent.blockNumber) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be gt not gte

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially i did it on purpose to overlap events in case we missed something, but probably doesn't make sense, because most likely all events from the same block are available atomically.. (?)

Maybe adding id_not_in would make things more predictable, i mean we would be getting only missing swaps for sure?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, events from the same block are available atomically.


// Prepare DB entries
const dbEntries = await Promise.all(
events.map(async (event) => {
Copy link
Collaborator

@franzns franzns Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you refactor this to how swaps are synced? No reason there are written so differently. Swaps sync seems super clean compared to this...

const events = joinExits.filter((event) => !existingEvents.some((existing) => existing.id === event.id));

// Prepare DB entries
const dbEntries = await Promise.all(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, as for sync join exits v3. can it be refactored to how swaps are done?

*
* @param chainId
*/
async reloadPools(chainId: string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would move this to the pool controller, as it is not triggered by a job

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we start to see a pattern emerging with organising things around models. looks like jobs becomes pretty much pools at the moment. how about forgetting about jobs specific controller, and use "model/domain" naming instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. We can evolve it later

*
* @param chainId
*/
async updatePools(chainId: string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt this the same as "reloadPools" above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, removing

const pools = await prisma.prismaPool.findMany();
const ids = pools.map((pool) => pool.id);
const client = getV3JoinedSubgraphClient(balancerV3, balancerPoolsV3);
const newPools = await client.getAllInitializedPools({ id_not_in: ids });
Copy link
Collaborator

@franzns franzns Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is clever! But we need to be vary of the max return limit. Subgraph only max returns 1000 entities

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe safer to get all pools with paging and then filter here on node?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getV3JoinedSubgraphClient – should handle paging – pools subgraph client needs to have the same paginated query as the vault has, i'll add it.

}

// Get the token prices needed for calculating token balances and total liquidity
const dbPrices = await prisma.prismaTokenPrice.findMany({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be prismaTokenCurrentPrice

Comment on lines 118 to 120
await prisma.prismaPoolToken.deleteMany({ where: { poolId: pool.id } });
await prisma.prismaPoolTokenDynamicData.deleteMany({ where: { poolTokenId: { startsWith: pool.id } } });
await prisma.prismaPoolExpandedTokens.deleteMany({ where: { poolId: pool.id } });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that needed? Wonder if we could have unexpected side effects if upsertPools is called regularly. Then pools end up having no tokens quite often.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see the problem. There is no upsertMany that works properly. hmm, need to add a todo here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can wrap it in a transaction, so there won't be intermittent "empty" state

isPoolPaused: configResult[i].result!.isPoolPaused,
isPoolInRecoveryMode: configResult[i].result!.isPoolInRecoveryMode,
} as PoolData;
const parsedResults = pools.map((pool, i) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole parsing is so ugly. We need a way to abstract that properly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree that something similiar to multicaller would be great. I added this here: web3/multicaller-viem.ts – but making it work with the types is another challenge and i don't think we need to fight for this one. There are just few places where we are fetching onchain data, and they are always opinionated, i mean we always know what we are fetching, so it's easy to match inputs. if anything i'd consider that for a separate task.

swapFee: String(onchainPoolData.swapFee ?? '0'),
},
},
poolTokenDynamicData: onchainPoolData.tokens.map((tokenData) => ({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this use the transformer?

Comment on lines 58 to 77
/** TODO: enrich updates with USD values
const tokenAddresses = Array.from(
new Set(Object.values(onchainData).flatMap((pool) => pool.tokens.map((token) => token.address))),
);

// Get the token prices needed for calculating token balances and total liquidity
const dbPrices = await prisma.prismaTokenCurrentPrice.findMany({
where: {
tokenAddress: { in: tokenAddresses },
chain: chain,
},
include: {
token: true,
},
});

// Build helper maps for token prices and decimals
const decimals = Object.fromEntries(dbPrices.map(({ token }) => [token.address, token.decimals]));
const prices = Object.fromEntries(dbPrices.map((price) => [price.tokenAddress, price.price]));
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to actually add this to this PR :)

@@ -801,28 +791,78 @@ enum GqlPoolStakingGaugeStatus {
input GqlPoolJoinExitFilter {
poolIdIn: [String!]
chainIn: [GqlChain!]
userAddress: String
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotta tripple check if any of these changes break the current UI. I think they dont, but need to check it!

Copy link
Collaborator Author

@gmbronco gmbronco Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was paying attention to just add new attributes, so join/exits have all the same attrs as swaps

@gmbronco gmbronco merged commit db04691 into v3-canary Mar 11, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants