Skip to content

Commit

Permalink
rename Operator -> Task
Browse files Browse the repository at this point in the history
Signed-off-by: Yulong Ruan <[email protected]>
  • Loading branch information
ruanyl committed Dec 26, 2024
1 parent 15cfb51 commit 9bd7e17
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 24 deletions.
12 changes: 6 additions & 6 deletions public/components/visualization/text2viz.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import { useCallback } from 'react';
import { useObservable } from 'react-use';
import { useLocation, useParams } from 'react-router-dom';
import { Pipeline } from '../../utils/pipeline/pipeline';
import { Text2PPLOperator } from '../../utils/pipeline/text_to_ppl_operator';
import { PPLSampleOperator } from '../../utils/pipeline/ppl_sample_operator';
import { Text2PPLTask } from '../../utils/pipeline/text_to_ppl_task';
import { PPLSampleTask } from '../../utils/pipeline/ppl_sample_task';
import { SourceSelector } from './source_selector';
import type { IndexPattern } from '../../../../../src/plugins/data/public';
import chatIcon from '../../assets/chat.svg';
Expand Down Expand Up @@ -54,7 +54,7 @@ import { HeaderVariant } from '../../../../../src/core/public';
import { TEXT2VEGA_INPUT_SIZE_LIMIT } from '../../../common/constants/llm';
import { FeedbackThumbs } from '../feedback_thumbs';
import { VizStyleEditor } from './viz_style_editor';
import { Text2VegaOperator } from '../../utils/pipeline/text_to_vega_operator';
import { Text2VegaTask } from '../../utils/pipeline/text_to_vega_task';

export const INDEX_PATTERN_URL_SEARCH_KEY = 'indexPatternId';
export const ASSISTANT_INPUT_URL_SEARCH_KEY = 'assistantInput';
Expand Down Expand Up @@ -109,9 +109,9 @@ export const Text2Viz = () => {

if (text2vegaRef.current === null) {
text2vegaRef.current = new Pipeline([
new Text2PPLOperator(http),
new PPLSampleOperator(data.search),
new Text2VegaOperator(http, savedObjects),
new Text2PPLTask(http),
new PPLSampleTask(data.search),
new Text2VegaTask(http, savedObjects),
]);
}

Expand Down
22 changes: 11 additions & 11 deletions public/utils/pipeline/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@
import { BehaviorSubject, Observable, Subject, of } from 'rxjs';
import { switchMap, tap, catchError } from 'rxjs/operators';

import { Operator } from './operator';
import { Task } from './task';

export class Pipeline {
input$ = new Subject<any>();
output$: Observable<any>;
status$ = new BehaviorSubject<'RUNNING' | 'STOPPED'>('STOPPED');

constructor(private readonly operators: Array<Operator<any, any>>) {
constructor(private readonly tasks: Array<Task<any, any>>) {
this.output$ = this.input$
.pipe(tap(() => this.status$.next('RUNNING')))
.pipe(
switchMap((value) => {
return this.operators
.reduce((acc$, operator) => {
return acc$.pipe(switchMap((result) => operator.execute(result)));
return this.tasks
.reduce((acc$, task) => {
return acc$.pipe(switchMap((result) => task.execute(result)));
}, of(value))
.pipe(catchError((e) => of({ error: e })));
})
Expand All @@ -31,23 +31,23 @@ export class Pipeline {

/**
* Triggers the pipeline execution by emitting a new input value.
* This will start the processing of the provided input value through the pipeline's operators,
* with each operator transforming the input in sequence. The resulting value will be emitted
* This will start the processing of the provided input value through the pipeline's tasks,
* with each task transforming the input in sequence. The resulting value will be emitted
* through the `output$` observable.
*/
run(input: any) {
this.input$.next(input);
}

/**
* Synchronously processes the provided input value through the pipeline's operators in sequence.
* This method bypasses the reactive pipeline and executes each operator one by one,
* Synchronously processes the provided input value through the pipeline's tasks in sequence.
* This method bypasses the reactive pipeline and executes each task one by one,
* it suitable for use cases where you need a one-time, imperative-style execution.
*/
async runOnce(input: any) {
let nextInput = input;
for (const operator of this.operators) {
nextInput = await operator.execute(nextInput);
for (const task of this.tasks) {
nextInput = await task.execute(nextInput);
}
return nextInput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { Operator } from './operator';
import { Task } from './task';
import { DataPublicPluginStart } from '../../../../../src/plugins/data/public';

interface Input {
Expand All @@ -14,7 +14,7 @@ interface Input {

const topN = (ppl: string, n: number = 2) => `${ppl} | head ${n}`;

export class PPLSampleOperator extends Operator<Input, Input & { sample: string }> {
export class PPLSampleTask extends Task<Input, Input & { sample: string }> {
searchClient: DataPublicPluginStart['search'];

constructor(searchClient: DataPublicPluginStart['search']) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

export abstract class Operator<T, P> {
export abstract class Task<T, P> {
abstract execute(v: T): Promise<P>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import { HttpSetup } from '../../../../../src/core/public';
import { Operator } from './operator';
import { Task } from './task';
import { TEXT2VIZ_API } from '../../../common/constants/llm';

interface Input {
Expand All @@ -13,7 +13,7 @@ interface Input {
dataSourceId?: string;
}

export class Text2PPLOperator extends Operator<Input, Input & { ppl: string }> {
export class Text2PPLTask extends Task<Input, Input & { ppl: string }> {
http: HttpSetup;

constructor(http: HttpSetup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { Operator } from './operator';
import { Task } from './task';
import { HttpSetup, SavedObjectsStart } from '../../../../../src/core/public';
import { TEXT2VIZ_API } from '../../../common/constants/llm';
import { DataSourceAttributes } from '../../../../../src/plugins/data_source/common/data_sources';
Expand All @@ -18,7 +18,7 @@ interface Input {
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class Text2VegaOperator extends Operator<Input, Input & { vega: any }> {
export class Text2VegaTask extends Task<Input, Input & { vega: any }> {
http: HttpSetup;
savedObjects: SavedObjectsStart;

Expand Down
3 changes: 3 additions & 0 deletions server/routes/summary_routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ export function registerData2SummaryRoutes(
});
}
} catch (e) {
console.log('Error: ', e);
console.log('status code: ', e.statusCode);
console.log('body: ', e.body);
context.assistant_plugin.logger.error('Execute agent failed!', e);
if (e.statusCode >= 400 && e.statusCode <= 499) {
return res.customError({
Expand Down

0 comments on commit 9bd7e17

Please sign in to comment.