From c4ecca3169d052785af52fdb747007807d14f012 Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Tue, 23 Jul 2024 15:18:38 -0700 Subject: [PATCH 01/14] feat(client): binary chunk streaming support --- .../app/streaming/audio/page.tsx | 119 ++++++++++++++++++ libs/client/package.json | 2 +- libs/client/src/function.ts | 2 +- libs/client/src/streaming.ts | 45 +++++-- 4 files changed, 158 insertions(+), 10 deletions(-) create mode 100644 apps/demo-nextjs-app-router/app/streaming/audio/page.tsx diff --git a/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx b/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx new file mode 100644 index 0000000..e66f644 --- /dev/null +++ b/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx @@ -0,0 +1,119 @@ +'use client'; + +import * as fal from '@fal-ai/serverless-client'; +import { useEffect, useRef, useState } from 'react'; + +fal.config({ + proxyUrl: '/api/fal/proxy', +}); + +type PlayHTInput = { + text: string; +}; + +export default function AudioStreamingDemo() { + const [streamStatus, setStreamStatus] = useState('idle'); + + const audioRef = useRef(null); + const mediaSourceRef = useRef(null); + const sourceBufferRef = useRef(null); + + useEffect(() => { + if (!audioRef.current) { + console.warn('Audio element not found or not ready'); + return; + } + const mimeCodec = 'audio/mpeg'; + + let sourceUrl: string | null = null; + if ('MediaSource' in window && MediaSource.isTypeSupported(mimeCodec)) { + const mediaSource = new MediaSource(); + mediaSourceRef.current = mediaSource; + sourceUrl = URL.createObjectURL(mediaSource); + console.log('sourceUrl', sourceUrl); + audioRef.current.src = sourceUrl; + + mediaSource.addEventListener('sourceopen', () => { + console.log('MediaSource ready - adding source buffer'); + const sourceBuffer = mediaSource.addSourceBuffer(mimeCodec); + // sourceBuffer.addEventListener('updateend', () => { + // mediaSource.endOfStream(); + // }); + sourceBufferRef.current = sourceBuffer; + }); + } else { + console.error( + 'Unsupported MIME type or MediaSource API is not available' + ); + } + + return () => { + if (sourceUrl) { + URL.revokeObjectURL(sourceUrl); + } + }; + }, []); + + const runInference = async () => { + const stream = await fal.stream( + 'fal-ai/playht-tts', + { + input: { + text: 'Do you know who drew this picture and what is the name of it?', + }, + } + ); + setStreamStatus('running'); + + stream.on('data', (data: Uint8Array) => { + const sourceBuffer = sourceBufferRef.current; + + audioRef.current?.play(); + + if (sourceBuffer) { + console.log('Appending buffer...'); + // sourceBuffer.appendBuffer(data); + // console.log('sourceBuffer', sourceBuffer); + } else { + console.warn('Source buffer not found or not ready'); + } + }); + + const result = await stream.done(); + console.log('result', result); + setStreamStatus('done'); + sourceBufferRef.current?.addEventListener('updateend', () => { + mediaSourceRef.current?.endOfStream(); + }); + }; + + return ( +
+
+

+ Hello fal +{' '} + streaming +

+ +
+ +
+ +
+
+

Answer

+ + streaming: {streamStatus} + +
+
+
+
+ ); +} diff --git a/libs/client/package.json b/libs/client/package.json index 6e979c5..c1433c3 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.14.0-alpha.3", + "version": "0.14.0-alpha.5", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/function.ts b/libs/client/src/function.ts index be10835..86e5e7f 100644 --- a/libs/client/src/function.ts +++ b/libs/client/src/function.ts @@ -390,7 +390,7 @@ export const queue: Queue = { ); }, timeout); } - status.on('message', (data: QueueStatus) => { + status.on('data', (data: QueueStatus) => { if (options.onQueueUpdate) { // accumulate logs to match previous polling behavior if ( diff --git a/libs/client/src/streaming.ts b/libs/client/src/streaming.ts index cd8d76d..e714d14 100644 --- a/libs/client/src/streaming.ts +++ b/libs/client/src/streaming.ts @@ -30,13 +30,19 @@ type StreamOptions = { * The HTTP method, defaults to `post`; */ readonly method?: 'get' | 'post' | 'put' | 'delete' | string; + + /** + * The content type the client accepts as response. + * By default this is set to `text/event-stream`. + */ + readonly accept?: string; }; const EVENT_STREAM_TIMEOUT = 15 * 1000; -type FalStreamEventType = 'message' | 'error' | 'done'; +type FalStreamEventType = 'data' | 'error' | 'done'; -type EventHandler = (event: any) => void; +type EventHandler = (event: T) => void; /** * The class representing a streaming response. With t @@ -91,7 +97,7 @@ export class FalStream { const response = await fetch(url, { method: method.toUpperCase(), headers: { - accept: 'text/event-stream', + accept: options.accept ?? 'text/event-stream', 'content-type': 'application/json', }, body: input && method !== 'get' ? JSON.stringify(input) : undefined, @@ -127,6 +133,26 @@ export class FalStream { ); return; } + + // any response that is not a text/event-stream will be handled as a binary stream + if (response.headers.get('content-type') !== 'text/event-stream') { + // pass the binary chunks to this.emit('data', chunk) + const reader = body.getReader(); + const emitRawChunk = () => { + reader.read().then(({ done, value }) => { + if (done) { + this.emit('done', this.currentData); + return; + } + this.currentData = value as Output; + this.emit('data', value); + emitRawChunk(); + }); + }; + emitRawChunk(); + return; + } + const decoder = new TextDecoder('utf-8'); const reader = response.body.getReader(); @@ -138,7 +164,10 @@ export class FalStream { const parsedData = JSON.parse(data); this.buffer.push(parsedData); this.currentData = parsedData; - this.emit('message', parsedData); + this.emit('data', parsedData); + + // also emit 'message'for backwards compatibility + this.emit('message' as any, parsedData); } catch (e) { this.emit('error', e); } @@ -242,16 +271,16 @@ export class FalStream { * object as a result, that can be used to get partial results through either * `AsyncIterator` or through an event listener. * - * @param appId the app id, e.g. `fal-ai/llavav15-13b`. + * @param endpointId the endpoint id, e.g. `fal-ai/llavav15-13b`. * @param options the request options, including the input payload. * @returns the `FalStream` instance. */ export async function stream, Output = any>( - appId: string, + endpointId: string, options: StreamOptions ): Promise> { - const token = await getTemporaryAuthToken(appId); - const url = buildUrl(appId, { path: '/stream' }); + const token = await getTemporaryAuthToken(endpointId); + const url = buildUrl(endpointId, { path: '/stream' }); const input = options.input && options.autoUpload !== false From 2c1a2e21d919e588aac0d13235a98143ac1c4094 Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Tue, 23 Jul 2024 15:30:16 -0700 Subject: [PATCH 02/14] fix: buffer update --- apps/demo-nextjs-app-router/app/streaming/audio/page.tsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx b/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx index e66f644..b624197 100644 --- a/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx +++ b/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx @@ -64,15 +64,14 @@ export default function AudioStreamingDemo() { } ); setStreamStatus('running'); + await audioRef.current?.play(); stream.on('data', (data: Uint8Array) => { const sourceBuffer = sourceBufferRef.current; - audioRef.current?.play(); - if (sourceBuffer) { console.log('Appending buffer...'); - // sourceBuffer.appendBuffer(data); + sourceBuffer.appendBuffer(data); // console.log('sourceBuffer', sourceBuffer); } else { console.warn('Source buffer not found or not ready'); @@ -99,6 +98,7 @@ export default function AudioStreamingDemo() { From 7cb65424054557185a7f1667cd6717a8a013f2d7 Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Tue, 23 Jul 2024 15:48:32 -0700 Subject: [PATCH 03/14] fix: audio playing --- .../app/streaming/audio/page.tsx | 55 +++++++++++++++---- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx b/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx index b624197..3ca5616 100644 --- a/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx +++ b/apps/demo-nextjs-app-router/app/streaming/audio/page.tsx @@ -11,8 +11,13 @@ type PlayHTInput = { text: string; }; +const DEFAULT_PROMPT = + 'Hey, this is Daniel from fal (dot) ai. Please hold on a moment, let me just um pull up your details real quick. Can you tell me your account email or, your phone number?'; + export default function AudioStreamingDemo() { + const [prompt, setPrompt] = useState(DEFAULT_PROMPT); const [streamStatus, setStreamStatus] = useState('idle'); + const [timeToFirstChunk, setTimeToFirstChunk] = useState(null); const audioRef = useRef(null); const mediaSourceRef = useRef(null); @@ -55,31 +60,44 @@ export default function AudioStreamingDemo() { }, []); const runInference = async () => { + setTimeToFirstChunk(null); + const startedAt = Date.now(); const stream = await fal.stream( 'fal-ai/playht-tts', { input: { - text: 'Do you know who drew this picture and what is the name of it?', + text: prompt, }, } ); setStreamStatus('running'); - await audioRef.current?.play(); + // await audioRef.current?.play(); + let firstChunk = true; stream.on('data', (data: Uint8Array) => { + if (audioRef.current?.paused) { + audioRef.current?.play(); + } + console.log('Received data', data); + if (firstChunk) { + setTimeToFirstChunk(Date.now() - startedAt); + firstChunk = false; + } const sourceBuffer = sourceBufferRef.current; + console.log('sourceBuffer before', sourceBuffer); if (sourceBuffer) { - console.log('Appending buffer...'); + // console.log('Appending buffer...'); sourceBuffer.appendBuffer(data); - // console.log('sourceBuffer', sourceBuffer); + // console.log('sourceBuffer after', sourceBuffer); } else { console.warn('Source buffer not found or not ready'); } }); const result = await stream.done(); - console.log('result', result); + // console.log('result', result); + sourceBufferRef.current?.appendBuffer(result); setStreamStatus('done'); sourceBufferRef.current?.addEventListener('updateend', () => { mediaSourceRef.current?.endOfStream(); @@ -94,10 +112,17 @@ export default function AudioStreamingDemo() { streaming -
+
+ -
- -
-
-

Result

-
- - time to first chunk:{' '} - - {timeToFirstChunk ? `${timeToFirstChunk}ms` : 'n/a'} - - - - streaming: {streamStatus} - -
-
-
- -
- ); -} diff --git a/libs/client/package.json b/libs/client/package.json index fa25e01..49deeb9 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.14.0-alpha.8", + "version": "0.14.0", "license": "MIT", "repository": { "type": "git", diff --git a/libs/proxy/package.json b/libs/proxy/package.json index 855a868..d5d55cb 100644 --- a/libs/proxy/package.json +++ b/libs/proxy/package.json @@ -1,6 +1,6 @@ { "name": "@fal-ai/serverless-proxy", - "version": "0.8.0-alpha.1", + "version": "0.8.0", "license": "MIT", "repository": { "type": "git",