diff --git a/server/langchain/tools/tool_sets/trace_tools/queries.ts b/server/langchain/tools/tool_sets/trace_tools/queries.ts new file mode 100644 index 00000000..f9d904dc --- /dev/null +++ b/server/langchain/tools/tool_sets/trace_tools/queries.ts @@ -0,0 +1,223 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { TraceAnalyticsMode } from '../../../utils/utils'; +import { OpenSearchClient } from '../../../../../../../src/core/server'; +import { TRACES_MAX_NUM } from '../../../../../common/constants/trace_analytics'; + +export async function getMode(opensearchClient: OpenSearchClient) { + const indexExistsResponse = await opensearchClient.indices.exists({ + index: 'otel-v1-apm-span-*', + }); + return indexExistsResponse ? 'data_prepper' : 'jaeger'; +} + +export const getDashboardQuery = () => { + return { + size: 0, + query: { + bool: { + must: [], + filter: [], + should: [], + must_not: [], + }, + }, + aggs: { + trace_group_name: { + terms: { + field: 'traceGroup', + size: 10000, + }, + aggs: { + average_latency: { + scripted_metric: { + init_script: 'state.traceIdToLatencyMap = [:];', + map_script: ` + if (doc.containsKey('traceGroupFields.durationInNanos') && !doc['traceGroupFields.durationInNanos'].empty) { + def traceId = doc['traceId'].value; + if (!state.traceIdToLatencyMap.containsKey(traceId)) { + state.traceIdToLatencyMap[traceId] = doc['traceGroupFields.durationInNanos'].value; + } + } + `, + combine_script: 'return state.traceIdToLatencyMap', + reduce_script: ` + def seenTraceIdsMap = [:]; + def totalLatency = 0.0; + def traceCount = 0.0; + + for (s in states) { + if (s == null) { + continue; + } + + for (entry in s.entrySet()) { + def traceId = entry.getKey(); + def traceLatency = entry.getValue(); + if (!seenTraceIdsMap.containsKey(traceId)) { + seenTraceIdsMap[traceId] = true; + totalLatency += traceLatency; + traceCount++; + } + } + } + + def average_latency_nanos = totalLatency / traceCount; + return Math.round(average_latency_nanos / 10000) / 100.0; + `, + }, + }, + trace_count: { + cardinality: { + field: 'traceId', + }, + }, + error_count: { + filter: { + term: { + 'traceGroupFields.statusCode': '2', + }, + }, + aggs: { + trace_count: { + cardinality: { + field: 'traceId', + }, + }, + }, + }, + error_rate: { + bucket_script: { + buckets_path: { + total: 'trace_count.value', + errors: 'error_count>trace_count.value', + }, + script: 'params.errors / params.total * 100', + }, + }, + }, + }, + }, + }; +}; + +export const getTracesQuery = (mode: TraceAnalyticsMode) => { + const jaegerQuery = { + size: 0, + query: { + bool: { + must: [], + filter: [], + should: [], + must_not: [], + }, + }, + aggs: { + traces: { + terms: { + field: 'traceID', + size: TRACES_MAX_NUM, + }, + aggs: { + latency: { + max: { + script: { + source: ` + if (doc.containsKey('duration') && !doc['duration'].empty) { + return Math.round(doc['duration'].value) / 1000.0 + } + + return 0 + `, + lang: 'painless', + }, + }, + }, + trace_group: { + terms: { + field: 'traceGroup', + size: 1, + }, + }, + error_count: { + filter: { + term: { + 'tag.error': true, + }, + }, + }, + last_updated: { + max: { + script: { + source: ` + if (doc.containsKey('startTime') && !doc['startTime'].empty && doc.containsKey('duration') && !doc['duration'].empty) { + return (Math.round(doc['duration'].value) + Math.round(doc['startTime'].value)) / 1000.0 + } + + return 0 + `, + lang: 'painless', + }, + }, + }, + }, + }, + }, + }; + const dataPrepperQuery = { + size: 0, + query: { + bool: { + must: [], + filter: [], + should: [], + must_not: [], + }, + }, + aggs: { + traces: { + terms: { + field: 'traceId', + size: TRACES_MAX_NUM, + }, + aggs: { + latency: { + max: { + script: { + source: ` + if (doc.containsKey('traceGroupFields.durationInNanos') && !doc['traceGroupFields.durationInNanos'].empty) { + return Math.round(doc['traceGroupFields.durationInNanos'].value / 10000) / 100.0 + } + return 0 + `, + lang: 'painless', + }, + }, + }, + trace_group: { + terms: { + field: 'traceGroup', + size: 1, + }, + }, + error_count: { + filter: { + term: { + 'traceGroupFields.statusCode': '2', + }, + }, + }, + last_updated: { + max: { + field: 'traceGroupFields.endTime', + }, + }, + }, + }, + }, + }; + return mode === 'jaeger' ? jaegerQuery : dataPrepperQuery; +}; diff --git a/server/langchain/tools/tool_sets/traces.ts b/server/langchain/tools/tool_sets/traces.ts new file mode 100644 index 00000000..f5e3e7d2 --- /dev/null +++ b/server/langchain/tools/tool_sets/traces.ts @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { DynamicTool } from 'langchain/tools'; +import { PluginToolsFactory } from '../tools_factory/tools_factory'; + +import { flatten, jsonToCsv, swallowErrors } from '../../utils/utils'; +import { getDashboardQuery, getMode, getTracesQuery } from './trace_tools/queries'; +import { + DATA_PREPPER_INDEX_NAME, + JAEGER_INDEX_NAME, +} from '../../../../common/constants/trace_analytics'; + +export class TracesTools extends PluginToolsFactory { + static TOOL_NAMES = { + TRACE_GROUPS: 'Get trace groups', + SERVICES: 'Get trace services', + TRACES: 'Get traces', + } as const; + + toolsList = [ + new DynamicTool({ + name: TracesTools.TOOL_NAMES.TRACE_GROUPS, + description: + 'Use this to get information about each trace group. The tool response includes the key, doc_count, average_latency.value, trace_count.value, error_count.doc_count, error_count.trace_count.value, and error_rate.value. The key is the name of the trace group, the doc_count is the number of spans, the average_latency.value is the average latency of the trace group, measured in milliseconds. The trace_count.value is the number of traces in the trace group. The error_count.doc_count is the number of spans in the trace groups with errors, while the error_count.trace_count.value is the number of different traces in the trace group with errors. The error_rate.value is the percentage of traces in the trace group that has at least one error. This tool takes in no inputs.', + func: swallowErrors(async () => this.getTraceGroups()), + callbacks: this.callbacks, + }), + new DynamicTool({ + name: TracesTools.TOOL_NAMES.TRACES, + description: + 'Use this to get information about each trace. The tool response includes the key, doc_count, last_updated.value, last_updated.value_as_string, error_count.doc_count, trace_group.doc_count_error_upper_bound, trace_group.sum_other_doc_count, trace_group.buckets.0.key, and trace_groups.buckets.0.doc_count. The key is the ID of the trace. The doc_count is the number of spans in that particular trace. The last_updated.value_as_string is the last time that the trace was updated. The error_count.doc_count is how many spans in that trace has errors. The trace group.buckets.1.key is what trace group the trace belongs to. The other fields are mostly irrelevant data. This tool takes in no inputs.', + func: swallowErrors(async () => this.getTraces()), + callbacks: this.callbacks, + }), + ]; + + public async getTraceGroups() { + const mode = await getMode(this.opensearchClient); + const query = getDashboardQuery(); + console.log(DATA_PREPPER_INDEX_NAME); + const traceGroupsResponse = await this.opensearchClient.search({ + index: DATA_PREPPER_INDEX_NAME, + body: query, + }); + const traceGroups = traceGroupsResponse.body.aggregations.trace_group_name.buckets; + return jsonToCsv(flatten(traceGroups)); + } + + public async getTraces() { + const mode = await getMode(this.opensearchClient); + const query = getTracesQuery(mode); + const tracesResponse = await this.opensearchClient.search({ + index: mode === 'data_prepper' ? DATA_PREPPER_INDEX_NAME : JAEGER_INDEX_NAME, + body: query, + }); + const traces = tracesResponse.body.aggregations.traces.buckets; + return jsonToCsv(flatten(traces)); + } +} diff --git a/server/langchain/tools/tools_helper.ts b/server/langchain/tools/tools_helper.ts index f184ac8f..a0124cfe 100644 --- a/server/langchain/tools/tools_helper.ts +++ b/server/langchain/tools/tools_helper.ts @@ -9,6 +9,7 @@ import { KnowledgeTools } from './tool_sets/knowledges'; import { OSAPITools } from './tool_sets/os_apis'; import { PPLTools } from './tool_sets/ppl'; import { SavedObjectsTools } from './tool_sets/saved_objects'; +import { TracesTools } from './tool_sets/traces'; export const initTools = ( // proper way to get parameters possibly needs typescript 4.2 https://github.com/microsoft/TypeScript/issues/35576 @@ -19,5 +20,6 @@ export const initTools = ( const knowledgeTools = new KnowledgeTools(...args); const opensearchTools = new OSAPITools(...args); const savedObjectsTools = new SavedObjectsTools(...args); - return [pplTools, alertingTools, knowledgeTools, opensearchTools, savedObjectsTools]; + const tracesTools = new TracesTools(...args); + return [pplTools, alertingTools, knowledgeTools, opensearchTools, savedObjectsTools, tracesTools]; }; diff --git a/server/langchain/utils/__tests__/utils.test.ts b/server/langchain/utils/__tests__/utils.test.ts index f4ee97e7..149e290f 100644 --- a/server/langchain/utils/__tests__/utils.test.ts +++ b/server/langchain/utils/__tests__/utils.test.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { jsonToCsv, swallowErrors } from '../utils'; +import { flatten, jsonToCsv, swallowErrors } from '../utils'; describe('swallow errors', () => { it('should swallow errors for sync functions', async () => { @@ -41,4 +41,29 @@ describe('utils', () => { const csv = jsonToCsv([]); expect(csv).toEqual('row_number\n'); }); + + it('flattens nested objects', () => { + const flattened = flatten([ + { + key1: { key2: 'value1' }, + key3: { + key4: 'value2', + key5: { key6: 'value3', key7: [{ key8: 'value4' }, { key9: 'value5' }] }, + }, + }, + { key10: { key11: 'value6' } }, + ]); + expect(flattened).toEqual([ + { + 'key1.key2': 'value1', + 'key3.key4': 'value2', + 'key3.key5.key6': 'value3', + 'key3.key5.key7.0.key8': 'value4', + 'key3.key5.key7.1.key9': 'value5', + }, + { + 'key10.key11': 'value6', + }, + ]); + }); }); diff --git a/server/langchain/utils/utils.ts b/server/langchain/utils/utils.ts index 1bec7514..00e40b09 100644 --- a/server/langchain/utils/utils.ts +++ b/server/langchain/utils/utils.ts @@ -39,3 +39,44 @@ export const jsonToCsv = (json: object[]) => { return csv; }; + +export const flatten = (response: Array>) => { + // Flattens each bucket in the response + for (const bucket in response) { + if (response.hasOwnProperty(bucket)) { + response[bucket] = flattenObject(response[bucket]); + } + } + return response; +}; + +function flattenObject(object: Record, prefix = '') { + const result: Record = {}; + + // Recursively flattens object if it's an object or an array + for (const key in object) { + if (object.hasOwnProperty(key)) { + const combinedKey = prefix ? `${prefix}.${key}` : key; + + if (typeof object[key] === 'object') { + if (Array.isArray(object[key])) { + for (let i = 0; i < object[key].length; i++) { + const nestedObject = flattenObject(object[key][i], `${combinedKey}.${i}`); + Object.assign(result, nestedObject); + } + } else { + const nestedObject = flattenObject( + object[key] as Record, + combinedKey + ); + Object.assign(result, nestedObject); + } + } else { + result[combinedKey] = object[key]; + } + } + } + return result; +} + +export type TraceAnalyticsMode = 'jaeger' | 'data_prepper';