From d731921f5c45c9cb956c499dca944972db065aed Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Tue, 12 Dec 2023 11:09:31 +0100 Subject: [PATCH] fix: message queue (#506) --- src/lib/adapters/waku/safe-waku.ts | 12 ++++++++---- src/lib/adapters/waku/wakustore.ts | 11 ++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/lib/adapters/waku/safe-waku.ts b/src/lib/adapters/waku/safe-waku.ts index f1f30601..482d187f 100644 --- a/src/lib/adapters/waku/safe-waku.ts +++ b/src/lib/adapters/waku/safe-waku.ts @@ -304,7 +304,7 @@ export class SafeWaku { return true } - private async queueMessage( + private queueMessage( chatId: string, callback: Callback, chatMessage: ChatMessage, @@ -321,6 +321,10 @@ export class SafeWaku { return } + this.handleMessages().catch((e) => this.log(`⁉️ Error in handleMessage: ${e}`)) + } + + private async handleMessages() { this.isHandlingMessage = true while (this.queuedMessages.length > 0) { @@ -328,16 +332,16 @@ export class SafeWaku { if (queuedMessage) { // deduplicate already seen messages const message = queuedMessage.chatMessage - const lastMessage = this.lastMessages.get(chatId) + const lastMessage = this.lastMessages.get(queuedMessage.chatId) if (lastMessage && this.areMessagesEqual(lastMessage, message)) { this.log('🙈 ignoring duplicate message', { message, lastMessage }) continue } - this.lastMessages.set(chatId, message) + this.lastMessages.set(queuedMessage.chatId, message) try { - await callback(queuedMessage.chatMessage, queuedMessage.decodedMessage) + await queuedMessage.callback(queuedMessage.chatMessage, queuedMessage.decodedMessage) } catch (e) { this.log(`⁉️ Error in callback: ${e}`) } diff --git a/src/lib/adapters/waku/wakustore.ts b/src/lib/adapters/waku/wakustore.ts index 8758cc86..b202aba4 100644 --- a/src/lib/adapters/waku/wakustore.ts +++ b/src/lib/adapters/waku/wakustore.ts @@ -24,7 +24,7 @@ export interface Wakustore { collectionQuery: (decoder: IDecoder, queryOptions?: QueryOptions) => Query onSnapshot: ( query: Query, - callback: (value: T, decodedMessage: DecodedMessage) => void, + callback: (value: T, decodedMessage: DecodedMessage) => void | Promise, ) => Promise getDoc: (decoder: IDecoder) => Promise setDoc: (encoder: IEncoder, data: T) => Promise @@ -115,23 +115,24 @@ export function makeWakustore(waku: LightNode): Wakustore { async function onSnapshot( query: Query, - callback: (value: T, decodedMessage: DecodedMessage) => void, + callback: (value: T, decodedMessage: DecodedMessage) => void | Promise, ): Promise { - const subscription = await subscribe(waku, query.decoder, (msg: DecodedMessage) => { + const subscription = await subscribe(waku, query.decoder, async (msg: DecodedMessage) => { const typedResult = decodeDoc(msg) - callback(typedResult, msg) + await callback(typedResult, msg) }) const queryOptions = { ...query.queryOptions, pageSize: query.queryOptions?.pageSize ?? query.queryOptions?.limit, } + const result = await readStore(waku, query.decoder, queryOptions) const messages = await getQueryResults(result, queryOptions) for (const message of messages) { const value = decodeDoc(message) - callback(value, message) + await callback(value, message) } return subscription