diff --git a/examples/demo/demo.ts b/examples/demo/demo.ts index 228a613e30..bdaabbc9bb 100644 --- a/examples/demo/demo.ts +++ b/examples/demo/demo.ts @@ -264,6 +264,7 @@ const appActions = { room.getParticipantByIdentity(participant?.identity), ); } + appendLog('text stream finished'); } }) .on(RoomEvent.FileStreamReceived, async (reader, participant) => { @@ -568,7 +569,7 @@ async function sendGreetingTo(participant: Participant) { await streamWriter.write(char); await sleep(20); } - await streamWriter.releaseLock(); + await streamWriter.close(); } async function participantConnected(participant: Participant) { diff --git a/src/room/StreamWriter.ts b/src/room/StreamWriter.ts new file mode 100644 index 0000000000..6df9e779c8 --- /dev/null +++ b/src/room/StreamWriter.ts @@ -0,0 +1,24 @@ +class BaseStreamWriter { + protected writableStream: WritableStream; + + protected defaultWriter: WritableStreamDefaultWriter; + + constructor(writableStream: WritableStream) { + this.writableStream = writableStream; + this.defaultWriter = writableStream.getWriter(); + } + + write(chunk: T): Promise { + return this.defaultWriter.write(chunk); + } + + async close() { + await this.defaultWriter.close(); + this.defaultWriter.releaseLock(); + console.log('stream status', this.writableStream.locked); + } +} + +export class TextStreamWriter extends BaseStreamWriter {} + +export class BinaryStreamWriter extends BaseStreamWriter {} diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 25d5d987d2..1cf1ca9a9c 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -28,6 +28,7 @@ import { import type { InternalRoomOptions } from '../../options'; import { PCTransportState } from '../PCTransportManager'; import type RTCEngine from '../RTCEngine'; +import { TextStreamWriter } from '../StreamWriter'; import { defaultVideoCodec } from '../defaults'; import { DeviceUnsupportedError, @@ -1581,7 +1582,7 @@ export default class LocalParticipant extends Participant { async streamText(options?: { topic?: string; destinationIdentities?: Array; - }): Promise> { + }): Promise { const streamId = crypto.randomUUID(); const header = new DataStream_Header({ streamId, @@ -1607,7 +1608,18 @@ export default class LocalParticipant extends Participant { let chunkId = 0; const localP = this; + let onEngineClose = async () => { + if (writableStream.locked) { + console.warn('writable stream still locked'); + await writableStream.abort('engine closed'); + } else { + await writableStream.close(); + } + }; const writableStream = new WritableStream({ + start() { + localP.engine.once(EngineEvent.Closing, onEngineClose); + }, // Implement the sink write(textChunk) { const textInBytes = new TextEncoder().encode(textChunk); @@ -1637,17 +1649,21 @@ export default class LocalParticipant extends Participant { resolve(); }); }, - close() { + async close() { const chunk = new DataStream_Chunk({ streamId, chunkIndex: numberToBigInt(chunkId), complete: true, }); - localP.publishData(chunk.toBinary(), { - reliable: true, - topic: 'streamchunk', - destinationIdentities: options?.destinationIdentities, + const chunkPacket = new DataPacket({ + destinationIdentities, + value: { + case: 'streamChunk', + value: chunk, + }, }); + await localP.engine.sendDataPacket(chunkPacket, DataPacket_Kind.RELIABLE); + localP.engine.off(EngineEvent.Closing, onEngineClose); }, abort(err) { console.log('Sink error:', err); @@ -1655,13 +1671,9 @@ export default class LocalParticipant extends Participant { }, }); - this.engine.once(EngineEvent.Closing, () => { - if (writableStream.locked) { - writableStream.abort(); - } - }); + const writer = new TextStreamWriter(writableStream); - return writableStream.getWriter(); + return writer; } async sendFile(