Skip to content

Commit

Permalink
wip: fix queuing, add support for multiple peers
Browse files Browse the repository at this point in the history
  • Loading branch information
agazso committed Sep 18, 2023
1 parent c6f6452 commit 4c41ffc
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 48 deletions.
37 changes: 21 additions & 16 deletions src/lib/adapters/waku/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type DataMessage,
getLastSeenMessageTime,
type ChatData,
getLastMessageTime,
} from '$lib/stores/chat'
import type { User } from '$lib/types'
import type { TimeFilter } from '@waku/interfaces'
Expand Down Expand Up @@ -152,8 +153,6 @@ async function executeOnDataMessage(
export default class WakuAdapter implements Adapter {
private safeWaku = new SafeWaku()
private subscriptions: Array<() => void> = []
private numWaitingSaveChats = 0
private isSavingChats = false

async onLogIn(wallet: BaseWallet): Promise<void> {
const address = wallet.address
Expand All @@ -167,8 +166,6 @@ export default class WakuAdapter implements Adapter {
const storageChatEntries = await ws.getDoc<StorageChatEntry[]>('chats', address)
chats.update((state) => ({ ...state, chats: new Map(storageChatEntries), loading: false }))

console.debug({ storageChatEntries })

const allChats = Array.from(get(chats).chats)

// private chats
Expand All @@ -183,10 +180,11 @@ export default class WakuAdapter implements Adapter {
await this.subscribeToPrivateMessages(address, address, wakuObjectAdapter, timeFilter)

// group chats
const groupChatIds = allChats.filter(([id]) => isGroupChatId(id)).map(([id]) => id)
const groupChats = allChats.filter(([id]) => isGroupChatId(id)).map(([, chat]) => chat)

for (const groupChatId of groupChatIds) {
const lastSeenMessageTime = getLastSeenMessageTime(privateChats)
for (const groupChat of groupChats) {
const groupChatId = groupChat.chatId
const lastSeenMessageTime = getLastMessageTime(groupChat)
const now = new Date()
const timeFilter = {
startTime: new Date(lastSeenMessageTime + 1),
Expand All @@ -196,20 +194,22 @@ export default class WakuAdapter implements Adapter {
}

// chat store
let firstChatStoreSave = true
let chatSaveTimeout: ReturnType<typeof setTimeout> | undefined = undefined
const subscribeChatStore = chats.subscribe(async () => {
if (this.isSavingChats) {
this.numWaitingSaveChats++
if (firstChatStoreSave) {
firstChatStoreSave = false
return
}
// debounce saving changes
if (chatSaveTimeout) {
clearTimeout(chatSaveTimeout)
}

this.isSavingChats = true

do {
this.numWaitingSaveChats = 0
chatSaveTimeout = setTimeout(async () => {
chatSaveTimeout = undefined
await this.saveChatStore(address)
} while (this.numWaitingSaveChats > 0)

this.isSavingChats = false
}, 1000)
})
this.subscriptions.push(subscribeChatStore)

Expand All @@ -222,7 +222,12 @@ export default class WakuAdapter implements Adapter {
loading: false,
}))

let firstObjectStoreSave = true
const subscribeObjectStore = objectStore.subscribe(async (objects) => {
if (firstObjectStoreSave) {
firstObjectStoreSave = false
return
}
await ws.setDoc<StorageObjectEntry[]>('objects', address, Array.from(objects.objects))
})
this.subscriptions.push(subscribeObjectStore)
Expand Down
43 changes: 27 additions & 16 deletions src/lib/adapters/waku/safe-waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export class SafeWaku {
}
private queuedMessages: QueuedMessage[] = []
private isHandlingMessage = false
private logging = true
private logDateTime = true

constructor(public readonly options?: ConnectWakuOptions) {}

Expand Down Expand Up @@ -59,13 +61,13 @@ export class SafeWaku {
timeFilter = timeFilter || calculatedTimeFilter

const talkEmoji = isGroupChatId(chatId) ? '🗫' : '🗩'
console.debug(`${talkEmoji} subscribe to ${chatId}`)
this.log(`${talkEmoji} subscribe to ${chatId}`)

const ws = makeWakustore(this.lightNode)
const unsubscribe = await ws.onSnapshot<Message>(
ws.collectionQuery('private-message', chatId, {
timeFilter,
pageDirection: PageDirection.BACKWARD,
pageDirection: PageDirection.FORWARD,
pageSize: 1000,
}),
(message) => this.queueMessage(callback, message, chatId),
Expand Down Expand Up @@ -110,8 +112,8 @@ export class SafeWaku {
} finally {
if (error) {
this.errors.numSendError++
console.debug(`⁉️ Error: ${error}`)
console.debug(`🕓 Waiting ${timeout} milliseconds...`)
this.log(`⁉️ Error: ${error}`)
this.log(`🕓 Waiting ${timeout} milliseconds...`)
await new Promise((r) => setTimeout(r, timeout))
if (timeout < 120_000) {
timeout += timeout
Expand All @@ -122,27 +124,23 @@ export class SafeWaku {

const elapsed = Date.now() - start

console.debug({ message, id })

if (elapsed > 1000) {
console.debug(`⏰ sending message took ${elapsed} milliseconds`)
this.log(`⏰ sending message took ${elapsed} milliseconds`)
}
}

private connectWaku() {
console.debug('connectWaku')

const waku = connectWaku({
onConnect: (connections) => {
console.debug('✅ connected to waku', { connections })
this.log('✅ connected to waku', { connections })
this.safeResubscribe()

if (this.options?.onConnect) {
this.options?.onConnect(connections)
}
},
onDisconnect: () => {
console.debug('❌ disconnected from waku')
this.log('❌ disconnected from waku')
this.errors.numDisconnect++

if (this.options?.onDisconnect) {
Expand Down Expand Up @@ -210,23 +208,36 @@ export class SafeWaku {
const queuedMessage = this.queuedMessages.shift()
if (queuedMessage) {
// deduplicate already seen messages
const message = queuedMessage.chatMessage
const lastMessage = this.lastMessages.get(chatId)
if (
lastMessage &&
lastMessage.timestamp === chatMessage.timestamp &&
lastMessage.type === chatMessage.type &&
lastMessage.fromAddress === chatMessage.fromAddress
lastMessage.timestamp === message.timestamp &&
lastMessage.type === message.type &&
lastMessage.fromAddress === message.fromAddress
) {
console.debug('🙈 ignoring duplicate message', { chatMessage })
this.log('🙈 ignoring duplicate message', { message, lastMessage })
continue
}

this.lastMessages.set(chatId, chatMessage)
this.lastMessages.set(chatId, message)

await callback(queuedMessage.chatMessage, queuedMessage.chatId)
}
}

this.isHandlingMessage = false
}

private log(...args: unknown[]) {
if (!this.logging) {
return
}
if (!this.logDateTime) {
console.debug(...args)
return
}
const isoTime = new Date().toISOString().replace('T', ' ').replace('Z', '')
console.debug(isoTime, ...args)
}
}
19 changes: 12 additions & 7 deletions src/lib/adapters/waku/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ import {
type Unsubscribe,
} from '@waku/interfaces'

const peerMultiaddr = multiaddr(
const peers = [
// use this address for local testing
// '/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm53sojJN72rFbYg6GV2LpRRER9XeWkiEAhjKy3aL9cN5Z',

// '/dns4/ws.waku.apyos.dev/tcp/443/wss/p2p/16Uiu2HAm5wH4dPAV6zDfrBHkWt9Wu9iiXT4ehHdUArDUbEevzmBY',
'/dns4/ws.waku-1.apyos.dev/tcp/443/wss/p2p/16Uiu2HAm8gXHntr3SB5sde11pavjptaoiqyvwoX3GyEZWKMPiuBu',

// use this address for local testing
// '/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm53sojJN72rFbYg6GV2LpRRER9XeWkiEAhjKy3aL9cN5Z',
)
// '/dns4/waku.gra.nomad.apyos.dev/tcp/443/wss/p2p/16Uiu2HAmDvywnsGaB32tFqwjTsg8sfC1ZV2EXo3xjxM4V2gvH6Up',
// '/dns4/waku.bhs.nomad.apyos.dev/tcp/443/wss/p2p/16Uiu2HAkvrRkEHRMfe26F8NCWUfzMuaCfyCzwoPSUYG7yminM5Bn',
// '/dns4/waku.de.nomad.apyos.dev/tcp/443/wss/p2p/16Uiu2HAmRgjA134DcoyK8r44pKWJQ69C7McLSWtRgxUVwkKAsbGx',
]

export type ContentTopic = 'private-message' | 'profile' | 'chats' | 'objects' | 'group-chats'

Expand Down Expand Up @@ -57,7 +61,10 @@ export async function connectWaku(options?: ConnectWakuOptions) {
})

await waku.start()
await waku.dial(peerMultiaddr)
for (const peer of peers) {
const addr = multiaddr(peer)
await waku.dial(addr)
}
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush, Protocols.Store])

return waku
Expand Down Expand Up @@ -87,7 +94,6 @@ export async function storeDocument(
const payload = utf8ToBytes(json)

const sendResult = await waku.lightPush.send(encoder, { payload })
console.debug({ sendResult, contentTopic })
return sendResult.error
}

Expand All @@ -114,6 +120,5 @@ export async function sendMessage(waku: LightNode, id: string, message: unknown)
const encoder = createEncoder({ contentTopic })

const sendResult = await waku.lightPush.send(encoder, { payload })
console.debug({ sendResult, contentTopic })
return sendResult.error
}
3 changes: 2 additions & 1 deletion src/lib/stores/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ function createChatStore(): ChatStore {
return state
}
const newMap = new Map(state.chats)
newMap.set(chatId, update(oldChat))
const newChat = update(oldChat)
newMap.set(chatId, newChat)

return {
...state,
Expand Down
8 changes: 4 additions & 4 deletions src/routes/chat/[id]/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@
})
$: messages = $chats.chats.get($page.params.id)?.messages || []
let loading = false
let isSending = false
let text = ''
const sendMessage = async (wallet: HDNodeWallet) => {
loading = true
isSending = true
await adapters.sendChatMessage(wallet, $page.params.id, text)
text = ''
loading = false
isSending = false
}
</script>

Expand Down Expand Up @@ -133,7 +133,7 @@
}}
/>
{#if text.length > 0}
<Button variant="strong" disabled={loading} on:click={() => sendMessage(wallet)}>
<Button variant="strong" disabled={isSending} on:click={() => sendMessage(wallet)}>
<ArrowUp />
</Button>
{/if}
Expand Down
9 changes: 5 additions & 4 deletions src/routes/group/chat/[id]/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@
})
$: messages = $chats.chats.get($page.params.id)?.messages || []
let loading = false
let isSending = false
let text = ''
const sendMessage = async (wallet: HDNodeWallet) => {
loading = true
isSending = true
const messageText = replaceNamesWithAddresses(text)
await adapters.sendChatMessage(wallet, $page.params.id, messageText)
text = ''
loading = false
isSending = false
}
$: inviter = chat?.users.find((user) => user.address === chat?.inviter)
Expand Down Expand Up @@ -224,6 +224,7 @@
</Button>
<Textarea
placeholder="Message"
autofocus
bind:value={text}
on:keypress={(e) => {
// When enter is pressed without modifier keys, send the message
Expand All @@ -239,7 +240,7 @@
}}
/>
{#if text.length > 0}
<Button variant="strong" disabled={loading} on:click={() => sendMessage(wallet)}>
<Button variant="strong" disabled={isSending} on:click={() => sendMessage(wallet)}>
<ArrowUp />
</Button>
{/if}
Expand Down

0 comments on commit 4c41ffc

Please sign in to comment.