Skip to content

Commit

Permalink
feat: safe waku
Browse files Browse the repository at this point in the history
  • Loading branch information
agazso committed Sep 18, 2023
1 parent 4485a1a commit c6f6452
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 158 deletions.
183 changes: 32 additions & 151 deletions src/lib/adapters/waku/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import {
type Message,
isGroupChatId,
type DataMessage,
getLastMessageTime,
getLastSeenMessageTime,
type ChatData,
} from '$lib/stores/chat'
import type { User } from '$lib/types'
import { PageDirection, type LightNode, type TimeFilter } from '@waku/interfaces'
import { connectWaku, sendMessage } from './waku'
import type { TimeFilter } from '@waku/interfaces'
import type { BaseWallet, Wallet } from 'ethers'
import { get } from 'svelte/store'
import { objectStore, objectKey } from '$lib/stores/objects'
Expand All @@ -32,16 +30,10 @@ import { makeWakustore } from './wakustore'
import type { StorageChat, StorageChatEntry, StorageObjectEntry, StorageProfile } from './types'
import { genRandomHex } from '$lib/utils'
import { walletStore } from '$lib/stores/wallet'
import { SafeWaku } from './safe-waku'

const MAX_MESSAGES = 100

interface QueuedMessage {
message: Message
address: string
id: string
adapter: WakuObjectAdapter
}

