diff --git a/packages/indexer-common/src/__tests__/subgraph.test.ts b/packages/indexer-common/src/__tests__/subgraph.test.ts index af90f78f3..a8cef2594 100644 --- a/packages/indexer-common/src/__tests__/subgraph.test.ts +++ b/packages/indexer-common/src/__tests__/subgraph.test.ts @@ -1,129 +1,172 @@ -import gql from 'graphql-tag' +import { DocumentNode } from 'graphql' import { SubgraphFreshnessChecker, LoggerInterface, ProviderInterface, + SubgraphQueryInterface, } from '../subgraphs' -import { mergeSelectionSets } from '../utils' +import { QueryResult } from '../network-subgraph' +import gql from 'graphql-tag' +/* eslint-disable @typescript-eslint/no-explicit-any */ const mockProvider: ProviderInterface & any = { getBlockNumber: jest.fn(), } -const mockLogger: LoggerInterface = { +const mockLogger: LoggerInterface & any = { trace: jest.fn(), + error: jest.fn(), + warn: jest.fn(), +} + +const mockSubgraph: SubgraphQueryInterface & any = { + query: jest.fn(), +} + +const testSubgraphQuery: DocumentNode = gql` + query TestQuery { + foo { + id + } + } +` + +function mockQueryResult(blockNumber: number): QueryResult & { + data: { _meta: { block: { number: number } } } +} { + return { + data: { + foo: { + id: 1, + }, + _meta: { + block: { + number: blockNumber, + }, + }, + }, + } } +/* eslint-enable @typescript-eslint/no-explicit-any */ describe('SubgraphFreshnessChecker', () => { - let freshnessChecker: SubgraphFreshnessChecker - - beforeEach(() => { - freshnessChecker = new SubgraphFreshnessChecker( - mockProvider, - 10, // Freshness threshold - mockLogger, - ) - }) + beforeEach(jest.resetAllMocks) - afterEach(() => { - jest.clearAllMocks() - }) + describe('checkedQuery method', () => { + beforeEach(jest.resetAllMocks) - it('Returns `true` when subgraph is fresh', async () => { - mockProvider.getBlockNumber.mockResolvedValue(100) - const isFresh = await freshnessChecker.checkSubgraphFreshness(90) - expect(isFresh).toBe(true) - expect(mockLogger.trace).toHaveBeenCalledWith('Performing subgraph freshness check', { - latestIndexedBlock: 90, - latestNetworkBlock: 100, - blockDistance: 10, - freshnessThreshold: 10, - }) - }) + it('should throw an error if max retries reached', async () => { + const checker = new SubgraphFreshnessChecker( + 'Test Subgraph', + mockProvider, + 10, + 10, + mockLogger, + 1, + ) + + // Mocks never change value in this test, so the network will always be 100 blocks ahead and + // the checked query will timeout.f + mockProvider.getBlockNumber.mockResolvedValue(242) + mockSubgraph.query.mockResolvedValue(mockQueryResult(100)) - it('Returns `false` when subgraph is not fresh', async () => { - mockProvider.getBlockNumber.mockResolvedValue(100) // - const isFresh = await freshnessChecker.checkSubgraphFreshness(80) - expect(isFresh).toBe(false) - expect(mockLogger.trace).toHaveBeenCalledWith('Performing subgraph freshness check', { - latestIndexedBlock: 80, - latestNetworkBlock: 100, - blockDistance: 20, - freshnessThreshold: 10, + await expect(checker.checkedQuery(testSubgraphQuery, mockSubgraph)).rejects.toThrow( + 'Max retries reached for Test Subgraph freshness check', + ) + + expect(mockLogger.trace).toHaveBeenCalledWith( + expect.stringContaining('Performing subgraph freshness check'), + { + blockDistance: 142, + freshnessThreshold: 10, + latestIndexedBlock: 100, + latestNetworkBlock: 242, + retriesLeft: 1, + subgraph: 'Test Subgraph', + }, + ) }) - }) - it('Throws error when subgraph is ahead of network', async () => { - mockProvider.getBlockNumber.mockResolvedValue(90) - await expect(freshnessChecker.checkSubgraphFreshness(100)).rejects.toThrowError( - "Subgraph's latest indexed block is higher than Network's latest block", - ) - expect(mockLogger.trace).toHaveBeenCalledWith('Performing subgraph freshness check', { - latestIndexedBlock: 100, - latestNetworkBlock: 90, - blockDistance: -10, - freshnessThreshold: 10, + it('should return query result if the subgraph is fresh', async () => { + const checker = new SubgraphFreshnessChecker( + 'Test Subgraph', + mockProvider, + 10, + 10, + mockLogger, + 1, + ) + + mockProvider.getBlockNumber.mockResolvedValue(105) + mockSubgraph.query.mockResolvedValue(mockQueryResult(100)) + + await expect( + checker.checkedQuery(testSubgraphQuery, mockSubgraph), + ).resolves.toEqual(mockQueryResult(100)) + + expect(mockLogger.trace).toHaveBeenCalledWith( + expect.stringContaining('Performing subgraph freshness check'), + { + blockDistance: 5, + freshnessThreshold: 10, + latestIndexedBlock: 100, + latestNetworkBlock: 105, + retriesLeft: 1, + subgraph: 'Test Subgraph', + }, + ) }) - }) -}) -describe('mergeSelectionSets tests', () => { - test('mergeSelectionSets can merge two GraphQL queries', () => { - const firstQuery = gql` - query Foo { - graphNetworks(first: 5) { - id - controller - graphToken - epochManager - } - graphAccounts(first: 5) { - id - names { - id - } - defaultName { - id - } - createdAt - } - } - ` - const secondQuery = gql` - { - _meta { - block { - number - } - } - } - ` - const expected = gql` - query Foo { - graphNetworks(first: 5) { - id - controller - graphToken - epochManager - } - graphAccounts(first: 5) { - id - names { - id - } - defaultName { - id - } - createdAt - } - _meta { - block { - number - } - } - } - ` - const merged = mergeSelectionSets(firstQuery, secondQuery) - expect(merged.definitions).toStrictEqual(expected.definitions) + it('should return query result if the subgraph becomes fresh after retries', async () => { + const checker = new SubgraphFreshnessChecker( + 'Test Subgraph', + mockProvider, + 10, + 100, + mockLogger, + 2, + ) + + // Advance the network by ten blocks between calls + mockProvider.getBlockNumber.mockResolvedValueOnce(150).mockResolvedValueOnce(160) + + // Advance the subgraph by 20 blocks between calls + // The first call should trigger a retry, which then shuld succeed + mockSubgraph.query + .mockResolvedValueOnce(mockQueryResult(130)) + .mockResolvedValueOnce(mockQueryResult(150)) + + const result = await checker.checkedQuery(testSubgraphQuery, mockSubgraph) + expect(result).toEqual(mockQueryResult(150)) + + // It should log this on retry + expect(mockLogger.warn).toHaveBeenCalledWith( + expect.stringContaining( + 'Test Subgraph is not fresh. Sleeping for 100 ms before retrying', + ), + { + blockDistance: 20, + freshnessThreshold: 10, + latestIndexedBlock: 130, + latestNetworkBlock: 150, + retriesLeft: 2, + subgraph: 'Test Subgraph', + }, + ) + // It should log this on success + expect(mockLogger.trace.mock.calls).toContainEqual( + expect.objectContaining([ + 'Test Subgraph is fresh', + { + blockDistance: 10, + freshnessThreshold: 10, + latestIndexedBlock: 150, + latestNetworkBlock: 160, + retriesLeft: 1, + subgraph: 'Test Subgraph', + }, + ]), + ) + }) }) }) diff --git a/packages/indexer-common/src/epoch-subgraph.ts b/packages/indexer-common/src/epoch-subgraph.ts index ee0af7e5c..becea302c 100644 --- a/packages/indexer-common/src/epoch-subgraph.ts +++ b/packages/indexer-common/src/epoch-subgraph.ts @@ -2,20 +2,8 @@ import axios, { AxiosInstance, AxiosResponse } from 'axios' import { DocumentNode, print } from 'graphql' import { CombinedError } from '@urql/core' import { QueryResult } from './network-subgraph' -import gql from 'graphql-tag' -import { mergeSelectionSets } from './utils' import { Logger } from '@graphprotocol/common-ts' -const blockNumberQuery = gql` - { - _meta { - block { - number - } - } - } -` - export class EpochSubgraph { endpointClient: AxiosInstance logger: Logger @@ -40,16 +28,8 @@ export class EpochSubgraph { // eslint-disable-next-line @typescript-eslint/no-explicit-any variables?: Record, ): Promise> { - // Include block number in all queries - let updatedQuery - try { - updatedQuery = mergeSelectionSets(query, blockNumberQuery) - } catch (e) { - updatedQuery = query - } - const response = await this.endpointClient.post('', { - query: print(updatedQuery), + query: print(query), variables, }) const data = JSON.parse(response.data) diff --git a/packages/indexer-common/src/subgraphs.ts b/packages/indexer-common/src/subgraphs.ts index cdbedf2d1..77d8ade18 100644 --- a/packages/indexer-common/src/subgraphs.ts +++ b/packages/indexer-common/src/subgraphs.ts @@ -1,5 +1,5 @@ import { base58 } from 'ethers/lib/utils' -import { BigNumber, providers, utils } from 'ethers' +import { BigNumber, utils } from 'ethers' import { Logger, SubgraphDeploymentID } from '@graphprotocol/common-ts' import { SubgraphDeployment } from './types' import { @@ -7,6 +7,10 @@ import { IndexingDecisionBasis, IndexingRuleAttributes, } from './indexer-management' +import { DocumentNode, print } from 'graphql' +import gql from 'graphql-tag' +import { QueryResult } from './network-subgraph' +import { mergeSelectionSets, sleep } from './utils' export enum SubgraphIdentifierType { DEPLOYMENT = 'deployment', @@ -328,40 +332,140 @@ export interface ProviderInterface { getBlockNumber(): Promise } +/* eslint-disable @typescript-eslint/no-explicit-any */ export interface LoggerInterface { trace(msg: string, o?: object, ...args: any[]): void + error(msg: string, o?: object, ...args: any[]): void + warn(msg: string, o?: object, ...args: any[]): void +} + +export interface SubgraphQueryInterface { + query( + query: DocumentNode, + variables?: Record, + ): Promise> +} +/* eslint-enable @typescript-eslint/no-explicit-any */ + +const blockNumberQuery = gql` + { + _meta { + block { + number + } + } + } +` + +interface BlockNumberInterface { + data: { _meta: { block: { number: number } } } } export class SubgraphFreshnessChecker { + subgraphName: string provider: ProviderInterface threshold: number logger: LoggerInterface + sleepDurationMillis: number + retries: number constructor( + subgraphName: string, provider: ProviderInterface, freshnessThreshold: number, + sleepDurationMillis: number, logger: LoggerInterface, + retries: number, ) { + this.subgraphName = subgraphName this.provider = provider this.threshold = freshnessThreshold + this.sleepDurationMillis = sleepDurationMillis this.logger = logger + this.retries = retries + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + public async checkedQuery( + query: DocumentNode, + subgraph: SubgraphQueryInterface, + ): Promise> { + // Try to inject the latest block number into the original query. + let updatedQuery = query + try { + updatedQuery = mergeSelectionSets(query, blockNumberQuery) + } catch (err) { + throw new Error(`Failed to append block number into ${this.subgraphName} query`) + } + + // Try obtaining a fresh subgraph query at most 10 times + return this.checkedQueryRecursive(updatedQuery, subgraph, this.retries) } - public async checkSubgraphFreshness(latestIndexedBlock: number): Promise { - const latestNetworkBlock = await this.provider.getBlockNumber() + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private async checkedQueryRecursive( + updatedQuery: DocumentNode, + subgraph: SubgraphQueryInterface, + retriesLeft: number, + ): Promise> { + this.logger.trace('Performing a checked subgraph query', { + query: print(updatedQuery), + subgraph, + retriesLeft, + }) + + if (retriesLeft < 0) { + const errorMsg = `Max retries reached for ${this.subgraphName} freshness check` + this.logger.error(errorMsg) + throw new Error(errorMsg) + } + + // Obtain the latest network block number and subgraph query in parallel. + const subgraphQueryPromise = subgraph.query(updatedQuery) as Promise< + QueryResult & BlockNumberInterface + > + const latestNetworkBLockPromise = this.provider.getBlockNumber() + const [subgraphQueryResult, latestNetworkBlock] = await Promise.all([ + subgraphQueryPromise, + latestNetworkBLockPromise, + ]) + + const latestIndexedBlock = subgraphQueryResult?.data?._meta?.block?.number + if (!latestIndexedBlock) { + // Check for unexpected missing block data as a precaution. + throw new Error(`Failed to append block number into ${this.subgraphName} query`) + } + + // Check subgraph freshness const blockDistance = latestNetworkBlock - latestIndexedBlock - this.logger.trace('Performing subgraph freshness check', { + const logInfo = { latestIndexedBlock, latestNetworkBlock, blockDistance, freshnessThreshold: this.threshold, - }) + subgraph: this.subgraphName, + retriesLeft, + } + this.logger.trace('Performing subgraph freshness check', logInfo) + if (blockDistance < 0) { // Invariant violated: Subgraph can't be ahead of network latest block throw new Error( - "Subgraph's latest indexed block is higher than Network's latest block", + `${this.subgraphName}'s latest indexed block (${latestIndexedBlock}) is higher than Network's latest block (${latestNetworkBlock})`, + ) + } + + if (blockDistance > this.threshold) { + // Reenter function + this.logger.warn( + `${this.subgraphName} is not fresh. Sleeping for ${this.sleepDurationMillis} ms before retrying`, + logInfo, ) + await sleep(this.sleepDurationMillis) + return this.checkedQueryRecursive(updatedQuery, subgraph, retriesLeft - 1) + } else { + this.logger.trace(`${this.subgraphName} is fresh`, logInfo) } - return blockDistance <= this.threshold + return subgraphQueryResult } } diff --git a/packages/indexer-common/src/utils.ts b/packages/indexer-common/src/utils.ts index de61b89fa..b83bb9d2c 100644 --- a/packages/indexer-common/src/utils.ts +++ b/packages/indexer-common/src/utils.ts @@ -98,6 +98,7 @@ function extractSelectionSet(document: DocumentNode): SelectionSetNode { return selectionSet } +// eslint-disable-next-line @typescript-eslint/no-explicit-any function findFirstSelectionSet(node: any): SelectionSetNode | null { if (node.kind === Kind.SELECTION_SET) { return node @@ -113,3 +114,5 @@ function findFirstSelectionSet(node: any): SelectionSetNode | null { } return null } + +export const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))