Skip to content

Commit

Permalink
wip sync parent entities
Browse files Browse the repository at this point in the history
  • Loading branch information
gonpombo8 committed Nov 22, 2023
1 parent 257fb1c commit 4a6f488
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 89 deletions.
13 changes: 11 additions & 2 deletions packages/@dcl/ecs/src/components/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { defineTweenComponent, TweenComponentDefinitionExtended } from './extend
import { LwwComponentGetter, GSetComponentGetter } from './generated/index.gen'
import defineNameComponent, { NameType } from './manual/Name'
import defineSyncComponent, { ISyncComponentsType } from './manual/SyncComponents'
import defineEntityNetwork, { INetowrkEntityType } from './manual/NetworkEntity'
import defineNetworkEntity, { INetowrkEntityType } from './manual/NetworkEntity'
import defineNetworkParent, { INetowrkParentType } from './manual/NetworkParent'
import { defineTransformComponent, TransformComponentExtended } from './manual/Transform'

export * from './generated/index.gen'
Expand Down Expand Up @@ -64,4 +65,12 @@ export const SyncComponents: (
/* @__PURE__ */
export const NetworkEntity: (
engine: Pick<IEngine, 'defineComponent'>
) => LastWriteWinElementSetComponentDefinition<INetowrkEntityType> = (engine) => defineEntityNetwork(engine)
) => LastWriteWinElementSetComponentDefinition<INetowrkEntityType> = (engine) => defineNetworkEntity(engine)

/**
* @alpha
*/
/* @__PURE__ */
export const NetworkParent: (
engine: Pick<IEngine, 'defineComponent'>
) => LastWriteWinElementSetComponentDefinition<INetowrkParentType> = (engine) => defineNetworkParent(engine)
20 changes: 20 additions & 0 deletions packages/@dcl/ecs/src/components/manual/NetworkParent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Entity } from '../../engine'
import { IEngine, LastWriteWinElementSetComponentDefinition } from '../../engine/types'
import { Schemas } from '../../schemas'

export interface INetowrkParentType {
networkId: number
entityId: Entity
}

export type INetowrkParent = LastWriteWinElementSetComponentDefinition<INetowrkParentType>

function defineNetworkParentComponent(engine: Pick<IEngine, 'defineComponent'>) {
const EntityNetwork = engine.defineComponent('core-schema::Network-Parent', {
networkId: Schemas.Int64,
entityId: Schemas.Entity
})
return EntityNetwork
}

export default defineNetworkParentComponent
1 change: 1 addition & 0 deletions packages/@dcl/ecs/src/components/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export type { TransformComponentExtended, TransformTypeWithOptionals } from './m
export type { NameComponent, NameType } from './manual/Name'
export type { ISyncComponents, ISyncComponentsType } from './manual/SyncComponents'
export type { INetowrkEntity, INetowrkEntityType } from './manual/NetworkEntity'
export type { INetowrkParent, INetowrkParentType } from './manual/NetworkParent'
8 changes: 7 additions & 1 deletion packages/@dcl/ecs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import {
AnimatorComponentDefinitionExtended,
ISyncComponents,
TweenComponentDefinitionExtended,
INetowrkEntity
INetowrkEntity,
INetowrkParent
} from './components/types'
import { NameComponent } from './components/manual/Name'

Expand All @@ -50,6 +51,11 @@ export const SyncComponents: ISyncComponents = /* @__PURE__*/ components.SyncCom
* Tag a entity to be syncronized through comms
*/
export const NetworkEntity: INetowrkEntity = /* @__PURE__*/ components.NetworkEntity(engine)
/**
* @alpha
* Tag a entity to be syncronized through comms
*/
export const NetworkParent: INetowrkParent = /* @__PURE__*/ components.NetworkParent(engine)

