diff --git a/packages/indexer-agent/src/commands/start.ts b/packages/indexer-agent/src/commands/start.ts index 7a6ebd868..2c8e95162 100644 --- a/packages/indexer-agent/src/commands/start.ts +++ b/packages/indexer-agent/src/commands/start.ts @@ -37,6 +37,11 @@ import { displayZodParsingError } from './error-handling' // eslint-disable-next-line @typescript-eslint/no-explicit-any export type AgentOptions = { [key: string]: any } & Argv['argv'] +const DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE = 0 +const SUGGESTED_SUBGRAPH_MAX_BLOCK_DISTANCE_ON_L2 = + 50 + DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE +const DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS = 5_000 + export const start = { command: 'start', describe: 'Start the agent', @@ -157,6 +162,20 @@ export const start = { required: true, group: 'Protocol', }) + .option('subgraph-max-block-distance', { + description: + 'How many blocks subgraphs are allowed to stay behind chain head', + type: 'number', + default: DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE, + group: 'Protocol', + }) + .option('subgraph-freshness-sleep-milliseconds', { + description: + 'How long to wait before retrying subgraph query if it is not fresh', + type: 'number', + default: DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS, + group: 'Protocol', + }) .option('default-allocation-amount', { description: 'Default amount of GRT to allocate to a subgraph deployment', @@ -298,6 +317,7 @@ export const start = { export async function createNetworkSpecification( argv: AgentOptions, + logger: Logger, ): Promise { const gateway = { url: argv.gatewayEndpoint, @@ -333,6 +353,8 @@ export async function createNetworkSpecification( } const subgraphs = { + maxBlockDistance: argv.subgraphMaxBlockDistance, + freshnessSleepMilliseconds: argv.subgraphFreshnessSleepMilliseconds, networkSubgraph: { deployment: argv.networkSubgraphDeployment, url: argv.networkSubgraphEndpoint, @@ -359,6 +381,38 @@ export async function createNetworkSpecification( const chainId = await fetchChainId(networkProvider.url) const networkIdentifier = resolveChainId(chainId) + // Warn about inappropriate max block distance for subgraph threshold checks for given networks. + if (networkIdentifier.startsWith('eip155:42161')) { + // Arbitrum-One and Arbitrum-Goerli + if ( + subgraphs.maxBlockDistance <= SUGGESTED_SUBGRAPH_MAX_BLOCK_DISTANCE_ON_L2 + ) { + logger.warn( + `Consider increasing 'subgraph-max-block-distance' for Arbitrum networks`, + { + problem: + 'A low subgraph freshness threshold might cause the Agent to discard too many subgraph queries in fast-paced networks.', + hint: `Increase the 'subgraph-max-block-distance' parameter to a value that accomodates for block and indexing speeds.`, + configuredValue: subgraphs.maxBlockDistance, + }, + ) + } + if ( + subgraphs.freshnessSleepMilliseconds <= + DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS + ) { + logger.warn( + `Consider increasing 'subgraph-freshness-sleep-milliseconds' for Arbitrum networks`, + { + problem: + 'A short subgraph freshness wait time might be insufficient for the subgraph to sync with fast-paced networks.', + hint: `Increase the 'subgraph-freshness-sleep-milliseconds' parameter to a value that accomodates for block and indexing speeds.`, + configuredValue: subgraphs.freshnessSleepMilliseconds, + }, + ) + } + } + try { return spec.NetworkSpecification.parse({ networkIdentifier, diff --git a/packages/indexer-agent/src/index.ts b/packages/indexer-agent/src/index.ts index 7a6c8bdee..ebe961689 100644 --- a/packages/indexer-agent/src/index.ts +++ b/packages/indexer-agent/src/index.ts @@ -60,7 +60,7 @@ async function processArgumentsAndRun(args: AgentOptions): Promise { await run(args, specifications, logger) } else { reviewArgumentsForWarnings(args, logger) - const specification = await createNetworkSpecification(args) + const specification = await createNetworkSpecification(args, logger) await run(args, [specification], logger) } } diff --git a/packages/indexer-agent/src/syncing-server.ts b/packages/indexer-agent/src/syncing-server.ts index abda6c5d7..4be36ef10 100644 --- a/packages/indexer-agent/src/syncing-server.ts +++ b/packages/indexer-agent/src/syncing-server.ts @@ -82,7 +82,7 @@ export const createSyncingServer = async ({ let result try { - result = await networkSubgraph.query(parsedQuery, variables) + result = await networkSubgraph.checkedQuery(parsedQuery, variables) } catch (err) { logger.error(err) return res.status(400).send({ error: err.message }) diff --git a/packages/indexer-common/package.json b/packages/indexer-common/package.json index 5fc472382..17d19c4af 100644 --- a/packages/indexer-common/package.json +++ b/packages/indexer-common/package.json @@ -25,6 +25,7 @@ "@graphprotocol/common-ts": "2.0.3", "@graphprotocol/cost-model": "0.1.16", "@thi.ng/heaps": "1.2.38", + "@types/lodash.clonedeep": "^4.5.7", "@types/lodash.intersection": "^4.4.7", "@types/lodash.xor": "^4.5.7", "@urql/core": "2.4.4", @@ -39,6 +40,7 @@ "graphql": "16.3.0", "graphql-tag": "2.12.6", "jayson": "3.6.6", + "lodash.clonedeep": "^4.5.0", "lodash.groupby": "^4.6.0", "lodash.intersection": "^4.4.0", "lodash.isequal": "^4.5.0", diff --git a/packages/indexer-common/src/__tests__/network-specification-files/invalid-negative-max-block-distance.yml b/packages/indexer-common/src/__tests__/network-specification-files/invalid-negative-max-block-distance.yml new file mode 100644 index 000000000..7cc09d111 --- /dev/null +++ b/packages/indexer-common/src/__tests__/network-specification-files/invalid-negative-max-block-distance.yml @@ -0,0 +1,35 @@ +networkIdentifier: mainnet +gateway: + url: http://gateway +indexerOptions: + address: "0x4e8a4C63Df58bf59Fef513aB67a76319a9faf448" + mnemonic: word ivory whale diesel slab pelican voyage oxygen chat find tobacco sport + url: http://indexer + geoCoordinates: [25.1, -71.2] + restakeRewards: true + rebateClaimThreshold: 400 + rebateClaimBatchThreshold: 5000 + rebateClaimMaxBatchSize: 10 + poiDisputeMonitoring: false + poiDisputableEpochs: 5 + defaultAllocationAmount: 0.05 + voucherRedemptionThreshold: 2 + voucherRedemptionBatchThreshold: 2000 + voucherRedemptionMaxBatchSize: 15 + allocationManagementMode: "auto" + autoAllocationMinBatchSize: 20 +transactionMonitoring: + gasIncreaseTimeout: 10 + gasIncreaseFactor: 10 + baseFeePerGasMax: 10 + maxTransactionAttempts: 10 +subgraphs: + maxBlockDistance: -10 + networkSubgraph: + deployment: QmPK1s3pNYLi9ERiq3BDxKa4XosgWwFRQUydHUtz4YgpqB + epochSubgraph: + url: http://subgraph +networkProvider: + url: http://provider +dai: + contractAddress: "0x4e8a4C63Df58bf59Fef513aB67a76319a9faf448" diff --git a/packages/indexer-common/src/__tests__/network-specification.test.ts b/packages/indexer-common/src/__tests__/network-specification.test.ts index 49e3a59a9..303c38c0d 100644 --- a/packages/indexer-common/src/__tests__/network-specification.test.ts +++ b/packages/indexer-common/src/__tests__/network-specification.test.ts @@ -70,6 +70,11 @@ describe('Failed deserialization', () => { path: ['dai', 'contractAddress'], message: 'Invalid contract address', }, + { + file: 'invalid-negative-max-block-distance.yml', + path: ['subgraphs', 'maxBlockDistance'], + message: 'Number must be greater than or equal to 0', + }, ] test.each(failedTests)( diff --git a/packages/indexer-common/src/__tests__/subgraph.test.ts b/packages/indexer-common/src/__tests__/subgraph.test.ts new file mode 100644 index 000000000..fb85fcb19 --- /dev/null +++ b/packages/indexer-common/src/__tests__/subgraph.test.ts @@ -0,0 +1,258 @@ +import { DocumentNode, print } from 'graphql' +import { + SubgraphFreshnessChecker, + LoggerInterface, + ProviderInterface, + SubgraphQueryInterface, +} from '../subgraphs' +import { QueryResult } from '../network-subgraph' +import gql from 'graphql-tag' +import { mergeSelectionSets } from '../utils' + +/* eslint-disable @typescript-eslint/no-explicit-any */ +export const mockProvider: ProviderInterface & any = { + getBlockNumber: jest.fn(), +} + +export const mockLogger: LoggerInterface & any = { + trace: jest.fn(), + error: jest.fn(), + warn: jest.fn(), +} + +const mockSubgraph: SubgraphQueryInterface & any = { + query: jest.fn(), +} + +const testSubgraphQuery: DocumentNode = gql` + query TestQuery { + foo { + id + } + } +` + +function mockQueryResult(blockNumber: number): QueryResult & { + data: { _meta: { block: { number: number } } } +} { + return { + data: { + foo: { + id: 1, + }, + _meta: { + block: { + number: blockNumber, + }, + }, + }, + } +} +/* eslint-enable @typescript-eslint/no-explicit-any */ + +const blockNumberQuery = gql` + { + _meta { + block { + number + } + } + } +` + +describe('mergeSelectionSets function tests', () => { + it('can merge two GraphQL queries', () => { + const firstQuery = gql` + query Foo { + graphNetworks(first: 5) { + id + controller + graphToken + epochManager + } + graphAccounts(first: 5) { + id + names { + id + } + defaultName { + id + } + createdAt + } + } + ` + const expected = gql` + query Foo { + graphNetworks(first: 5) { + id + controller + graphToken + epochManager + } + graphAccounts(first: 5) { + id + names { + id + } + defaultName { + id + } + createdAt + } + _meta { + block { + number + } + } + } + ` + const result = mergeSelectionSets(firstQuery, blockNumberQuery) + expect(result.definitions).toStrictEqual(expected.definitions) + expect(print(result)).toEqual(print(expected)) + }) + + it("doesn't mutate its input", () => { + const expectedMergedQuery = gql` + query TestQuery { + foo { + id + } + _meta { + block { + number + } + } + } + ` + let result: DocumentNode + // Repetition required to test `mergeSelectionSets` doesn't mutate its input + for (let i = 0; i < 3; i++) { + result = mergeSelectionSets(testSubgraphQuery, blockNumberQuery) + } + expect(result!.definitions).toStrictEqual(expectedMergedQuery.definitions) + expect(print(result!)).toEqual(print(expectedMergedQuery)) + }) +}) + +describe('SubgraphFreshnessChecker', () => { + beforeEach(jest.resetAllMocks) + + describe('checkedQuery method', () => { + beforeEach(jest.resetAllMocks) + + it('should throw an error if max retries reached', async () => { + const checker = new SubgraphFreshnessChecker( + 'Test Subgraph', + mockProvider, + 10, + 10, + mockLogger, + 1, + ) + + // Mocks never change value in this test, so the network will always be 100 blocks ahead and + // the checked query will timeout.f + mockProvider.getBlockNumber.mockResolvedValue(242) + mockSubgraph.query.mockResolvedValue(mockQueryResult(100)) + + await expect(checker.checkedQuery(mockSubgraph, testSubgraphQuery)).rejects.toThrow( + 'Max retries reached for Test Subgraph freshness check', + ) + + expect(mockLogger.trace).toHaveBeenCalledWith( + expect.stringContaining('Performing subgraph freshness check'), + { + blockDistance: 142, + freshnessThreshold: 10, + latestIndexedBlock: 100, + latestNetworkBlock: 242, + retriesLeft: 1, + subgraph: 'Test Subgraph', + }, + ) + }) + + it('should return query result if the subgraph is fresh', async () => { + const checker = new SubgraphFreshnessChecker( + 'Test Subgraph', + mockProvider, + 10, + 10, + mockLogger, + 1, + ) + + mockProvider.getBlockNumber.mockResolvedValue(105) + mockSubgraph.query.mockResolvedValue(mockQueryResult(100)) + + await expect( + checker.checkedQuery(mockSubgraph, testSubgraphQuery), + ).resolves.toEqual(mockQueryResult(100)) + + expect(mockLogger.trace).toHaveBeenCalledWith( + expect.stringContaining('Performing subgraph freshness check'), + { + blockDistance: 5, + freshnessThreshold: 10, + latestIndexedBlock: 100, + latestNetworkBlock: 105, + retriesLeft: 1, + subgraph: 'Test Subgraph', + }, + ) + }) + + it('should return query result if the subgraph becomes fresh after retries', async () => { + const checker = new SubgraphFreshnessChecker( + 'Test Subgraph', + mockProvider, + 10, + 100, + mockLogger, + 2, + ) + + // Advance the network by ten blocks between calls + mockProvider.getBlockNumber.mockResolvedValueOnce(150).mockResolvedValueOnce(160) + + // Advance the subgraph by 20 blocks between calls + // The first call should trigger a retry, which then shuld succeed + mockSubgraph.query + .mockResolvedValueOnce(mockQueryResult(130)) + .mockResolvedValueOnce(mockQueryResult(150)) + + const result = await checker.checkedQuery(mockSubgraph, testSubgraphQuery) + expect(result).toEqual(mockQueryResult(150)) + + // It should log this on retry + expect(mockLogger.warn).toHaveBeenCalledWith( + expect.stringContaining( + 'Test Subgraph is not fresh. Sleeping for 100 ms before retrying', + ), + { + blockDistance: 20, + freshnessThreshold: 10, + latestIndexedBlock: 130, + latestNetworkBlock: 150, + retriesLeft: 2, + subgraph: 'Test Subgraph', + }, + ) + // It should log this on success + expect(mockLogger.trace.mock.calls).toContainEqual( + expect.objectContaining([ + 'Test Subgraph is fresh', + { + blockDistance: 10, + freshnessThreshold: 10, + latestIndexedBlock: 150, + latestNetworkBlock: 160, + retriesLeft: 1, + subgraph: 'Test Subgraph', + }, + ]), + ) + }) + }) +}) diff --git a/packages/indexer-common/src/epoch-subgraph.ts b/packages/indexer-common/src/epoch-subgraph.ts index 0f4339888..d2248c4eb 100644 --- a/packages/indexer-common/src/epoch-subgraph.ts +++ b/packages/indexer-common/src/epoch-subgraph.ts @@ -2,12 +2,19 @@ import axios, { AxiosInstance, AxiosResponse } from 'axios' import { DocumentNode, print } from 'graphql' import { CombinedError } from '@urql/core' import { QueryResult } from './network-subgraph' - +import { Logger } from '@graphprotocol/common-ts' +import { SubgraphFreshnessChecker } from './subgraphs' export class EpochSubgraph { - private constructor(private endpointClient: AxiosInstance) {} + endpointClient: AxiosInstance + freshnessChecker: SubgraphFreshnessChecker + logger: Logger - public static async create(endpoint: string): Promise { - const endpointClient = axios.create({ + constructor( + endpoint: string, + freshnessChecker: SubgraphFreshnessChecker, + logger: Logger, + ) { + this.endpointClient = axios.create({ baseURL: endpoint, headers: { 'content-type': 'application/json' }, @@ -17,8 +24,17 @@ export class EpochSubgraph { // Don't transform responses transformResponse: (data) => data, }) - // Create the Epoch subgraph instance - return new EpochSubgraph(endpointClient) + this.freshnessChecker = freshnessChecker + this.logger = logger + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + async checkedQuery( + query: DocumentNode, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + variables?: Record, + ): Promise> { + return this.freshnessChecker.checkedQuery(this, query, variables) } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -32,6 +48,7 @@ export class EpochSubgraph { variables, }) const data = JSON.parse(response.data) + this.logger.trace('Epoch Subgraph query', { data }) if (data.errors) { return { error: new CombinedError({ graphQLErrors: data.errors }) } } diff --git a/packages/indexer-common/src/indexer-management/__tests__/helpers.test.ts b/packages/indexer-common/src/indexer-management/__tests__/helpers.test.ts index 7ab97e4c0..906a70e31 100644 --- a/packages/indexer-common/src/indexer-management/__tests__/helpers.test.ts +++ b/packages/indexer-common/src/indexer-management/__tests__/helpers.test.ts @@ -16,7 +16,7 @@ import { import { defineQueryFeeModels, specification as spec } from '../../index' import { networkIsL1, networkIsL2 } from '../types' import { fetchIndexingRules, upsertIndexingRule } from '../rules' -import { SubgraphIdentifierType } from '../../subgraphs' +import { SubgraphFreshnessChecker, SubgraphIdentifierType } from '../../subgraphs' import { ActionManager } from '../actions' import { actionFilterToWhereOptions, ActionStatus, ActionType } from '../../actions' import { literal, Op, Sequelize } from 'sequelize' @@ -34,6 +34,7 @@ import { SubgraphDeployment, getTestProvider, } from '@graphprotocol/indexer-common' +import { mockLogger, mockProvider } from '../../__tests__/subgraph.test' import { BigNumber, ethers, utils } from 'ethers' // Make global Jest variable available @@ -72,14 +73,28 @@ const setupMonitor = async () => { }) ethereum = getTestProvider('goerli') contracts = await connectContracts(ethereum, 5) + + const subgraphFreshnessChecker = new SubgraphFreshnessChecker( + 'Test Subgraph', + mockProvider, + 10, + 10, + mockLogger, + 1, + ) + networkSubgraph = await NetworkSubgraph.create({ logger, endpoint: 'https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli', deployment: undefined, + subgraphFreshnessChecker, }) - epochSubgraph = await EpochSubgraph.create( + + epochSubgraph = new EpochSubgraph( 'https://api.thegraph.com/subgraphs/name/graphprotocol/goerli-epoch-block-oracle', + subgraphFreshnessChecker, + logger, ) graphNode = new GraphNode( logger, diff --git a/packages/indexer-common/src/indexer-management/monitor.ts b/packages/indexer-common/src/indexer-management/monitor.ts index 737bc3abc..332173063 100644 --- a/packages/indexer-common/src/indexer-management/monitor.ts +++ b/packages/indexer-common/src/indexer-management/monitor.ts @@ -60,7 +60,7 @@ export class NetworkMonitor { } async allocation(allocationID: string): Promise { - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query allocation($allocation: String!) { allocation(id: $allocation) { @@ -100,7 +100,7 @@ export class NetworkMonitor { async allocations(status: AllocationStatus): Promise { try { this.logger.debug(`Fetch ${status} allocations`) - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query allocations($indexer: String!, $status: AllocationStatus!) { allocations( @@ -161,7 +161,7 @@ export class NetworkMonitor { async epochs(epochNumbers: number[]): Promise { try { - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query epochs($epochs: [Int!]!) { epoches(where: { id_in: $epochs }, first: 1000) { @@ -201,7 +201,7 @@ export class NetworkMonitor { ): Promise { try { this.logger.debug('Fetch recently closed allocations') - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query allocations($indexer: String!, $closedAtEpochThreshold: Int!) { allocations( @@ -264,7 +264,7 @@ export class NetworkMonitor { subgraphDeploymentId: SubgraphDeploymentID, ): Promise { try { - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query allocations($indexer: String!, $subgraphDeploymentId: String!) { allocations( @@ -350,7 +350,7 @@ export class NetworkMonitor { subgraphIds: ids, }) try { - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query subgraphs($first: Int!, $lastCreatedAt: Int!, $subgraphs: [String!]!) { subgraphs( @@ -436,7 +436,7 @@ export class NetworkMonitor { async subgraphDeployment(ipfsHash: string): Promise { try { - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query subgraphDeployments($ipfsHash: String!) { subgraphDeployments(where: { ipfsHash: $ipfsHash }) { @@ -494,7 +494,7 @@ export class NetworkMonitor { async transferredDeployments(): Promise { this.logger.debug('Querying the Network for transferred subgraph deployments') try { - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( // TODO: Consider querying for the same time range as the Agent's evaluation, limiting // results to recent transfers. gql` @@ -592,7 +592,7 @@ export class NetworkMonitor { queryProgress: queryProgress, }) try { - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query subgraphDeployments($first: Int!, $lastCreatedAt: Int!) { subgraphDeployments( @@ -670,7 +670,7 @@ export class NetworkMonitor { const queryEpochSubgraph = async () => { // We know it is non-null because of the check above for a null case that will end execution of fn if true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const result = await this.epochSubgraph!.query( + const result = await this.epochSubgraph!.checkedQuery( gql` query network($networkID: String!) { network(id: $networkID) { @@ -689,11 +689,6 @@ export class NetworkMonitor { } } } - _meta { - block { - number - } - } } `, { @@ -940,7 +935,7 @@ Please submit an issue at https://github.com/graphprotocol/block-oracle/issues/n .reduce(async (currentlyPaused) => { try { logger.debug('Query network subgraph isPaused state') - const result = await networkSubgraph.query( + const result = await networkSubgraph.checkedQuery( gql` { graphNetworks { @@ -1018,7 +1013,7 @@ Please submit an issue at https://github.com/graphprotocol/block-oracle/issues/n closedAtEpoch_lte: disputableEpoch, queryFeesCollected_gte: this.indexerOptions.rebateClaimThreshold.toString(), }) - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query allocations( $indexer: String! @@ -1132,7 +1127,7 @@ Please submit an issue at https://github.com/graphprotocol/block-oracle/issues/n const disputableEpoch = currentEpoch - this.indexerOptions.poiDisputableEpochs let lastCreatedAt = 0 while (dataRemaining) { - const result = await this.networkSubgraph.query( + const result = await this.networkSubgraph.checkedQuery( gql` query allocations( $deployments: [String!]! diff --git a/packages/indexer-common/src/indexer-management/resolvers/allocations.ts b/packages/indexer-common/src/indexer-management/resolvers/allocations.ts index 2a852b4a5..81520f12a 100644 --- a/packages/indexer-common/src/indexer-management/resolvers/allocations.ts +++ b/packages/indexer-common/src/indexer-management/resolvers/allocations.ts @@ -241,7 +241,10 @@ async function queryAllocations( ) } - const result = await networkSubgraph.query(ALLOCATION_QUERIES[filterType], filterVars) + const result = await networkSubgraph.checkedQuery( + ALLOCATION_QUERIES[filterType], + filterVars, + ) if (result.data.allocations.length == 0) { logger.info(`No 'Claimable' allocations found`) diff --git a/packages/indexer-common/src/indexer-management/resolvers/indexer-status.ts b/packages/indexer-common/src/indexer-management/resolvers/indexer-status.ts index 6b4d83a1d..3564fe605 100644 --- a/packages/indexer-common/src/indexer-management/resolvers/indexer-status.ts +++ b/packages/indexer-common/src/indexer-management/resolvers/indexer-status.ts @@ -124,7 +124,7 @@ export default { const address = network.specification.indexerOptions.address try { - const result = await network.networkSubgraph.query( + const result = await network.networkSubgraph.checkedQuery( gql` query allocations($indexer: String!) { allocations( diff --git a/packages/indexer-common/src/network-specification.ts b/packages/indexer-common/src/network-specification.ts index 318f80173..620c7a39a 100644 --- a/packages/indexer-common/src/network-specification.ts +++ b/packages/indexer-common/src/network-specification.ts @@ -97,6 +97,8 @@ export type Subgraph = z.infer // All pertinent subgraphs in the protocol export const ProtocolSubgraphs = z .object({ + maxBlockDistance: z.number().nonnegative().finite().default(0), + freshnessSleepMilliseconds: positiveNumber().default(5_000), networkSubgraph: Subgraph, epochSubgraph: Subgraph, }) diff --git a/packages/indexer-common/src/network-subgraph.ts b/packages/indexer-common/src/network-subgraph.ts index 893fdcab3..589732272 100644 --- a/packages/indexer-common/src/network-subgraph.ts +++ b/packages/indexer-common/src/network-subgraph.ts @@ -4,6 +4,7 @@ import { DocumentNode, print } from 'graphql' import { OperationResult, CombinedError } from '@urql/core' import { BlockPointer, IndexingError } from './types' import { GraphNode } from './graph-node' +import { SubgraphFreshnessChecker } from './subgraphs' export interface NetworkSubgraphCreateOptions { logger: Logger @@ -12,6 +13,7 @@ export interface NetworkSubgraphCreateOptions { graphNode: GraphNode deployment: SubgraphDeploymentID } + subgraphFreshnessChecker: SubgraphFreshnessChecker } interface DeploymentStatus { @@ -30,6 +32,7 @@ interface NetworkSubgraphOptions { status: Eventual graphNode: GraphNode } + subgraphFreshnessChecker: SubgraphFreshnessChecker } export type QueryResult = Pick< @@ -39,7 +42,7 @@ export type QueryResult = Pick< export class NetworkSubgraph { logger: Logger - + freshnessChecker: SubgraphFreshnessChecker endpointClient?: AxiosInstance public readonly deployment?: { @@ -50,6 +53,7 @@ export class NetworkSubgraph { private constructor(options: NetworkSubgraphOptions) { this.logger = options.logger + this.freshnessChecker = options.subgraphFreshnessChecker if (options.endpoint) { this.endpointClient = axios.create({ @@ -83,6 +87,7 @@ export class NetworkSubgraph { logger: parentLogger, endpoint, deployment, + subgraphFreshnessChecker, }: NetworkSubgraphCreateOptions): Promise { // Either an endpoint or a deployment needs to be provided; the CLI // validation should already guarantee that but we're asserting this again @@ -122,6 +127,7 @@ export class NetworkSubgraph { logger, endpoint, deployment: deploymentInfo, + subgraphFreshnessChecker, }) // If we don't have a network subgraph endpoint configured, we @@ -160,6 +166,15 @@ export class NetworkSubgraph { } } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + async checkedQuery( + query: DocumentNode, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + variables?: Record, + ): Promise> { + return this.freshnessChecker.checkedQuery(this, query, variables) + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any async query( query: DocumentNode, diff --git a/packages/indexer-common/src/network.ts b/packages/indexer-common/src/network.ts index c363a035e..dae1345c4 100644 --- a/packages/indexer-common/src/network.ts +++ b/packages/indexer-common/src/network.ts @@ -18,6 +18,7 @@ import { EpochSubgraph, NetworkMonitor, AllocationReceiptCollector, + SubgraphFreshnessChecker, } from '.' import { BigNumber, providers, Wallet } from 'ethers' import { strict as assert } from 'assert' @@ -81,9 +82,29 @@ export class Network { protocolNetwork: specification.networkIdentifier, }) + // * ----------------------------------------------------------------------- + // * Network Provider + // * ----------------------------------------------------------------------- + const networkProvider = await Network.provider( + logger, + metrics, + specification.networkIdentifier, + specification.networkProvider.url, + specification.networkProvider.pollingInterval, + ) + // * ----------------------------------------------------------------------- // * Network Subgraph // * ----------------------------------------------------------------------- + const networkSubgraphFreshnessChecker = new SubgraphFreshnessChecker( + 'Network Subgraph', + networkProvider, + specification.subgraphs.maxBlockDistance, + specification.subgraphs.freshnessSleepMilliseconds, + logger.child({ component: 'FreshnessChecker' }), + Infinity, + ) + const networkSubgraphDeploymentId = specification.subgraphs.networkSubgraph.deployment ? new SubgraphDeploymentID(specification.subgraphs.networkSubgraph.deployment) : undefined @@ -98,19 +119,9 @@ export class Network { deployment: networkSubgraphDeploymentId, } : undefined, + subgraphFreshnessChecker: networkSubgraphFreshnessChecker, }) - // * ----------------------------------------------------------------------- - // * Network Provider - // * ----------------------------------------------------------------------- - const networkProvider = await Network.provider( - logger, - metrics, - specification.networkIdentifier, - specification.networkProvider.url, - specification.networkProvider.pollingInterval, - ) - // * ----------------------------------------------------------------------- // * Contracts // * ----------------------------------------------------------------------- @@ -138,13 +149,24 @@ export class Network { // * ----------------------------------------------------------------------- // * Epoch Subgraph // * ----------------------------------------------------------------------- - const epochSubgraph = await EpochSubgraph.create( + const epochSubgraphFreshnessChecker = new SubgraphFreshnessChecker( + 'Epoch Subgraph', + networkProvider, + specification.subgraphs.maxBlockDistance, + specification.subgraphs.freshnessSleepMilliseconds, + logger.child({ component: 'FreshnessChecker' }), + Infinity, + ) + + const epochSubgraph = new EpochSubgraph( /* eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- * Accept the non-null `url` property of the Epoch Subgraph, as it has * already been validated during parsing. Once indexing is supported, * initialize it in the same way as the NetworkSubgraph */ specification.subgraphs.epochSubgraph.url!, + epochSubgraphFreshnessChecker, + logger.child({ component: 'EpochSubgraph' }), ) // * ----------------------------------------------------------------------- diff --git a/packages/indexer-common/src/subgraphs.ts b/packages/indexer-common/src/subgraphs.ts index d9af8bdc4..0bed4cf18 100644 --- a/packages/indexer-common/src/subgraphs.ts +++ b/packages/indexer-common/src/subgraphs.ts @@ -7,6 +7,10 @@ import { IndexingDecisionBasis, IndexingRuleAttributes, } from './indexer-management' +import { DocumentNode, print } from 'graphql' +import gql from 'graphql-tag' +import { QueryResult } from './network-subgraph' +import { mergeSelectionSets, sleep } from './utils' export enum SubgraphIdentifierType { DEPLOYMENT = 'deployment', @@ -323,3 +327,161 @@ export function isDeploymentWorthAllocatingTowards( } } } + +export interface ProviderInterface { + getBlockNumber(): Promise +} + +/* eslint-disable @typescript-eslint/no-explicit-any */ +export interface LoggerInterface { + trace(msg: string, o?: object, ...args: any[]): void + error(msg: string, o?: object, ...args: any[]): void + warn(msg: string, o?: object, ...args: any[]): void +} + +export interface SubgraphQueryInterface { + query( + query: DocumentNode, + variables?: Record, + ): Promise> +} +/* eslint-enable @typescript-eslint/no-explicit-any */ + +const blockNumberQuery = gql` + { + _meta { + block { + number + } + } + } +` + +interface BlockNumberInterface { + data: { _meta: { block: { number: number } } } + errors?: any +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type Variables = Record + +export class SubgraphFreshnessChecker { + subgraphName: string + provider: ProviderInterface + threshold: number + logger: LoggerInterface + sleepDurationMillis: number + retries: number + + constructor( + subgraphName: string, + provider: ProviderInterface, + freshnessThreshold: number, + sleepDurationMillis: number, + logger: LoggerInterface, + retries: number, + ) { + this.subgraphName = subgraphName + this.provider = provider + this.threshold = freshnessThreshold + this.sleepDurationMillis = sleepDurationMillis + this.logger = logger + this.retries = retries + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + public async checkedQuery( + subgraph: SubgraphQueryInterface, + query: DocumentNode, + variables?: Variables, + ): Promise> { + // Try to inject the latest block number into the original query. + let updatedQuery = query + try { + updatedQuery = mergeSelectionSets(query, blockNumberQuery) + } catch (err) { + const errorMsg = `Failed to append block number into ${this.subgraphName} query` + this.logger.error(errorMsg, { subgraph: this.subgraphName, query: print(query) }) + throw new Error(errorMsg) + } + + // Try obtaining a fresh subgraph query at most `this.retry` times + return this.checkedQueryRecursive( + updatedQuery, + subgraph, + this.retries, + variables, + ) + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private async checkedQueryRecursive( + updatedQuery: DocumentNode, + subgraph: SubgraphQueryInterface, + retriesLeft: number, + variables?: Variables, + ): Promise> { + if (retriesLeft < 0) { + const errorMsg = `Max retries reached for ${this.subgraphName} freshness check` + this.logger.error(errorMsg, { + subgraph: this.subgraphName, + query: print(updatedQuery), + }) + throw new Error(errorMsg) + } + + // Obtain the latest network block number and subgraph query in parallel. + const subgraphQueryPromise = subgraph.query(updatedQuery, variables) as Promise< + QueryResult & BlockNumberInterface + > + const latestNetworkBlockPromise = this.provider.getBlockNumber() + const [subgraphQueryResult, latestNetworkBlock] = await Promise.all([ + subgraphQueryPromise, + latestNetworkBlockPromise, + ]) + + // Return it early if query results contains errors + if (subgraphQueryResult.errors) { + return subgraphQueryResult + } + + const latestIndexedBlock = subgraphQueryResult?.data?._meta?.block?.number + if (!latestIndexedBlock) { + const errorMsg = `Failed to infer block number for ${this.subgraphName} query` + this.logger.error(errorMsg, { query: print(updatedQuery) }) + // Check for unexpected missing block data as a precaution. + throw new Error(errorMsg) + } + + // Check subgraph freshness + const blockDistance = latestNetworkBlock - latestIndexedBlock + const logInfo = { + latestIndexedBlock, + latestNetworkBlock, + blockDistance, + freshnessThreshold: this.threshold, + subgraph: this.subgraphName, + retriesLeft, + } + this.logger.trace('Performing subgraph freshness check', logInfo) + + if (blockDistance < 0) { + // Invariant violated: Subgraph can't be ahead of network latest block + const errorMsg = `${this.subgraphName}'s latest indexed block (${latestIndexedBlock}) is higher than Network's latest block (${latestNetworkBlock})` + console.warn(errorMsg, logInfo) + } + + if (blockDistance > this.threshold) { + // Reenter function + this.logger.warn( + `${this.subgraphName} is not fresh. Sleeping for ${this.sleepDurationMillis} ms before retrying`, + logInfo, + ) + await sleep(this.sleepDurationMillis) + return this.checkedQueryRecursive(updatedQuery, subgraph, retriesLeft - 1) + } else { + this.logger.trace(`${this.subgraphName} is fresh`, logInfo) + } + return subgraphQueryResult + } +} diff --git a/packages/indexer-common/src/transactions.ts b/packages/indexer-common/src/transactions.ts index 1b0477f68..db08b8621 100644 --- a/packages/indexer-common/src/transactions.ts +++ b/packages/indexer-common/src/transactions.ts @@ -326,7 +326,7 @@ export class TransactionManager { return timer(60_000) .reduce(async (currentlyPaused) => { try { - const result = await networkSubgraph.query( + const result = await networkSubgraph.checkedQuery( gql` { graphNetworks { diff --git a/packages/indexer-common/src/utils.ts b/packages/indexer-common/src/utils.ts index fd660c982..a486424a0 100644 --- a/packages/indexer-common/src/utils.ts +++ b/packages/indexer-common/src/utils.ts @@ -6,7 +6,8 @@ import { } from '@ethersproject/providers' import { Logger, Metrics, timer } from '@graphprotocol/common-ts' import { indexerError, IndexerErrorCode } from './errors' -import { Sequelize } from 'sequelize' +import { DocumentNode, SelectionSetNode, Kind } from 'graphql' +import cloneDeep from 'lodash.clonedeep' export const parseBoolean = ( val: string | boolean | number | undefined | null, @@ -63,3 +64,58 @@ export async function monitorEthBalance( } }) } + +export function mergeSelectionSets( + first: DocumentNode, + second: DocumentNode, +): DocumentNode { + // Work on a copy to avoid mutating inupt + const copy = cloneDeep(first) + const firstSelectionSet = extractSelectionSet(copy) + const secondSelectionSet = extractSelectionSet(second) + firstSelectionSet.selections = [ + ...firstSelectionSet.selections, + ...secondSelectionSet.selections, + ] + return copy +} + +function extractSelectionSet(document: DocumentNode): SelectionSetNode { + // Ensure that the document contains at least one definition + if (document.definitions.length === 0) { + throw new Error('Document must contain at least one definition') + } + // Find the first SelectionSet in the document + const firstDefinition = document.definitions[0] + if (!firstDefinition || firstDefinition.kind !== Kind.OPERATION_DEFINITION) { + throw new Error('Invalid document definition') + } + const selectionSet = findFirstSelectionSet(firstDefinition.selectionSet) + if (!selectionSet) { + throw new Error('No SelectionSet found in the document') + } + if (!selectionSet.selections) { + throw new Error('SelectionSet has no selections') + } + + return selectionSet +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function findFirstSelectionSet(node: any): SelectionSetNode | null { + if (node.kind === Kind.SELECTION_SET) { + return node + } + for (const key of Object.keys(node)) { + const childNode = node[key] + if (childNode && typeof childNode === 'object') { + const result = findFirstSelectionSet(childNode) + if (result !== null) { + return result + } + } + } + return null +} + +export const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) diff --git a/packages/indexer-service/src/allocations.ts b/packages/indexer-service/src/allocations.ts index 87b238334..22aee525a 100644 --- a/packages/indexer-service/src/allocations.ts +++ b/packages/indexer-service/src/allocations.ts @@ -38,7 +38,7 @@ export const monitorEligibleAllocations = ({ logger.debug('Refresh eligible allocations') try { - const currentEpochResult = await networkSubgraph.query( + const currentEpochResult = await networkSubgraph.checkedQuery( gql` query { graphNetwork(id: "1") { @@ -61,7 +61,7 @@ export const monitorEligibleAllocations = ({ const currentEpoch = currentEpochResult.data.graphNetwork.currentEpoch - const result = await networkSubgraph.query( + const result = await networkSubgraph.checkedQuery( gql` query allocations($indexer: String!, $closedAtEpochThreshold: Int!) { indexer(id: $indexer) { diff --git a/packages/indexer-service/src/commands/start.ts b/packages/indexer-service/src/commands/start.ts index 1bc53cf59..37b373df4 100644 --- a/packages/indexer-service/src/commands/start.ts +++ b/packages/indexer-service/src/commands/start.ts @@ -5,6 +5,11 @@ import { BigNumber, Wallet } from 'ethers' import fs from 'fs' import { parse as yaml_parse } from 'yaml' +const DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE = 0 +const SUGGESTED_SUBGRAPH_MAX_BLOCK_DISTANCE_ON_L2 = + 50 + DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE +const DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS = 5_000 + import { connectContracts, connectDatabase, @@ -26,6 +31,7 @@ import { registerIndexerErrorMetrics, resolveChainId, validateProviderNetworkIdentifier, + SubgraphFreshnessChecker, } from '@graphprotocol/indexer-common' import { createServer } from '../server' @@ -176,6 +182,19 @@ export default { type: 'string', required: false, }) + .option('subgraph-max-block-distance', { + description: 'How many blocks subgraphs are allowed to stay behind chain head', + type: 'number', + default: DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE, + group: 'Protocol', + }) + .option('subgraph-freshness-sleep-milliseconds', { + description: 'How long to wait before retrying subgraph query if it is not fresh', + type: 'number', + default: DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS, + group: 'Protocol', + }) + .check(argv => { if (!argv['network-subgraph-endpoint'] && !argv['network-subgraph-deployment']) { return `At least one of --network-subgraph-endpoint and --network-subgraph-deployment must be provided` @@ -287,6 +306,56 @@ export default { argv.graphNodeStatusEndpoint, argv.indexNodeIds, ) + + const networkProvider = await Network.provider( + logger, + metrics, + '_', + argv.networkProvider, + argv.ethereumPollingInterval, + ) + const networkIdentifier = await networkProvider.getNetwork() + const protocolNetwork = resolveChainId(networkIdentifier.chainId) + + // Warn about inappropriate max block distance for subgraph threshold checks for given networks. + if (protocolNetwork.startsWith('eip155:42161')) { + // Arbitrum-One and Arbitrum-Goerli + if (argv.subgraphMaxBlockDistance <= SUGGESTED_SUBGRAPH_MAX_BLOCK_DISTANCE_ON_L2) { + logger.warn( + `Consider increasing 'subgraph-max-block-distance' for Arbitrum networks`, + { + problem: + 'A low subgraph freshness threshold might cause the Agent to discard too many subgraph queries in fast-paced networks.', + hint: `Increase the 'subgraph-max-block-distance' parameter to a value that accomodates for block and indexing speeds.`, + configuredValue: argv.subgraphMaxBlockDistance, + }, + ) + } + if ( + argv.subgraphFreshnessSleepMilliseconds <= + DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS + ) { + logger.warn( + `Consider increasing 'subgraph-freshness-sleep-milliseconds' for Arbitrum networks`, + { + problem: + 'A short subgraph freshness wait time might be insufficient for the subgraph to sync with fast-paced networks.', + hint: `Increase the 'subgraph-freshness-sleep-milliseconds' parameter to a value that accomodates for block and indexing speeds.`, + configuredValue: argv.subgraphFreshnessSleepMilliseconds, + }, + ) + } + } + + const subgraphFreshnessChecker = new SubgraphFreshnessChecker( + 'Network Subgraph', + networkProvider, + argv.subgraphMaxBlockDistance, + argv.subgraphFreshnessSleepMilliseconds, + logger.child({ component: 'FreshnessChecker' }), + Infinity, + ) + const networkSubgraph = await NetworkSubgraph.create({ logger, endpoint: argv.networkSubgraphEndpoint, @@ -296,19 +365,10 @@ export default { deployment: new SubgraphDeploymentID(argv.networkSubgraphDeployment), } : undefined, + subgraphFreshnessChecker, }) logger.info(`Successfully connected to network subgraph`) - const networkProvider = await Network.provider( - logger, - metrics, - '_', - argv.networkProvider, - argv.ethereumPollingInterval, - ) - const networkIdentifier = await networkProvider.getNetwork() - const protocolNetwork = resolveChainId(networkIdentifier.chainId) - // If the network subgraph deployment is present, validate if the `chainId` we get from our // provider is consistent. if (argv.networkSubgraphDeployment) {