From 7f46d9aa4967b70f3d0bc8400e1482da43bf67ab Mon Sep 17 00:00:00 2001 From: Gonzalo DCL Date: Wed, 18 Oct 2023 16:11:38 -0300 Subject: [PATCH] wip put component network operation --- .../lww-element-set-component-definition.ts | 13 +- .../ecs/src/serialization/crdt/message.ts | 3 + .../serialization/crdt/putComponentNetwork.ts | 64 +++++++++ .../@dcl/ecs/src/serialization/crdt/types.ts | 22 ++- packages/@dcl/ecs/src/systems/crdt/index.ts | 125 ++++++++++++++---- packages/@dcl/sdk/src/index.ts | 1 + .../sdk/src/internal/transports/logger.ts | 1 + .../@dcl/sdk/src/network-transport/client.ts | 2 +- .../@dcl/sdk/src/network-transport/server.ts | 2 +- .../@dcl/sdk/src/network-transport/utils.ts | 19 ++- .../src/create-cube.ts | 40 +++++- .../sdk7-humming-birds-sync/src/index.ts | 19 +-- .../src/message-bus-sync.ts | 25 +++- .../sdk7-humming-birds-sync/src/ui.tsx | 21 +++ 14 files changed, 297 insertions(+), 60 deletions(-) create mode 100644 packages/@dcl/ecs/src/serialization/crdt/putComponentNetwork.ts diff --git a/packages/@dcl/ecs/src/engine/lww-element-set-component-definition.ts b/packages/@dcl/ecs/src/engine/lww-element-set-component-definition.ts index 983a803d1..1842c2237 100644 --- a/packages/@dcl/ecs/src/engine/lww-element-set-component-definition.ts +++ b/packages/@dcl/ecs/src/engine/lww-element-set-component-definition.ts @@ -7,7 +7,8 @@ import { CrdtMessageType, CrdtMessageBody, PutComponentOperation, - DeleteComponent + DeleteComponent, + PutNetworkComponentMessageBody } from '../serialization/crdt' import { dataCompare } from '../systems/crdt/utils' import { LastWriteWinElementSetComponentDefinition, ComponentType } from './component' @@ -63,7 +64,7 @@ export function createUpdateLwwFromCrdt( * @public */ function crdtRuleForCurrentState( - message: PutComponentMessageBody | DeleteComponentMessageBody + message: PutComponentMessageBody | DeleteComponentMessageBody | PutNetworkComponentMessageBody ): ProcessMessageResultType { const { entityId, timestamp } = message const currentTimestamp = timestamps.get(entityId as Entity) @@ -109,7 +110,11 @@ export function createUpdateLwwFromCrdt( return (msg: CrdtMessageBody): [null | PutComponentMessageBody | DeleteComponentMessageBody, any] => { /* istanbul ignore next */ - if (msg.type !== CrdtMessageType.PUT_COMPONENT && msg.type !== CrdtMessageType.DELETE_COMPONENT) + if ( + msg.type !== CrdtMessageType.PUT_COMPONENT && + msg.type !== CrdtMessageType.DELETE_COMPONENT && + msg.type !== CrdtMessageType.PUT_NETWORK_COMPONENT + ) /* istanbul ignore next */ return [null, data.get(msg.entityId)] @@ -120,7 +125,7 @@ export function createUpdateLwwFromCrdt( case ProcessMessageResultType.StateUpdatedTimestamp: { timestamps.set(entity, msg.timestamp) - if (msg.type === CrdtMessageType.PUT_COMPONENT) { + if (msg.type === CrdtMessageType.PUT_COMPONENT || msg.type === CrdtMessageType.PUT_NETWORK_COMPONENT) { const buf = new ReadWriteByteBuffer(msg.data!) data.set(entity, schema.deserialize(buf)) } else { diff --git a/packages/@dcl/ecs/src/serialization/crdt/message.ts b/packages/@dcl/ecs/src/serialization/crdt/message.ts index 3bfef6e31..47fe901ab 100644 --- a/packages/@dcl/ecs/src/serialization/crdt/message.ts +++ b/packages/@dcl/ecs/src/serialization/crdt/message.ts @@ -5,6 +5,7 @@ import { PutComponentOperation } from './putComponent' import { DeleteComponent } from './deleteComponent' import { DeleteEntity } from './deleteEntity' import { AppendValueOperation } from './appendValue' +import { PutNetworkComponentOperation } from './putComponentNetwork' export function readMessage(buf: ByteBuffer): CrdtMessage | null { const header = CrdtMessageProtocol.getHeader(buf) @@ -12,6 +13,8 @@ export function readMessage(buf: ByteBuffer): CrdtMessage | null { if (header.type === CrdtMessageType.PUT_COMPONENT) { return PutComponentOperation.read(buf) + } else if (header.type === CrdtMessageType.PUT_NETWORK_COMPONENT) { + return PutNetworkComponentOperation.read(buf) } else if (header.type === CrdtMessageType.DELETE_COMPONENT) { return DeleteComponent.read(buf) } else if (header.type === CrdtMessageType.APPEND_VALUE) { diff --git a/packages/@dcl/ecs/src/serialization/crdt/putComponentNetwork.ts b/packages/@dcl/ecs/src/serialization/crdt/putComponentNetwork.ts new file mode 100644 index 000000000..767ebcfa6 --- /dev/null +++ b/packages/@dcl/ecs/src/serialization/crdt/putComponentNetwork.ts @@ -0,0 +1,64 @@ +import { CrdtMessageProtocol } from './crdtMessageProtocol' +import { Entity } from '../../engine/entity' +import { ByteBuffer } from '../ByteBuffer' +import { CrdtMessageType, CRDT_MESSAGE_HEADER_LENGTH, PutComponentMessage } from './types' + +/** + * @public + */ +export namespace PutNetworkComponentOperation { + export const MESSAGE_HEADER_LENGTH = 20 + + /** + * Call this function for an optimal writing data passing the ByteBuffer + * already allocated + */ + export function write( + entity: Entity, + timestamp: number, + componentId: number, + networkId: number, + data: Uint8Array, + buf: ByteBuffer + ) { + // reserve the beginning + const startMessageOffset = buf.incrementWriteOffset(CRDT_MESSAGE_HEADER_LENGTH + MESSAGE_HEADER_LENGTH) + + // write body + buf.writeBuffer(data, false) + const messageLength = buf.currentWriteOffset() - startMessageOffset + + // Write CrdtMessage header + buf.setUint32(startMessageOffset, messageLength) + buf.setUint32(startMessageOffset + 4, CrdtMessageType.PUT_NETWORK_COMPONENT) + + // Write ComponentOperation header + buf.setUint32(startMessageOffset + 8, entity as number) + buf.setUint32(startMessageOffset + 12, componentId) + buf.setUint32(startMessageOffset + 16, timestamp) + buf.setUint32(startMessageOffset + 20, networkId) + const newLocal = messageLength - MESSAGE_HEADER_LENGTH - CRDT_MESSAGE_HEADER_LENGTH + buf.setUint32(startMessageOffset + 24, newLocal) + } + + export function read(buf: ByteBuffer): (PutComponentMessage & { networkId: number }) | null { + const header = CrdtMessageProtocol.readHeader(buf) + + if (!header) { + return null + } + + if (header.type !== CrdtMessageType.PUT_NETWORK_COMPONENT) { + throw new Error('PutComponentOperation tried to read another message type.') + } + + return { + ...header, + entityId: buf.readUint32() as Entity, + componentId: buf.readUint32(), + timestamp: buf.readUint32(), + networkId: buf.readUint32(), + data: buf.readBuffer() + } + } +} diff --git a/packages/@dcl/ecs/src/serialization/crdt/types.ts b/packages/@dcl/ecs/src/serialization/crdt/types.ts index 74aa6b527..71dfd78b7 100644 --- a/packages/@dcl/ecs/src/serialization/crdt/types.ts +++ b/packages/@dcl/ecs/src/serialization/crdt/types.ts @@ -12,6 +12,7 @@ export enum CrdtMessageType { DELETE_ENTITY = 3, APPEND_VALUE = 4, + PUT_NETWORK_COMPONENT = 5, MAX_MESSAGE_TYPE } @@ -50,6 +51,15 @@ export type PutComponentMessageBody = { data: Uint8Array } +export type PutNetworkComponentMessageBody = { + type: CrdtMessageType.PUT_NETWORK_COMPONENT + entityId: Entity + componentId: number + timestamp: number + data: Uint8Array + networkId: number +} + /** * Min. length = header (8 bytes) + 16 bytes = 24 bytes * @@ -97,6 +107,10 @@ export type AppendValueMessage = CrdtMessageHeader & AppendValueMessageBody * @public */ export type PutComponentMessage = CrdtMessageHeader & PutComponentMessageBody +/** + * @public + */ +export type PutNetworkComponentMessage = CrdtMessageHeader & PutNetworkComponentMessageBody /** * @public */ @@ -109,12 +123,18 @@ export type DeleteEntityMessage = CrdtMessageHeader & DeleteEntityMessageBody /** * @public */ -export type CrdtMessage = PutComponentMessage | DeleteComponentMessage | DeleteEntityMessage | AppendValueMessage +export type CrdtMessage = + | PutComponentMessage + | PutNetworkComponentMessage + | DeleteComponentMessage + | DeleteEntityMessage + | AppendValueMessage /** * @public */ export type CrdtMessageBody = + | PutNetworkComponentMessageBody | PutComponentMessageBody | DeleteComponentMessageBody | DeleteEntityMessageBody diff --git a/packages/@dcl/ecs/src/systems/crdt/index.ts b/packages/@dcl/ecs/src/systems/crdt/index.ts index 0546131bd..109d35c26 100644 --- a/packages/@dcl/ecs/src/systems/crdt/index.ts +++ b/packages/@dcl/ecs/src/systems/crdt/index.ts @@ -1,13 +1,28 @@ import { Entity, EntityState } from '../../engine/entity' import type { ComponentDefinition } from '../../engine' -import type { PreEngine } from '../../engine/types' +import type { IEngine, PreEngine } from '../../engine/types' import { ReadWriteByteBuffer } from '../../serialization/ByteBuffer' import { AppendValueOperation, CrdtMessageProtocol } from '../../serialization/crdt' import { DeleteComponent } from '../../serialization/crdt/deleteComponent' import { DeleteEntity } from '../../serialization/crdt/deleteEntity' import { PutComponentOperation } from '../../serialization/crdt/putComponent' -import { CrdtMessageType, CrdtMessageHeader } from '../../serialization/crdt/types' +import { + CrdtMessageType, + CrdtMessageHeader, + PutComponentMessageBody, + PutNetworkComponentMessageBody +} from '../../serialization/crdt/types' import { ReceiveMessage, Transport, TransportMessage } from './types' +import { Schemas } from '../../schemas' +import { PutNetworkComponentOperation } from '../../serialization/crdt/putComponentNetwork' + +export const NetworkEntityEngine = (engine: Pick) => + engine.defineComponent('chore:network-entity', { + entityId: Schemas.Int, + userId: Schemas.Int + }) + +type NetworkComponent = { entityId: number; userId: number } /** * @public @@ -24,7 +39,7 @@ export type OnChangeFunction = ( */ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChange: OnChangeFunction | null) { const transports: Transport[] = [] - + const NetworkEntity = NetworkEntityEngine(engine) // Messages that we received at transport.onMessage waiting to be processed const receivedMessages: ReceiveMessage[] = [] // Messages already processed by the engine but that we need to broadcast to other transports. @@ -64,6 +79,13 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang transportId, messageBuffer: buffer.buffer().subarray(offset, buffer.currentReadOffset()) }) + } else if (header.type === CrdtMessageType.PUT_NETWORK_COMPONENT) { + const message = PutNetworkComponentOperation.read(buffer)! + receivedMessages.push({ + ...message, + transportId, + messageBuffer: buffer.buffer().subarray(offset, buffer.currentReadOffset()) + }) } else if (header.type === CrdtMessageType.DELETE_ENTITY) { const message = DeleteEntity.read(buffer)! receivedMessages.push({ @@ -97,51 +119,70 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang return messagesToProcess } + function findNetworkId(msg: ReceiveMessage): { entityId: Entity; network?: ReturnType } { + if (msg.type !== CrdtMessageType.PUT_NETWORK_COMPONENT) { + return { entityId: msg.entityId } + } + for (const [entityId, network] of engine.getEntitiesWith(NetworkEntity)) { + if (network.userId === msg.networkId && network.entityId === msg.entityId) { + console.log('[NetworkId]: ', { entityId, network }) + return { entityId, network } + } + } + return { entityId: msg.entityId } + } + /** * This fn will be called on every tick. * Process all the messages queue received by the transport */ async function receiveMessages() { const messagesToProcess = getMessages(receivedMessages) - const bufferForOutdated = new ReadWriteByteBuffer() + // const bufferForOutdated = new ReadWriteByteBuffer() const entitiesShouldBeCleaned: Entity[] = [] for (const msg of messagesToProcess) { + // eslint-disable-next-line prefer-const + let { entityId, network } = findNetworkId(msg) + if (msg.type === CrdtMessageType.PUT_NETWORK_COMPONENT && !network) { + console.log('[CRDT New] New message without network', msg) + entityId = engine.addEntity() + NetworkEntity.createOrReplace(entityId, { entityId: msg.entityId, userId: msg.networkId }) + } if (msg.type === CrdtMessageType.DELETE_ENTITY) { - entitiesShouldBeCleaned.push(msg.entityId) + entitiesShouldBeCleaned.push(entityId) broadcastMessages.push(msg) } else { - const entityState = engine.entityContainer.getEntityState(msg.entityId) + const entityState = engine.entityContainer.getEntityState(entityId) // Skip updates from removed entityes if (entityState === EntityState.Removed) continue // Entities with unknown entities should update its entity state if (entityState === EntityState.Unknown) { - engine.entityContainer.updateUsedEntity(msg.entityId) + engine.entityContainer.updateUsedEntity(entityId) } const component = engine.getComponentOrNull(msg.componentId) /* istanbul ignore else */ if (component) { - const [conflictMessage, value] = component.updateFromCrdt(msg) + const [conflictMessage, value] = component.updateFromCrdt({ ...msg, entityId }) if (conflictMessage) { - const offset = bufferForOutdated.currentWriteOffset() - - if (conflictMessage.type === CrdtMessageType.PUT_COMPONENT) { - PutComponentOperation.write( - msg.entityId, - conflictMessage.timestamp, - conflictMessage.componentId, - conflictMessage.data, - bufferForOutdated - ) - } else if (conflictMessage.type === CrdtMessageType.DELETE_COMPONENT) { - DeleteComponent.write(msg.entityId, component.componentId, conflictMessage.timestamp, bufferForOutdated) - } - + // const offset = bufferForOutdated.currentWriteOffset() + // if (conflictMessage.type === CrdtMessageType.PUT_COMPONENT) { + // PutComponentOperation.write( + // msg.entityId, + // conflictMessage.timestamp, + // conflictMessage.componentId, + // conflictMessage.networkId, + // conflictMessage.data, + // bufferForOutdated + // ) + // } else if (conflictMessage.type === CrdtMessageType.DELETE_COMPONENT) { + // DeleteComponent.write(entityId, component.componentId, conflictMessage.timestamp, bufferForOutdated) + // } // outdatedMessages.push({ // ...msg, // messageBuffer: bufferForOutdated.buffer().subarray(offset, bufferForOutdated.currentWriteOffset()) @@ -182,14 +223,14 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang */ async function sendMessages(entitiesDeletedThisTick: Entity[]) { // CRDT Messages will be the merge between the recieved transport messages and the new crdt messages - const crdtMessages = getMessages(broadcastMessages) + const crdtMessages = getMessages(broadcastMessages) const outdatedMessagesBkp = getMessages(outdatedMessages) const buffer = new ReadWriteByteBuffer() for (const component of engine.componentsIter()) { for (const message of component.getCrdtUpdates()) { const offset = buffer.currentWriteOffset() - + const network = NetworkEntity.getOrNull(message.entityId) || undefined // Avoid creating messages if there is no transport that will handle it if (transports.some((t) => t.filter(message))) { if (message.type === CrdtMessageType.PUT_COMPONENT) { @@ -199,10 +240,10 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang } else if (message.type === CrdtMessageType.APPEND_VALUE) { AppendValueOperation.write(message.entityId, message.timestamp, message.componentId, message.data, buffer) } - crdtMessages.push({ ...message, - messageBuffer: buffer.buffer().subarray(offset, buffer.currentWriteOffset()) + messageBuffer: buffer.buffer().subarray(offset, buffer.currentWriteOffset()), + network }) if (onProcessEntityComponentChange) { @@ -231,9 +272,13 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang // Send CRDT messages to transports const transportBuffer = new ReadWriteByteBuffer() + if (crdtMessages.find((a) => a.entityId > 500)) { + console.log(crdtMessages) + } for (const index in transports) { const transportIndex = Number(index) const transport = transports[transportIndex] + const isRendererTransport = !!(transport as any).isRenderer transportBuffer.resetBuffer() // First we need to send all the messages that were outdated from a transport // So we can fix their crdt state @@ -253,13 +298,39 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang transportBuffer.writeBuffer(message.messageBuffer, false) } } + const buffer = new ReadWriteByteBuffer() // Then we send all the new crdtMessages that the transport needs to process for (const message of crdtMessages) { if (message.transportId !== transportIndex && transport.filter(message)) { - transportBuffer.writeBuffer(message.messageBuffer, false) + if (isRendererTransport && message.type === CrdtMessageType.PUT_NETWORK_COMPONENT) { + const msg = message as any as PutNetworkComponentMessageBody + const { entityId } = findNetworkId(message as any) + const offset = buffer.currentWriteOffset() + + PutComponentOperation.write(entityId, msg.timestamp, msg.componentId, msg.data, buffer) + transportBuffer.writeBuffer(buffer.buffer().subarray(offset, buffer.currentWriteOffset()), false) + } else if (!isRendererTransport && message.type === CrdtMessageType.PUT_COMPONENT && message.network) { + const msg = message as any as PutComponentMessageBody + const offset = buffer.currentWriteOffset() + PutNetworkComponentOperation.write( + message.network.entityId as Entity, + msg.timestamp, + msg.componentId, + message.network.userId ?? 0, + msg.data, + buffer + ) + transportBuffer.writeBuffer(buffer.buffer().subarray(offset, buffer.currentWriteOffset()), false) + } else { + transportBuffer.writeBuffer(message.messageBuffer, false) + } } } const message = transportBuffer.currentWriteOffset() ? transportBuffer.toBinary() : new Uint8Array([]) + if (!isRendererTransport && message.byteLength) { + console.log('Sending', message) + } + await transport.send(message) } } diff --git a/packages/@dcl/sdk/src/index.ts b/packages/@dcl/sdk/src/index.ts index 7c8e8e098..3c82f5266 100644 --- a/packages/@dcl/sdk/src/index.ts +++ b/packages/@dcl/sdk/src/index.ts @@ -8,6 +8,7 @@ import { compositeProvider } from './composite-provider' // Attach CRDT transport // @internal export const rendererTransport = createRendererTransport({ crdtSendToRenderer }) +;(rendererTransport as any).isRenderer = true engine.addTransport(rendererTransport) export async function onUpdate(deltaTime: number) { diff --git a/packages/@dcl/sdk/src/internal/transports/logger.ts b/packages/@dcl/sdk/src/internal/transports/logger.ts index 88fd8736c..f98c610b7 100644 --- a/packages/@dcl/sdk/src/internal/transports/logger.ts +++ b/packages/@dcl/sdk/src/internal/transports/logger.ts @@ -16,6 +16,7 @@ export function* serializeCrdtMessages(prefix: string, data: Uint8Array, engine: if ( message.type === CrdtMessageType.PUT_COMPONENT || + message.type === CrdtMessageType.PUT_NETWORK_COMPONENT || message.type === CrdtMessageType.DELETE_COMPONENT || message.type === CrdtMessageType.APPEND_VALUE ) { diff --git a/packages/@dcl/sdk/src/network-transport/client.ts b/packages/@dcl/sdk/src/network-transport/client.ts index 9a41ed8ad..d395561ab 100644 --- a/packages/@dcl/sdk/src/network-transport/client.ts +++ b/packages/@dcl/sdk/src/network-transport/client.ts @@ -1,4 +1,4 @@ -import { craftMessage, createNetworkManager, encodeString, syncFilter } from './utils' +import { syncFilter, craftMessage, createNetworkManager, encodeString } from './utils' import { Transport, engine } from '@dcl/ecs' import { getHeaders } from '~system/SignedFetch' diff --git a/packages/@dcl/sdk/src/network-transport/server.ts b/packages/@dcl/sdk/src/network-transport/server.ts index 11192c3ae..962281a57 100644 --- a/packages/@dcl/sdk/src/network-transport/server.ts +++ b/packages/@dcl/sdk/src/network-transport/server.ts @@ -1,6 +1,6 @@ import { engine, SyncComponents, Transport } from '@dcl/ecs' import { engineToCrdt } from './state' -import { syncFilter, createNetworkManager } from './utils' +import { createNetworkManager, syncFilter } from './utils' import { NetworkManager, ServerTransportConfig } from './types' import { PlayersConnected } from '.' diff --git a/packages/@dcl/sdk/src/network-transport/utils.ts b/packages/@dcl/sdk/src/network-transport/utils.ts index c761f36de..bc9df4882 100644 --- a/packages/@dcl/sdk/src/network-transport/utils.ts +++ b/packages/@dcl/sdk/src/network-transport/utils.ts @@ -7,7 +7,8 @@ import { SyncComponents, CrdtMessageType, EntityUtils, - GltfContainerLoadingState + GltfContainerLoadingState, + Schemas } from '@dcl/ecs' import { MessageType } from './types' import { connected, reservedLocalEntities } from '.' @@ -52,18 +53,28 @@ export function syncFilter(message: Omit) { return true } + const sync = SyncComponents.getOrNull(message.entityId) + if (!sync) return false + // TBD: First component if ((message as any).timestamp <= 1) { return true } - const sync = SyncComponents.getOrNull(message.entityId) - if (!sync) return false + if (componentId === NetworkEntity.componentId) { + console.log('Message discarded', message) + return false + } - if ((message as any).componentId && sync.componentIds.includes((message as any).componentId)) { + if (componentId && sync.componentIds.includes(componentId)) { console.log('[SYNC COMPONENT]', message) return true } return false } + +export const NetworkEntity = engine.defineComponent('chore:network-entity', { + entityId: Schemas.Int, + userId: Schemas.Int +}) diff --git a/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/create-cube.ts b/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/create-cube.ts index 55f01aca7..db5f0180a 100644 --- a/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/create-cube.ts +++ b/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/create-cube.ts @@ -16,7 +16,27 @@ import { NetworkManager } from '@dcl/sdk/network-transport/types' // Cube factory export const Cube = engine.defineComponent('cube', {}) -export function createCube(entityFactory: NetworkManager, x: number, y: number, z: number): Entity { +export function createTriangle(entityFactory: NetworkManager, x: number, y: number, z: number, sync: boolean = true) { + const entity = createCube(entityFactory, x, y, z, sync) + MeshRenderer.setCylinder(entity, 1, 0) + MeshCollider.setCylinder(entity, 1, 0) + return entity +} + +export function createCircle(entityFactory: NetworkManager, x: number, y: number, z: number, sync: boolean = true) { + const entity = createCube(entityFactory, x, y, z, sync) + MeshRenderer.setCylinder(entity, 3, 3) + MeshCollider.setCylinder(entity, 3, 3) + return entity +} + +export function createCube( + entityFactory: NetworkManager, + x: number, + y: number, + z: number, + sync: boolean = true +): Entity { const entity = entityFactory.addEntity(engine) // Used to track the cubes Cube.create(entity) @@ -25,9 +45,8 @@ export function createCube(entityFactory: NetworkManager, x: number, y: number, // set how the cube looks and collides MeshRenderer.setBox(entity) MeshCollider.setBox(entity) - Material.setPbrMaterial(entity, { albedoColor: Color4.fromHexString(getRandomHexColor()) }) - SyncComponents.create(entity, { componentIds: [Material.componentId] }) + sync && SyncComponents.create(entity, { componentIds: [Material.componentId] }) PointerEvents.create(entity, { pointerEvents: [ @@ -54,3 +73,18 @@ export function changeColorSystem() { } } } + +export function createCubes(networkManager: NetworkManager) { + for (const [x, y, z] of [ + [44, 1, 26], + [36, 2, 37], + [20, 3, 40], + [19, 1, 23], + [31, 5, 8], + [43, 4, 6], + [37, 3, 24], + [5, 8, 2] + ]) { + createCube(networkManager, x, y, z) + } +} diff --git a/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/index.ts b/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/index.ts index d3ee11c32..0e68470ec 100644 --- a/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/index.ts +++ b/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/index.ts @@ -19,8 +19,7 @@ import { isServer } from '~system/EngineApi' import { getUserData } from '~system/UserIdentity' import { NetworkManager } from '@dcl/sdk/network-transport/types' import { createMovingPlatforms } from './moving-platforms' -import { changeColorSystem, createCube } from './create-cube' -import { createMovingPlatformsOld } from './moving-platforms-old' +import { changeColorSystem, createCube, createCubes } from './create-cube' import { addSyncTransport } from './message-bus-sync' export const GameStatus = engine.defineComponent('game-status', { paused: Schemas.Boolean }) @@ -47,24 +46,12 @@ export async function main() { const userId = (await getUserData({})).data?.userId ?? '' setupUi(userId) + if (server || true) { engine.addSystem(moveHummingBirds) gameStatusServer(networkManager) createMovingPlatforms(networkManager) - // createMovingPlatformsOld(networkManager) - for (const [x, y, z] of [ - [44, 1, 26], - [36, 2, 37], - [20, 3, 40], - [19, 1, 23], - [31, 5, 8], - [43, 4, 6], - [37, 3, 24], - [5, 8, 2] - ]) { - createCube(networkManager, x, y, z) - } - // return + createCubes(networkManager) } if (!server) { diff --git a/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/message-bus-sync.ts b/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/message-bus-sync.ts index cfca7737d..c8a22fa4f 100644 --- a/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/message-bus-sync.ts +++ b/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/message-bus-sync.ts @@ -1,18 +1,21 @@ -import { Transport, engine } from '@dcl/sdk/ecs' +import { Entity, Transport, engine } from '@dcl/sdk/ecs' +import { componentNumberFromName } from '@dcl/ecs/dist/components/component-number' -import { syncFilter } from '@dcl/sdk/network-transport/utils' +import { syncFilter, NetworkEntity } from '@dcl/sdk/network-transport/utils' import { serializeCrdtMessages } from '@dcl/sdk/internal/transports/logger' import { sendBinary } from '~system/CommunicationsController' +import { getUserData } from '~system/UserIdentity' export function addSyncTransport() { const transport: Transport = { filter: syncFilter, send: async (message: Uint8Array) => { + message.byteLength && console.log(Array.from(serializeCrdtMessages('[CRDT Send]: ', message, engine))) const messagesToProcess = await sendBinary({ data: message }) if (messagesToProcess.data.length) { if (transport.onmessage) { for (const byteArray of messagesToProcess.data) { - console.log(Array.from(serializeCrdtMessages('[CRDT]: ', byteArray, engine))) + console.log(Array.from(serializeCrdtMessages('[CRDT Receive]: ', byteArray, engine))) transport.onmessage(byteArray) } } @@ -21,3 +24,19 @@ export function addSyncTransport() { } engine.addTransport(transport) } + +let userId: number +async function getUser() { + const data = await getUserData({}) + if (data.data?.userId) { + userId = componentNumberFromName(data.data?.userId) + } +} +void getUser() + +export const addNetworkEntity = (entity: Entity) => { + if (!userId) { + throw new Error('Invalid user address') + } + NetworkEntity.create(entity, { entityId: entity, userId }) +} diff --git a/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/ui.tsx b/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/ui.tsx index f7c0c18cc..b0b5ae1f8 100644 --- a/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/ui.tsx +++ b/test/build-ecs/fixtures/sdk7-humming-birds-sync/src/ui.tsx @@ -4,6 +4,8 @@ import ReactEcs, { Button, Label, ReactEcsRenderer, UiEntity } from '@dcl/sdk/re import { Bird, BirdKilled } from './hummingBird' import { PlayersConnected } from '@dcl/sdk/network-transport' import { GameStatus } from '.' +import { createCircle, createCube, createTriangle } from './create-cube' +import { addNetworkEntity } from './message-bus-sync' let scoreBoard: [string, number][] = [] let scoreInterval = 0 @@ -116,6 +118,20 @@ export function setupUi(userId: string) { fontSize={18} uiTransform={{ width: '100%', height: 40 }} /> +