Skip to content

Commit

Permalink
non-hook polling functions
Browse files Browse the repository at this point in the history
  • Loading branch information
arietrouw committed Jan 20, 2024
1 parent 1085ba5 commit db25047
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { NodeInstance } from '@xyo-network/node-model'
import { Payload } from '@xyo-network/payload-model'

import { IndexedResultsConfig, PollingConfig } from '../../interfaces'
import { createPollingFunction, DEFAULT_POLLING_CONFIG } from './createPollingFunction'
import { divineIndexedResults } from './divineIndexedResults'

/** Poll a set of diviners with various polling strategies */
export const createDivineIndexedResultsPollingFunction = <T extends Payload = Payload>(
node?: NodeInstance | null,
config?: IndexedResultsConfig,
pollDivinerConfig: PollingConfig = DEFAULT_POLLING_CONFIG,
onResult?: (result: T[] | null) => void,
) => {
return createPollingFunction(config, pollDivinerConfig, () => divineIndexedResults(node, config), onResult)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { setTimeoutEx } from '@xylabs/timer'
import { Payload } from '@xyo-network/payload-model'

import { IndexedResultsConfig, PollingConfig } from '../../interfaces'

export type PollingFunction = () => Promise<Payload[] | null | undefined>

export const DEFAULT_POLLING_CONFIG: PollingConfig = {
initialDelay: 100 / 3, //First time will be zero, second time will be 100
maxDelay: 10_000,
maxRetries: 8,
}

export const createPollingFunction = <T extends Payload = Payload>(
config?: IndexedResultsConfig,
pollDivinerConfig: PollingConfig = DEFAULT_POLLING_CONFIG,
pollingFunction?: PollingFunction,
onResult?: (result: T[] | null) => void,
) => {
const { indexedQueries, processIndexedResults } = config ?? {}
const { isFresh } = processIndexedResults ?? {}
const { maxDelay = 10_000, maxRetries, initialDelay = 100, onFoundResult } = pollDivinerConfig

let activePolling = true

const freshTest = (result?: Payload[] | null) => (isFresh ? isFresh(result) : true)

const pollCompleteTest = (result?: Payload[] | null) => (onFoundResult ? onFoundResult(result) : false)

/** A polling function that runs on an increasing delay for a fixed number of times */
const pollDivinersWithDelay = async (newDelay: number, pollingFunction?: PollingFunction) => {
if (activePolling && maxRetries !== null && pollingFunction) {
let retries = 0
let result: Payload[] | undefined | null

const pollDivinersWithDelayInner = async (newDelay: number) => {
await new Promise((resolve) => setTimeoutEx(() => resolve(true), retries === 0 ? 0 : newDelay))
try {
// Try for a fixed number of times
if (retries < maxRetries) {
// logarithmic backoff till we hit the max, then we continue that delay for remaining tries
const updatedDelay = newDelay >= maxDelay ? newDelay : newDelay * 3
result = await pollingFunction()

const fresh = freshTest(result)

// have a result but its not fresh enough
if (result && !fresh) {
console.log(`Completed Retry ${retries} - Retrying in ${updatedDelay} milliseconds...`)
retries++
await pollDivinersWithDelayInner(updatedDelay)
}
onResult?.(result as T[] | null)
} else {
console.warn('Exceeded maximum retries.', JSON.stringify(indexedQueries))
onResult?.(result as T[] | null)
}
} catch (e) {
console.error('error retrying diviner', e)
throw e
}
}

return await pollDivinersWithDelayInner(newDelay)
}
}

/** A polling function that runs indefinitely on a set interval */
const pollDivinersIndefinitely = async (newDelay: number, pollingFunction?: PollingFunction) => {
// Uncomment to debug
// console.log('activePollingRef', activePollingRef)
if (activePolling && pollingFunction) {
let result: Payload[] | undefined | null

await new Promise((resolve) => setTimeoutEx(() => resolve(true), newDelay))
try {
result = await pollingFunction()

const fresh = freshTest(result)
const pollComplete = pollCompleteTest(result)

if ((result && fresh) || result === null) {
onResult?.(result as T[] | null)
}
pollComplete ? (activePolling = false) : await pollDivinersIndefinitely(initialDelay, pollingFunction)
} catch (e) {
console.error('error retrying diviner', e)
throw e
}
}
}

/** Function to invoke polling by determining a polling strategy */
const poll = async () => {
return await (maxRetries === null
? pollDivinersIndefinitely(initialDelay, pollingFunction)
: pollDivinersWithDelay(initialDelay, pollingFunction))
}

const setActive = (value: boolean) => {
activePolling = value
}

return { poll, setActive }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { DivinerInstance, isDivinerInstance } from '@xyo-network/diviner-model'
import { NodeInstance } from '@xyo-network/node-model'
import { Payload } from '@xyo-network/payload-model'

import { IndexedResultsConfig } from '../../interfaces'
import { divineSingleIndexedResults } from './divineSingleIndexedResults'

export const divineIndexedResults = async <T extends Payload = Payload>(node?: NodeInstance | null, config?: IndexedResultsConfig) => {
let result: T[] | undefined | null
let divinerCount = 0

const { indexedQueries, processIndexedResults } = config ?? {}
const parseIndexedResults = processIndexedResults?.parseIndexedResults

if (config?.diviners && node) {
const resolvedDiviners = await node.resolve({ name: config.diviners })
const diviners = resolvedDiviners.filter((module) => isDivinerInstance(module)) as DivinerInstance[]

if (diviners && diviners?.length > 0) {
while (divinerCount < diviners?.length && indexedQueries) {
const divinerResult = await divineSingleIndexedResults(diviners[divinerCount], indexedQueries, parseIndexedResults)
if (divinerResult && divinerResult?.length) {
result = divinerResult as T[]
break
}
divinerCount++
}
return result ?? null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { DivinerInstance } from '@xyo-network/diviner-model'
import { Payload } from '@xyo-network/payload-model'

import { ParseIndexedResults } from '../../interfaces'

export const divineSingleIndexedResults = async (diviner: DivinerInstance, indexedQueries: Payload[], parseIndexedResults?: ParseIndexedResults) => {
const divinedResult = await diviner.divine(indexedQueries)
let results: Payload[] | undefined
if (divinedResult?.length > 0) {
results = parseIndexedResults ? await parseIndexedResults(divinedResult) : divinedResult
}
return results && results.length > 0 ? results : null
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export * from './createDivineIndexedResultsPollingFunction'
export * from './createPollingFunction'
export * from './divineIndexedResults'
export * from './divineSingleIndexedResults'
export * from './useFetchDivinersFromNode'
export * from './usePollDiviners'
export * from './useTryDiviners'
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { Payload } from '@xyo-network/payload-model'

export type ParseIndexedResults = (payloads: Payload[]) => Promise<Payload[] | undefined>

export interface ProcessIndexedResults {
/** function to ensure the results meets a required level of freshness */
isFresh?: (payloads?: Payload[] | null) => boolean
/** Validate and parse results from the diviner(s) (i.e. validate and resolve the hashes that are in the query result) */
parseIndexedResults: (payloads: Payload[]) => Promise<Payload[] | undefined>
parseIndexedResults: ParseIndexedResults
}

export interface IndexedResultsConfig<TPayload extends Payload = Payload> {
Expand Down

0 comments on commit db25047

Please sign in to comment.