Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

publish to prod #1394

Merged
merged 5 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# backend

## 1.26.27

### Patch Changes

- 7a0ae59: adding dynamic swap fee to fx pools
- a16406b: balance tables indexes
- 5011ef2: optimise multichain events query
- 580c5e7: db indexes

## 1.26.26

### Patch Changes
Expand Down
18 changes: 18 additions & 0 deletions apps/api/gql/resolvers/pool.resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
EventsQueryController,
SnapshotsController,
PoolController,
FXPoolsController,
} from '../../../../modules/controllers';
import { chainIdToChain } from '../../../../modules/network/chain-id-to-chain';

Expand Down Expand Up @@ -207,6 +208,23 @@ const balancerResolvers: Resolvers = {
}
}

return result;
},
poolSyncFxQuoteTokens: async (parent, { chains }, context) => {
isAdminRoute(context);

const result: { type: string; chain: GqlChain; success: boolean; error: string | undefined }[] = [];

for (const chain of chains) {
try {
await FXPoolsController().syncQuoteTokens(chain);
result.push({ type: 'fx', chain, success: true, error: undefined });
} catch (e) {
result.push({ type: 'fx', chain, success: false, error: `${e}` });
console.log(`Could not sync fx quote tokens for chain ${chain}: ${e}`);
}
}

return result;
},
},
Expand Down
1 change: 1 addition & 0 deletions apps/api/gql/schema/pool.gql
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ extend type Mutation {
poolLoadOnChainDataForAllPools(chains: [GqlChain!]!): [GqlPoolMutationResult!]!
poolReloadPools(chains: [GqlChain!]!): [GqlPoolMutationResult!]!
poolSyncAllCowSnapshots(chains: [GqlChain!]!): [GqlPoolMutationResult!]!
poolSyncFxQuoteTokens(chains: [GqlChain!]!): [GqlPoolMutationResult!]!
}

