Skip to content

Commit

Permalink
expose stateIsSyncronized flag to check if the crdt state is initialized
Browse files Browse the repository at this point in the history
  • Loading branch information
gonpombo8 committed Oct 17, 2024
1 parent aeb6a17 commit 780dcab
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
9 changes: 3 additions & 6 deletions packages/@dcl/sdk/src/network/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ import { addSyncTransport } from './message-bus-sync'
import { getUserData } from '~system/UserIdentity'

// initialize sync transport for sdk engine
const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild } = addSyncTransport(
engine,
sendBinary,
getUserData
)
const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild, isStateSyncronized } =
addSyncTransport(engine, sendBinary, getUserData)

export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent }
export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, isStateSyncronized }
56 changes: 48 additions & 8 deletions packages/@dcl/sdk/src/network/message-bus-sync.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IEngine, Transport, RealmInfo } from '@dcl/ecs'
import { IEngine, Transport, RealmInfo, PlayerIdentityData } from '@dcl/ecs'
import { type SendBinaryRequest, type SendBinaryResponse } from '~system/CommunicationsController'

import { syncFilter } from './filter'
Expand Down Expand Up @@ -61,23 +61,58 @@ export function addSyncTransport(
if (sender !== myProfile.userId) return
DEBUG_NETWORK_MESSAGES() && console.log('[Processing CRDT State]', data.byteLength)
transport.onmessage!(data)
stateIsSyncronized = true
})

binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, (message, userId) => {
binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, async (message, userId) => {
console.log(`Sending CRDT State to: ${userId}`)
transport.onmessage!(message)
binaryMessageBus.emit(CommsMessage.RES_CRDT_STATE, encodeCRDTState(userId, engineToCrdt(engine)))
})

async function sleep(ms: number) {
let timer = 0
function system(dt: number) {
timer += dt
if (timer >= ms) {
engine.removeSystem(system)
return Promise.resolve()
}
}
engine.addSystem(system)
}

const players = definePlayerHelper(engine)

let stateIsSyncronized = false

let requestCrdtStateWhenConnected = false

async function requestState() {
requestCrdtStateWhenConnected = false
let players = Array.from(engine.getEntitiesWith(PlayerIdentityData))
DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`)
binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine))
await sleep(3000)
players = Array.from(engine.getEntitiesWith(PlayerIdentityData))

if (!stateIsSyncronized) {
if (players.length > 1) {
DEBUG_NETWORK_MESSAGES() &&
console.log(`Requesting state again (no response). Players connected: ${players.length - 1}`)
void requestState()
} else {
DEBUG_NETWORK_MESSAGES() && console.log('No active players. State syncronized')
stateIsSyncronized = true
}
}
}

players.onEnterScene((player) => {
DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId)
if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) {
if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) {
DEBUG_NETWORK_MESSAGES() && console.log('Requesting state')
binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine))
void requestState()
} else {
DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted')
requestCrdtStateWhenConnected = true
Expand All @@ -87,15 +122,15 @@ export function addSyncTransport(

RealmInfo.onChange(engine.RootEntity, (value) => {
if (value?.isConnectedSceneRoom && requestCrdtStateWhenConnected) {
DEBUG_NETWORK_MESSAGES() && console.log('Requesting state.')
requestCrdtStateWhenConnected = false
binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine))
void requestState()
}
})

players.onLeaveScene((userId) => {
DEBUG_NETWORK_MESSAGES() && console.log('[onLeaveScene]', userId)
if (userId === myProfile.userId) {
DEBUG_NETWORK_MESSAGES() && console.log('Disconnected from comms')
stateIsSyncronized = false
requestCrdtStateWhenConnected = false
}
})
Expand All @@ -107,9 +142,14 @@ export function addSyncTransport(
transport.onmessage!(value)
})

function isStateSyncronized() {
return stateIsSyncronized
}

return {
...entityDefinitions,
myProfile
myProfile,
isStateSyncronized
}
}

Expand Down

0 comments on commit 780dcab

Please sign in to comment.