From 7c0b2e74f45a08f3d7f75af39052a0c3af32b2ac Mon Sep 17 00:00:00 2001 From: Vyacheslav Matyukhin Date: Tue, 19 Nov 2024 20:33:54 -0300 Subject: [PATCH] streaming and serialization fixes --- packages/ai/src/LLMStepInstance.ts | 31 +++++++++++++++----------- packages/ai/src/workflows/Workflow.ts | 10 ++++----- packages/ai/src/workflows/streaming.ts | 1 + 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/packages/ai/src/LLMStepInstance.ts b/packages/ai/src/LLMStepInstance.ts index 938914bd8c..064e7f7145 100644 --- a/packages/ai/src/LLMStepInstance.ts +++ b/packages/ai/src/LLMStepInstance.ts @@ -404,23 +404,28 @@ export function serializeStepParams( params: StepParams, visitor: AiSerializationVisitor ): SerializedStep { + let serializedState: SerializedState; + if (params.state.kind === "DONE") { + const { outputs, ...stateWithoutOutputs } = params.state; + serializedState = { + ...stateWithoutOutputs, + outputIds: Object.fromEntries( + Object.entries(outputs) + .map(([key, output]) => + output ? [key, visitor.artifact(output)] : undefined + ) + .filter((x) => x !== undefined) + ), + }; + } else { + serializedState = params.state; + } + return { id: params.id, sequentialId: params.sequentialId, templateName: params.template.name, - state: - params.state.kind === "DONE" - ? { - ...params.state, - outputIds: Object.fromEntries( - Object.entries(params.state.outputs) - .map(([key, output]) => - output ? [key, visitor.artifact(output)] : undefined - ) - .filter((x) => x !== undefined) - ), - } - : params.state, + state: serializedState, startTime: params.startTime, conversationMessages: params.conversationMessages, llmMetricsList: params.llmMetricsList, diff --git a/packages/ai/src/workflows/Workflow.ts b/packages/ai/src/workflows/Workflow.ts index 402b75e6c7..a08adbbb09 100644 --- a/packages/ai/src/workflows/Workflow.ts +++ b/packages/ai/src/workflows/Workflow.ts @@ -148,6 +148,9 @@ export class Workflow { if (this.steps.length) { throw new Error("Workflow already started"); } + + this.dispatchEvent({ type: "workflowStarted" }); + // add the first step const initialStep = this.template.getInitialStep(this); this.addStep(initialStep); @@ -155,18 +158,13 @@ export class Workflow { // Run workflow to the ReadableStream, appropriate for streaming in Next.js routes runAsStream(): ReadableStream { - this.startOrThrow(); - const stream = new ReadableStream({ start: async (controller) => { addStreamingListeners(this, controller); - // We need to dispatch this event after we configured the event - // handlers, but before we add any steps. - this.dispatchEvent({ type: "workflowStarted" }); + this.startOrThrow(); await this.runUntilComplete(); - controller.close(); }, }); diff --git a/packages/ai/src/workflows/streaming.ts b/packages/ai/src/workflows/streaming.ts index 2076399a45..1cf69ba8dd 100644 --- a/packages/ai/src/workflows/streaming.ts +++ b/packages/ai/src/workflows/streaming.ts @@ -140,6 +140,7 @@ export function addStreamingListeners( kind: "finalResult", content: event.workflow.getFinalResult(), }); + controller.close(); }); }