function createPrivateChat(chatId: string, user: User, ownAddress: string): string {
const ownProfile = get(profile)
const ownUser = {
Expand Down Expand Up @@ -158,36 +150,25 @@ async function executeOnDataMessage(
}

export default class WakuAdapter implements Adapter {
private waku: LightNode | undefined
private safeWaku = new SafeWaku()
private subscriptions: Array<() => void> = []
private numWaitingSaveChats = 0
private isSavingChats = false
private queuedMessages: QueuedMessage[] = []
private isHandlingMessage = false

async onLogIn(wallet: BaseWallet): Promise<void> {
const address = wallet.address
this.waku = await connectWaku({
onDisconnect: () => {
console.debug('❌ disconnected from waku')
},
onConnect: () => {
console.debug('✅ connected to waku')
},
})

const ws = await this.makeWakustore()
const wakuObjectAdapter = makeWakuObjectAdapter(this, wallet)

const ws = makeWakustore(this.waku)

const storageProfile = await ws.getDoc<StorageProfile>('profile', address)
profile.update((state) => ({ ...state, ...storageProfile, address, loading: false }))

const storageChatEntries = await ws.getDoc<StorageChatEntry[]>('chats', address)
chats.update((state) => ({ ...state, chats: new Map(storageChatEntries), loading: false }))

// eslint-disable-next-line @typescript-eslint/no-this-alias
const adapter = this
console.debug({ storageChatEntries })

const allChats = Array.from(get(chats).chats)

// private chats
Expand All @@ -205,7 +186,13 @@ export default class WakuAdapter implements Adapter {
const groupChatIds = allChats.filter(([id]) => isGroupChatId(id)).map(([id]) => id)

for (const groupChatId of groupChatIds) {
await this.subscribeToGroupChat(groupChatId, address, wakuObjectAdapter)
const lastSeenMessageTime = getLastSeenMessageTime(privateChats)
const now = new Date()
const timeFilter = {
startTime: new Date(lastSeenMessageTime + 1),
endTime: now,
}
await this.subscribeToGroupChat(groupChatId, address, wakuObjectAdapter, timeFilter)
}

// chat store
Expand Down Expand Up @@ -236,9 +223,6 @@ export default class WakuAdapter implements Adapter {
}))

const subscribeObjectStore = objectStore.subscribe(async (objects) => {
if (!adapter.waku) {
return
}
await ws.setDoc<StorageObjectEntry[]>('objects', address, Array.from(objects.objects))
})
this.subscriptions.push(subscribeObjectStore)
Expand All @@ -249,41 +233,31 @@ export default class WakuAdapter implements Adapter {
}

async onLogOut() {
await this.safeWaku.unsubscribeAll()
this.subscriptions.forEach((s) => s())
this.subscriptions = []
profile.set({ loading: false })
}

async saveUserProfile(address: string, name?: string, avatar?: string): Promise<void> {
if (!this.waku) {
this.waku = await connectWaku()
}

const defaultProfile: StorageProfile = { name: name ?? address }
const storageProfile = (await this.fetchStorageProfile(address)) || defaultProfile

if (avatar) storageProfile.avatar = avatar
if (name) storageProfile.name = name

if (avatar || name) {
const ws = makeWakustore(this.waku)
const ws = await this.makeWakustore()
ws.setDoc<StorageProfile>('profile', address, storageProfile)
profile.update((state) => ({ ...state, address, name, avatar }))
}
}

async getUserProfile(address: string): Promise<User | undefined> {
if (!this.waku) {
this.waku = await connectWaku()
}
return this.storageProfileToUser(address)
}

async startChat(address: string, peerAddress: string): Promise<string> {
if (!this.waku) {
this.waku = await connectWaku()
}

const chatId = peerAddress
const user = await this.storageProfileToUser(chatId)
if (!user) {
Expand All @@ -301,9 +275,6 @@ export default class WakuAdapter implements Adapter {
name: string,
avatar?: string,
): Promise<string> {
if (!this.waku) {
this.waku = await connectWaku()
}
if (memberAddresses.length === 0) {
throw 'invalid chat'
}
Expand All @@ -321,19 +292,15 @@ export default class WakuAdapter implements Adapter {

createGroupChat(chatId, chat.users, name, avatar, true)

const ws = makeWakustore(this.waku)
const ws = await this.makeWakustore()
await ws.setDoc<StorageChat>('group-chats', chatId, storageChat)
await this.subscribeToGroupChat(chatId, wallet.address, wakuObjectAdapter)

return chatId
}

async addMemberToGroupChat(chatId: string, users: string[]): Promise<void> {
if (!this.waku) {
this.waku = await connectWaku()
}

const ws = makeWakustore(this.waku)
const ws = await this.makeWakustore()

const groupChat = await ws.getDoc<StorageChat>('group-chats', chatId)
if (!groupChat) {
Expand All @@ -348,11 +315,7 @@ export default class WakuAdapter implements Adapter {
}

async removeFromGroupChat(chatId: string, address: string): Promise<void> {
if (!this.waku) {
this.waku = await connectWaku()
}

const ws = makeWakustore(this.waku)
const ws = await this.makeWakustore()

const groupChat = await ws.getDoc<StorageChat>('group-chats', chatId)
if (!groupChat) {
Expand All @@ -367,11 +330,7 @@ export default class WakuAdapter implements Adapter {
}

async saveGroupChatProfile(chatId: string, name?: string, avatar?: string): Promise<void> {
if (!this.waku) {
this.waku = await connectWaku()
}

const ws = makeWakustore(this.waku)
const ws = await this.makeWakustore()

const groupChat = await ws.getDoc<StorageChat>('group-chats', chatId)
if (!groupChat) {
Expand All @@ -387,10 +346,6 @@ export default class WakuAdapter implements Adapter {
}

async sendChatMessage(wallet: BaseWallet, chatId: string, text: string): Promise<void> {
if (!this.waku) {
this.waku = await connectWaku()
}

const fromAddress = wallet.address
const message: Message = {
type: 'user',
Expand All @@ -402,7 +357,7 @@ export default class WakuAdapter implements Adapter {
const wakuObjectAdapter = makeWakuObjectAdapter(this, wallet)

await addMessageToChat(fromAddress, wakuObjectAdapter, chatId, message)
await sendMessage(this.waku, chatId, message)
await this.safeWaku.sendMessage(chatId, message)
}

async sendData(
Expand All @@ -412,10 +367,6 @@ export default class WakuAdapter implements Adapter {
instanceId: string,
data: JSONSerializable,
): Promise<void> {
if (!this.waku) {
this.waku = await connectWaku()
}

const fromAddress = wallet.address
const message: Message = {
type: 'data',
Expand All @@ -430,14 +381,10 @@ export default class WakuAdapter implements Adapter {
const send = (data: JSONValue) => this.sendData(wallet, chatId, objectId, instanceId, data)

await addMessageToChat(fromAddress, wakuObjectAdapter, chatId, message, send)
await sendMessage(this.waku, chatId, message)
await this.safeWaku.sendMessage(chatId, message)
}

async sendInvite(wallet: BaseWallet, chatId: string, users: string[]): Promise<void> {
if (!this.waku) {
this.waku = await connectWaku()
}

if (!isGroupChatId(chatId)) {
throw 'chat id is private'
}
Expand All @@ -451,7 +398,7 @@ export default class WakuAdapter implements Adapter {
}

for (const user of users) {
await sendMessage(this.waku, user, message)
await this.safeWaku.sendMessage(user, message)
}
}

Expand All @@ -462,10 +409,6 @@ export default class WakuAdapter implements Adapter {
instanceId: string,
updater: (state: JSONSerializable) => JSONSerializable,
): Promise<void> {
if (!this.waku) {
this.waku = await connectWaku()
}

const key = objectKey(objectId, instanceId)
const wakuObjectStore = get(objectStore)

Expand All @@ -492,11 +435,12 @@ export default class WakuAdapter implements Adapter {
}
}

private async storageChatToChat(chatId: string, storageChat: StorageChat): Promise<Chat> {
if (!this.waku) {
throw 'no waku'
}
private async makeWakustore() {
const waku = await this.safeWaku.connect()
return makeWakustore(waku)
}

private async storageChatToChat(chatId: string, storageChat: StorageChat): Promise<Chat> {
const userPromises = storageChat.users.map((user) => this.storageProfileToUser(user))
const allUsers = await Promise.all(userPromises)
const users = allUsers.filter((user) => user) as User[]
Expand All @@ -513,11 +457,7 @@ export default class WakuAdapter implements Adapter {

// fetches the profile from the network
private async fetchStorageProfile(address: string): Promise<StorageProfile | undefined> {
if (!this.waku) {
throw 'no waku'
}

const ws = makeWakustore(this.waku)
const ws = await this.makeWakustore()
const storageProfile = await ws.getDoc<StorageProfile>('profile', address)

return storageProfile
Expand Down Expand Up @@ -554,11 +494,7 @@ export default class WakuAdapter implements Adapter {
wakuObjectAdapter: WakuObjectAdapter,
timeFilter?: TimeFilter,
) {
if (!this.waku) {
return
}

const ws = makeWakustore(this.waku)
const ws = await this.makeWakustore()

const groupChatSubscription = await ws.onSnapshot<StorageChat>(
ws.docQuery('group-chats', groupChatId),
Expand Down Expand Up @@ -588,60 +524,9 @@ export default class WakuAdapter implements Adapter {
wakuObjectAdapter: WakuObjectAdapter,
timeFilter?: TimeFilter,
) {
if (!this.waku) {
return
}

const ws = makeWakustore(this.waku)

const startTime = new Date(getLastMessageTime(get(chats).chats.get(id)) + 1)
const endTime = new Date()

const subscription = await ws.onSnapshot<Message>(
ws.collectionQuery('private-message', id, {
timeFilter: timeFilter || { startTime, endTime },
pageDirection: PageDirection.BACKWARD,
pageSize: 1000,
}),
(message) => {
this.queueMessage(message, address, id, wakuObjectAdapter)
},
this.safeWaku.subscribe(id, timeFilter, (message, chatId) =>
this.handleMessage(message, address, chatId, wakuObjectAdapter),
)
this.subscriptions.push(subscription)
}

private async queueMessage(
message: Message,
address: string,
id: string,
adapter: WakuObjectAdapter,
) {
this.queuedMessages.push({
message,
address,
id,
adapter,
})

if (this.isHandlingMessage) {
return
}

this.isHandlingMessage = true

while (this.queuedMessages.length > 0) {
const queuedMessage = this.queuedMessages.shift()
if (queuedMessage) {
await this.handleMessage(
queuedMessage.message,
queuedMessage.address,
queuedMessage.id,
queuedMessage.adapter,
)
}
}

this.isHandlingMessage = false
}

private async handleMessage(
Expand Down Expand Up @@ -752,11 +637,7 @@ export default class WakuAdapter implements Adapter {
}

private async saveChatStore(address: string) {
if (!this.waku) {
return
}

const ws = makeWakustore(this.waku)
const ws = await this.makeWakustore()
const chatData: ChatData = get(chats)

const result = await ws.setDoc<StorageChatEntry[]>('chats', address, Array.from(chatData.chats))
Expand Down
Loading

0 comments on commit c6f6452

Please sign in to comment.