diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c649d81..9553127a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Features +- introduces Pipeline to execute asynchronous operations ([#376](https://github.com/opensearch-project/dashboards-assistant/pull/376)) + ### Enhancements - feat: Hide navigate to discover button if alert is not from visual editor monitor([#368](https://github.com/opensearch-project/dashboards-assistant/pull/368)) diff --git a/public/components/visualization/text2vega.ts b/public/components/visualization/text2vega.ts deleted file mode 100644 index 10108aa5..00000000 --- a/public/components/visualization/text2vega.ts +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -import { BehaviorSubject, Observable, of } from 'rxjs'; -import { debounceTime, switchMap, tap, filter, catchError } from 'rxjs/operators'; -import { TEXT2VIZ_API } from '.../../../common/constants/llm'; -import { HttpSetup, SavedObjectsStart } from '../../../../../src/core/public'; -import { DataPublicPluginStart } from '../../../../../src/plugins/data/public'; -import { DataSourceAttributes } from '../../../../../src/plugins/data_source/common/data_sources'; - -const topN = (ppl: string, n: number) => `${ppl} | head ${n}`; - -interface Input { - inputQuestion: string; - inputInstruction?: string; - index: string; - dataSourceId?: string; -} - -export class Text2Vega { - input$ = new BehaviorSubject({ inputQuestion: '', index: '' }); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - result$: Observable | { error: any }>; - status$ = new BehaviorSubject<'RUNNING' | 'STOPPED'>('STOPPED'); - http: HttpSetup; - searchClient: DataPublicPluginStart['search']; - savedObjects: SavedObjectsStart; - - constructor( - http: HttpSetup, - searchClient: DataPublicPluginStart['search'], - savedObjects: SavedObjectsStart - ) { - this.http = http; - this.searchClient = searchClient; - this.savedObjects = savedObjects; - this.result$ = this.input$ - .pipe( - filter((v) => v.inputQuestion.length > 0), - tap(() => this.status$.next('RUNNING')), - debounceTime(200) - ) - .pipe( - switchMap((v) => - of(v).pipe( - // text to ppl - switchMap(async (value) => { - const pplQuestion = value.inputQuestion; - const ppl = await this.text2ppl(pplQuestion, value.index, value.dataSourceId); - return { - ...value, - ppl, - }; - }), - // query sample data with ppl - switchMap(async (value) => { - const ppl = topN(value.ppl, 2); - const res = await this.searchClient - .search( - { params: { body: { query: ppl } }, dataSourceId: value.dataSourceId }, - { strategy: 'pplraw' } - ) - // eslint-disable-next-line @typescript-eslint/no-explicit-any - .toPromise(); - if (res.rawResponse.total === 0) { - throw new Error(`There is no result with the generated query: '${value.ppl}'.`); - } - return { ...value, sample: res.rawResponse }; - }), - // call llm to generate vega - switchMap(async (value) => { - const result = await this.text2vega({ - inputQuestion: value.inputQuestion, - inputInstruction: value.inputInstruction, - ppl: value.ppl, - sampleData: JSON.stringify(value.sample.jsonData), - dataSchema: JSON.stringify(value.sample.schema), - dataSourceId: value.dataSourceId, - }); - const dataSource = await this.getDataSourceById(value.dataSourceId); - const dataSourceName = dataSource?.attributes.title; - result.data = { - url: { - '%type%': 'ppl', - body: { query: value.ppl }, - data_source_name: dataSourceName, - }, - }; - return result; - }), - catchError((e) => of({ error: e })) - ) - ) - ) - .pipe(tap(() => this.status$.next('STOPPED'))); - } - - async text2vega({ - inputQuestion, - inputInstruction = '', - ppl, - sampleData, - dataSchema, - dataSourceId, - }: { - inputQuestion: string; - inputInstruction?: string; - ppl: string; - sampleData: string; - dataSchema: string; - dataSourceId?: string; - }) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const escapeField = (json: any, field: string) => { - if (json[field]) { - // Only escape field which name is 'field' - if (typeof json[field] === 'string' && field === 'field') { - json[field] = json[field].replace(/\./g, '\\.'); - } - if (typeof json[field] === 'object') { - Object.keys(json[field]).forEach((p) => { - escapeField(json[field], p); - }); - } - } - }; - const res = await this.http.post(TEXT2VIZ_API.TEXT2VEGA, { - body: JSON.stringify({ - input_question: inputQuestion.trim(), - input_instruction: inputInstruction.trim(), - ppl, - sampleData: JSON.stringify(sampleData), - dataSchema: JSON.stringify(dataSchema), - }), - query: { dataSourceId }, - }); - - // need to escape field: geo.city -> field: geo\\.city - escapeField(res, 'encoding'); - escapeField(res, 'layer'); - return res; - } - - async text2ppl(query: string, index: string, dataSourceId?: string) { - const pplResponse = await this.http.post(TEXT2VIZ_API.TEXT2PPL, { - body: JSON.stringify({ - question: query, - index, - }), - query: { dataSourceId }, - }); - return pplResponse.ppl; - } - - async getDataSourceById(id?: string) { - if (!id) { - return null; - } - - const res = await this.savedObjects.client.get('data-source', id); - if (res.error) { - return null; - } - return res; - } - - invoke(value: Input) { - this.input$.next(value); - } - - getStatus$() { - return this.status$; - } - - getResult$() { - return this.result$; - } -} diff --git a/public/components/visualization/text2viz.tsx b/public/components/visualization/text2viz.tsx index c4a76dec..51e466ff 100644 --- a/public/components/visualization/text2viz.tsx +++ b/public/components/visualization/text2viz.tsx @@ -23,6 +23,9 @@ import { v4 as uuidv4 } from 'uuid'; import { useCallback } from 'react'; import { useObservable } from 'react-use'; import { useLocation, useParams } from 'react-router-dom'; +import { Pipeline } from '../../utils/pipeline/pipeline'; +import { Text2PPLTask } from '../../utils/pipeline/text_to_ppl_task'; +import { PPLSampleTask } from '../../utils/pipeline/ppl_sample_task'; import { SourceSelector } from './source_selector'; import type { IndexPattern } from '../../../../../src/plugins/data/public'; import chatIcon from '../../assets/chat.svg'; @@ -36,7 +39,6 @@ import { StartServices } from '../../types'; import './text2viz.scss'; import { Text2VizEmpty } from './text2viz_empty'; import { Text2VizLoading } from './text2viz_loading'; -import { Text2Vega } from './text2vega'; import { OnSaveProps, SavedObjectSaveModalOrigin, @@ -52,6 +54,7 @@ import { HeaderVariant } from '../../../../../src/core/public'; import { TEXT2VEGA_INPUT_SIZE_LIMIT } from '../../../common/constants/llm'; import { FeedbackThumbs } from '../feedback_thumbs'; import { VizStyleEditor } from './viz_style_editor'; +import { Text2VegaTask } from '../../utils/pipeline/text_to_vega_task'; export const INDEX_PATTERN_URL_SEARCH_KEY = 'indexPatternId'; export const ASSISTANT_INPUT_URL_SEARCH_KEY = 'assistantInput'; @@ -102,7 +105,15 @@ export const Text2Viz = () => { ); const [currentInstruction, setCurrentInstruction] = useState(''); const [editorInput, setEditorInput] = useState(''); - const text2vegaRef = useRef(new Text2Vega(http, data.search, savedObjects)); + const text2vegaRef = useRef(null); + + if (text2vegaRef.current === null) { + text2vegaRef.current = new Pipeline([ + new Text2PPLTask(http), + new PPLSampleTask(data.search), + new Text2VegaTask(http, savedObjects), + ]); + } const status = useObservable(text2vegaRef.current.status$); @@ -129,7 +140,7 @@ export const Text2Viz = () => { */ useEffect(() => { const text2vega = text2vegaRef.current; - const subscription = text2vega.getResult$().subscribe((result) => { + const subscription = text2vega?.getResult$().subscribe((result) => { if (result) { if (result.error) { notifications.toasts.addError(result.error, { @@ -138,7 +149,7 @@ export const Text2Viz = () => { }), }); } else { - setEditorInput(JSON.stringify(result, undefined, 4)); + setEditorInput(JSON.stringify(result.vega, undefined, 4)); // Report metric when visualization generated successfully if (usageCollection) { @@ -153,7 +164,7 @@ export const Text2Viz = () => { }); return () => { - subscription.unsubscribe(); + subscription?.unsubscribe(); }; }, [http, notifications, usageCollection]); @@ -232,7 +243,7 @@ export const Text2Viz = () => { currentUsedIndexPatternRef.current = indexPattern; const text2vega = text2vegaRef.current; - text2vega.invoke({ + text2vega?.run({ index: indexPattern.title, inputQuestion, inputInstruction, diff --git a/public/utils/pipeline/pipeline.test.ts b/public/utils/pipeline/pipeline.test.ts new file mode 100644 index 00000000..5c2f20f4 --- /dev/null +++ b/public/utils/pipeline/pipeline.test.ts @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Pipeline } from './pipeline'; + +describe('pipeline', () => { + it('should run pipeline', (done) => { + const fn1 = jest.fn().mockImplementation((input) => { + return Promise.resolve(Array().concat(input).concat('fn1')); + }); + const fn2 = jest.fn().mockImplementation((input) => { + return Promise.resolve(Array().concat(input).concat('fn2')); + }); + const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]); + pipeline.getResult$().subscribe((result) => { + expect(result).toEqual(['input', 'fn1', 'fn2']); + expect(pipeline.status$.value).toBe('STOPPED'); + done(); + }); + + expect(pipeline.status$.value).toBe('STOPPED'); + pipeline.run('input'); + expect(pipeline.status$.value).toBe('RUNNING'); + }); + + it('should run pipeline with the latest input', (done) => { + const fn1 = jest.fn().mockImplementation((input) => { + return Promise.resolve(Array().concat(input).concat('fn1')); + }); + const fn2 = jest.fn().mockImplementation((input) => { + return Promise.resolve(Array().concat(input).concat('fn2')); + }); + const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]); + pipeline.getResult$().subscribe((result) => { + expect(result).toEqual(['input2', 'fn1', 'fn2']); + expect(fn1).toHaveBeenCalledTimes(2); + // The fn2 should only be called onece because the first pipeline run should already be canceled + expect(fn2).toHaveBeenCalledTimes(1); + done(); + }); + // the pipeline run twice with different inputs + // the second pipeline.run should be make it to cancel the first pipeline run + pipeline.run('input1'); + pipeline.run('input2'); + }); + + it('should run pipeline once synchronously', async () => { + const fn1 = jest.fn().mockImplementation((input) => { + return Promise.resolve(Array().concat(input).concat('fn1')); + }); + const fn2 = jest.fn().mockImplementation((input) => { + return Promise.resolve(Array().concat(input).concat('fn2')); + }); + const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]); + const result = await pipeline.runOnce('input'); + expect(result).toEqual(['input', 'fn1', 'fn2']); + }); + + it('should catch error', (done) => { + const fn1 = jest.fn().mockImplementation((input) => { + return Promise.resolve(Array().concat(input).concat('fn1')); + }); + const fn2 = jest.fn().mockImplementation(() => { + throw new Error('test'); + }); + + const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]); + pipeline.getResult$().subscribe((result) => { + expect(result).toEqual({ error: new Error('test') }); + done(); + }); + pipeline.run('input'); + }); +}); diff --git a/public/utils/pipeline/pipeline.ts b/public/utils/pipeline/pipeline.ts new file mode 100644 index 00000000..1411dcfa --- /dev/null +++ b/public/utils/pipeline/pipeline.ts @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { BehaviorSubject, Observable, Subject, of } from 'rxjs'; +import { switchMap, tap, catchError } from 'rxjs/operators'; + +import { Task } from './task'; + +export class Pipeline { + input$ = new Subject(); + output$: Observable; + status$ = new BehaviorSubject<'RUNNING' | 'STOPPED'>('STOPPED'); + + constructor(private readonly tasks: Array>) { + this.output$ = this.input$ + .pipe(tap(() => this.status$.next('RUNNING'))) + .pipe( + switchMap((value) => { + return this.tasks + .reduce((acc$, task) => { + return acc$.pipe(switchMap((result) => task.execute(result))); + }, of(value)) + .pipe(catchError((e) => of({ error: e }))); + }) + ) + .pipe(tap(() => this.status$.next('STOPPED'))); + } + + /** + * Triggers the pipeline execution by emitting a new input value. + * This will start the processing of the provided input value through the pipeline's tasks, + * with each task transforming the input in sequence. The resulting value will be emitted + * through the `output$` observable. + */ + run(input: any) { + this.input$.next(input); + } + + /** + * Synchronously processes the provided input value through the pipeline's tasks in sequence. + * This method bypasses the reactive pipeline and executes each task one by one, + * it suitable for use cases where you need a one-time, imperative-style execution. + */ + async runOnce(input: any) { + let nextInput = input; + for (const task of this.tasks) { + nextInput = await task.execute(nextInput); + } + return nextInput; + } + + getResult$() { + return this.output$; + } +} diff --git a/public/utils/pipeline/ppl_sample_task.ts b/public/utils/pipeline/ppl_sample_task.ts new file mode 100644 index 00000000..b99dd081 --- /dev/null +++ b/public/utils/pipeline/ppl_sample_task.ts @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Task } from './task'; +import { DataPublicPluginStart } from '../../../../../src/plugins/data/public'; + +interface Input { + ppl: string; + dataSourceId: string | undefined; + pplSampleSize?: number; +} + +const topN = (ppl: string, n: number = 2) => `${ppl} | head ${n}`; + +export class PPLSampleTask extends Task { + searchClient: DataPublicPluginStart['search']; + + constructor(searchClient: DataPublicPluginStart['search']) { + super(); + this.searchClient = searchClient; + } + + async execute(v: T) { + const ppl = topN(v.ppl, v.pplSampleSize); + const res = await this.searchClient + .search( + { params: { body: { query: ppl } }, dataSourceId: v.dataSourceId }, + { strategy: 'pplraw' } + ) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .toPromise(); + if (res.rawResponse.total === 0) { + throw new Error(`There is no result with the generated query: '${v.ppl}'.`); + } + return { ...v, sample: res.rawResponse }; + } +} diff --git a/public/utils/pipeline/task.ts b/public/utils/pipeline/task.ts new file mode 100644 index 00000000..323e29bf --- /dev/null +++ b/public/utils/pipeline/task.ts @@ -0,0 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +export abstract class Task { + abstract execute(v: T): Promise

; +} diff --git a/public/utils/pipeline/text_to_ppl_task.ts b/public/utils/pipeline/text_to_ppl_task.ts new file mode 100644 index 00000000..fb56100f --- /dev/null +++ b/public/utils/pipeline/text_to_ppl_task.ts @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { HttpSetup } from '../../../../../src/core/public'; +import { Task } from './task'; +import { TEXT2VIZ_API } from '../../../common/constants/llm'; + +interface Input { + inputQuestion: string; + index: string; + dataSourceId?: string; +} + +export class Text2PPLTask extends Task { + http: HttpSetup; + + constructor(http: HttpSetup) { + super(); + this.http = http; + } + + async execute(v: T) { + const ppl: string = await this.text2ppl(v.inputQuestion, v.index, v.dataSourceId); + return { ...v, ppl }; + } + + async text2ppl(query: string, index: string, dataSourceId?: string) { + const res = await this.http.post(TEXT2VIZ_API.TEXT2PPL, { + body: JSON.stringify({ + question: query, + index, + }), + query: { dataSourceId }, + }); + return res.ppl; + } +} diff --git a/public/utils/pipeline/text_to_vega_task.ts b/public/utils/pipeline/text_to_vega_task.ts new file mode 100644 index 00000000..21d2aa9d --- /dev/null +++ b/public/utils/pipeline/text_to_vega_task.ts @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Task } from './task'; +import { HttpSetup, SavedObjectsStart } from '../../../../../src/core/public'; +import { TEXT2VIZ_API } from '../../../common/constants/llm'; +import { DataSourceAttributes } from '../../../../../src/plugins/data_source/common/data_sources'; + +interface Input { + inputQuestion: string; + inputInstruction: string; + ppl: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sample: any; + dataSourceId: string | undefined; +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export class Text2VegaTask extends Task { + http: HttpSetup; + savedObjects: SavedObjectsStart; + + constructor(http: HttpSetup, savedObjects: SavedObjectsStart) { + super(); + this.http = http; + this.savedObjects = savedObjects; + } + + async execute(v: T) { + const result = await this.text2vega({ + inputQuestion: v.inputQuestion, + inputInstruction: v.inputInstruction, + ppl: v.ppl, + sampleData: JSON.stringify(v.sample.jsonData), + dataSchema: JSON.stringify(v.sample.schema), + dataSourceId: v.dataSourceId, + }); + const dataSource = await this.getDataSourceById(v.dataSourceId); + const dataSourceName = dataSource?.attributes.title; + result.data = { + url: { + '%type%': 'ppl', + body: { query: v.ppl }, + data_source_name: dataSourceName, + }, + }; + return { ...v, vega: result }; + } + + async text2vega({ + inputQuestion, + inputInstruction = '', + ppl, + sampleData, + dataSchema, + dataSourceId, + }: { + inputQuestion: string; + inputInstruction?: string; + ppl: string; + sampleData: string; + dataSchema: string; + dataSourceId?: string; + }) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const escapeField = (json: any, field: string) => { + if (json[field]) { + // Only escape field which name is 'field' + if (typeof json[field] === 'string' && field === 'field') { + json[field] = json[field].replace(/\./g, '\\.'); + } + if (typeof json[field] === 'object') { + Object.keys(json[field]).forEach((p) => { + escapeField(json[field], p); + }); + } + } + }; + const res = await this.http.post(TEXT2VIZ_API.TEXT2VEGA, { + body: JSON.stringify({ + input_question: inputQuestion.trim(), + input_instruction: inputInstruction.trim(), + ppl, + sampleData: JSON.stringify(sampleData), + dataSchema: JSON.stringify(dataSchema), + }), + query: { dataSourceId }, + }); + + // need to escape field: geo.city -> field: geo\\.city + escapeField(res, 'encoding'); + escapeField(res, 'layer'); + return res; + } + + async getDataSourceById(id?: string) { + if (!id) { + return null; + } + + const res = await this.savedObjects.client.get('data-source', id); + if (res.error) { + return null; + } + return res; + } +}