From 50a5455eb47fefba4e2dc4c1a01a3caa9b7557e3 Mon Sep 17 00:00:00 2001 From: Noah Saso Date: Sun, 3 Sep 2023 13:29:53 -0700 Subject: [PATCH] Batch indexer requests. --- packages/state/indexer/query.ts | 24 ++++---- packages/utils/batch.ts | 106 ++++++++++++++++++++++++++++++++ packages/utils/index.ts | 1 + 3 files changed, 120 insertions(+), 11 deletions(-) create mode 100644 packages/utils/batch.ts diff --git a/packages/state/indexer/query.ts b/packages/state/indexer/query.ts index e1aa2ec5ba..7483729ed4 100644 --- a/packages/state/indexer/query.ts +++ b/packages/state/indexer/query.ts @@ -1,9 +1,9 @@ import { IndexerFormulaType, WithChainId } from '@dao-dao/types' import { + BatchClient, CommonError, INDEXER_DISABLED, INDEXER_URL, - fetchWithTimeout, } from '@dao-dao/utils' export type QueryIndexerOptions = WithChainId< @@ -31,6 +31,8 @@ export type QueryIndexerOptions = WithChainId< ) > +const indexerBatchClient = new BatchClient(INDEXER_URL + '/batch') + export const queryIndexer = async ({ type, address = '_', @@ -59,22 +61,22 @@ export const queryIndexer = async ({ ...(block ? { block: `${block.height}:${block.timeUnixMs ?? 1}` } : {}), }) - const response = await fetchWithTimeout( - // Timeout after 10 seconds. - 10 * 1000, - `${INDEXER_URL}/${chainId}/${type}/${address}/${formula}?${params.toString()}` - ) + const url = `/${chainId}/${type}/${address}/${formula}?${params.toString()}` + const { status, body } = await indexerBatchClient.execute({ + url, + }) - if (!response.ok) { - const errorResponse = await response.text().catch(() => undefined) + if (status >= 300) { throw new Error( - `Error querying indexer for ${type}/${address}/${formula}: ${response.status} ${errorResponse}`.trim() + `Error querying indexer for ${type}/${address}/${formula}: ${status} ${body}`.trim() ) - } else if (response.status === 204) { + } + + if (status === 204) { // If no content is returned, return undefined. This will happen if the // formula computed succesfully and outputted nothing (undefined or null). return undefined } - return response.json() + return JSON.parse(body) } diff --git a/packages/utils/batch.ts b/packages/utils/batch.ts new file mode 100644 index 0000000000..f1da038f88 --- /dev/null +++ b/packages/utils/batch.ts @@ -0,0 +1,106 @@ +// Generic batch client inspired by @cosmjs/tendermint-rpc's HttpBatchClient. + +export type BatchClientOptions = { + dispatchInterval: number + batchSizeLimit: number +} + +export type Request = { + url: string +} + +export type Response = { + status: number + body: any +} + +export type QueueItem = { + request: Request + resolve: (response: Response) => void + reject: (reason?: any) => void +} + +const defaultHttpBatchClientOptions = { + dispatchInterval: 20, + batchSizeLimit: 20, +} + +export class BatchClient { + protected readonly url: string + protected readonly headers: Record | undefined + protected readonly options: BatchClientOptions + private timer: NodeJS.Timer | undefined + private readonly queue: QueueItem[] + + constructor(endpoint: string, options: Partial = {}) { + this.queue = [] + this.options = { + batchSizeLimit: + options.batchSizeLimit ?? defaultHttpBatchClientOptions.batchSizeLimit, + dispatchInterval: + options.dispatchInterval ?? + defaultHttpBatchClientOptions.dispatchInterval, + } + this.url = endpoint + this.timer = setInterval(() => this.tick(), options.dispatchInterval) + this.validate() + } + + disconnect() { + this.timer && clearInterval(this.timer) + this.timer = undefined + } + + async execute(request: Request): Promise { + return new Promise((resolve, reject) => { + this.queue.push({ request, resolve, reject }) + if (this.queue.length >= this.options.batchSizeLimit) { + // this train is full, let's go + this.tick() + } + }) + } + + validate() { + if ( + !this.options.batchSizeLimit || + !Number.isSafeInteger(this.options.batchSizeLimit) || + this.options.batchSizeLimit < 1 + ) { + throw new Error('batchSizeLimit must be a safe integer >= 1') + } + } + + /** + * This is called in an interval where promise rejections cannot be handled. + * So this is not async and HTTP errors need to be handled by the queued promises. + */ + async tick() { + // Remove batch from queue. + const batch = this.queue.splice(0, this.options.batchSizeLimit) + if (!batch.length) { + return + } + + const response = await fetch(this.url, { + method: 'POST', + body: JSON.stringify(batch.map(({ request }) => request.url)), + headers: { + 'Content-Type': 'application/json', + }, + }) + + if (!response.ok) { + batch.forEach(({ reject }) => reject(new Error(response.statusText))) + return + } + + const responses = await response.json() + if (!Array.isArray(responses) || responses.length !== batch.length) { + batch.forEach(({ reject }) => reject(new Error('Invalid batch response'))) + return + } + + batch.forEach(({ resolve }, index) => resolve(responses[index])) + } +} diff --git a/packages/utils/index.ts b/packages/utils/index.ts index 3789902ddd..3c38d61332 100644 --- a/packages/utils/index.ts +++ b/packages/utils/index.ts @@ -4,6 +4,7 @@ export * from './validation' export * from './actions' export * from './address' export * from './assets' +export * from './batch' export * from './chain' export * from './claims' export * from './error'