From 4c41ffcf12f60cb19d092a979d54317f0cc7b116 Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Mon, 18 Sep 2023 22:24:50 +0200 Subject: [PATCH] wip: fix queuing, add support for multiple peers --- src/lib/adapters/waku/index.ts | 37 ++++++++++++--------- src/lib/adapters/waku/safe-waku.ts | 43 ++++++++++++++++--------- src/lib/adapters/waku/waku.ts | 19 +++++++---- src/lib/stores/chat.ts | 3 +- src/routes/chat/[id]/+page.svelte | 8 ++--- src/routes/group/chat/[id]/+page.svelte | 9 +++--- 6 files changed, 71 insertions(+), 48 deletions(-) diff --git a/src/lib/adapters/waku/index.ts b/src/lib/adapters/waku/index.ts index 5d5a9c77..8c80b15f 100644 --- a/src/lib/adapters/waku/index.ts +++ b/src/lib/adapters/waku/index.ts @@ -8,6 +8,7 @@ import { type DataMessage, getLastSeenMessageTime, type ChatData, + getLastMessageTime, } from '$lib/stores/chat' import type { User } from '$lib/types' import type { TimeFilter } from '@waku/interfaces' @@ -152,8 +153,6 @@ async function executeOnDataMessage( export default class WakuAdapter implements Adapter { private safeWaku = new SafeWaku() private subscriptions: Array<() => void> = [] - private numWaitingSaveChats = 0 - private isSavingChats = false async onLogIn(wallet: BaseWallet): Promise { const address = wallet.address @@ -167,8 +166,6 @@ export default class WakuAdapter implements Adapter { const storageChatEntries = await ws.getDoc('chats', address) chats.update((state) => ({ ...state, chats: new Map(storageChatEntries), loading: false })) - console.debug({ storageChatEntries }) - const allChats = Array.from(get(chats).chats) // private chats @@ -183,10 +180,11 @@ export default class WakuAdapter implements Adapter { await this.subscribeToPrivateMessages(address, address, wakuObjectAdapter, timeFilter) // group chats - const groupChatIds = allChats.filter(([id]) => isGroupChatId(id)).map(([id]) => id) + const groupChats = allChats.filter(([id]) => isGroupChatId(id)).map(([, chat]) => chat) - for (const groupChatId of groupChatIds) { - const lastSeenMessageTime = getLastSeenMessageTime(privateChats) + for (const groupChat of groupChats) { + const groupChatId = groupChat.chatId + const lastSeenMessageTime = getLastMessageTime(groupChat) const now = new Date() const timeFilter = { startTime: new Date(lastSeenMessageTime + 1), @@ -196,20 +194,22 @@ export default class WakuAdapter implements Adapter { } // chat store + let firstChatStoreSave = true + let chatSaveTimeout: ReturnType | undefined = undefined const subscribeChatStore = chats.subscribe(async () => { - if (this.isSavingChats) { - this.numWaitingSaveChats++ + if (firstChatStoreSave) { + firstChatStoreSave = false return } + // debounce saving changes + if (chatSaveTimeout) { + clearTimeout(chatSaveTimeout) + } - this.isSavingChats = true - - do { - this.numWaitingSaveChats = 0 + chatSaveTimeout = setTimeout(async () => { + chatSaveTimeout = undefined await this.saveChatStore(address) - } while (this.numWaitingSaveChats > 0) - - this.isSavingChats = false + }, 1000) }) this.subscriptions.push(subscribeChatStore) @@ -222,7 +222,12 @@ export default class WakuAdapter implements Adapter { loading: false, })) + let firstObjectStoreSave = true const subscribeObjectStore = objectStore.subscribe(async (objects) => { + if (firstObjectStoreSave) { + firstObjectStoreSave = false + return + } await ws.setDoc('objects', address, Array.from(objects.objects)) }) this.subscriptions.push(subscribeObjectStore) diff --git a/src/lib/adapters/waku/safe-waku.ts b/src/lib/adapters/waku/safe-waku.ts index 553bfe4e..542141ef 100644 --- a/src/lib/adapters/waku/safe-waku.ts +++ b/src/lib/adapters/waku/safe-waku.ts @@ -29,6 +29,8 @@ export class SafeWaku { } private queuedMessages: QueuedMessage[] = [] private isHandlingMessage = false + private logging = true + private logDateTime = true constructor(public readonly options?: ConnectWakuOptions) {} @@ -59,13 +61,13 @@ export class SafeWaku { timeFilter = timeFilter || calculatedTimeFilter const talkEmoji = isGroupChatId(chatId) ? '🗫' : '🗩' - console.debug(`${talkEmoji} subscribe to ${chatId}`) + this.log(`${talkEmoji} subscribe to ${chatId}`) const ws = makeWakustore(this.lightNode) const unsubscribe = await ws.onSnapshot( ws.collectionQuery('private-message', chatId, { timeFilter, - pageDirection: PageDirection.BACKWARD, + pageDirection: PageDirection.FORWARD, pageSize: 1000, }), (message) => this.queueMessage(callback, message, chatId), @@ -110,8 +112,8 @@ export class SafeWaku { } finally { if (error) { this.errors.numSendError++ - console.debug(`⁉️ Error: ${error}`) - console.debug(`🕓 Waiting ${timeout} milliseconds...`) + this.log(`⁉️ Error: ${error}`) + this.log(`🕓 Waiting ${timeout} milliseconds...`) await new Promise((r) => setTimeout(r, timeout)) if (timeout < 120_000) { timeout += timeout @@ -122,19 +124,15 @@ export class SafeWaku { const elapsed = Date.now() - start - console.debug({ message, id }) - if (elapsed > 1000) { - console.debug(`⏰ sending message took ${elapsed} milliseconds`) + this.log(`⏰ sending message took ${elapsed} milliseconds`) } } private connectWaku() { - console.debug('connectWaku') - const waku = connectWaku({ onConnect: (connections) => { - console.debug('✅ connected to waku', { connections }) + this.log('✅ connected to waku', { connections }) this.safeResubscribe() if (this.options?.onConnect) { @@ -142,7 +140,7 @@ export class SafeWaku { } }, onDisconnect: () => { - console.debug('❌ disconnected from waku') + this.log('❌ disconnected from waku') this.errors.numDisconnect++ if (this.options?.onDisconnect) { @@ -210,18 +208,19 @@ export class SafeWaku { const queuedMessage = this.queuedMessages.shift() if (queuedMessage) { // deduplicate already seen messages + const message = queuedMessage.chatMessage const lastMessage = this.lastMessages.get(chatId) if ( lastMessage && - lastMessage.timestamp === chatMessage.timestamp && - lastMessage.type === chatMessage.type && - lastMessage.fromAddress === chatMessage.fromAddress + lastMessage.timestamp === message.timestamp && + lastMessage.type === message.type && + lastMessage.fromAddress === message.fromAddress ) { - console.debug('🙈 ignoring duplicate message', { chatMessage }) + this.log('🙈 ignoring duplicate message', { message, lastMessage }) continue } - this.lastMessages.set(chatId, chatMessage) + this.lastMessages.set(chatId, message) await callback(queuedMessage.chatMessage, queuedMessage.chatId) } @@ -229,4 +228,16 @@ export class SafeWaku { this.isHandlingMessage = false } + + private log(...args: unknown[]) { + if (!this.logging) { + return + } + if (!this.logDateTime) { + console.debug(...args) + return + } + const isoTime = new Date().toISOString().replace('T', ' ').replace('Z', '') + console.debug(isoTime, ...args) + } } diff --git a/src/lib/adapters/waku/waku.ts b/src/lib/adapters/waku/waku.ts index f6021c2b..464e41d3 100644 --- a/src/lib/adapters/waku/waku.ts +++ b/src/lib/adapters/waku/waku.ts @@ -16,13 +16,17 @@ import { type Unsubscribe, } from '@waku/interfaces' -const peerMultiaddr = multiaddr( +const peers = [ + // use this address for local testing + // '/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm53sojJN72rFbYg6GV2LpRRER9XeWkiEAhjKy3aL9cN5Z', + // '/dns4/ws.waku.apyos.dev/tcp/443/wss/p2p/16Uiu2HAm5wH4dPAV6zDfrBHkWt9Wu9iiXT4ehHdUArDUbEevzmBY', '/dns4/ws.waku-1.apyos.dev/tcp/443/wss/p2p/16Uiu2HAm8gXHntr3SB5sde11pavjptaoiqyvwoX3GyEZWKMPiuBu', - // use this address for local testing - // '/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm53sojJN72rFbYg6GV2LpRRER9XeWkiEAhjKy3aL9cN5Z', -) + // '/dns4/waku.gra.nomad.apyos.dev/tcp/443/wss/p2p/16Uiu2HAmDvywnsGaB32tFqwjTsg8sfC1ZV2EXo3xjxM4V2gvH6Up', + // '/dns4/waku.bhs.nomad.apyos.dev/tcp/443/wss/p2p/16Uiu2HAkvrRkEHRMfe26F8NCWUfzMuaCfyCzwoPSUYG7yminM5Bn', + // '/dns4/waku.de.nomad.apyos.dev/tcp/443/wss/p2p/16Uiu2HAmRgjA134DcoyK8r44pKWJQ69C7McLSWtRgxUVwkKAsbGx', +] export type ContentTopic = 'private-message' | 'profile' | 'chats' | 'objects' | 'group-chats' @@ -57,7 +61,10 @@ export async function connectWaku(options?: ConnectWakuOptions) { }) await waku.start() - await waku.dial(peerMultiaddr) + for (const peer of peers) { + const addr = multiaddr(peer) + await waku.dial(addr) + } await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush, Protocols.Store]) return waku @@ -87,7 +94,6 @@ export async function storeDocument( const payload = utf8ToBytes(json) const sendResult = await waku.lightPush.send(encoder, { payload }) - console.debug({ sendResult, contentTopic }) return sendResult.error } @@ -114,6 +120,5 @@ export async function sendMessage(waku: LightNode, id: string, message: unknown) const encoder = createEncoder({ contentTopic }) const sendResult = await waku.lightPush.send(encoder, { payload }) - console.debug({ sendResult, contentTopic }) return sendResult.error } diff --git a/src/lib/stores/chat.ts b/src/lib/stores/chat.ts index 7fa37545..9e122bf2 100644 --- a/src/lib/stores/chat.ts +++ b/src/lib/stores/chat.ts @@ -104,7 +104,8 @@ function createChatStore(): ChatStore { return state } const newMap = new Map(state.chats) - newMap.set(chatId, update(oldChat)) + const newChat = update(oldChat) + newMap.set(chatId, newChat) return { ...state, diff --git a/src/routes/chat/[id]/+page.svelte b/src/routes/chat/[id]/+page.svelte index 47e39f23..75ef73d3 100644 --- a/src/routes/chat/[id]/+page.svelte +++ b/src/routes/chat/[id]/+page.svelte @@ -55,14 +55,14 @@ }) $: messages = $chats.chats.get($page.params.id)?.messages || [] - let loading = false + let isSending = false let text = '' const sendMessage = async (wallet: HDNodeWallet) => { - loading = true + isSending = true await adapters.sendChatMessage(wallet, $page.params.id, text) text = '' - loading = false + isSending = false } @@ -133,7 +133,7 @@ }} /> {#if text.length > 0} - {/if} diff --git a/src/routes/group/chat/[id]/+page.svelte b/src/routes/group/chat/[id]/+page.svelte index a40b579d..2323a77b 100644 --- a/src/routes/group/chat/[id]/+page.svelte +++ b/src/routes/group/chat/[id]/+page.svelte @@ -65,15 +65,15 @@ }) $: messages = $chats.chats.get($page.params.id)?.messages || [] - let loading = false + let isSending = false let text = '' const sendMessage = async (wallet: HDNodeWallet) => { - loading = true + isSending = true const messageText = replaceNamesWithAddresses(text) await adapters.sendChatMessage(wallet, $page.params.id, messageText) text = '' - loading = false + isSending = false } $: inviter = chat?.users.find((user) => user.address === chat?.inviter) @@ -224,6 +224,7 @@