"""
Expand Down
1 change: 1 addition & 0 deletions graphql_schema_generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3684,6 +3684,7 @@ export const schema = gql`
poolReloadStakingForAllPools(stakingTypes: [GqlPoolStakingType!]!): String!
poolSyncAllCowSnapshots(chains: [GqlChain!]!): [GqlPoolMutationResult!]!
poolSyncAllPoolsFromSubgraph: [String!]!
poolSyncFxQuoteTokens(chains: [GqlChain!]!): [GqlPoolMutationResult!]!
poolUpdateLifetimeValuesForAllPools: String!
poolUpdateLiquidityValuesForAllPools: String!
protocolCacheMetrics: String!
Expand Down
19 changes: 12 additions & 7 deletions modules/actions/pool/v2/add-pools.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Chain } from '@prisma/client';
import { Chain, PrismaPool } from '@prisma/client';
import { prisma } from '../../../../prisma/prisma-client';
import { nestedPoolWithSingleLayerNesting } from '../../../../prisma/prisma-types';
import { V2SubgraphClient } from '../../../subgraphs/balancer-subgraph';
import { BalancerPoolFragment } from '../../../subgraphs/balancer-subgraph/generated/balancer-subgraph-types';
import { subgraphToPrismaCreate } from '../../../pool/subgraph-mapper';
import { upsertBptBalancesV2 } from '../../user/upsert-bpt-balances-v2';
import _ from 'lodash';
import { syncPoolTypeOnchainData } from './sync-pool-type-onchain-data';

export const addPools = async (subgraphService: V2SubgraphClient, chain: Chain): Promise<string[]> => {
const { block } = await subgraphService.legacyService.getMetadata();
Expand All @@ -25,9 +26,13 @@ export const addPools = async (subgraphService: V2SubgraphClient, chain: Chain):

const createdPools: string[] = [];
for (const subgraphPool of newPools) {
const created = await createPoolRecord(subgraphPool, chain, block.number, allNestedTypePools);
if (created) {
const dbPool = await createPoolRecord(subgraphPool, chain, block.number, allNestedTypePools);
if (dbPool) {
createdPools.push(subgraphPool.id);
// When new FX pool is added, we need to get the quote token
if (subgraphPool.poolType === 'FX') {
await syncPoolTypeOnchainData([dbPool], chain);
}
}
}

Expand All @@ -48,7 +53,7 @@ const createPoolRecord = async (
chain: Chain,
blockNumber: number,
nestedPools: { id: string; address: string }[],
): Promise<Boolean> => {
): Promise<PrismaPool | undefined> => {
const poolTokens = pool.tokens || [];

await prisma.prismaToken.createMany({
Expand All @@ -74,14 +79,14 @@ const createPoolRecord = async (
const prismaPoolRecordWithAssociations = subgraphToPrismaCreate(pool, chain, blockNumber, nestedPools);

try {
await prisma.prismaPool.create(prismaPoolRecordWithAssociations);
const pool = await prisma.prismaPool.create(prismaPoolRecordWithAssociations);

await createAllTokensRelationshipForPool(pool.id, chain);

return pool;
} catch (e) {
console.error(`Could not create pool ${pool.id} on chain ${chain}. Skipping.`, e);
return false;
}
return true;
};

const createAllTokensRelationshipForPool = async (poolId: string, chain: Chain): Promise<void> => {
Expand Down
64 changes: 64 additions & 0 deletions modules/actions/pool/v2/sync-pool-type-onchain-data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Abi } from 'abitype';
import FX from '../../../pool/abi/FxPool.json';
import { getViemClient, ViemClient } from '../../../sources/viem-client';
import { Chain, PrismaPoolType } from '@prisma/client';
import { prisma } from '../../../../prisma/prisma-client';
import { prismaBulkExecuteOperations } from '../../../../prisma/prisma-util';

const update = async (data: { id: string; chain: Chain; typeData: any }[]) => {
// Update the pool type data
const updates = data.map(({ id, chain, typeData }) =>
prisma.prismaPool.update({
where: { id_chain: { id, chain } },
data: { typeData },
}),
);

await prismaBulkExecuteOperations(updates, false);
};

export const syncPoolTypeOnchainData = async (
pools: { id: string; chain: Chain; address: string; type: PrismaPoolType; typeData: any }[],
chain: Chain,
) => {
const viemClient = getViemClient(chain);

// Get FX pools
const fxPools = pools.filter((pool) => pool.type === 'FX');
const quoteTokens = await fetchFxQuoteTokens(fxPools, viemClient);
await update(quoteTokens);

return true;
};

export const fetchFxQuoteTokens = async (
pools: { id: string; chain: Chain; address: string; typeData: any }[],
viemClient: ViemClient,
) => {
// Fetch the tokens from the subgraph
const contracts = pools.map(({ address }) => {
return {
address: address as `0x${string}`,
abi: FX as Abi,
functionName: 'derivatives',
args: [1],
};
});

const results = await viemClient.multicall({ contracts, allowFailure: true });

return results
.map((call, index) => {
// If the call failed, return null
if (call.status === 'failure') return null;

const typeData = { ...pools[index].typeData, quoteToken: (call.result as string).toLowerCase() };

return {
id: pools[index].id,
chain: pools[index].chain,
typeData,
};
})
.filter((quoteToken): quoteToken is { id: string; chain: Chain; typeData: any } => quoteToken !== null);
};
16 changes: 14 additions & 2 deletions modules/actions/pool/v2/sync-swaps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ export async function syncSwaps(subgraphClient: V2SubgraphClient, chain: Chain):
},
});

// Get list of FX pool addresses for the fee calculation
const fxPools = (await prisma.prismaPool.findMany({
where: {
chain: chain,
type: 'FX',
},
select: {
id: true,
typeData: true, // contains the quote token address
},
})) as { id: string; typeData: { quoteToken: string } }[];

// Querying by timestamp of Fantom, because it has events without a block number in the DB
const where = latestEvent
? chain === Chain.FANTOM
Expand All @@ -47,14 +59,14 @@ export async function syncSwaps(subgraphClient: V2SubgraphClient, chain: Chain):
console.time('BalancerSwaps');
const { swaps } = await subgraphClient.BalancerSwaps({
first: 1000,
where,
where: where,
orderBy: chain === Chain.FANTOM ? Swap_OrderBy.Timestamp : Swap_OrderBy.Block,
orderDirection: OrderDirection.Asc,
});
console.timeEnd('BalancerSwaps');

console.time('swapV2Transformer');
const dbSwaps = swaps.map((swap) => swapV2Transformer(swap, chain));
const dbSwaps = swaps.map((swap) => swapV2Transformer(swap, chain, fxPools));
console.timeEnd('swapV2Transformer');

// TODO: parse batchSwaps, if needed
Expand Down
47 changes: 47 additions & 0 deletions modules/controllers/event-query-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,48 @@ const rangeToTimestamp = (range: GqlPoolEventsDataRange): number => {
}
};

const getMultichainEvents = async (chainIn: Chain[]) => {
const results = await Promise.all(
chainIn.map(async (chain) => {
return (
await prisma.prismaPoolEvent.findMany({
where: {
chain,
},
take: 100,
orderBy: [
{
blockTimestamp: 'desc',
},
{
blockNumber: 'desc',
},
{
logIndex: 'desc',
},
],
})
).map((event) =>
event.type === 'SWAP' && (event as SwapEvent).payload?.surplus
? parseCowAmmSwap(event as SwapEvent)
: event.type === 'SWAP'
? parseSwap(event as SwapEvent)
: parseJoinExit(event as JoinExitEvent),
);
}),
);

return results.flat().sort((a, b) => {
if (a.blockTimestamp === b.blockTimestamp) {
if (a.blockNumber === b.blockNumber) {
return a.logIndex - b.logIndex;
}
return a.blockNumber - b.blockNumber;
}
return a.blockTimestamp - b.blockTimestamp;
});
};

export function EventsQueryController(tracer?: any) {
return {
/**
Expand All @@ -93,6 +135,11 @@ export function EventsQueryController(tracer?: any) {

const conditions: Prisma.PrismaPoolEventWhereInput = {};

// Table is partitioned by chain, so querying by many chains is extermenly inefficient.
if (chainIn && chainIn.length > 1) {
return getMultichainEvents(chainIn as Chain[]);
}

if (chainIn && chainIn.length) {
conditions.chain = {
in: chainIn as Chain[],
Expand Down
9 changes: 9 additions & 0 deletions modules/controllers/fx-pools-controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import config from '../../config';
import { prisma } from '../../prisma/prisma-client';
import { syncPoolTypeOnchainData } from '../actions/pool/v2/sync-pool-type-onchain-data';
import { syncLatestFXPrices } from '../token/latest-fx-price';
import { Chain } from '@prisma/client';

Expand All @@ -11,5 +13,12 @@ export function FXPoolsController() {

return syncLatestFXPrices(balancer, chain);
},
async syncQuoteTokens(chain: Chain) {
const pools = await prisma.prismaPool.findMany({
where: { chain, type: 'FX' },
});

return syncPoolTypeOnchainData(pools, chain);
},
};
}
3 changes: 2 additions & 1 deletion modules/sources/enrichers/swaps-usd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ export async function swapsUsd(swaps: SwapEvent[], chain: Chain): Promise<SwapEv
const tokenOut = tokenPrices.find((price) => price.tokenAddress === swap.payload.tokenOut.address);
const feeToken = tokenPrices.find((price) => price.tokenAddress === swap.payload.fee.address);
const surplusToken = tokenPrices.find((price) => price.tokenAddress === swap.payload.surplus?.address);
const feeValueUSD = parseFloat(swap.payload.fee.amount) * (feeToken?.price || 0);

const payload = {
fee: {
...swap.payload.fee,
valueUSD: String((feeToken?.price || 0) * parseFloat(swap.payload.fee.amount)),
valueUSD: String(feeValueUSD > 0 ? feeValueUSD : swap.payload.fee.valueUSD),
},
tokenIn: {
...swap.payload.tokenIn,
Expand Down
47 changes: 43 additions & 4 deletions modules/sources/transformers/swap-v2-transformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,53 @@ import { SwapEvent } from '../../../prisma/prisma-types';
* @param chain
* @returns
*/
export function swapV2Transformer(swap: BalancerSwapFragment, chain: Chain): SwapEvent {
export function swapV2Transformer(
swap: BalancerSwapFragment,
chain: Chain,
fxPools: { id: string; typeData: { quoteToken: string } }[] = [],
): SwapEvent {
// Avoiding scientific notation
const feeFloat = parseFloat(swap.tokenAmountIn) * parseFloat(swap.poolId.swapFee ?? 0);
const fee = feeFloat < 1e6 ? feeFloat.toFixed(18).replace(/0+$/, '').replace(/\.$/, '') : String(feeFloat);
const feeFloatUSD = parseFloat(swap.valueUSD) * parseFloat(swap.poolId.swapFee ?? 0);
const feeUSD =
let fee = feeFloat < 1e6 ? feeFloat.toFixed(18).replace(/0+$/, '').replace(/\.$/, '') : String(feeFloat);
let feeFloatUSD = parseFloat(swap.valueUSD) * parseFloat(swap.poolId.swapFee ?? 0);
let feeUSD =
feeFloatUSD < 1e6 ? feeFloatUSD.toFixed(18).replace(/0+$/, '').replace(/\.$/, '') : String(feeFloatUSD);

// FX pools have a different fee calculation
// Replica of the subgraph logic:
// https://github.com/balancer/balancer-subgraph-v2/blob/60453224453bd07a0a3a22a8ad6cc26e65fd809f/src/mappings/vault.ts#L551-L564
if (swap.poolId.poolType === 'FX') {
// Find the pool that has the quote token
const fxPool = fxPools.find((pool) => pool.id === swap.poolId.id);
if (fxPool && [swap.tokenOut, swap.tokenIn].includes(fxPool.typeData.quoteToken)) {
const quoteTokenAddress = fxPool.typeData.quoteToken;
const baseTokenAddress = swap.tokenIn === quoteTokenAddress ? swap.tokenOut : swap.tokenIn;
let isTokenInBase = swap.tokenOut === quoteTokenAddress;
let baseToken = swap.poolId.tokens?.find(({ token }) => token.address == baseTokenAddress);
let quoteToken = swap.poolId.tokens?.find(({ token }) => token.address == quoteTokenAddress);
let baseRate = baseToken != null ? baseToken.token.latestFXPrice : null;
let quoteRate = quoteToken != null ? quoteToken.token.latestFXPrice : null;

if (baseRate && quoteRate) {
if (isTokenInBase) {
feeFloatUSD +=
parseFloat(swap.tokenAmountIn) * parseFloat(baseRate) -
parseFloat(swap.tokenAmountOut) * parseFloat(quoteRate);
// Need to set the fee in the tokenIn price, because it's later recalculated based on the DB prices
fee = String(feeFloatUSD / parseFloat(baseRate)); // fee / tokenIn price
} else {
feeFloatUSD +=
parseFloat(swap.tokenAmountIn) * parseFloat(quoteRate) -
parseFloat(swap.tokenAmountOut) * parseFloat(baseRate);
// Need to set the fee in the tokenIn price, because it's later recalculated based on the DB prices
fee = String(feeFloatUSD / parseFloat(quoteRate)); // fee / tokenIn price
}
}

feeUSD = String(feeFloatUSD);
}
}

return {
id: swap.id, // tx + logIndex
tx: swap.tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,13 @@ fragment BalancerSwap on Swap {
poolId {
id
swapFee
poolType
tokens {
token {
address
latestFXPrice
}
}
}
userAddress {
id
Expand Down
Loading
Loading