Skip to content

Commit

Permalink
custom streamwriter
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Dec 6, 2024
1 parent cde351e commit 2134144
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
3 changes: 2 additions & 1 deletion examples/demo/demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ const appActions = {
room.getParticipantByIdentity(participant?.identity),
);
}
appendLog('text stream finished');
}
})
.on(RoomEvent.FileStreamReceived, async (reader, participant) => {
Expand Down Expand Up @@ -568,7 +569,7 @@ async function sendGreetingTo(participant: Participant) {
await streamWriter.write(char);
await sleep(20);
}
await streamWriter.releaseLock();
await streamWriter.close();
}

async function participantConnected(participant: Participant) {
Expand Down
24 changes: 24 additions & 0 deletions src/room/StreamWriter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class BaseStreamWriter<T> {
protected writableStream: WritableStream<T>;

protected defaultWriter: WritableStreamDefaultWriter<T>;

constructor(writableStream: WritableStream<T>) {
this.writableStream = writableStream;
this.defaultWriter = writableStream.getWriter();
}

write(chunk: T): Promise<void> {
return this.defaultWriter.write(chunk);
}

async close() {
await this.defaultWriter.close();
this.defaultWriter.releaseLock();
console.log('stream status', this.writableStream.locked);
}
}

export class TextStreamWriter extends BaseStreamWriter<string> {}

export class BinaryStreamWriter extends BaseStreamWriter<Uint8Array> {}
36 changes: 24 additions & 12 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
import type { InternalRoomOptions } from '../../options';
import { PCTransportState } from '../PCTransportManager';
import type RTCEngine from '../RTCEngine';
import { TextStreamWriter } from '../StreamWriter';
import { defaultVideoCodec } from '../defaults';
import {
DeviceUnsupportedError,
Expand Down Expand Up @@ -1581,7 +1582,7 @@ export default class LocalParticipant extends Participant {
async streamText(options?: {
topic?: string;
destinationIdentities?: Array<string>;
}): Promise<WritableStreamDefaultWriter<string>> {
}): Promise<TextStreamWriter> {
const streamId = crypto.randomUUID();
const header = new DataStream_Header({
streamId,
Expand All @@ -1607,7 +1608,18 @@ export default class LocalParticipant extends Participant {

let chunkId = 0;
const localP = this;
let onEngineClose = async () => {
if (writableStream.locked) {
console.warn('writable stream still locked');
await writableStream.abort('engine closed');
} else {
await writableStream.close();
}
};
const writableStream = new WritableStream<string>({
start() {
localP.engine.once(EngineEvent.Closing, onEngineClose);
},
// Implement the sink
write(textChunk) {
const textInBytes = new TextEncoder().encode(textChunk);
Expand Down Expand Up @@ -1637,31 +1649,31 @@ export default class LocalParticipant extends Participant {
resolve();
});
},
close() {
async close() {
const chunk = new DataStream_Chunk({
streamId,
chunkIndex: numberToBigInt(chunkId),
complete: true,
});
localP.publishData(chunk.toBinary(), {
reliable: true,
topic: 'streamchunk',
destinationIdentities: options?.destinationIdentities,
const chunkPacket = new DataPacket({
destinationIdentities,
value: {
case: 'streamChunk',
value: chunk,
},
});
await localP.engine.sendDataPacket(chunkPacket, DataPacket_Kind.RELIABLE);
localP.engine.off(EngineEvent.Closing, onEngineClose);
},
abort(err) {
console.log('Sink error:', err);
// TODO handle aborts to signal something to receiver side
},
});

this.engine.once(EngineEvent.Closing, () => {
if (writableStream.locked) {
writableStream.abort();
}
});
const writer = new TextStreamWriter(writableStream);

return writableStream.getWriter();
return writer;
}

async sendFile(
Expand Down

0 comments on commit 2134144

Please sign in to comment.