Skip to content

Commit

Permalink
bigint helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Nov 25, 2024
1 parent 9812835 commit e99aad0
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 43 deletions.
49 changes: 24 additions & 25 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import {
ConnectionQualityUpdate,
type DataPacket,
DataPacket_Kind,
DataStream_Chunk,
DataStream_Header,
DataStream_Packet,
DataStream_StreamType,
DisconnectReason,
JoinResponse,
LeaveRequest,
Expand Down Expand Up @@ -85,6 +84,7 @@ import {
} from './types';
import {
Future,
bigIntToNumber,
createDummyVideoStreamTrack,
extractChatMessage,
extractTranscriptionSegments,
Expand Down Expand Up @@ -942,7 +942,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
req = new SimulateScenario({
scenario: {
case: 'subscriberBandwidth',
value: BigInt(arg),
value: numberToBigInt(arg),
},
});
break;
Expand Down Expand Up @@ -1569,7 +1569,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.getParticipantByIdentity(packet.participantIdentity),
);
} else if (packet.value.value.topic === 'streamchunk') {
this.handleStreamChunk(DataStream_Packet.fromBinary(packet.value.value.payload));
this.handleStreamChunk(DataStream_Chunk.fromBinary(packet.value.value.payload));
}
} else if (packet.value.case === 'transcription') {
this.handleTranscription(participant, packet.value.value);
Expand All @@ -1594,7 +1594,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
{
start: (controller) => {
streamController = controller;
this.fileStreamBuffer.set(streamHeader.messageId, {
this.fileStreamBuffer.set(streamHeader.streamId, {
header: streamHeader,
chunks: [],
streamController,
Expand All @@ -1603,17 +1603,17 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
},
},
undefined,
Number(streamHeader.totalChunks),
bigIntToNumber(streamHeader.totalChunks),
);
this.emit(
RoomEvent.FileStreamReceived,
{
messageId: streamHeader.messageId,
id: streamHeader.streamId,
fileName: streamHeader.contentHeader.value.fileName ?? 'unknown',
mimeType: streamHeader.mimeType,
size: Number(streamHeader.totalLength),
size: streamHeader.totalLength ? Number(streamHeader.totalLength) : undefined,
topic: streamHeader.topic,
timestamp: Number(streamHeader.timestamp),
timestamp: bigIntToNumber(streamHeader.timestamp),
extensions: streamHeader.extensions,
},
stream,
Expand All @@ -1625,7 +1625,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
{
start: (controller) => {
streamController = controller;
this.textStreamBuffer.set(streamHeader.messageId, {
this.textStreamBuffer.set(streamHeader.streamId, {
header: streamHeader,
chunks: [],
streamController,
Expand All @@ -1634,16 +1634,15 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
},
},
undefined,
Number(streamHeader.totalChunks),
bigIntToNumber(streamHeader.totalChunks),
);
this.emit(
RoomEvent.TextStreamReceived,
{
messageId: streamHeader.messageId,
id: streamHeader.streamId,
mimeType: streamHeader.mimeType,
size: Number(streamHeader.totalLength),
size: streamHeader.totalLength ? Number(streamHeader.totalLength) : undefined,
topic: streamHeader.topic,
isFinite: streamHeader.streamType === DataStream_StreamType.FINITE,
timestamp: Number(streamHeader.timestamp),
extensions: streamHeader.extensions,
},
Expand All @@ -1653,35 +1652,35 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

private handleStreamChunk(chunk: DataStream_Packet) {
private handleStreamChunk(chunk: DataStream_Chunk) {
console.log('received chunk', chunk.chunkIndex);

const fileBuffer = this.fileStreamBuffer.get(chunk.messageId);
const fileBuffer = this.fileStreamBuffer.get(chunk.streamId);
if (fileBuffer) {
if (chunk.contentLength > 0) {
if (chunk.content.length > 0) {
fileBuffer.streamController.enqueue(chunk.content);
fileBuffer.chunks.push(Number(chunk.chunkIndex));
fileBuffer.chunks.push(bigIntToNumber(chunk.chunkIndex));
}
if (
fileBuffer.chunks.length === Number(fileBuffer.header.totalChunks) ||
fileBuffer.chunks.length === bigIntToNumber(fileBuffer.header.totalChunks) ||
chunk.complete === true
) {
fileBuffer.streamController.close();
this.fileStreamBuffer.delete(chunk.messageId);
this.fileStreamBuffer.delete(chunk.streamId);
}
}
const textBuffer = this.textStreamBuffer.get(chunk.messageId);
const textBuffer = this.textStreamBuffer.get(chunk.streamId);
if (textBuffer) {
if (chunk.contentLength > 0) {
if (chunk.content.length > 0) {
textBuffer.streamController.enqueue(new TextDecoder().decode(chunk.content));
textBuffer.chunks.push(Number(chunk.chunkIndex));
textBuffer.chunks.push(bigIntToNumber(chunk.chunkIndex));
}
if (
textBuffer.chunks.length === Number(textBuffer.header.totalChunks) ||
textBuffer.chunks.length === bigIntToNumber(textBuffer.header.totalChunks) ||
chunk.complete === true
) {
textBuffer.streamController.close();
this.fileStreamBuffer.delete(chunk.messageId);
this.fileStreamBuffer.delete(chunk.streamId);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/room/StreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class StreamReader<T> extends ReadableStream<T> {
}

constructor(
underlyingSource?: UnderlyingSource<T>,
underlyingSource: UnderlyingDefaultSource<T>,
strategy?: QueuingStrategy<T>,
totalChunkCount?: number,
) {
Expand Down
27 changes: 14 additions & 13 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import {
isSVCCodec,
isSafari17,
isWeb,
numberToBigInt,
sleep,
supportsAV1,
supportsVP9,
Expand Down Expand Up @@ -1490,11 +1491,11 @@ export default class LocalParticipant extends Participant {

const header = new DataStream_Header({
streamId,
totalChunks: BigInt(totalChunks),
totalLength: BigInt(totalLength),
mimeType: 'plain/text',
totalChunks: numberToBigInt(totalChunks),
totalLength: numberToBigInt(totalLength),
mimeType: 'text/plain',
topic: options?.topic,
timestamp: BigInt(Date.now()),
timestamp: numberToBigInt(Date.now()),
contentHeader: {
case: 'textHeader',
value: new DataStream_TextHeader({
Expand All @@ -1517,7 +1518,7 @@ export default class LocalParticipant extends Participant {
const chunk = new DataStream_Chunk({
content: chunkData,
streamId,
chunkIndex: BigInt(i),
chunkIndex: numberToBigInt(i),
complete: i === totalChunks - 1,
});
await this.publishData(chunk.toBinary(), {
Expand All @@ -1543,9 +1544,9 @@ export default class LocalParticipant extends Participant {
const streamId = crypto.randomUUID();
const header = new DataStream_Header({
streamId,
mimeType: 'plain/text',
mimeType: 'text/plain',
topic: options?.topic,
timestamp: BigInt(Date.now()),
timestamp: numberToBigInt(Date.now()),
contentHeader: {
case: 'textHeader',
value: new DataStream_TextHeader({
Expand Down Expand Up @@ -1575,7 +1576,7 @@ export default class LocalParticipant extends Participant {
const chunk = new DataStream_Chunk({
content: textInBytes,
streamId,
chunkIndex: BigInt(chunkId),
chunkIndex: numberToBigInt(chunkId),
});
await localP.publishData(chunk.toBinary(), {
reliable: true,
Expand All @@ -1589,7 +1590,7 @@ export default class LocalParticipant extends Participant {
close() {
const chunk = new DataStream_Chunk({
streamId,
chunkIndex: BigInt(chunkId),
chunkIndex: numberToBigInt(chunkId),
complete: true,
});
localP.publishData(chunk.toBinary(), {
Expand Down Expand Up @@ -1639,13 +1640,13 @@ export default class LocalParticipant extends Participant {
const totalLength = file.size;
const totalChunks = Math.ceil(totalLength / STREAM_CHUNK_SIZE);
const header = new DataStream_Header({
totalChunks: BigInt(totalChunks),
totalLength: BigInt(totalLength),
totalChunks: numberToBigInt(totalChunks),
totalLength: numberToBigInt(totalLength),
mimeType: options?.mimeType ?? file.type,
streamId,
topic: options?.topic,
encryptionType: options?.encryptionType,
timestamp: BigInt(Date.now()),
timestamp: numberToBigInt(Date.now()),
contentHeader: {
case: 'fileHeader',
value: new DataStream_FileHeader({
Expand Down Expand Up @@ -1676,7 +1677,7 @@ export default class LocalParticipant extends Participant {
const chunk = new DataStream_Chunk({
content: chunkData,
streamId,
chunkIndex: BigInt(i),
chunkIndex: numberToBigInt(i),
complete: i === totalChunks - 1,
});
await this.publishData(chunk.toBinary(), {
Expand Down
7 changes: 3 additions & 4 deletions src/room/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,16 @@ export interface StreamBuffer<T extends string | Uint8Array> {
}

interface BaseStreamInfo {
messageId: string;
id: string;
mimeType: string;
topic: string;
timestamp: number;
/** total size in bytes for finite streams and undefined for streams of unknown size */
size?: number;
extensions?: Record<string, string>;
}
export interface FileStreamInfo extends BaseStreamInfo {
fileName: string;
}

export interface TextStreamInfo extends BaseStreamInfo {
isFinite: boolean;
}
export interface TextStreamInfo extends BaseStreamInfo {}
14 changes: 14 additions & 0 deletions src/room/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,17 @@ export function getDisconnectReasonFromConnectionError(e: ConnectionError) {
return DisconnectReason.UNKNOWN_REASON;
}
}

/** convert bigints to numbers preserving undefined values */
export function bigIntToNumber<T extends BigInt | undefined>(
value: T,
): T extends BigInt ? number : undefined {
return (value !== undefined ? Number(value) : undefined) as T extends BigInt ? number : undefined;
}

/** convert numbers to bigints preserving undefined values */
export function numberToBigInt<T extends number | undefined>(
value: T,
): T extends number ? bigint : undefined {
return (value !== undefined ? BigInt(value) : undefined) as T extends number ? bigint : undefined;
}

0 comments on commit e99aad0

Please sign in to comment.