Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-latest-fx-prices #602

Merged
merged 2 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions modules/network/avalanche.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,5 +346,9 @@ export const avalancheNetworkConfig: NetworkConfig = {
name: 'feed-data-to-datastudio',
interval: (env.DEPLOYMENT_ENV as DeploymentEnv) === 'canary' ? every(5, 'minutes') : every(1, 'minutes'),
},
{
name: 'sync-latest-fx-prices',
interval: every(10, 'minutes'),
},
],
};
4 changes: 4 additions & 0 deletions modules/network/mainnet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,5 +509,9 @@ export const mainnetNetworkConfig: NetworkConfig = {
name: 'feed-data-to-datastudio',
interval: (env.DEPLOYMENT_ENV as DeploymentEnv) === 'canary' ? every(5, 'minutes') : every(1, 'minutes'),
},
{
name: 'sync-latest-fx-prices',
interval: every(10, 'minutes'),
},
],
};
5 changes: 4 additions & 1 deletion modules/network/optimism.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ export const optimismNetworkConfig: NetworkConfig = {
],
userStakedBalanceServices: [new UserSyncGaugeBalanceService()],
services: {
balancerSubgraphService: new BalancerSubgraphService(optimismNetworkData.subgraphs.balancer, optimismNetworkData.chain.id),
balancerSubgraphService: new BalancerSubgraphService(
optimismNetworkData.subgraphs.balancer,
optimismNetworkData.chain.id,
),
},
/*
For sub-minute jobs we set the alarmEvaluationPeriod and alarmDatapointsToAlarm to 1 instead of the default 3.
Expand Down
4 changes: 4 additions & 0 deletions modules/network/polygon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,5 +391,9 @@ export const polygonNetworkConfig: NetworkConfig = {
name: 'feed-data-to-datastudio',
interval: (env.DEPLOYMENT_ENV as DeploymentEnv) === 'canary' ? every(5, 'minutes') : every(1, 'minutes'),
},
{
name: 'sync-latest-fx-prices',
interval: every(10, 'minutes'),
},
],
};
1 change: 1 addition & 0 deletions modules/pool/pool.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ model PrismaPoolTokenDynamicData {
balanceUSD Float
weight String?
priceRate String
latestFxPrice Float?
}

model PrismaPoolSwap {
Expand Down
1 change: 0 additions & 1 deletion modules/sor/sorV2/sorV2.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ export class SorV2Service implements SwapService {
'ELEMENT', // not supported by b-sdk
'UNKNOWN', // not supported by b-sdk
'INVESTMENT', // not supported by b-sdk
'FX', // TODO: FX pool tokens are missing latestFXPrice - needs to be added to the DB
],
},
AND: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ fragment BalancerToken on Token {
id
symbol
address
latestFXPrice
latestUSDPrice
totalVolumeNotional
totalVolumeUSD
Expand Down Expand Up @@ -213,6 +214,9 @@ fragment BalancerPoolToken on PoolToken {
weight
priceRate
index
token {
latestFXPrice
}
}

query BalancerPools(
Expand Down
49 changes: 49 additions & 0 deletions modules/token/latest-fx-price.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { GraphQLClient } from 'graphql-request';
import { getSdk } from '../subgraphs/balancer-subgraph/generated/balancer-subgraph-types';
import { prisma } from '../../prisma/prisma-client';
import { Chain } from '@prisma/client';

/**
* 'Latest FX Price' is relevant only to FX pools. It is sourced from offchain platforms, like Chainlink.
* The subgraph actively indexes this price by listening to the 'Answer Updated' events emitted by Chainlink oracles.
* For reference and more details, see the code at:
* https://github.com/balancer/balancer-subgraph-v2/blob/master/src/mappings/pricing.ts#L373
*
* Note: 'LatestFXPrice' is a dependency of SORv2.
*/
export const syncLatestFXPrices = async (subgraphUrl: string, chain: Chain) => {
const { pools } = await fetchFxPools(subgraphUrl);

for (const pool of pools) {
const { tokens } = pool;
if (!tokens) continue;

for (const token of tokens) {
try {
await prisma.prismaPoolTokenDynamicData.update({
where: {
id_chain: {
id: token.id,
chain,
},
},
data: {
latestFxPrice: token.token.latestFXPrice ? parseFloat(token.token.latestFXPrice) : undefined,
},
});
} catch (e) {
console.error(`Error updating latest FX price for token ${token.id} on chain ${chain}: ${e}`);
}
}
}

return true;
};

const fetchFxPools = (subgraphUrl: string) => {
const sdk = getSdk(new GraphQLClient(subgraphUrl));

return sdk.BalancerPools({
where: { poolType: 'FX' },
});
};
1 change: 1 addition & 0 deletions modules/token/token.gql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ extend type Mutation {
tokenReloadTokenPrices: Boolean
tokenSyncTokenDefinitions: String!
tokenSyncTokenDynamicData: String!
tokenSyncLatestFxPrices(chain: GqlChain!): String!
tokenInitChartData(tokenAddress: String!): String!
tokenDeletePrice(tokenAddress: String!, timestamp: Int!): Boolean!
tokenDeleteTokenType(tokenAddress: String!, type: GqlTokenType!): String!
Expand Down
10 changes: 10 additions & 0 deletions modules/token/token.resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import _ from 'lodash';
import { isAdminRoute } from '../auth/auth-context';
import { tokenService } from './token.service';
import { headerChain } from '../context/header-chain';
import { syncLatestFXPrices } from './latest-fx-price';
import { AllNetworkConfigsKeyedOnChain } from '../network/network-config';

const resolvers: Resolvers = {
Query: {
Expand Down Expand Up @@ -137,6 +139,14 @@ const resolvers: Resolvers = {

return 'success';
},
tokenSyncLatestFxPrices: async (parent, { chain }, context) => {
isAdminRoute(context);
const subgraphUrl = AllNetworkConfigsKeyedOnChain[chain].data.subgraphs.balancer;

await syncLatestFXPrices(subgraphUrl, chain);

return 'success';
},
tokenInitChartData: async (parent, { tokenAddress }, context) => {
isAdminRoute(context);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "PrismaPoolTokenDynamicData" ADD COLUMN "latestFxPrice" DOUBLE PRECISION;
1 change: 1 addition & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ model PrismaPoolTokenDynamicData {
balanceUSD Float
weight String?
priceRate String
latestFxPrice Float?
}

model PrismaPoolSwap {
Expand Down
16 changes: 16 additions & 0 deletions worker/job-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { veBalVotingListService } from '../modules/vebal/vebal-voting-list.servi
import { cronsMetricPublisher } from '../modules/metrics/metrics.client';
import moment from 'moment';
import { cronsDurationMetricPublisher } from '../modules/metrics/cron-duration-metrics.client';
import { syncLatestFXPrices } from '../modules/token/latest-fx-price';
import { AllNetworkConfigs } from '../modules/network/network-config';

const runningJobs: Set<string> = new Set();

Expand Down Expand Up @@ -278,6 +280,20 @@ export function configureWorkerRoutes(app: Express) {
next,
);
break;
case 'sync-latest-fx-prices':
await runIfNotAlreadyRunning(
job.name,
chainId,
() => {
const config = AllNetworkConfigs[chainId].data;
const subgraphUrl = config.subgraphs.balancer;
const chain = config.chain.prismaId;
return syncLatestFXPrices(subgraphUrl, chain);
},
res,
next,
);
break;
default:
res.sendStatus(400);
throw new Error(`Unhandled job type ${job.name}`);
Expand Down