diff --git a/packages/modules/packages/bridge/packages/pub-sub/src/AsyncQueryBus/AsyncQueryBusClient.ts b/packages/modules/packages/bridge/packages/pub-sub/src/AsyncQueryBus/AsyncQueryBusClient.ts index 5296ad6b43..a3e35b33e7 100644 --- a/packages/modules/packages/bridge/packages/pub-sub/src/AsyncQueryBus/AsyncQueryBusClient.ts +++ b/packages/modules/packages/bridge/packages/pub-sub/src/AsyncQueryBus/AsyncQueryBusClient.ts @@ -155,19 +155,19 @@ export class AsyncQueryBusClient status === Pending) // TODO: Do in throttled batches await Promise.allSettled( - pendingCommands.map(async ([query, status]) => { + pendingCommands.map(async ([sourceQuery, status]) => { if (status === Pending) { const divinerQuery: BoundWitnessDivinerQueryPayload = { - limit: 1, order: 'desc', schema: BoundWitnessDivinerQuerySchema, query, + limit: 1, order: 'desc', schema: BoundWitnessDivinerQuerySchema, sourceQuery, } const result = await responseBoundWitnessDiviner.divine([divinerQuery]) if (result && result.length > 0) { const response = result.find(isBoundWitnessWithMeta) - if (response && (response?.$meta as unknown as { sourceQuery: string })?.sourceQuery === query) { - this.logger?.debug(`Found response to query: ${query}`) + if (response && (response?.$meta as unknown as { sourceQuery: string })?.sourceQuery === sourceQuery) { + this.logger?.debug(`Found response to query: ${sourceQuery}`) // Get any payloads associated with the response const payloads: PayloadWithMeta[] = response.payload_hashes?.length > 0 ? await responseArchivist.get(response.payload_hashes) : [] - this.queryCache.set(query, [response, payloads, []]) + this.queryCache.set(sourceQuery, [response, payloads, []]) } } } diff --git a/packages/modules/packages/diviner/packages/boundwitness/packages/memory/src/applyBoundWitnessDivinerQueryPayload.ts b/packages/modules/packages/diviner/packages/boundwitness/packages/memory/src/applyBoundWitnessDivinerQueryPayload.ts index f31c42a808..2542420d23 100644 --- a/packages/modules/packages/diviner/packages/boundwitness/packages/memory/src/applyBoundWitnessDivinerQueryPayload.ts +++ b/packages/modules/packages/diviner/packages/boundwitness/packages/memory/src/applyBoundWitnessDivinerQueryPayload.ts @@ -8,13 +8,15 @@ import type { BoundWitnessDivinerQueryPayload } from '@xyo-network/diviner-bound import type { Payload, WithMeta } from '@xyo-network/payload-model' type WithTimestamp = BoundWitness & { timestamp: number } +type WithBlock = BoundWitness & { block: number } const hasTimestamp = (bw: BoundWitness): bw is WithTimestamp => bw.timestamp !== undefined +const hasBlock = (bw: BoundWitness): bw is WithBlock => bw.timestamp !== undefined // eslint-disable-next-line complexity export const applyBoundWitnessDivinerQueryPayload = (filter?: BoundWitnessDivinerQueryPayload, payloads: Payload[] = []) => { if (!filter) return [] const { - addresses, payload_hashes, payload_schemas, limit, offset, order = 'desc', query, destination, timestamp, + addresses, payload_hashes, payload_schemas, block, limit, offset, order = 'desc', sourceQuery, destination, timestamp, } = filter let bws = payloads.filter(isBoundWitness) as WithMeta[] @@ -23,7 +25,7 @@ export const applyBoundWitnessDivinerQueryPayload = (filter?: BoundWitnessDivine if (allAddresses?.length) bws = bws.filter(bw => containsAll(bw.addresses, allAddresses)) if (payload_hashes?.length) bws = bws.filter(bw => containsAll(bw.payload_hashes, payload_hashes)) if (payload_schemas?.length) bws = bws.filter(bw => containsAll(bw.payload_schemas, payload_schemas)) - if (query) bws = bws.filter(bw => (bw?.$meta as { sourceQuery?: string })?.sourceQuery === query) + if (sourceQuery) bws = bws.filter(bw => (bw?.$meta as { sourceQuery?: string })?.sourceQuery === sourceQuery) // If there's a destination filter of the right kind if (destination && Array.isArray(destination) && destination?.length > 0) { const targetFilter = assertEx(destination, () => 'Missing destination') @@ -38,11 +40,17 @@ export const applyBoundWitnessDivinerQueryPayload = (filter?: BoundWitnessDivine : false }) } + if (block !== undefined) { + bws + = order === 'desc' + ? bws.filter(hasBlock).filter(bw => bw.block && bw.block <= block) + : bws.filter(hasBlock).filter(bw => bw.block && bw.block >= block) + } if (timestamp !== undefined) { bws = order === 'desc' - ? bws.filter(hasTimestamp).filter(bw => bw.timestamp <= timestamp) - : bws.filter(hasTimestamp).filter(bw => bw.timestamp >= timestamp) + ? bws.filter(hasTimestamp).filter(bw => bw.timestamp && bw.timestamp <= timestamp) + : bws.filter(hasTimestamp).filter(bw => bw.timestamp && bw.timestamp >= timestamp) } const parsedLimit = limit ?? bws.length const parsedOffset = offset ?? 0 diff --git a/packages/modules/packages/diviner/packages/boundwitness/packages/model/src/Predicate.ts b/packages/modules/packages/diviner/packages/boundwitness/packages/model/src/Predicate.ts index 4c0e47270b..8badf31d41 100644 --- a/packages/modules/packages/diviner/packages/boundwitness/packages/model/src/Predicate.ts +++ b/packages/modules/packages/diviner/packages/boundwitness/packages/model/src/Predicate.ts @@ -4,5 +4,14 @@ import type { Payload } from '@xyo-network/payload-model' export type WithoutSchemas = Omit, 'schemas'> +export type BoundWitnessDivinerPredicateFields = { + destination: string[] + sourceQuery: string +} + // TODO: Should we just accept "schema"/"schemas" here and infer that they mean "payload_schemas"? -export type BoundWitnessDivinerPredicate = WithoutSchemas> +export type BoundWitnessDivinerPredicate = WithoutSchemas +> diff --git a/packages/protocol/packages/boundwitness/packages/model/src/BoundWitness/BoundWitness.ts b/packages/protocol/packages/boundwitness/packages/model/src/BoundWitness/BoundWitness.ts index 7f7215621e..df2243f060 100644 --- a/packages/protocol/packages/boundwitness/packages/model/src/BoundWitness/BoundWitness.ts +++ b/packages/protocol/packages/boundwitness/packages/model/src/BoundWitness/BoundWitness.ts @@ -14,16 +14,20 @@ export type BoundWitnessFields = { /** @field Array of signatures by the accounts that are listed in addresses */ addresses: Address[] /** @field sequential number (if this boundwitness is part of a multi-party chain) */ - block?: Hex + block?: number /** @field unique id of a multi-party chain */ chain?: Hex error_hashes?: Hash[] payload_hashes: Hash[] payload_schemas: Schema[] previous_hashes: (Hash | null)[] - /** @field Hash of the QueryBoundWitness that caused this BoundWitness to be created */ - query?: Hash - timestamp: number + timestamp?: number + /** + * @field sequential number of the tower (if this boundwitness is part of a multi-party chain) + * The tower should always be zero until block reaches 2^32 which then causes it to rollover to 0 + * and increases the tower by 1 + */ + tower?: number } export type BoundWitness = Payload<