Skip to content

Commit

Permalink
fix: message queue (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
agazso authored Dec 12, 2023
1 parent 76e1cc5 commit d731921
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
12 changes: 8 additions & 4 deletions src/lib/adapters/waku/safe-waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ export class SafeWaku {
return true
}

private async queueMessage(
private queueMessage(
chatId: string,
callback: Callback,
chatMessage: ChatMessage,
Expand All @@ -321,23 +321,27 @@ export class SafeWaku {
return
}

this.handleMessages().catch((e) => this.log(`⁉️ Error in handleMessage: ${e}`))
}

private async handleMessages() {
this.isHandlingMessage = true

while (this.queuedMessages.length > 0) {
const queuedMessage = this.queuedMessages.shift()
if (queuedMessage) {
// deduplicate already seen messages
const message = queuedMessage.chatMessage
const lastMessage = this.lastMessages.get(chatId)
const lastMessage = this.lastMessages.get(queuedMessage.chatId)
if (lastMessage && this.areMessagesEqual(lastMessage, message)) {
this.log('🙈 ignoring duplicate message', { message, lastMessage })
continue
}

this.lastMessages.set(chatId, message)
this.lastMessages.set(queuedMessage.chatId, message)

try {
await callback(queuedMessage.chatMessage, queuedMessage.decodedMessage)
await queuedMessage.callback(queuedMessage.chatMessage, queuedMessage.decodedMessage)
} catch (e) {
this.log(`⁉️ Error in callback: ${e}`)
}
Expand Down
11 changes: 6 additions & 5 deletions src/lib/adapters/waku/wakustore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export interface Wakustore {
collectionQuery: (decoder: IDecoder<DecodedMessage>, queryOptions?: QueryOptions) => Query
onSnapshot: <T>(
query: Query,
callback: (value: T, decodedMessage: DecodedMessage) => void,
callback: (value: T, decodedMessage: DecodedMessage) => void | Promise<void>,
) => Promise<Unsubscribe>
getDoc: <T>(decoder: IDecoder<DecodedMessage>) => Promise<T | undefined>
setDoc: <T>(encoder: IEncoder, data: T) => Promise<SendError[] | undefined>
Expand Down Expand Up @@ -115,23 +115,24 @@ export function makeWakustore(waku: LightNode): Wakustore {

async function onSnapshot<T>(
query: Query,
callback: (value: T, decodedMessage: DecodedMessage) => void,
callback: (value: T, decodedMessage: DecodedMessage) => void | Promise<void>,
): Promise<Unsubscribe> {
const subscription = await subscribe(waku, query.decoder, (msg: DecodedMessage) => {
const subscription = await subscribe(waku, query.decoder, async (msg: DecodedMessage) => {
const typedResult = decodeDoc<T>(msg)
callback(typedResult, msg)
await callback(typedResult, msg)
})

const queryOptions = {
...query.queryOptions,
pageSize: query.queryOptions?.pageSize ?? query.queryOptions?.limit,
}

const result = await readStore(waku, query.decoder, queryOptions)
const messages = await getQueryResults(result, queryOptions)

for (const message of messages) {
const value = decodeDoc<T>(message)
callback(value, message)
await callback(value, message)
}

return subscription
Expand Down

1 comment on commit d731921

@vercel
Copy link

@vercel vercel bot commented on d731921 Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.