Skip to content

Commit

Permalink
clean up crdt code
Browse files Browse the repository at this point in the history
  • Loading branch information
gonpombo8 committed Oct 19, 2023
1 parent 7f46d9a commit d30fe7c
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 102 deletions.
6 changes: 1 addition & 5 deletions packages/@dcl/ecs/src/serialization/crdt/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ export type PutComponentMessageBody = {
data: Uint8Array
}

export type PutNetworkComponentMessageBody = {
export type PutNetworkComponentMessageBody = Omit<PutComponentMessageBody, 'type'> & {
type: CrdtMessageType.PUT_NETWORK_COMPONENT
entityId: Entity
componentId: number
timestamp: number
data: Uint8Array
networkId: number
}

Expand Down
135 changes: 44 additions & 91 deletions packages/@dcl/ecs/src/systems/crdt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,8 @@ import { AppendValueOperation, CrdtMessageProtocol } from '../../serialization/c
import { DeleteComponent } from '../../serialization/crdt/deleteComponent'
import { DeleteEntity } from '../../serialization/crdt/deleteEntity'
import { PutComponentOperation } from '../../serialization/crdt/putComponent'
import {
CrdtMessageType,
CrdtMessageHeader,
PutComponentMessageBody,
PutNetworkComponentMessageBody
} from '../../serialization/crdt/types'
import { ReceiveMessage, Transport, TransportMessage } from './types'
import { CrdtMessageType, CrdtMessageHeader } from '../../serialization/crdt/types'
import { ReceiveMessage, Transport } from './types'
import { Schemas } from '../../schemas'
import { PutNetworkComponentOperation } from '../../serialization/crdt/putComponentNetwork'

Expand All @@ -22,8 +17,6 @@ export const NetworkEntityEngine = (engine: Pick<IEngine, 'defineComponent'>) =>
userId: Schemas.Int
})

type NetworkComponent = { entityId: number; userId: number }

/**
* @public
*/
Expand All @@ -43,9 +36,7 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
// Messages that we received at transport.onMessage waiting to be processed
const receivedMessages: ReceiveMessage[] = []
// Messages already processed by the engine but that we need to broadcast to other transports.
const broadcastMessages: TransportMessage[] = []
// Messages receieved by a transport that were outdated. We need to correct them
const outdatedMessages: TransportMessage[] = []
const broadcastMessages: ReceiveMessage[] = []