// export components for global engine
export * from './components/generated/global.gen'
Expand Down
33 changes: 30 additions & 3 deletions packages/@dcl/ecs/src/serialization/crdt/network/utils.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { Entity } from '../../../engine'
import { ReceiveMessage } from '../../../runtime/types'
import { ReceiveMessage, TransformType } from '../../../runtime/types'
import { ReceiveNetworkMessage } from '../../../systems/crdt/types'
import { ByteBuffer } from '../../ByteBuffer'
import { ByteBuffer, ReadWriteByteBuffer } from '../../ByteBuffer'
import { PutComponentOperation } from '../putComponent'
import { CrdtMessageType } from '../types'
import {
CrdtMessageType,
PutComponentMessageBody,

Check warning on line 8 in packages/@dcl/ecs/src/serialization/crdt/network/utils.ts

View workflow job for this annotation

GitHub Actions / lint

'PutComponentMessageBody' is defined but never used. Allowed unused vars must match /^_|ReactEcs/u
PutNetworkComponentMessage,

Check warning on line 9 in packages/@dcl/ecs/src/serialization/crdt/network/utils.ts

View workflow job for this annotation

GitHub Actions / lint

'PutNetworkComponentMessage' is defined but never used. Allowed unused vars must match /^_|ReactEcs/u
PutNetworkComponentMessageBody

Check warning on line 10 in packages/@dcl/ecs/src/serialization/crdt/network/utils.ts

View workflow job for this annotation

GitHub Actions / lint

'PutNetworkComponentMessageBody' is defined but never used. Allowed unused vars must match /^_|ReactEcs/u
} from '../types'
import { DeleteComponent } from '../deleteComponent'
import { DeleteEntity } from '../deleteEntity'
import { INetowrkEntityType } from '../../../components/types'
import { PutNetworkComponentOperation } from './putComponentNetwork'
import { DeleteComponentNetwork } from './deleteComponentNetwork'
import { DeleteEntityNetwork } from './deleteEntityNetwork'
import { TransformSchema } from '../../../components/manual/Transform'

