diff --git a/pages/api/generate.ts b/pages/api/generate.ts index c8439ca8..89619590 100644 --- a/pages/api/generate.ts +++ b/pages/api/generate.ts @@ -30,7 +30,19 @@ const handler = async (req: Request): Promise => { }; const stream = await OpenAIStream(payload); - return new Response(stream); + // return stream response (SSE) + return new Response( + stream, { + headers: new Headers({ + // since we don't use browser's EventSource interface, specifying content-type is optional. + // the eventsource-parser library can handle the stream response as SSE, as long as the data format complies with SSE: + // https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#sending_events_from_the_server + + // 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + }) + } + ); }; export default handler; diff --git a/pages/index.tsx b/pages/index.tsx index 8b7679de..ce54c69e 100644 --- a/pages/index.tsx +++ b/pages/index.tsx @@ -8,6 +8,11 @@ import Footer from "../components/Footer"; import Github from "../components/GitHub"; import Header from "../components/Header"; import LoadingDots from "../components/LoadingDots"; +import { + createParser, + ParsedEvent, + ReconnectInterval, +} from "eventsource-parser"; const Home: NextPage = () => { const [loading, setLoading] = useState(false); @@ -56,15 +61,28 @@ const Home: NextPage = () => { return; } + const onParse = (event: ParsedEvent | ReconnectInterval) => { + if (event.type === "event") { + const data = event.data; + try { + const text = JSON.parse(data).text ?? "" + setGeneratedBios((prev) => prev + text); + } catch (e) { + console.error(e); + } + } + } + + // https://web.dev/streams/#the-getreader-and-read-methods const reader = data.getReader(); const decoder = new TextDecoder(); + const parser = createParser(onParse); let done = false; - while (!done) { const { value, done: doneReading } = await reader.read(); done = doneReading; const chunkValue = decoder.decode(value); - setGeneratedBios((prev) => prev + chunkValue); + parser.feed(chunkValue); } scrollToBios(); setLoading(false); diff --git a/utils/OpenAIStream.ts b/utils/OpenAIStream.ts index b0b0d73a..6186c144 100644 --- a/utils/OpenAIStream.ts +++ b/utils/OpenAIStream.ts @@ -27,8 +27,6 @@ export async function OpenAIStream(payload: OpenAIStreamPayload) { const encoder = new TextEncoder(); const decoder = new TextDecoder(); - let counter = 0; - const res = await fetch("https://api.openai.com/v1/chat/completions", { headers: { "Content-Type": "application/json", @@ -38,34 +36,28 @@ export async function OpenAIStream(payload: OpenAIStreamPayload) { body: JSON.stringify(payload), }); - const stream = new ReadableStream({ + const readableStream = new ReadableStream({ async start(controller) { // callback - function onParse(event: ParsedEvent | ReconnectInterval) { + const onParse = (event: ParsedEvent | ReconnectInterval) => { if (event.type === "event") { const data = event.data; - // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream - if (data === "[DONE]") { - controller.close(); - return; - } - try { - const json = JSON.parse(data); - const text = json.choices[0].delta?.content || ""; - if (counter < 2 && (text.match(/\n/) || []).length) { - // this is a prefix character (i.e., "\n\n"), do nothing - return; - } - const queue = encoder.encode(text); - controller.enqueue(queue); - counter++; - } catch (e) { - // maybe parse error - controller.error(e); - } + controller.enqueue(encoder.encode(data)); } } + // optimistic error handling + if (res.status !== 200) { + const data = { + status: res.status, + statusText: res.statusText, + body: await res.text(), + } + console.log(`Error: recieved non-200 status code, ${JSON.stringify(data)}`); + controller.close(); + return + } + // stream response (SSE) from OpenAI may be fragmented into multiple chunks // this ensures we properly read chunks and invoke an event for each SSE event stream const parser = createParser(onParse); @@ -76,5 +68,35 @@ export async function OpenAIStream(payload: OpenAIStreamPayload) { }, }); - return stream; + let counter = 0; + const transformStream = new TransformStream({ + async transform(chunk, controller) { + const data = decoder.decode(chunk); + // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream + if (data === "[DONE]") { + controller.terminate(); + return; + } + try { + const json = JSON.parse(data); + const text = json.choices[0].delta?.content || ""; + if (counter < 2 && (text.match(/\n/) || []).length) { + // this is a prefix character (i.e., "\n\n"), do nothing + return; + } + // stream transformed JSON resposne as SSE + const payload = {text: text}; + // https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(payload)}\n\n`) + ); + counter++; + } catch (e) { + // maybe parse error + controller.error(e); + } + }, + }); + + return readableStream.pipeThrough(transformStream); }