From ed438c9e36db590a9e9f2684a0eda6989213c56f Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Tue, 28 Nov 2023 10:46:59 +0800 Subject: [PATCH] Integrate Memeory APIs of agent framework (#15) * Integrate Memeory APIs of agent framework Signed-off-by: gaobinlong * Add TODO to getSessions() Signed-off-by: gaobinlong --------- Signed-off-by: gaobinlong --- .../agent_framework_storage_service.ts | 118 +++++++++++++++++- 1 file changed, 112 insertions(+), 6 deletions(-) diff --git a/server/services/storage/agent_framework_storage_service.ts b/server/services/storage/agent_framework_storage_service.ts index 5393105e..d80f5b7e 100644 --- a/server/services/storage/agent_framework_storage_service.ts +++ b/server/services/storage/agent_framework_storage_service.ts @@ -16,6 +16,12 @@ import { StorageService } from './storage_service'; import { MessageParser } from '../../types'; import { MessageParserRunner } from '../../utils/message_parser_runner'; +export interface SessionOptResponse { + success: boolean; + statusCode?: number | null; + message?: string; +} + export class AgentFrameworkStorageService implements StorageService { constructor( private readonly client: OpenSearchClient, @@ -58,8 +64,67 @@ export class AgentFrameworkStorageService implements StorageService { }; } + // TODO: return real update_time in the response once the agent framework supports update_time field async getSessions(query: GetSessionsSchema): Promise { - throw new Error('Method not implemented.'); + let sortField = ''; + if (query.sortField === 'updatedTimeMs') { + sortField = 'create_time'; + } + let searchFields: string[] = []; + if (query.search && query.searchFields) { + if (typeof query.searchFields === 'string') { + searchFields = [...searchFields, query.searchFields.replace('title', 'name')]; + } else { + searchFields = query.searchFields.map((item) => item.replace('title', 'name')); + } + } + + const requestParams = { + from: (query.page - 1) * query.perPage, + size: query.perPage, + ...(searchFields.length > 0 && { + query: { + multi_match: { + query: query.search, + fields: searchFields, + }, + }, + }), + ...(searchFields.length === 0 && { + query: { + match_all: {}, + }, + }), + ...(sortField && query.sortOrder && { sort: [{ [sortField]: query.sortOrder }] }), + }; + + const sessions = await this.client.transport.request({ + method: 'GET', + path: `/_plugins/_ml/memory/conversation/_search`, + body: requestParams, + }); + + return { + objects: sessions.body.hits.hits + .filter( + (hit: { + _source: { name: string; create_time: string }; + }): hit is RequiredKey => + hit._source !== null && hit._source !== undefined + ) + .map((item: { _id: string; _source: { name: string; create_time: string } }) => ({ + id: item._id, + title: item._source.name, + version: 1, + createdTimeMs: Date.parse(item._source.create_time), + updatedTimeMs: Date.parse(item._source.create_time), + messages: [] as IMessage[], + })), + total: + typeof sessions.body.hits.total === 'number' + ? sessions.body.hits.total + : sessions.body.hits.total.value, + }; } async saveMessages( @@ -67,12 +132,53 @@ export class AgentFrameworkStorageService implements StorageService { sessionId: string | undefined, messages: IMessage[] ): Promise<{ sessionId: string; messages: IMessage[] }> { - throw new Error('Method not implemented.'); + throw new Error('Method is not needed'); } - deleteSession(sessionId: string): Promise<{}> { - throw new Error('Method not implemented.'); + + async deleteSession(sessionId: string): Promise { + try { + const response = await this.client.transport.request({ + method: 'DELETE', + path: `/_plugins/_ml/memory/conversation/${sessionId}/_delete`, + }); + if (response.statusCode === 200) { + return { + success: true, + }; + } else { + return { + success: false, + statusCode: response.statusCode, + message: JSON.stringify(response.body), + }; + } + } catch (error) { + throw new Error('delete converstaion failed, reason:' + JSON.stringify(error.meta?.body)); + } } - updateSession(sessionId: string, title: string): Promise<{}> { - throw new Error('Method not implemented.'); + + async updateSession(sessionId: string, title: string): Promise { + try { + const response = await this.client.transport.request({ + method: 'PUT', + path: `/_plugins/_ml/memory/conversation/${sessionId}/_update`, + body: { + name: title, + }, + }); + if (response.statusCode === 200) { + return { + success: true, + }; + } else { + return { + success: false, + statusCode: response.statusCode, + message: JSON.stringify(response.body), + }; + } + } catch (error) { + throw new Error('update converstaion failed, reason:' + JSON.stringify(error.meta?.body)); + } } }