/**
*
Expand Down Expand Up @@ -119,13 +110,18 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
return messagesToProcess
}

/**
* Find the local entityId associated to the network component message.
* It's a mapping Network -> to Local
* If it's not a network message, return the entityId received by the message
*/
function findNetworkId(msg: ReceiveMessage): { entityId: Entity; network?: ReturnType<typeof NetworkEntity.get> } {
if (msg.type !== CrdtMessageType.PUT_NETWORK_COMPONENT) {
return { entityId: msg.entityId }
}

for (const [entityId, network] of engine.getEntitiesWith(NetworkEntity)) {
if (network.userId === msg.networkId && network.entityId === msg.entityId) {
console.log('[NetworkId]: ', { entityId, network })
return { entityId, network }
}
}
Expand All @@ -138,14 +134,13 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
*/
async function receiveMessages() {
const messagesToProcess = getMessages(receivedMessages)
// const bufferForOutdated = new ReadWriteByteBuffer()
const entitiesShouldBeCleaned: Entity[] = []

for (const msg of messagesToProcess) {
// eslint-disable-next-line prefer-const
let { entityId, network } = findNetworkId(msg)
// We receive a new Entity. Create the localEntity and map it to the NetworkEntity component
if (msg.type === CrdtMessageType.PUT_NETWORK_COMPONENT && !network) {
console.log('[CRDT New] New message without network', msg)
entityId = engine.addEntity()
NetworkEntity.createOrReplace(entityId, { entityId: msg.entityId, userId: msg.networkId })
}
Expand All @@ -169,28 +164,9 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
if (component) {
const [conflictMessage, value] = component.updateFromCrdt({ ...msg, entityId })

if (conflictMessage) {
// const offset = bufferForOutdated.currentWriteOffset()
// if (conflictMessage.type === CrdtMessageType.PUT_COMPONENT) {
// PutComponentOperation.write(
// msg.entityId,
// conflictMessage.timestamp,
// conflictMessage.componentId,
// conflictMessage.networkId,
// conflictMessage.data,
// bufferForOutdated
// )
// } else if (conflictMessage.type === CrdtMessageType.DELETE_COMPONENT) {
// DeleteComponent.write(entityId, component.componentId, conflictMessage.timestamp, bufferForOutdated)
// }
// outdatedMessages.push({
// ...msg,
// messageBuffer: bufferForOutdated.buffer().subarray(offset, bufferForOutdated.currentWriteOffset())
// })
} else {
if (!conflictMessage) {
// Add message to transport queue to be processed by others transports
broadcastMessages.push(msg)

onProcessEntityComponentChange && onProcessEntityComponentChange(msg.entityId, msg.type, component, value)
}
} else {
Expand All @@ -201,19 +177,11 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
}
// the last stage of the syncrhonization is to delete the entities
for (const entity of entitiesShouldBeCleaned) {
// If we tried to resend outdated message and the entity was deleted before, we avoid sending them.
for (let i = outdatedMessages.length - 1; i >= 0; i--) {
if (outdatedMessages[i].entityId === entity && outdatedMessages[i].type !== CrdtMessageType.DELETE_ENTITY) {
outdatedMessages.splice(i, 1)
}
}
for (const definition of engine.componentsIter()) {
// TODO: check this with pato/pravus
definition.entityDeleted(entity, true)
}

engine.entityContainer.updateRemovedEntity(entity)

onProcessEntityComponentChange && onProcessEntityComponentChange(entity, CrdtMessageType.DELETE_ENTITY)
}
}
Expand All @@ -223,14 +191,12 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
*/
async function sendMessages(entitiesDeletedThisTick: Entity[]) {
// CRDT Messages will be the merge between the recieved transport messages and the new crdt messages
const crdtMessages = getMessages<TransportMessage & { network?: NetworkComponent }>(broadcastMessages)
const outdatedMessagesBkp = getMessages(outdatedMessages)
const crdtMessages = getMessages(broadcastMessages)
const buffer = new ReadWriteByteBuffer()

for (const component of engine.componentsIter()) {
for (const message of component.getCrdtUpdates()) {
const offset = buffer.currentWriteOffset()
const network = NetworkEntity.getOrNull(message.entityId) || undefined
// Avoid creating messages if there is no transport that will handle it
if (transports.some((t) => t.filter(message))) {
if (message.type === CrdtMessageType.PUT_COMPONENT) {
Expand All @@ -242,8 +208,7 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
}
crdtMessages.push({
...message,
messageBuffer: buffer.buffer().subarray(offset, buffer.currentWriteOffset()),
network
messageBuffer: buffer.buffer().subarray(offset, buffer.currentWriteOffset())
})

if (onProcessEntityComponentChange) {
Expand Down Expand Up @@ -272,65 +237,53 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang

// Send CRDT messages to transports
const transportBuffer = new ReadWriteByteBuffer()
if (crdtMessages.find((a) => a.entityId > 500)) {
console.log(crdtMessages)
}
for (const index in transports) {
const transportIndex = Number(index)
const transport = transports[transportIndex]
const isRendererTransport = !!(transport as any).isRenderer
const isRendererTransport = transport.type === 'renderer'
const isNetworkTransport = transport.type === 'network'
transportBuffer.resetBuffer()
// First we need to send all the messages that were outdated from a transport
// So we can fix their crdt state
for (const message of outdatedMessagesBkp) {
if (
message.transportId === transportIndex &&
// TODO: This is an optimization, the state should converge anyway, whatever the message is sent.
// Avoid sending multiple messages for the same entity-componentId
!crdtMessages.find(
(m) =>
m.entityId === message.entityId &&
// TODO: as any, with multiple type of messages, it should have many checks before the check for similar messages
(m as any).componentId &&
(m as any).componentId === (message as any).componentId
)
) {
transportBuffer.writeBuffer(message.messageBuffer, false)
}
}
const buffer = new ReadWriteByteBuffer()

// Then we send all the new crdtMessages that the transport needs to process
for (const message of crdtMessages) {
if (message.transportId !== transportIndex && transport.filter(message)) {
if (isRendererTransport && message.type === CrdtMessageType.PUT_NETWORK_COMPONENT) {
const msg = message as any as PutNetworkComponentMessageBody
const { entityId } = findNetworkId(message as any)
const offset = buffer.currentWriteOffset()

PutComponentOperation.write(entityId, msg.timestamp, msg.componentId, msg.data, buffer)
transportBuffer.writeBuffer(buffer.buffer().subarray(offset, buffer.currentWriteOffset()), false)
} else if (!isRendererTransport && message.type === CrdtMessageType.PUT_COMPONENT && message.network) {
const msg = message as any as PutComponentMessageBody
// Avoid echo messages
if (message.transportId === transportIndex) continue
// Redundant message for the transport
if (!transport.filter(message)) continue
// If it's the renderer transport and its a NetworkMessage, we need to fix the entityId field and convert it to a known Message.
// PUT_NETWORK_COMPONENT -> PUT_COMPONENT
if (isRendererTransport && message.type === CrdtMessageType.PUT_NETWORK_COMPONENT) {
const { entityId } = findNetworkId(message)
const offset = buffer.currentWriteOffset()
PutComponentOperation.write(entityId, message.timestamp, message.componentId, message.data, buffer)
transportBuffer.writeBuffer(buffer.buffer().subarray(offset, buffer.currentWriteOffset()), false)
// Iterate the next message
continue
}
// If its a network transport and its a PUT_COMPONENT that has a NetworkEntity component, we need to send this message
// through comms with the EntityID and NetworkID from ther NetworkEntity so everyone can recieve this message and map to their custom entityID.
if (isNetworkTransport && message.type === CrdtMessageType.PUT_COMPONENT) {
const networkData = NetworkEntity.getOrNull(message.entityId)
// If it has networkData convert the message to PUT_NETWORK_COMPONENT.
if (networkData) {
const offset = buffer.currentWriteOffset()
PutNetworkComponentOperation.write(
message.network.entityId as Entity,
msg.timestamp,
msg.componentId,
message.network.userId ?? 0,
msg.data,
networkData.entityId as Entity,
message.timestamp,
message.componentId,
networkData.userId,
message.data,
buffer
)
transportBuffer.writeBuffer(buffer.buffer().subarray(offset, buffer.currentWriteOffset()), false)
} else {
transportBuffer.writeBuffer(message.messageBuffer, false)
// Iterate the next message
continue
}
}
transportBuffer.writeBuffer(message.messageBuffer, false)
}
const message = transportBuffer.currentWriteOffset() ? transportBuffer.toBinary() : new Uint8Array([])
if (!isRendererTransport && message.byteLength) {
console.log('Sending', message)
}

await transport.send(message)
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/@dcl/ecs/src/systems/crdt/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ export type Transport = {
send(message: Uint8Array): Promise<void>
onmessage?(message: Uint8Array): void
filter(message: Omit<TransportMessage, 'messageBuffer'>): boolean
type?: string
}
1 change: 0 additions & 1 deletion packages/@dcl/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { compositeProvider } from './composite-provider'
// Attach CRDT transport
// @internal
export const rendererTransport = createRendererTransport({ crdtSendToRenderer })
;(rendererTransport as any).isRenderer = true
engine.addTransport(rendererTransport)

export async function onUpdate(deltaTime: number) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ export function createRendererTransport(engineApi: EngineApiForTransport): Trans
}

return !!message
}
},
type: 'renderer'
}

return rendererTransport
Expand Down
2 changes: 0 additions & 2 deletions packages/@dcl/sdk/src/network-transport/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,10 @@ export function syncFilter(message: Omit<TransportMessage, 'messageBuffer'>) {
}

if (componentId === NetworkEntity.componentId) {
console.log('Message discarded', message)
return false
}

if (componentId && sync.componentIds.includes(componentId)) {
console.log('[SYNC COMPONENT]', message)
return true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export function addSyncTransport() {
const transport: Transport = {
filter: syncFilter,
send: async (message: Uint8Array) => {
message.byteLength && console.log(Array.from(serializeCrdtMessages('[CRDT Send]: ', message, engine)))
const messagesToProcess = await sendBinary({ data: message })
if (messagesToProcess.data.length) {
if (transport.onmessage) {
Expand All @@ -20,7 +19,8 @@ export function addSyncTransport() {
}
}
}
}
},
type: 'network'
}
engine.addTransport(transport)
}
Expand Down

0 comments on commit d30fe7c

Please sign in to comment.