export function isNetworkMessage(message: ReceiveMessage): message is ReceiveNetworkMessage {
return [
Expand Down Expand Up @@ -59,3 +65,24 @@ export function localMessageToNetwork(
}
destinationBuffer.writeBuffer(buffer.buffer().subarray(offset, buffer.currentWriteOffset()), false)
}

export function fixTransformParent(
message: ReceiveMessage,
transformValue?: TransformType,
parent?: Entity
): Uint8Array {
let transform = transformValue
const buffer = new ReadWriteByteBuffer()
if (!transform && 'data' in message) {
buffer.writeBuffer(message.data)
transform = TransformSchema.deserialize(buffer)
buffer.resetBuffer()
}
if (!transform) throw new Error('Invalid parent transform')
// Generate new transform raw data with the parent
const newTransform = { ...transform, parent }
console.log('Fix transform', message.entityId, newTransform)
buffer.resetBuffer()
TransformSchema.serialize(newTransform, buffer)
return buffer.toBinary()
}
76 changes: 62 additions & 14 deletions packages/@dcl/ecs/src/systems/crdt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import { PutComponentOperation } from '../../serialization/crdt/putComponent'
import { CrdtMessageType, CrdtMessageHeader, CrdtMessage } from '../../serialization/crdt/types'
import { ReceiveMessage, Transport } from './types'
import { PutNetworkComponentOperation } from '../../serialization/crdt/network/putComponentNetwork'
import { NetworkEntity as defineNetworkEntity } from '../../components'
import {
NetworkEntity as defineNetworkEntity,
NetworkParent as defineNetworkParent,
Transform as defineTransform
} from '../../components'
import { INetowrkEntityType } from '../../components/types'
import * as networkUtils from '../../serialization/crdt/network/utils'

Expand All @@ -33,7 +37,12 @@ export type OnChangeFunction = (
*/
export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChange: OnChangeFunction | null) {
const transports: Transport[] = []

// Components that we used on this system
const NetworkEntity = defineNetworkEntity(engine)
const NetworkParent = defineNetworkParent(engine)
const Transform = defineTransform(engine)

// Messages that we received at transport.onMessage waiting to be processed
const receivedMessages: ReceiveMessage[] = []
// Messages already processed by the engine but that we need to broadcast to other transports.
Expand Down Expand Up @@ -101,16 +110,20 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
* It's a mapping Network -> to Local
* If it's not a network message, return the entityId received by the message
*/
function findNetworkId(msg: ReceiveMessage): { entityId: Entity; network?: INetowrkEntityType } {
if (!networkUtils.isNetworkMessage(msg)) {
return { entityId: msg.entityId }
}
function findNetworkId(msg: { entityId: Entity; networkId?: number }): {
entityId: Entity
network?: INetowrkEntityType
} {
const hasNetworkId = 'networkId' in msg

for (const [entityId, network] of engine.getEntitiesWith(NetworkEntity)) {
if (network.networkId === msg.networkId && network.entityId === msg.entityId) {
return { entityId, network }
if (hasNetworkId) {
for (const [entityId, network] of engine.getEntitiesWith(NetworkEntity)) {
if (network.networkId === msg.networkId && network.entityId === msg.entityId) {
return { entityId, network }
}
}
}

return { entityId: msg.entityId }
}

Expand All @@ -128,7 +141,8 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang
// We receive a new Entity. Create the localEntity and map it to the NetworkEntity component
if (networkUtils.isNetworkMessage(msg) && !network) {
entityId = engine.addEntity()
NetworkEntity.createOrReplace(entityId, { entityId: msg.entityId, networkId: msg.networkId })
network = { entityId: msg.entityId, networkId: msg.networkId }
NetworkEntity.createOrReplace(entityId, network)
}
if (msg.type === CrdtMessageType.DELETE_ENTITY || msg.type === CrdtMessageType.DELETE_ENTITY_NETWORK) {
entitiesShouldBeCleaned.push(entityId)
Expand All @@ -148,8 +162,15 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang

/* istanbul ignore else */
if (component) {
if (
msg.type === CrdtMessageType.PUT_COMPONENT &&
component.componentId === Transform.componentId &&
NetworkEntity.has(entityId) &&
NetworkParent.has(entityId)
) {
msg.data = networkUtils.fixTransformParent(msg)
}
const [conflictMessage, value] = component.updateFromCrdt({ ...msg, entityId })

if (!conflictMessage) {
// Add message to transport queue to be processed by others transports
broadcastMessages.push(msg)
Expand Down Expand Up @@ -238,12 +259,39 @@ export function crdtSceneSystem(engine: PreEngine, onProcessEntityComponentChang

// Redundant message for the transport
if (!transport.filter(message)) continue
const { entityId } = findNetworkId(message)

const transformNeedsFix =
'componentId' in message &&
message.componentId === Transform.componentId &&
Transform.has(entityId) &&
NetworkParent.has(entityId) &&
NetworkEntity.has(entityId)

// If there was a LOCAL change in the transform. Add the parent to that transform
if (isRendererTransport && message.type === CrdtMessageType.PUT_COMPONENT && transformNeedsFix) {
const parent = findNetworkId(NetworkParent.get(entityId))
const transformData = networkUtils.fixTransformParent(message, Transform.get(entityId), parent.entityId)
const offset = buffer.currentWriteOffset()
PutComponentOperation.write(entityId, message.timestamp, message.componentId, transformData, buffer)
transportBuffer.writeBuffer(buffer.buffer().subarray(offset, buffer.currentWriteOffset()), false)
continue
}

// If it's the renderer transport and its a NetworkMessage, we need to fix the entityId field and convert it to a known Message.
// PUT_NETWORK_COMPONENT -> PUT_COMPONENT
if (isRendererTransport && networkUtils.isNetworkMessage(message)) {
const { entityId } = findNetworkId(message)
networkUtils.networkMessageToLocal(message, entityId, buffer, transportBuffer)
// If it's the renderer transport and its a NetworkMessage, we need to fix the entityId field and convert it to a known Message.
// PUT_NETWORK_COMPONENT -> PUT_COMPONENT
let transformData: Uint8Array = 'data' in message ? message.data : new Uint8Array()
if (transformNeedsFix) {
const parent = findNetworkId(NetworkParent.get(entityId))
transformData = networkUtils.fixTransformParent(message, Transform.get(entityId), parent.entityId)
}
networkUtils.networkMessageToLocal(
{ ...message, data: transformData } as any,
entityId,
buffer,
transportBuffer
)
// Iterate the next message
continue
}
Expand Down
58 changes: 30 additions & 28 deletions packages/@dcl/sdk/src/network/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,47 +74,49 @@ B creates sync entity 514 (child)
Transform.create({ parent, position: {} })
syncEntity(child, [Transform.componentId], SyncEntities.ChildDoor)

So now client A has different raw data for the Transform with client B, because they have different parents.
So now client A & B had different raw data for the Transform component, because they have different parents.
Meaning that we have an inconsistent CRDT State between two clients.
So if there is a new message comming from client C we could have conflicts for client A but maybe not for client B.
I want to cry.

Same problem would happen if a client after some interaction (i.e. a bullet) creates an entity with a parent. For the client A this could be { parent: 515, child: 516 } but for another user those entities are not going to be the same ones.

Solution 1:
What if we introduce a new ParentSync.
This ParentSync will be in charge of syncronizing the parenting. If we have ParentSync then we should always ignore the Transform.parent property.
The parentSync will have both entityId and networkId such as the PutNetworkMessage so we can map the entity in every client.
ParentSync.Schema = { parent: Entity; networkId: number }.
Being the networkId the id of the user that owns that parent entity.
const newParent: Entity = someEntityWeWantToUpdate() // 512
this new parent returns the entity of this client, but it may not be the client who owns this entity.
const networkParent = NetworkEntity.getOrNull(newParent) // { entityId: 1025, networkId: random }
ParentSync.create(child, networkParent)

Now imagine that client A want to create a new parent for a child that originally was created by Client B
What if we introduce a new ParentNetwork component.
This ParentNetwork will be in charge of syncronizing the parenting. If we have ParentNetwork then we should always ignore the Transform.parent property.
The ParentNetwork will have both entityId and networkId such as the PutNetworkMessage so we can map the entity in every client.

ParentNetwork.Schema = { entityId: Entity; networkId: number }.

Being the networkId the id of the user that owns that parent entity, and the entityId the parent entityId of the user that creates that entity.
So with this two values, we cant map the real parent entity id on every client.

```ts
const parent = engine.addEntity()
Transform.create(parent, { position: somePosition })
import { syncEntity, parentEntity } from '@dcl/sdk/network'
const parentEntity = engine.addEntity()
Transform.create(parentEntity, { position: somePosition })
syncEntity(parent, Transform.componentId)
const childEntity: Entity = someEntityWeWantToUpdate()
ParentSync.createOrReplace(childEntity, { parent, networkId: 'clientA' })
const childEntity: Entity = engine.addEntity()
syncEntity(childEntity, Transform.componentId)

// create parentNetwork component. This maybe could be done in a system and use the original parent. TBD
parentEntity(childEntity, parentEntity)
```
This will generate two PUT_NETWORK_COMPONENT messages.
One for the parent entity with the transform component ( networkId: clientA, entityId: parent )
And another for the ParentSync of the child entity that was originally created by client B (networkId: clientB, entityId: child).
Every client will know how to map this entity because the ParentSync has the pointers to the parent entity.
ParentSync will point to the entity that was created on the first message.

But we need to fix the parenting for the renderer, so it doesnt know about this logic
So every time we send a Transform component to the renderer, we should update the transform.parent property with the mapped Entity that we fetch from the ParentSync.
if (isTransform(message) && isRendererTransport && ParentSync.getOrNull(message.entityId)) {
Every client will know how to map this entity because the ParentNetwork has the pointers to the parent entity. But we are still having an issue, the parent is not defined. We need to tell the renderer that the child entity has a parent property.

So every time we send a Transform component to the renderer, we should update the transform.parent property with the mapped Entity that we fetch from the ParentNetwork.
if (isTransform(message) && isRendererTransport && ParentNetwork.getOrNull(message.entityId)) {
// Generate a new transform raw data with the parent property included
}

And every time we recieve a message from the renderer, we should remove the parent property to keep consistency in all CRDT state clients.
if (isTransform(message) && message.type === CrdtMessageType.PUT_COMPONENT && ParentSync.has(message.entityId)) {
if (isTransform(message) && message.type === CrdtMessageType.PUT_COMPONENT && ParentNetwork.has(message.entityId)) {
transform.parent = null
// Generate a new transform raw data without the parent property included
}

With this approach, all the clients will have the same Transform, so we avoid the inconsistency of crdt's state.
And when some user wants to update the transform, it has to modify the ParentSync and will update both values, the parent & the network.
And when some user wants to update the transform, it has to modify the ParentNetwork and will update both values, the parent & the network.

I think this will work but there are some developer experience issues, like using the `parentEntity(child, parent)` function instead of the transform.parent.
This could end up with a lot of unexepcted issues/bugs. Maybe we can have a system that iterates over every syncronized entity and when the transform.parent changes, add the parentEntity function automatically.
First I wanna try to implement all of this and then came up with this approach to avoid inconsistencies
2 changes: 1 addition & 1 deletion packages/@dcl/sdk/src/network/binary-message-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export function BinaryMessageBus<T extends CommsMessage>(send: (message: Uint8Ar
mapping.set(message, callback)
},
emit: <K extends T>(message: K, value: Uint8Array) => {
console.log('[EMIT]: ', message, value)
console.log('[EMIT]: ', message)
send(craftMessage<T>(message, value))
},
__processMessages: (messages: Uint8Array[]) => {
Expand Down
3 changes: 2 additions & 1 deletion packages/@dcl/sdk/src/network/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { syncEntity } from './sync-entity'
import { addSyncTransport } from './message-bus-sync'
import { getNetworkId } from './utils'
import { parentEntity } from './parent'

export { syncEntity, addSyncTransport, getNetworkId }
export { syncEntity, addSyncTransport, getNetworkId, parentEntity }
7 changes: 3 additions & 4 deletions packages/@dcl/sdk/src/network/message-bus-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { engineToCrdt } from './state'
import { serializeCrdtMessages } from '../internal/transports/logger'
import { BinaryMessageBus, CommsMessage } from './binary-message-bus'
import {
addOnLeaveSceneListener,
getOwnProfile,
oldestUser,
setInitialized,
Expand All @@ -33,6 +32,7 @@ export async function addSyncTransport() {
filter: syncFilter,
send: async (message: Uint8Array) => {
if (syncTransportIsReady() && message.byteLength) {
console.log(Array.from(serializeCrdtMessages('[send CRDT]: ', message, engine)))
binaryMessageBus.emit(CommsMessage.CRDT, message)
}
const messages = getMessagesToSend()
Expand All @@ -47,9 +47,6 @@ export async function addSyncTransport() {
// Add state intialized checker
engine.addSystem(stateInitializedChecker)

// Listener to have the oldest list up-to-date
addOnLeaveSceneListener()

// Request initial state
binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, new Uint8Array())

Expand All @@ -71,6 +68,8 @@ export async function addSyncTransport() {

// Process CRDT messages here
binaryMessageBus.on(CommsMessage.CRDT, (value) => {
console.log(Array.from(serializeCrdtMessages('[CRDT on]: ', value, engine)))

transport.onmessage!(value)
})
}
Loading

0 comments on commit 4a6f488

Please sign in to comment.