Skip to content

Commit

Permalink
Batch indexer requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
NoahSaso committed Sep 3, 2023
1 parent c8ae0fb commit 50a5455
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 11 deletions.
24 changes: 13 additions & 11 deletions packages/state/indexer/query.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { IndexerFormulaType, WithChainId } from '@dao-dao/types'
import {
BatchClient,
CommonError,
INDEXER_DISABLED,
INDEXER_URL,
fetchWithTimeout,
} from '@dao-dao/utils'

export type QueryIndexerOptions = WithChainId<
Expand Down Expand Up @@ -31,6 +31,8 @@ export type QueryIndexerOptions = WithChainId<
)
>

const indexerBatchClient = new BatchClient(INDEXER_URL + '/batch')

export const queryIndexer = async <T = any>({
type,
address = '_',
Expand Down Expand Up @@ -59,22 +61,22 @@ export const queryIndexer = async <T = any>({
...(block ? { block: `${block.height}:${block.timeUnixMs ?? 1}` } : {}),
})

const response = await fetchWithTimeout(
// Timeout after 10 seconds.
10 * 1000,
`${INDEXER_URL}/${chainId}/${type}/${address}/${formula}?${params.toString()}`
)
const url = `/${chainId}/${type}/${address}/${formula}?${params.toString()}`
const { status, body } = await indexerBatchClient.execute({
url,
})

if (!response.ok) {
const errorResponse = await response.text().catch(() => undefined)
if (status >= 300) {
throw new Error(
`Error querying indexer for ${type}/${address}/${formula}: ${response.status} ${errorResponse}`.trim()
`Error querying indexer for ${type}/${address}/${formula}: ${status} ${body}`.trim()
)
} else if (response.status === 204) {
}

if (status === 204) {
// If no content is returned, return undefined. This will happen if the
// formula computed succesfully and outputted nothing (undefined or null).
return undefined
}

return response.json()
return JSON.parse(body)
}
106 changes: 106 additions & 0 deletions packages/utils/batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Generic batch client inspired by @cosmjs/tendermint-rpc's HttpBatchClient.

export type BatchClientOptions = {
dispatchInterval: number
batchSizeLimit: number
}

export type Request = {
url: string
}

export type Response = {
status: number
body: any
}

export type QueueItem = {
request: Request
resolve: (response: Response) => void
reject: (reason?: any) => void
}

const defaultHttpBatchClientOptions = {
dispatchInterval: 20,
batchSizeLimit: 20,
}

export class BatchClient {
protected readonly url: string
protected readonly headers: Record<string, string> | undefined
protected readonly options: BatchClientOptions
private timer: NodeJS.Timer | undefined
private readonly queue: QueueItem[]

constructor(endpoint: string, options: Partial<BatchClientOptions> = {}) {
this.queue = []
this.options = {
batchSizeLimit:
options.batchSizeLimit ?? defaultHttpBatchClientOptions.batchSizeLimit,
dispatchInterval:
options.dispatchInterval ??
defaultHttpBatchClientOptions.dispatchInterval,
}
this.url = endpoint
this.timer = setInterval(() => this.tick(), options.dispatchInterval)
this.validate()
}

disconnect() {
this.timer && clearInterval(this.timer)
this.timer = undefined
}

async execute(request: Request): Promise<Response> {
return new Promise<Response>((resolve, reject) => {
this.queue.push({ request, resolve, reject })
if (this.queue.length >= this.options.batchSizeLimit) {
// this train is full, let's go
this.tick()
}
})
}

validate() {
if (
!this.options.batchSizeLimit ||
!Number.isSafeInteger(this.options.batchSizeLimit) ||
this.options.batchSizeLimit < 1
) {
throw new Error('batchSizeLimit must be a safe integer >= 1')
}
}

/**
* This is called in an interval where promise rejections cannot be handled.
* So this is not async and HTTP errors need to be handled by the queued promises.
*/
async tick() {
// Remove batch from queue.
const batch = this.queue.splice(0, this.options.batchSizeLimit)
if (!batch.length) {
return
}

const response = await fetch(this.url, {
method: 'POST',
body: JSON.stringify(batch.map(({ request }) => request.url)),
headers: {
'Content-Type': 'application/json',
},
})

if (!response.ok) {
batch.forEach(({ reject }) => reject(new Error(response.statusText)))
return
}

const responses = await response.json()
if (!Array.isArray(responses) || responses.length !== batch.length) {
batch.forEach(({ reject }) => reject(new Error('Invalid batch response')))
return
}

batch.forEach(({ resolve }, index) => resolve(responses[index]))
}
}
1 change: 1 addition & 0 deletions packages/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './validation'
export * from './actions'
export * from './address'
export * from './assets'
export * from './batch'
export * from './chain'
export * from './claims'
export * from './error'
Expand Down

0 comments on commit 50a5455

Please sign in to comment.