Skip to content

Commit

Permalink
Integrate Memeory APIs of agent framework (#15)
Browse files Browse the repository at this point in the history
* Integrate Memeory APIs of agent framework

Signed-off-by: gaobinlong <[email protected]>

* Add TODO to getSessions()

Signed-off-by: gaobinlong <[email protected]>

---------

Signed-off-by: gaobinlong <[email protected]>
  • Loading branch information
gaobinlong authored and SuZhou-Joe committed Dec 5, 2023
1 parent ec80045 commit ed438c9
Showing 1 changed file with 112 additions and 6 deletions.
118 changes: 112 additions & 6 deletions server/services/storage/agent_framework_storage_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import { StorageService } from './storage_service';
import { MessageParser } from '../../types';
import { MessageParserRunner } from '../../utils/message_parser_runner';

export interface SessionOptResponse {
success: boolean;
statusCode?: number | null;
message?: string;
}

export class AgentFrameworkStorageService implements StorageService {
constructor(
private readonly client: OpenSearchClient,
Expand Down Expand Up @@ -58,21 +64,121 @@ export class AgentFrameworkStorageService implements StorageService {
};
}

// TODO: return real update_time in the response once the agent framework supports update_time field
async getSessions(query: GetSessionsSchema): Promise<ISessionFindResponse> {
throw new Error('Method not implemented.');
let sortField = '';
if (query.sortField === 'updatedTimeMs') {
sortField = 'create_time';
}
let searchFields: string[] = [];
if (query.search && query.searchFields) {
if (typeof query.searchFields === 'string') {
searchFields = [...searchFields, query.searchFields.replace('title', 'name')];
} else {
searchFields = query.searchFields.map((item) => item.replace('title', 'name'));
}
}

const requestParams = {
from: (query.page - 1) * query.perPage,
size: query.perPage,
...(searchFields.length > 0 && {
query: {
multi_match: {
query: query.search,
fields: searchFields,
},
},
}),
...(searchFields.length === 0 && {
query: {
match_all: {},
},
}),
...(sortField && query.sortOrder && { sort: [{ [sortField]: query.sortOrder }] }),
};

const sessions = await this.client.transport.request({
method: 'GET',
path: `/_plugins/_ml/memory/conversation/_search`,
body: requestParams,
});

return {
objects: sessions.body.hits.hits
.filter(
(hit: {
_source: { name: string; create_time: string };
}): hit is RequiredKey<typeof hit, '_source'> =>
hit._source !== null && hit._source !== undefined
)
.map((item: { _id: string; _source: { name: string; create_time: string } }) => ({
id: item._id,
title: item._source.name,
version: 1,
createdTimeMs: Date.parse(item._source.create_time),
updatedTimeMs: Date.parse(item._source.create_time),
messages: [] as IMessage[],
})),
total:
typeof sessions.body.hits.total === 'number'
? sessions.body.hits.total
: sessions.body.hits.total.value,
};
}

async saveMessages(
title: string,
sessionId: string | undefined,
messages: IMessage[]
): Promise<{ sessionId: string; messages: IMessage[] }> {
throw new Error('Method not implemented.');
throw new Error('Method is not needed');
}
deleteSession(sessionId: string): Promise<{}> {
throw new Error('Method not implemented.');

async deleteSession(sessionId: string): Promise<SessionOptResponse> {
try {
const response = await this.client.transport.request({
method: 'DELETE',
path: `/_plugins/_ml/memory/conversation/${sessionId}/_delete`,
});
if (response.statusCode === 200) {
return {
success: true,
};
} else {
return {
success: false,
statusCode: response.statusCode,
message: JSON.stringify(response.body),
};
}
} catch (error) {
throw new Error('delete converstaion failed, reason:' + JSON.stringify(error.meta?.body));
}
}
updateSession(sessionId: string, title: string): Promise<{}> {
throw new Error('Method not implemented.');

async updateSession(sessionId: string, title: string): Promise<SessionOptResponse> {
try {
const response = await this.client.transport.request({
method: 'PUT',
path: `/_plugins/_ml/memory/conversation/${sessionId}/_update`,
body: {
name: title,
},
});
if (response.statusCode === 200) {
return {
success: true,
};
} else {
return {
success: false,
statusCode: response.statusCode,
message: JSON.stringify(response.body),
};
}
} catch (error) {
throw new Error('update converstaion failed, reason:' + JSON.stringify(error.meta?.body));
}
}
}

0 comments on commit ed438c9

Please sign in to comment.