Skip to content

Commit

Permalink
Refactor multi agents execution (#376)
Browse files Browse the repository at this point in the history
* refactor(t2viz): introduced pipeline to run async operations

Signed-off-by: Yulong Ruan <[email protected]>

* add tests

Signed-off-by: Yulong Ruan <[email protected]>

* update changelog entry

Signed-off-by: Yulong Ruan <[email protected]>

* rename Operator -> Task

Signed-off-by: Yulong Ruan <[email protected]>

* cleanup console.log

Signed-off-by: Yulong Ruan <[email protected]>

---------

Signed-off-by: Yulong Ruan <[email protected]>
  • Loading branch information
ruanyl authored Dec 27, 2024
1 parent ffa509b commit fb47ce8
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 186 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

### Features

- introduces Pipeline to execute asynchronous operations ([#376](https://github.com/opensearch-project/dashboards-assistant/pull/376))

### Enhancements

- feat: Hide navigate to discover button if alert is not from visual editor monitor([#368](https://github.com/opensearch-project/dashboards-assistant/pull/368))
Expand Down
180 changes: 0 additions & 180 deletions public/components/visualization/text2vega.ts

This file was deleted.

23 changes: 17 additions & 6 deletions public/components/visualization/text2viz.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import { v4 as uuidv4 } from 'uuid';
import { useCallback } from 'react';
import { useObservable } from 'react-use';
import { useLocation, useParams } from 'react-router-dom';
import { Pipeline } from '../../utils/pipeline/pipeline';
import { Text2PPLTask } from '../../utils/pipeline/text_to_ppl_task';
import { PPLSampleTask } from '../../utils/pipeline/ppl_sample_task';
import { SourceSelector } from './source_selector';
import type { IndexPattern } from '../../../../../src/plugins/data/public';
import chatIcon from '../../assets/chat.svg';
Expand All @@ -36,7 +39,6 @@ import { StartServices } from '../../types';
import './text2viz.scss';
import { Text2VizEmpty } from './text2viz_empty';
import { Text2VizLoading } from './text2viz_loading';
import { Text2Vega } from './text2vega';
import {
OnSaveProps,
SavedObjectSaveModalOrigin,
Expand All @@ -52,6 +54,7 @@ import { HeaderVariant } from '../../../../../src/core/public';
import { TEXT2VEGA_INPUT_SIZE_LIMIT } from '../../../common/constants/llm';
import { FeedbackThumbs } from '../feedback_thumbs';
import { VizStyleEditor } from './viz_style_editor';
import { Text2VegaTask } from '../../utils/pipeline/text_to_vega_task';

export const INDEX_PATTERN_URL_SEARCH_KEY = 'indexPatternId';
export const ASSISTANT_INPUT_URL_SEARCH_KEY = 'assistantInput';
Expand Down Expand Up @@ -102,7 +105,15 @@ export const Text2Viz = () => {
);
const [currentInstruction, setCurrentInstruction] = useState('');
const [editorInput, setEditorInput] = useState('');
const text2vegaRef = useRef(new Text2Vega(http, data.search, savedObjects));
const text2vegaRef = useRef<Pipeline | null>(null);

if (text2vegaRef.current === null) {
text2vegaRef.current = new Pipeline([
new Text2PPLTask(http),
new PPLSampleTask(data.search),
new Text2VegaTask(http, savedObjects),
]);
}

const status = useObservable(text2vegaRef.current.status$);

Expand All @@ -129,7 +140,7 @@ export const Text2Viz = () => {
*/
useEffect(() => {
const text2vega = text2vegaRef.current;
const subscription = text2vega.getResult$().subscribe((result) => {
const subscription = text2vega?.getResult$().subscribe((result) => {
if (result) {
if (result.error) {
notifications.toasts.addError(result.error, {
Expand All @@ -138,7 +149,7 @@ export const Text2Viz = () => {
}),
});
} else {
setEditorInput(JSON.stringify(result, undefined, 4));
setEditorInput(JSON.stringify(result.vega, undefined, 4));

// Report metric when visualization generated successfully
if (usageCollection) {
Expand All @@ -153,7 +164,7 @@ export const Text2Viz = () => {
});

return () => {
subscription.unsubscribe();
subscription?.unsubscribe();
};
}, [http, notifications, usageCollection]);

Expand Down Expand Up @@ -232,7 +243,7 @@ export const Text2Viz = () => {
currentUsedIndexPatternRef.current = indexPattern;

const text2vega = text2vegaRef.current;
text2vega.invoke({
text2vega?.run({
index: indexPattern.title,
inputQuestion,
inputInstruction,
Expand Down
76 changes: 76 additions & 0 deletions public/utils/pipeline/pipeline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { Pipeline } from './pipeline';

describe('pipeline', () => {
it('should run pipeline', (done) => {
const fn1 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn1'));
});
const fn2 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn2'));
});
const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]);
pipeline.getResult$().subscribe((result) => {
expect(result).toEqual(['input', 'fn1', 'fn2']);
expect(pipeline.status$.value).toBe('STOPPED');
done();
});

expect(pipeline.status$.value).toBe('STOPPED');
pipeline.run('input');
expect(pipeline.status$.value).toBe('RUNNING');
});

it('should run pipeline with the latest input', (done) => {
const fn1 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn1'));
});
const fn2 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn2'));
});
const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]);
pipeline.getResult$().subscribe((result) => {
expect(result).toEqual(['input2', 'fn1', 'fn2']);
expect(fn1).toHaveBeenCalledTimes(2);
// The fn2 should only be called onece because the first pipeline run should already be canceled
expect(fn2).toHaveBeenCalledTimes(1);
done();
});
// the pipeline run twice with different inputs
// the second pipeline.run should be make it to cancel the first pipeline run
pipeline.run('input1');
pipeline.run('input2');
});

it('should run pipeline once synchronously', async () => {
const fn1 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn1'));
});
const fn2 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn2'));
});
const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]);
const result = await pipeline.runOnce('input');
expect(result).toEqual(['input', 'fn1', 'fn2']);
});

it('should catch error', (done) => {
const fn1 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn1'));
});
const fn2 = jest.fn().mockImplementation(() => {
throw new Error('test');
});

const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]);
pipeline.getResult$().subscribe((result) => {
expect(result).toEqual({ error: new Error('test') });
done();
});
pipeline.run('input');
});
});
Loading

0 comments on commit fb47ce8

Please sign in to comment.