Skip to content

Commit

Permalink
wip put component network operation
Browse files Browse the repository at this point in the history
  • Loading branch information
gonpombo8 committed Oct 18, 2023
1 parent 3848cf2 commit 7f46d9a
Show file tree
Hide file tree
Showing 14 changed files with 297 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
CrdtMessageType,
CrdtMessageBody,
PutComponentOperation,
DeleteComponent
DeleteComponent,
PutNetworkComponentMessageBody
} from '../serialization/crdt'
import { dataCompare } from '../systems/crdt/utils'
import { LastWriteWinElementSetComponentDefinition, ComponentType } from './component'
Expand Down Expand Up @@ -63,7 +64,7 @@ export function createUpdateLwwFromCrdt(
* @public
*/
function crdtRuleForCurrentState(
message: PutComponentMessageBody | DeleteComponentMessageBody
message: PutComponentMessageBody | DeleteComponentMessageBody | PutNetworkComponentMessageBody
): ProcessMessageResultType {
const { entityId, timestamp } = message
const currentTimestamp = timestamps.get(entityId as Entity)
Expand Down Expand Up @@ -109,7 +110,11 @@ export function createUpdateLwwFromCrdt(

return (msg: CrdtMessageBody): [null | PutComponentMessageBody | DeleteComponentMessageBody, any] => {
/* istanbul ignore next */
if (msg.type !== CrdtMessageType.PUT_COMPONENT && msg.type !== CrdtMessageType.DELETE_COMPONENT)
if (
msg.type !== CrdtMessageType.PUT_COMPONENT &&
msg.type !== CrdtMessageType.DELETE_COMPONENT &&
msg.type !== CrdtMessageType.PUT_NETWORK_COMPONENT
)
/* istanbul ignore next */
return [null, data.get(msg.entityId)]

Expand All @@ -120,7 +125,7 @@ export function createUpdateLwwFromCrdt(
case ProcessMessageResultType.StateUpdatedTimestamp: {
timestamps.set(entity, msg.timestamp)

if (msg.type === CrdtMessageType.PUT_COMPONENT) {
if (msg.type === CrdtMessageType.PUT_COMPONENT || msg.type === CrdtMessageType.PUT_NETWORK_COMPONENT) {
const buf = new ReadWriteByteBuffer(msg.data!)
data.set(entity, schema.deserialize(buf))
} else {
Expand Down
3 changes: 3 additions & 0 deletions packages/@dcl/ecs/src/serialization/crdt/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import { PutComponentOperation } from './putComponent'
import { DeleteComponent } from './deleteComponent'
import { DeleteEntity } from './deleteEntity'
import { AppendValueOperation } from './appendValue'
import { PutNetworkComponentOperation } from './putComponentNetwork'

export function readMessage(buf: ByteBuffer): CrdtMessage | null {
const header = CrdtMessageProtocol.getHeader(buf)
if (!header) return null

if (header.type === CrdtMessageType.PUT_COMPONENT) {
return PutComponentOperation.read(buf)
} else if (header.type === CrdtMessageType.PUT_NETWORK_COMPONENT) {
return PutNetworkComponentOperation.read(buf)
} else if (header.type === CrdtMessageType.DELETE_COMPONENT) {
return DeleteComponent.read(buf)
} else if (header.type === CrdtMessageType.APPEND_VALUE) {
Expand Down
64 changes: 64 additions & 0 deletions packages/@dcl/ecs/src/serialization/crdt/putComponentNetwork.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { CrdtMessageProtocol } from './crdtMessageProtocol'
import { Entity } from '../../engine/entity'
import { ByteBuffer } from '../ByteBuffer'
import { CrdtMessageType, CRDT_MESSAGE_HEADER_LENGTH, PutComponentMessage } from './types'

/**
* @public
*/
export namespace PutNetworkComponentOperation {
export const MESSAGE_HEADER_LENGTH = 20

/**
* Call this function for an optimal writing data passing the ByteBuffer
* already allocated
*/
export function write(
entity: Entity,
timestamp: number,
componentId: number,
networkId: number,
data: Uint8Array,
buf: ByteBuffer
) {
// reserve the beginning
const startMessageOffset = buf.incrementWriteOffset(CRDT_MESSAGE_HEADER_LENGTH + MESSAGE_HEADER_LENGTH)

// write body
buf.writeBuffer(data, false)
const messageLength = buf.currentWriteOffset() - startMessageOffset

// Write CrdtMessage header
buf.setUint32(startMessageOffset, messageLength)
buf.setUint32(startMessageOffset + 4, CrdtMessageType.PUT_NETWORK_COMPONENT)

// Write ComponentOperation header
buf.setUint32(startMessageOffset + 8, entity as number)
buf.setUint32(startMessageOffset + 12, componentId)
buf.setUint32(startMessageOffset + 16, timestamp)
buf.setUint32(startMessageOffset + 20, networkId)
const newLocal = messageLength - MESSAGE_HEADER_LENGTH - CRDT_MESSAGE_HEADER_LENGTH
buf.setUint32(startMessageOffset + 24, newLocal)
}

export function read(buf: ByteBuffer): (PutComponentMessage & { networkId: number }) | null {
const header = CrdtMessageProtocol.readHeader(buf)

if (!header) {
return null
}

if (header.type !== CrdtMessageType.PUT_NETWORK_COMPONENT) {
throw new Error('PutComponentOperation tried to read another message type.')
}

return {
...header,
entityId: buf.readUint32() as Entity,
componentId: buf.readUint32(),
timestamp: buf.readUint32(),
networkId: buf.readUint32(),
data: buf.readBuffer()
}
}
}
22 changes: 21 additions & 1 deletion packages/@dcl/ecs/src/serialization/crdt/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export enum CrdtMessageType {

DELETE_ENTITY = 3,
APPEND_VALUE = 4,
PUT_NETWORK_COMPONENT = 5,

MAX_MESSAGE_TYPE
}
Expand Down Expand Up @@ -50,6 +51,15 @@ export type PutComponentMessageBody = {
data: Uint8Array
}

export type PutNetworkComponentMessageBody = {
type: CrdtMessageType.PUT_NETWORK_COMPONENT
entityId: Entity
componentId: number
timestamp: number
data: Uint8Array
networkId: number
}

/**
* Min. length = header (8 bytes) + 16 bytes = 24 bytes
*
Expand Down Expand Up @@ -97,6 +107,10 @@ export type AppendValueMessage = CrdtMessageHeader & AppendValueMessageBody
* @public
*/
export type PutComponentMessage = CrdtMessageHeader & PutComponentMessageBody
/**
* @public
*/
export type PutNetworkComponentMessage = CrdtMessageHeader & PutNetworkComponentMessageBody
/**
* @public
*/
Expand All @@ -109,12 +123,18 @@ export type DeleteEntityMessage = CrdtMessageHeader & DeleteEntityMessageBody
/**
* @public
*/
export type CrdtMessage = PutComponentMessage | DeleteComponentMessage | DeleteEntityMessage | AppendValueMessage
export type CrdtMessage =
| PutComponentMessage
| PutNetworkComponentMessage
| DeleteComponentMessage
| DeleteEntityMessage
| AppendValueMessage

/**
* @public
*/
export type CrdtMessageBody =
| PutNetworkComponentMessageBody
| PutComponentMessageBody
| DeleteComponentMessageBody
| DeleteEntityMessageBody
Expand Down
Loading

0 comments on commit 7f46d9a

Please sign in to comment.