From 267b5180891c1cdcd14edd6e8b38d0ed48f12291 Mon Sep 17 00:00:00 2001 From: Dan Dong <58446449+danieldong51@users.noreply.github.com> Date: Wed, 23 Aug 2023 15:27:26 -0700 Subject: [PATCH] Add Trace Analytics Data Prepper Tooling (#2) * Added traces tooling Signed-off-by: Daniel Dong * Started new trace group tool Signed-off-by: Daniel Dong * Changed query get methods to not import from public Signed-off-by: Daniel Dong * Added tools for getting traces and services Signed-off-by: Daniel Dong * Cleaned up comments + removed broken services query Signed-off-by: Daniel Dong * Removed getServices import Signed-off-by: Daniel Dong * Deleted comments Signed-off-by: Daniel Dong * Converted output from tools to be in csv rather than JSON format Signed-off-by: Daniel Dong * Created flattenObject to convert data into csv Signed-off-by: Daniel Dong * Deleted print statements Signed-off-by: Daniel Dong * Updated description of traces tool Signed-off-by: Daniel Dong * Added unit testing for flattening objects Signed-off-by: Daniel Dong * Updated calls to use opensearchClient.search Signed-off-by: Daniel Dong * Removed unnecessary export Signed-off-by: Daniel Dong * Changed description Signed-off-by: Daniel Dong --------- Signed-off-by: Daniel Dong Co-authored-by: Daniel Dong --- .../tools/tool_sets/trace_tools/queries.ts | 223 ++++++++++++++++++ server/langchain/tools/tool_sets/traces.ts | 62 +++++ server/langchain/tools/tools_helper.ts | 4 +- .../langchain/utils/__tests__/utils.test.ts | 27 ++- server/langchain/utils/utils.ts | 41 ++++ 5 files changed, 355 insertions(+), 2 deletions(-) create mode 100644 server/langchain/tools/tool_sets/trace_tools/queries.ts create mode 100644 server/langchain/tools/tool_sets/traces.ts diff --git a/server/langchain/tools/tool_sets/trace_tools/queries.ts b/server/langchain/tools/tool_sets/trace_tools/queries.ts new file mode 100644 index 00000000..f9d904dc --- /dev/null +++ b/server/langchain/tools/tool_sets/trace_tools/queries.ts @@ -0,0 +1,223 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { TraceAnalyticsMode } from '../../../utils/utils'; +import { OpenSearchClient } from '../../../../../../../src/core/server'; +import { TRACES_MAX_NUM } from '../../../../../common/constants/trace_analytics'; + +export async function getMode(opensearchClient: OpenSearchClient) { + const indexExistsResponse = await opensearchClient.indices.exists({ + index: 'otel-v1-apm-span-*', + }); + return indexExistsResponse ? 'data_prepper' : 'jaeger'; +} + +export const getDashboardQuery = () => { + return { + size: 0, + query: { + bool: { + must: [], + filter: [], + should: [], + must_not: [], + }, + }, + aggs: { + trace_group_name: { + terms: { + field: 'traceGroup', + size: 10000, + }, + aggs: { + average_latency: { + scripted_metric: { + init_script: 'state.traceIdToLatencyMap = [:];', + map_script: ` + if (doc.containsKey('traceGroupFields.durationInNanos') && !doc['traceGroupFields.durationInNanos'].empty) { + def traceId = doc['traceId'].value; + if (!state.traceIdToLatencyMap.containsKey(traceId)) { + state.traceIdToLatencyMap[traceId] = doc['traceGroupFields.durationInNanos'].value; + } + } + `, + combine_script: 'return state.traceIdToLatencyMap', + reduce_script: ` + def seenTraceIdsMap = [:]; + def totalLatency = 0.0; + def traceCount = 0.0; + + for (s in states) { + if (s == null) { + continue; + } + + for (entry in s.entrySet()) { + def traceId = entry.getKey(); + def traceLatency = entry.getValue(); + if (!seenTraceIdsMap.containsKey(traceId)) { + seenTraceIdsMap[traceId] = true; + totalLatency += traceLatency; + traceCount++; + } + } + } + + def average_latency_nanos = totalLatency / traceCount; + return Math.round(average_latency_nanos / 10000) / 100.0; + `, + }, + }, + trace_count: { + cardinality: { + field: 'traceId', + }, + }, + error_count: { + filter: { + term: { + 'traceGroupFields.statusCode': '2', + }, + }, + aggs: { + trace_count: { + cardinality: { + field: 'traceId', + }, + }, + }, + }, + error_rate: { + bucket_script: { + buckets_path: { + total: 'trace_count.value', + errors: 'error_count>trace_count.value', + }, + script: 'params.errors / params.total * 100', + }, + }, + }, + }, + }, + }; +}; + +export const getTracesQuery = (mode: TraceAnalyticsMode) => { + const jaegerQuery = { + size: 0, + query: { + bool: { + must: [], + filter: [], + should: [], + must_not: [], + }, + }, + aggs: { + traces: { + terms: { + field: 'traceID', + size: TRACES_MAX_NUM, + }, + aggs: { + latency: { + max: { + script: { + source: ` + if (doc.containsKey('duration') && !doc['duration'].empty) { + return Math.round(doc['duration'].value) / 1000.0 + } + + return 0 + `, + lang: 'painless', + }, + }, + }, + trace_group: { + terms: { + field: 'traceGroup', + size: 1, + }, + }, + error_count: { + filter: { + term: { + 'tag.error': true, + }, + }, + }, + last_updated: { + max: { + script: { + source: ` + if (doc.containsKey('startTime') && !doc['startTime'].empty && doc.containsKey('duration') && !doc['duration'].empty) { + return (Math.round(doc['duration'].value) + Math.round(doc['startTime'].value)) / 1000.0 + } + + return 0 + `, + lang: 'painless', + }, + }, + }, + }, + }, + }, + }; + const dataPrepperQuery = { + size: 0, + query: { + bool: { + must: [], + filter: [], + should: [], + must_not: [], + }, + }, + aggs: { + traces: { + terms: { + field: 'traceId', + size: TRACES_MAX_NUM, + }, + aggs: { + latency: { + max: { + script: { + source: ` + if (doc.containsKey('traceGroupFields.durationInNanos') && !doc['traceGroupFields.durationInNanos'].empty) { + return Math.round(doc['traceGroupFields.durationInNanos'].value / 10000) / 100.0 + } + return 0 + `, + lang: 'painless', + }, + }, + }, + trace_group: { + terms: { + field: 'traceGroup', + size: 1, + }, + }, + error_count: { + filter: { + term: { + 'traceGroupFields.statusCode': '2', + }, + }, + }, + last_updated: { + max: { + field: 'traceGroupFields.endTime', + }, + }, + }, + }, + }, + }; + return mode === 'jaeger' ? jaegerQuery : dataPrepperQuery; +}; diff --git a/server/langchain/tools/tool_sets/traces.ts b/server/langchain/tools/tool_sets/traces.ts new file mode 100644 index 00000000..f5e3e7d2 --- /dev/null +++ b/server/langchain/tools/tool_sets/traces.ts @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { DynamicTool } from 'langchain/tools'; +import { PluginToolsFactory } from '../tools_factory/tools_factory'; + +import { flatten, jsonToCsv, swallowErrors } from '../../utils/utils'; +import { getDashboardQuery, getMode, getTracesQuery } from './trace_tools/queries'; +import { + DATA_PREPPER_INDEX_NAME, + JAEGER_INDEX_NAME, +} from '../../../../common/constants/trace_analytics'; + +export class TracesTools extends PluginToolsFactory { + static TOOL_NAMES = { + TRACE_GROUPS: 'Get trace groups', + SERVICES: 'Get trace services', + TRACES: 'Get traces', + } as const; + + toolsList = [ + new DynamicTool({ + name: TracesTools.TOOL_NAMES.TRACE_GROUPS, + description: + 'Use this to get information about each trace group. The tool response includes the key, doc_count, average_latency.value, trace_count.value, error_count.doc_count, error_count.trace_count.value, and error_rate.value. The key is the name of the trace group, the doc_count is the number of spans, the average_latency.value is the average latency of the trace group, measured in milliseconds. The trace_count.value is the number of traces in the trace group. The error_count.doc_count is the number of spans in the trace groups with errors, while the error_count.trace_count.value is the number of different traces in the trace group with errors. The error_rate.value is the percentage of traces in the trace group that has at least one error. This tool takes in no inputs.', + func: swallowErrors(async () => this.getTraceGroups()), + callbacks: this.callbacks, + }), + new DynamicTool({ + name: TracesTools.TOOL_NAMES.TRACES, + description: + 'Use this to get information about each trace. The tool response includes the key, doc_count, last_updated.value, last_updated.value_as_string, error_count.doc_count, trace_group.doc_count_error_upper_bound, trace_group.sum_other_doc_count, trace_group.buckets.0.key, and trace_groups.buckets.0.doc_count. The key is the ID of the trace. The doc_count is the number of spans in that particular trace. The last_updated.value_as_string is the last time that the trace was updated. The error_count.doc_count is how many spans in that trace has errors. The trace group.buckets.1.key is what trace group the trace belongs to. The other fields are mostly irrelevant data. This tool takes in no inputs.', + func: swallowErrors(async () => this.getTraces()), + callbacks: this.callbacks, + }), + ]; + + public async getTraceGroups() { + const mode = await getMode(this.opensearchClient); + const query = getDashboardQuery(); + console.log(DATA_PREPPER_INDEX_NAME); + const traceGroupsResponse = await this.opensearchClient.search({ + index: DATA_PREPPER_INDEX_NAME, + body: query, + }); + const traceGroups = traceGroupsResponse.body.aggregations.trace_group_name.buckets; + return jsonToCsv(flatten(traceGroups)); + } + + public async getTraces() { + const mode = await getMode(this.opensearchClient); + const query = getTracesQuery(mode); + const tracesResponse = await this.opensearchClient.search({ + index: mode === 'data_prepper' ? DATA_PREPPER_INDEX_NAME : JAEGER_INDEX_NAME, + body: query, + }); + const traces = tracesResponse.body.aggregations.traces.buckets; + return jsonToCsv(flatten(traces)); + } +} diff --git a/server/langchain/tools/tools_helper.ts b/server/langchain/tools/tools_helper.ts index f184ac8f..a0124cfe 100644 --- a/server/langchain/tools/tools_helper.ts +++ b/server/langchain/tools/tools_helper.ts @@ -9,6 +9,7 @@ import { KnowledgeTools } from './tool_sets/knowledges'; import { OSAPITools } from './tool_sets/os_apis'; import { PPLTools } from './tool_sets/ppl'; import { SavedObjectsTools } from './tool_sets/saved_objects'; +import { TracesTools } from './tool_sets/traces'; export const initTools = ( // proper way to get parameters possibly needs typescript 4.2 https://github.com/microsoft/TypeScript/issues/35576 @@ -19,5 +20,6 @@ export const initTools = ( const knowledgeTools = new KnowledgeTools(...args); const opensearchTools = new OSAPITools(...args); const savedObjectsTools = new SavedObjectsTools(...args); - return [pplTools, alertingTools, knowledgeTools, opensearchTools, savedObjectsTools]; + const tracesTools = new TracesTools(...args); + return [pplTools, alertingTools, knowledgeTools, opensearchTools, savedObjectsTools, tracesTools]; }; diff --git a/server/langchain/utils/__tests__/utils.test.ts b/server/langchain/utils/__tests__/utils.test.ts index f4ee97e7..149e290f 100644 --- a/server/langchain/utils/__tests__/utils.test.ts +++ b/server/langchain/utils/__tests__/utils.test.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { jsonToCsv, swallowErrors } from '../utils'; +import { flatten, jsonToCsv, swallowErrors } from '../utils'; describe('swallow errors', () => { it('should swallow errors for sync functions', async () => { @@ -41,4 +41,29 @@ describe('utils', () => { const csv = jsonToCsv([]); expect(csv).toEqual('row_number\n'); }); + + it('flattens nested objects', () => { + const flattened = flatten([ + { + key1: { key2: 'value1' }, + key3: { + key4: 'value2', + key5: { key6: 'value3', key7: [{ key8: 'value4' }, { key9: 'value5' }] }, + }, + }, + { key10: { key11: 'value6' } }, + ]); + expect(flattened).toEqual([ + { + 'key1.key2': 'value1', + 'key3.key4': 'value2', + 'key3.key5.key6': 'value3', + 'key3.key5.key7.0.key8': 'value4', + 'key3.key5.key7.1.key9': 'value5', + }, + { + 'key10.key11': 'value6', + }, + ]); + }); }); diff --git a/server/langchain/utils/utils.ts b/server/langchain/utils/utils.ts index 1bec7514..00e40b09 100644 --- a/server/langchain/utils/utils.ts +++ b/server/langchain/utils/utils.ts @@ -39,3 +39,44 @@ export const jsonToCsv = (json: object[]) => { return csv; }; + +export const flatten = (response: Array>) => { + // Flattens each bucket in the response + for (const bucket in response) { + if (response.hasOwnProperty(bucket)) { + response[bucket] = flattenObject(response[bucket]); + } + } + return response; +}; + +function flattenObject(object: Record, prefix = '') { + const result: Record = {}; + + // Recursively flattens object if it's an object or an array + for (const key in object) { + if (object.hasOwnProperty(key)) { + const combinedKey = prefix ? `${prefix}.${key}` : key; + + if (typeof object[key] === 'object') { + if (Array.isArray(object[key])) { + for (let i = 0; i < object[key].length; i++) { + const nestedObject = flattenObject(object[key][i], `${combinedKey}.${i}`); + Object.assign(result, nestedObject); + } + } else { + const nestedObject = flattenObject( + object[key] as Record, + combinedKey + ); + Object.assign(result, nestedObject); + } + } else { + result[combinedKey] = object[key]; + } + } + } + return result; +} + +export type TraceAnalyticsMode = 'jaeger' | 'data_prepper';