From bf960a6db94d892c8d914b5f68247f422bdd08c2 Mon Sep 17 00:00:00 2001 From: Harish Mohan Raj Date: Fri, 24 Nov 2023 12:31:06 +0530 Subject: [PATCH] Pool the server to get the status of all inprogress conversations (#41) --- main.wasp | 4 + .../migration.sql | 2 + src/client/components/ConversationWrapper.tsx | 25 +-- src/server/actions.ts | 166 ++++++++++-------- src/server/config.js | 2 + src/server/webSocket.js | 68 +++++++ 6 files changed, 183 insertions(+), 84 deletions(-) create mode 100644 migrations/20231123091639_add_status_to_conversation_model/migration.sql create mode 100644 src/server/config.js create mode 100644 src/server/webSocket.js diff --git a/main.wasp b/main.wasp index f63183d..e076da7 100644 --- a/main.wasp +++ b/main.wasp @@ -68,6 +68,9 @@ app chatApp { ("stripe", "11.15.0"), ("markdown-to-jsx", "7.3.2"), ], + webSocket: { + fn: import { webSocketFn } from "@server/webSocket.js" + }, } /* 💽 Wasp defines DB entities via Prisma Database Models: @@ -129,6 +132,7 @@ entity Conversation {=psl conversation Json createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + status String? chat Chat? @relation(fields: [chatId], references: [id]) chatId Int? user User? @relation(fields: [userId], references: [id]) diff --git a/migrations/20231123091639_add_status_to_conversation_model/migration.sql b/migrations/20231123091639_add_status_to_conversation_model/migration.sql new file mode 100644 index 0000000..ae830f6 --- /dev/null +++ b/migrations/20231123091639_add_status_to_conversation_model/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Conversation" ADD COLUMN "status" TEXT; diff --git a/src/client/components/ConversationWrapper.tsx b/src/client/components/ConversationWrapper.tsx index a0a93df..7e1326b 100644 --- a/src/client/components/ConversationWrapper.tsx +++ b/src/client/components/ConversationWrapper.tsx @@ -71,18 +71,18 @@ export default function ConversationWrapper() { { chatId: Number(id), }, - { enabled: !!id } + { enabled: !!id, refetchInterval: 1000 } ); useEffect(() => { - if (chatContainerRef.current) { - // Todo: remove the below ignore comment - // @ts-ignore - chatContainerRef.current.scrollTop = - // Todo: remove the below ignore comment - // @ts-ignore - chatContainerRef.current.scrollHeight; - } + // if (chatContainerRef.current) { + // // Todo: remove the below ignore comment + // // @ts-ignore + // chatContainerRef.current.scrollTop = + // // Todo: remove the below ignore comment + // // @ts-ignore + // chatContainerRef.current.scrollHeight; + // } }, [conversations]); async function callAgent(userQuery: string) { @@ -102,11 +102,11 @@ export default function ConversationWrapper() { // 2. call backend python server to get agent response setIsLoading(true); const response = await getAgentResponse({ - message: userQuery, + message: payload.conversations, conv_id: payload.conversation_id, }); // 3. add agent response as new conversation in the table - const openAIPayload = { + const openAIResponse = { conversation_id: conversations.id, conversations: [ ...payload.conversations, @@ -114,8 +114,9 @@ export default function ConversationWrapper() { // @ts-ignore ...[{ role: "assistant", content: response.content }], ], + ...(response.team_status && { status: response.team_status }), }; - await updateConversation(openAIPayload); + await updateConversation(openAIResponse); setIsLoading(false); } catch (err: any) { setIsLoading(false); diff --git a/src/server/actions.ts b/src/server/actions.ts index bc38b86..2f686e9 100644 --- a/src/server/actions.ts +++ b/src/server/actions.ts @@ -1,57 +1,66 @@ -import fetch from 'node-fetch'; -import HttpError from '@wasp/core/HttpError.js'; +import fetch from "node-fetch"; +import HttpError from "@wasp/core/HttpError.js"; // import type { RelatedObject } from '@wasp/entities'; -import type { Chat } from '@wasp/entities'; -import type { Conversation } from '@wasp/entities'; -import type { GenerateGptResponse, StripePayment, CreateChat, UpdateConversation, GetAgentResponse } from '@wasp/actions/types'; -import type { StripePaymentResult, OpenAIResponse } from './types'; -import Stripe from 'stripe'; +import type { Chat } from "@wasp/entities"; +import type { Conversation } from "@wasp/entities"; +import type { + GenerateGptResponse, + StripePayment, + CreateChat, + UpdateConversation, + GetAgentResponse, +} from "@wasp/actions/types"; +import type { StripePaymentResult, OpenAIResponse } from "./types"; +import Stripe from "stripe"; + +import { ADS_SERVER_URL } from "./config.js"; const stripe = new Stripe(process.env.STRIPE_KEY!, { - apiVersion: '2022-11-15', + apiVersion: "2022-11-15", }); // WASP_WEB_CLIENT_URL will be set up by Wasp when deploying to production: https://wasp-lang.dev/docs/deploying -const DOMAIN = process.env.WASP_WEB_CLIENT_URL || 'http://localhost:3000'; - -// Python ADS_SERVER_URL -const ADS_SERVER_URL = process.env.ADS_SERVER_URL || 'http://127.0.0.1:9000'; +const DOMAIN = process.env.WASP_WEB_CLIENT_URL || "http://localhost:3000"; -export const stripePayment: StripePayment = async (_args, context) => { +export const stripePayment: StripePayment = async ( + _args, + context +) => { if (!context.user) { throw new HttpError(401); } - + let customer: Stripe.Customer; const stripeCustomers = await stripe.customers.list({ email: context.user.email!, }); if (!stripeCustomers.data.length) { - console.log('creating customer'); + console.log("creating customer"); customer = await stripe.customers.create({ email: context.user.email!, }); } else { - console.log('using existing customer'); + console.log("using existing customer"); customer = stripeCustomers.data[0]; } - const session: Stripe.Checkout.Session = await stripe.checkout.sessions.create({ - line_items: [ - { - price: process.env.SUBSCRIPTION_PRICE_ID!, - quantity: 1, + const session: Stripe.Checkout.Session = + await stripe.checkout.sessions.create({ + line_items: [ + { + price: process.env.SUBSCRIPTION_PRICE_ID!, + quantity: 1, + }, + ], + mode: "subscription", + success_url: `${DOMAIN}/checkout?success=true`, + cancel_url: `${DOMAIN}/checkout?canceled=true`, + automatic_tax: { enabled: true }, + customer_update: { + address: "auto", }, - ], - mode: 'subscription', - success_url: `${DOMAIN}/checkout?success=true`, - cancel_url: `${DOMAIN}/checkout?canceled=true`, - automatic_tax: { enabled: true }, - customer_update: { - address: 'auto', - }, - customer: customer.id, - }); + customer: customer.id, + }); await context.entities.User.update({ where: { @@ -64,7 +73,7 @@ export const stripePayment: StripePayment = async (_a }); if (!session) { - throw new HttpError(402, 'Could not create a Stripe session'); + throw new HttpError(402, "Could not create a Stripe session"); } else { return { sessionUrl: session.url, @@ -93,11 +102,11 @@ export const generateGptResponse: GenerateGptResponse = async ( // engine:"airt-canada-gpt35-turbo-16k", messages: [ { - role: 'system', + role: "system", content: instructions, }, { - role: 'user', + role: "user", content: command, }, ], @@ -119,20 +128,23 @@ export const generateGptResponse: GenerateGptResponse = async ( // }); // } - console.log('fetching', payload); + console.log("fetching", payload); // https://api.openai.com/v1/chat/completions - const response = await fetch('https://airt-openai-canada.openai.azure.com/openai/deployments/airt-canada-gpt35-turbo-16k/chat/completions?api-version=2023-07-01-preview', { - headers: { - 'Content-Type': 'application/json', - // Authorization: `Bearer ${process.env.AZURE_OPENAI_API_KEY!}`, - 'api-key': `${process.env.AZURE_OPENAI_API_KEY!}`, - }, - method: 'POST', - body: JSON.stringify(payload), - }); + const response = await fetch( + "https://airt-openai-canada.openai.azure.com/openai/deployments/airt-canada-gpt35-turbo-16k/chat/completions?api-version=2023-07-01-preview", + { + headers: { + "Content-Type": "application/json", + // Authorization: `Bearer ${process.env.AZURE_OPENAI_API_KEY!}`, + "api-key": `${process.env.AZURE_OPENAI_API_KEY!}`, + }, + method: "POST", + body: JSON.stringify(payload), + } + ); const json = (await response.json()) as OpenAIResponse; - console.log('response json', json); + console.log("response json", json); // return context.entities.RelatedObject.create({ // data: { // content: json?.choices[0].message.content, @@ -141,7 +153,7 @@ export const generateGptResponse: GenerateGptResponse = async ( // }); return { content: json?.choices[0].message.content, - } + }; } catch (error: any) { if (!context.user.hasPaid && error?.statusCode != 402) { await context.entities.User.update({ @@ -156,12 +168,13 @@ export const generateGptResponse: GenerateGptResponse = async ( console.error(error); } - throw new HttpError(500, 'Something went wrong'); + throw new HttpError(500, "Something went wrong"); }; - - -export const createChat: CreateChat = async (_args, context) => { +export const createChat: CreateChat = async ( + _args, + context +) => { if (!context.user) { throw new HttpError(401); } @@ -175,35 +188,40 @@ export const createChat: CreateChat = async (_args, context) data: { conversation: [ { - role: 'assistant', - content: `Hi! I am Captn and I am here to help you with digital marketing. I can create and optimise marketing campaigns for you. But before I propose any activities, please let me know a little bit about your business and what your marketing goals are.`, + role: "assistant", + content: `Hi! I am Captn and I am here to help you with digital marketing. I can create and optimise marketing campaigns for you. But before I propose any activities, please let me know a little bit about your business and what your marketing goals are.`, }, - ], + ], chat: { connect: { id: chat.id } }, user: { connect: { id: context.user.id } }, }, }); -} +}; type UpdateConversationPayload = { conversation_id: number; conversations: any; + status: string; }; -export const updateConversation: UpdateConversation = async (args, context) => { +export const updateConversation: UpdateConversation< + UpdateConversationPayload, + Conversation +> = async (args, context) => { if (!context.user) { throw new HttpError(401); } return context.entities.Conversation.update({ where: { id: args.conversation_id }, data: { - conversation: args.conversations + conversation: args.conversations, + ...(args.status && { status: args.status }), }, - }) -} + }); +}; type AgentPayload = { - message: string; + message: any; conv_id: number; }; @@ -215,29 +233,33 @@ export const getAgentResponse: GetAgentResponse = async ( throw new HttpError(401); } - const payload = { message: message, conv_id: conv_id, user_id: context.user.id }; - console.log("===========") - console.log("Payload to Python server") - console.log(payload) - console.log("===========") + const payload = { + message: message, + conv_id: conv_id, + user_id: context.user.id, + }; + console.log("==========="); + console.log("Payload to Python server"); + console.log(payload); + console.log("==========="); try { - const response = await fetch(`${ADS_SERVER_URL}/chat`, { - method: 'POST', + const response = await fetch(`${ADS_SERVER_URL}/openai/chat`, { + method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(payload), }); - const json = await response.json() as { detail?: string }; // Parse JSON once + const json: any = (await response.json()) as { detail?: string }; // Parse JSON once if (!response.ok) { - const errorMsg = json.detail || `HTTP error with status code ${response.status}`; - console.error('Server Error:', errorMsg); + const errorMsg = + json.detail || `HTTP error with status code ${response.status}`; + console.error("Server Error:", errorMsg); throw new Error(errorMsg); } - return { content: json }; - + return { content: json["content"], team_status: json["team_status"] }; } catch (error: any) { - throw new HttpError(500, 'Something went wrong. Please try again later'); + throw new HttpError(500, "Something went wrong. Please try again later"); } }; diff --git a/src/server/config.js b/src/server/config.js new file mode 100644 index 0000000..80ad453 --- /dev/null +++ b/src/server/config.js @@ -0,0 +1,2 @@ +export const ADS_SERVER_URL = + process.env.ADS_SERVER_URL || "http://127.0.0.1:9000"; diff --git a/src/server/webSocket.js b/src/server/webSocket.js new file mode 100644 index 0000000..05b25ae --- /dev/null +++ b/src/server/webSocket.js @@ -0,0 +1,68 @@ +import HttpError from "@wasp/core/HttpError.js"; + +import { ADS_SERVER_URL } from "./config.js"; + +export const webSocketFn = (io, context) => { + io.on("connection", async (socket) => { + if (socket.data.user) { + const userEmail = socket.data.user.email; + console.log("========"); + console.log("a user connected: ", userEmail); + + // Check for updates every 3 seconds + const updateInterval = setInterval(async () => { + console.log("Checking database for inprogress tasks"); + const conversations = await context.entities.Conversation.findMany({ + where: { userId: socket.data.user.id, status: "inprogress" }, + }); + + conversations.length > 0 && + conversations.forEach(async function (conversation) { + try { + const payload = { + conversation_id: conversation.id, + }; + const response = await fetch( + `${ADS_SERVER_URL}/openai/get-team-status`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(payload), + } + ); + + const json = await response.json(); + + // Todo: check how throwing the below error affects the user experience + if (!response.ok) { + const errorMsg = + json.detail || + `HTTP error with status code ${response.status}`; + console.error("Server Error:", errorMsg); + throw new Error(errorMsg); + } + + const conversation_status = json["status"]; + if (conversation_status === "ready") { + const updated_conversation = conversation.conversation.concat([ + { role: "assistant", content: json["msg"] }, + ]); + await context.entities.Conversation.update({ + where: { + // userId: socket.data.user.id, + id: conversation.id, + }, + data: { + conversation: updated_conversation, + status: conversation_status, + }, + }); + } + } catch (error) { + throw new HttpError(500, error); + } + }); + }, 3000); + } + }); +};