Skip to content

Commit

Permalink
optimize workflow db updates
Browse files Browse the repository at this point in the history
  • Loading branch information
berekuk committed Nov 25, 2024
1 parent f84baa1 commit 8306c4e
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 53 deletions.
127 changes: 74 additions & 53 deletions packages/hub/src/app/ai/api/create/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,96 @@ import {
import { auth } from "@/auth";
import { getSelf, isSignedIn } from "@/graphql/helpers/userHelpers";
import { prisma } from "@/prisma";
import { getAiCodec } from "@/server/ai/utils";
import { V2WorkflowData } from "@/server/ai/v2_0";
import { workflowToV2_0Json } from "@/server/ai/v2_0";

import { AiRequestBody, aiRequestBodySchema } from "../../utils";

// https://nextjs.org/docs/app/api-reference/file-conventions/route-segment-config#maxduration
export const maxDuration = 300;

async function upsertWorkflow(
user: Awaited<ReturnType<typeof getSelf>>,
workflow: Workflow<any>
async function updateDbWorkflow(
workflow: Workflow<any>,
opts: { final?: boolean } = {}
) {
const codec = getAiCodec();
const serializer = codec.makeSerializer();
const entrypoint = serializer.serialize("workflow", workflow);
const bundle = serializer.getBundle();

const v2Workflow: V2WorkflowData = {
entrypoint,
bundle,
};
const v2Workflow = workflowToV2_0Json(workflow);

await prisma.aiWorkflow.upsert({
console.log(
`Update workflow ${workflow.id}: ${workflow.getStepCount()} steps, final: ${opts.final ?? false}`
);
const startTime = Date.now();

// We were doing both create and update with `upsert`, but Claude recommended to separate them for performance.
await prisma.aiWorkflow.update({
select: {
id: true,
},
where: {
id: workflow.id,
},
update: {
format: "V2_0",
workflow: v2Workflow,
},
create: {
id: workflow.id,
user: {
connect: { id: user.id },
},
data: {
format: "V2_0",
workflow: v2Workflow,
...(opts.final && { markdown: workflow.getFinalResult().logSummary }),
},
});
console.log(
`Updated workflow ${workflow.id}: ${workflow.getStepCount()} steps, final: ${opts.final ?? false}, ${Date.now() - startTime}ms`
);
}

async function updateWorkflowLog(workflow: Workflow<any>) {
const result = workflow.getFinalResult();
await prisma.aiWorkflow.update({
where: { id: workflow.id },
data: { markdown: result.logSummary },
function saveWorkflowToDbOnUpdates(workflow: Workflow<any>) {
// Save workflow to the database on each update.
workflow.addEventListener("stepAdded", () => {
updateDbWorkflow(workflow);
});

/*
* We save the markdown log after all steps are finished. This means that if
* the workflow fails or this route dies, there'd be no log summary. Should
* we save the log summary after each step? It'd be more expensive but more
* robust.
* (this important only in case we decide to roll back our fully
* deserializable workflows; if deserialization works well then this doesn't
* matter, the log is redundant)
*/
workflow.addEventListener("allStepsFinished", () => {
updateDbWorkflow(workflow, { final: true });
});
}

async function createDbWorkflow(
workflow: Workflow<any>,
user: Awaited<ReturnType<typeof getSelf>>
) {
const v2Workflow = workflowToV2_0Json(workflow);

console.log(
`Create DB workflow ${workflow.id}: ${workflow.getStepCount()} steps`
);
const startTime = Date.now();

// try/catch is not necessary, but Next.js is logging some weird errors that I tried to debug here.
// See this thread: https://www.reddit.com/r/nextjs/comments/1gkxdqe/typeerror_the_payload_argument_must_be_of_type/
// "Created DB workflow" is logged, though, so it's _probably_ working correctly.
try {
await prisma.aiWorkflow.create({
data: {
id: workflow.id,
user: {
connect: { id: user.id },
},
format: "V2_0",
workflow: v2Workflow,
},
});
console.log(
`Created DB workflow ${workflow.id}: ${workflow.getStepCount()} steps, ${Date.now() - startTime}ms`
);
} catch (error) {
console.error(`Error creating DB workflow ${workflow.id}: ${error}`);
}
}

function aiRequestToWorkflow(request: AiRequestBody) {
// Create a SquiggleWorkflow instance
const llmConfig: LlmConfig = {
Expand Down Expand Up @@ -98,29 +139,6 @@ function aiRequestToWorkflow(request: AiRequestBody) {
return workflow;
}

function saveWorkflowToDbOnUpdates(
workflow: Workflow<any>,
user: Awaited<ReturnType<typeof getSelf>>
) {
// Save workflow to the database on each update.
workflow.addEventListener("stepAdded", () => {
upsertWorkflow(user, workflow);
});

/*
* We save the markdown log after all steps are finished. This means that if
* the workflow fails or this route dies, there'd be no log summary. Should
* we save the log summary after each step? It'd be more expensive but more
* robust.
* (this important only in case we decide to roll back our fully
* deserializable workflows; if deserialization works well then this doesn't
* matter, the log is redundant)
*/
workflow.addEventListener("allStepsFinished", () => {
updateWorkflowLog(workflow);
});
}

export async function POST(req: Request) {
const session = await auth();

Expand All @@ -136,10 +154,13 @@ export async function POST(req: Request) {

const workflow = aiRequestToWorkflow(request);

saveWorkflowToDbOnUpdates(workflow, user);
saveWorkflowToDbOnUpdates(workflow);

const stream = workflow.runAsStream();

// this is async but we don't need to wait for it
createDbWorkflow(workflow, user);

return new Response(stream as ReadableStream, {
headers: {
"Content-Type": "text/event-stream",
Expand Down
10 changes: 10 additions & 0 deletions packages/hub/src/server/ai/v2_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Prisma } from "@prisma/client";
import { z } from "zod";

import { ClientWorkflow } from "@quri/squiggle-ai";
import { Workflow } from "@quri/squiggle-ai/server";

import { getAiCodec } from "./utils";

Expand All @@ -19,6 +20,15 @@ export const v2WorkflowDataSchema = z.object({

export type V2WorkflowData = z.infer<typeof v2WorkflowDataSchema>;

export function workflowToV2_0Json(workflow: Workflow<any>): V2WorkflowData {

This comment has been minimized.

Copy link
@OAGr

OAGr Dec 3, 2024

Contributor

Minor - but it seems like the method implies it outputs to "V 2.0", but the Type implies it's just outputing to "V2". Maybe it would be good to modify the Type to be more explicit.

const codec = getAiCodec();
const serializer = codec.makeSerializer();
const entrypoint = serializer.serialize("workflow", workflow);
const bundle = serializer.getBundle();

return { entrypoint, bundle };
}

export function decodeV2_0JsonToClientWorkflow(
json: Prisma.JsonValue
): ClientWorkflow {
Expand Down

0 comments on commit 8306c4e

Please sign in to comment.