From e99aad0f603c1d772f55df070a78554a8ad4fbd6 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 25 Nov 2024 17:15:49 +0100 Subject: [PATCH] bigint helpers --- src/room/Room.ts | 49 ++++++++++++------------ src/room/StreamReader.ts | 2 +- src/room/participant/LocalParticipant.ts | 27 ++++++------- src/room/types.ts | 7 ++-- src/room/utils.ts | 14 +++++++ 5 files changed, 56 insertions(+), 43 deletions(-) diff --git a/src/room/Room.ts b/src/room/Room.ts index f66387cc86..5606050e6e 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -4,9 +4,8 @@ import { ConnectionQualityUpdate, type DataPacket, DataPacket_Kind, + DataStream_Chunk, DataStream_Header, - DataStream_Packet, - DataStream_StreamType, DisconnectReason, JoinResponse, LeaveRequest, @@ -85,6 +84,7 @@ import { } from './types'; import { Future, + bigIntToNumber, createDummyVideoStreamTrack, extractChatMessage, extractTranscriptionSegments, @@ -942,7 +942,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) req = new SimulateScenario({ scenario: { case: 'subscriberBandwidth', - value: BigInt(arg), + value: numberToBigInt(arg), }, }); break; @@ -1569,7 +1569,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.getParticipantByIdentity(packet.participantIdentity), ); } else if (packet.value.value.topic === 'streamchunk') { - this.handleStreamChunk(DataStream_Packet.fromBinary(packet.value.value.payload)); + this.handleStreamChunk(DataStream_Chunk.fromBinary(packet.value.value.payload)); } } else if (packet.value.case === 'transcription') { this.handleTranscription(participant, packet.value.value); @@ -1594,7 +1594,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) { start: (controller) => { streamController = controller; - this.fileStreamBuffer.set(streamHeader.messageId, { + this.fileStreamBuffer.set(streamHeader.streamId, { header: streamHeader, chunks: [], streamController, @@ -1603,17 +1603,17 @@ class Room extends (EventEmitter as new () => TypedEmitter) }, }, undefined, - Number(streamHeader.totalChunks), + bigIntToNumber(streamHeader.totalChunks), ); this.emit( RoomEvent.FileStreamReceived, { - messageId: streamHeader.messageId, + id: streamHeader.streamId, fileName: streamHeader.contentHeader.value.fileName ?? 'unknown', mimeType: streamHeader.mimeType, - size: Number(streamHeader.totalLength), + size: streamHeader.totalLength ? Number(streamHeader.totalLength) : undefined, topic: streamHeader.topic, - timestamp: Number(streamHeader.timestamp), + timestamp: bigIntToNumber(streamHeader.timestamp), extensions: streamHeader.extensions, }, stream, @@ -1625,7 +1625,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) { start: (controller) => { streamController = controller; - this.textStreamBuffer.set(streamHeader.messageId, { + this.textStreamBuffer.set(streamHeader.streamId, { header: streamHeader, chunks: [], streamController, @@ -1634,16 +1634,15 @@ class Room extends (EventEmitter as new () => TypedEmitter) }, }, undefined, - Number(streamHeader.totalChunks), + bigIntToNumber(streamHeader.totalChunks), ); this.emit( RoomEvent.TextStreamReceived, { - messageId: streamHeader.messageId, + id: streamHeader.streamId, mimeType: streamHeader.mimeType, - size: Number(streamHeader.totalLength), + size: streamHeader.totalLength ? Number(streamHeader.totalLength) : undefined, topic: streamHeader.topic, - isFinite: streamHeader.streamType === DataStream_StreamType.FINITE, timestamp: Number(streamHeader.timestamp), extensions: streamHeader.extensions, }, @@ -1653,35 +1652,35 @@ class Room extends (EventEmitter as new () => TypedEmitter) } } - private handleStreamChunk(chunk: DataStream_Packet) { + private handleStreamChunk(chunk: DataStream_Chunk) { console.log('received chunk', chunk.chunkIndex); - const fileBuffer = this.fileStreamBuffer.get(chunk.messageId); + const fileBuffer = this.fileStreamBuffer.get(chunk.streamId); if (fileBuffer) { - if (chunk.contentLength > 0) { + if (chunk.content.length > 0) { fileBuffer.streamController.enqueue(chunk.content); - fileBuffer.chunks.push(Number(chunk.chunkIndex)); + fileBuffer.chunks.push(bigIntToNumber(chunk.chunkIndex)); } if ( - fileBuffer.chunks.length === Number(fileBuffer.header.totalChunks) || + fileBuffer.chunks.length === bigIntToNumber(fileBuffer.header.totalChunks) || chunk.complete === true ) { fileBuffer.streamController.close(); - this.fileStreamBuffer.delete(chunk.messageId); + this.fileStreamBuffer.delete(chunk.streamId); } } - const textBuffer = this.textStreamBuffer.get(chunk.messageId); + const textBuffer = this.textStreamBuffer.get(chunk.streamId); if (textBuffer) { - if (chunk.contentLength > 0) { + if (chunk.content.length > 0) { textBuffer.streamController.enqueue(new TextDecoder().decode(chunk.content)); - textBuffer.chunks.push(Number(chunk.chunkIndex)); + textBuffer.chunks.push(bigIntToNumber(chunk.chunkIndex)); } if ( - textBuffer.chunks.length === Number(textBuffer.header.totalChunks) || + textBuffer.chunks.length === bigIntToNumber(textBuffer.header.totalChunks) || chunk.complete === true ) { textBuffer.streamController.close(); - this.fileStreamBuffer.delete(chunk.messageId); + this.fileStreamBuffer.delete(chunk.streamId); } } } diff --git a/src/room/StreamReader.ts b/src/room/StreamReader.ts index 6a6e07f347..274dbf6caa 100644 --- a/src/room/StreamReader.ts +++ b/src/room/StreamReader.ts @@ -14,7 +14,7 @@ export class StreamReader extends ReadableStream { } constructor( - underlyingSource?: UnderlyingSource, + underlyingSource: UnderlyingDefaultSource, strategy?: QueuingStrategy, totalChunkCount?: number, ) { diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index f61226b386..08e1d3f017 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -75,6 +75,7 @@ import { isSVCCodec, isSafari17, isWeb, + numberToBigInt, sleep, supportsAV1, supportsVP9, @@ -1490,11 +1491,11 @@ export default class LocalParticipant extends Participant { const header = new DataStream_Header({ streamId, - totalChunks: BigInt(totalChunks), - totalLength: BigInt(totalLength), - mimeType: 'plain/text', + totalChunks: numberToBigInt(totalChunks), + totalLength: numberToBigInt(totalLength), + mimeType: 'text/plain', topic: options?.topic, - timestamp: BigInt(Date.now()), + timestamp: numberToBigInt(Date.now()), contentHeader: { case: 'textHeader', value: new DataStream_TextHeader({ @@ -1517,7 +1518,7 @@ export default class LocalParticipant extends Participant { const chunk = new DataStream_Chunk({ content: chunkData, streamId, - chunkIndex: BigInt(i), + chunkIndex: numberToBigInt(i), complete: i === totalChunks - 1, }); await this.publishData(chunk.toBinary(), { @@ -1543,9 +1544,9 @@ export default class LocalParticipant extends Participant { const streamId = crypto.randomUUID(); const header = new DataStream_Header({ streamId, - mimeType: 'plain/text', + mimeType: 'text/plain', topic: options?.topic, - timestamp: BigInt(Date.now()), + timestamp: numberToBigInt(Date.now()), contentHeader: { case: 'textHeader', value: new DataStream_TextHeader({ @@ -1575,7 +1576,7 @@ export default class LocalParticipant extends Participant { const chunk = new DataStream_Chunk({ content: textInBytes, streamId, - chunkIndex: BigInt(chunkId), + chunkIndex: numberToBigInt(chunkId), }); await localP.publishData(chunk.toBinary(), { reliable: true, @@ -1589,7 +1590,7 @@ export default class LocalParticipant extends Participant { close() { const chunk = new DataStream_Chunk({ streamId, - chunkIndex: BigInt(chunkId), + chunkIndex: numberToBigInt(chunkId), complete: true, }); localP.publishData(chunk.toBinary(), { @@ -1639,13 +1640,13 @@ export default class LocalParticipant extends Participant { const totalLength = file.size; const totalChunks = Math.ceil(totalLength / STREAM_CHUNK_SIZE); const header = new DataStream_Header({ - totalChunks: BigInt(totalChunks), - totalLength: BigInt(totalLength), + totalChunks: numberToBigInt(totalChunks), + totalLength: numberToBigInt(totalLength), mimeType: options?.mimeType ?? file.type, streamId, topic: options?.topic, encryptionType: options?.encryptionType, - timestamp: BigInt(Date.now()), + timestamp: numberToBigInt(Date.now()), contentHeader: { case: 'fileHeader', value: new DataStream_FileHeader({ @@ -1676,7 +1677,7 @@ export default class LocalParticipant extends Participant { const chunk = new DataStream_Chunk({ content: chunkData, streamId, - chunkIndex: BigInt(i), + chunkIndex: numberToBigInt(i), complete: i === totalChunks - 1, }); await this.publishData(chunk.toBinary(), { diff --git a/src/room/types.ts b/src/room/types.ts index 1587330a33..287a2ed4e6 100644 --- a/src/room/types.ts +++ b/src/room/types.ts @@ -151,10 +151,11 @@ export interface StreamBuffer { } interface BaseStreamInfo { - messageId: string; + id: string; mimeType: string; topic: string; timestamp: number; + /** total size in bytes for finite streams and undefined for streams of unknown size */ size?: number; extensions?: Record; } @@ -162,6 +163,4 @@ export interface FileStreamInfo extends BaseStreamInfo { fileName: string; } -export interface TextStreamInfo extends BaseStreamInfo { - isFinite: boolean; -} +export interface TextStreamInfo extends BaseStreamInfo {} diff --git a/src/room/utils.ts b/src/room/utils.ts index 3b8c427310..8ac676c11f 100644 --- a/src/room/utils.ts +++ b/src/room/utils.ts @@ -548,3 +548,17 @@ export function getDisconnectReasonFromConnectionError(e: ConnectionError) { return DisconnectReason.UNKNOWN_REASON; } } + +/** convert bigints to numbers preserving undefined values */ +export function bigIntToNumber( + value: T, +): T extends BigInt ? number : undefined { + return (value !== undefined ? Number(value) : undefined) as T extends BigInt ? number : undefined; +} + +/** convert numbers to bigints preserving undefined values */ +export function numberToBigInt( + value: T, +): T extends number ? bigint : undefined { + return (value !== undefined ? BigInt(value) : undefined) as T extends number ? bigint : undefined; +}