Skip to content

Commit

Permalink
Add Trace Analytics Data Prepper Tooling (opensearch-project#2)
Browse files Browse the repository at this point in the history
* Added traces tooling

Signed-off-by: Daniel Dong <[email protected]>

* Started new trace group tool

Signed-off-by: Daniel Dong <[email protected]>

* Changed query get methods to not import from public

Signed-off-by: Daniel Dong <[email protected]>

* Added tools for getting traces and services

Signed-off-by: Daniel Dong <[email protected]>

* Cleaned up comments + removed broken services query

Signed-off-by: Daniel Dong <[email protected]>

* Removed getServices import

Signed-off-by: Daniel Dong <[email protected]>

* Deleted comments

Signed-off-by: Daniel Dong <[email protected]>

* Converted output from tools to be in csv rather than JSON format

Signed-off-by: Daniel Dong <[email protected]>

* Created flattenObject to convert data into csv

Signed-off-by: Daniel Dong <[email protected]>

* Deleted print statements

Signed-off-by: Daniel Dong <[email protected]>

* Updated description of traces tool

Signed-off-by: Daniel Dong <[email protected]>

* Added unit testing for flattening objects

Signed-off-by: Daniel Dong <[email protected]>

* Updated calls to use opensearchClient.search

Signed-off-by: Daniel Dong <[email protected]>

* Removed unnecessary export

Signed-off-by: Daniel Dong <[email protected]>

* Changed description

Signed-off-by: Daniel Dong <[email protected]>

---------

Signed-off-by: Daniel Dong <[email protected]>
Co-authored-by: Daniel Dong <[email protected]>
  • Loading branch information
danieldong51 and Daniel Dong authored Aug 23, 2023
1 parent b7b9770 commit 267b518
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 2 deletions.
223 changes: 223 additions & 0 deletions server/langchain/tools/tool_sets/trace_tools/queries.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { TraceAnalyticsMode } from '../../../utils/utils';
import { OpenSearchClient } from '../../../../../../../src/core/server';
import { TRACES_MAX_NUM } from '../../../../../common/constants/trace_analytics';

export async function getMode(opensearchClient: OpenSearchClient) {
const indexExistsResponse = await opensearchClient.indices.exists({
index: 'otel-v1-apm-span-*',
});
return indexExistsResponse ? 'data_prepper' : 'jaeger';
}

export const getDashboardQuery = () => {
return {
size: 0,
query: {
bool: {
must: [],
filter: [],
should: [],
must_not: [],
},
},
aggs: {
trace_group_name: {
terms: {
field: 'traceGroup',
size: 10000,
},
aggs: {
average_latency: {
scripted_metric: {
init_script: 'state.traceIdToLatencyMap = [:];',
map_script: `
if (doc.containsKey('traceGroupFields.durationInNanos') && !doc['traceGroupFields.durationInNanos'].empty) {
def traceId = doc['traceId'].value;
if (!state.traceIdToLatencyMap.containsKey(traceId)) {
state.traceIdToLatencyMap[traceId] = doc['traceGroupFields.durationInNanos'].value;
}
}
`,
combine_script: 'return state.traceIdToLatencyMap',
reduce_script: `
def seenTraceIdsMap = [:];
def totalLatency = 0.0;
def traceCount = 0.0;
for (s in states) {
if (s == null) {
continue;
}
for (entry in s.entrySet()) {
def traceId = entry.getKey();
def traceLatency = entry.getValue();
if (!seenTraceIdsMap.containsKey(traceId)) {
seenTraceIdsMap[traceId] = true;
totalLatency += traceLatency;
traceCount++;
}
}
}
def average_latency_nanos = totalLatency / traceCount;
return Math.round(average_latency_nanos / 10000) / 100.0;
`,
},
},
trace_count: {
cardinality: {
field: 'traceId',
},
},
error_count: {
filter: {
term: {
'traceGroupFields.statusCode': '2',
},
},
aggs: {
trace_count: {
cardinality: {
field: 'traceId',
},
},
},
},
error_rate: {
bucket_script: {
buckets_path: {
total: 'trace_count.value',
errors: 'error_count>trace_count.value',
},
script: 'params.errors / params.total * 100',
},
},
},
},
},
};
};

export const getTracesQuery = (mode: TraceAnalyticsMode) => {
const jaegerQuery = {
size: 0,
query: {
bool: {
must: [],
filter: [],
should: [],
must_not: [],
},
},
aggs: {
traces: {
terms: {
field: 'traceID',
size: TRACES_MAX_NUM,
},
aggs: {
latency: {
max: {
script: {
source: `
if (doc.containsKey('duration') && !doc['duration'].empty) {
return Math.round(doc['duration'].value) / 1000.0
}
return 0
`,
lang: 'painless',
},
},
},
trace_group: {
terms: {
field: 'traceGroup',
size: 1,
},
},
error_count: {
filter: {
term: {
'tag.error': true,
},
},
},
last_updated: {
max: {
script: {
source: `
if (doc.containsKey('startTime') && !doc['startTime'].empty && doc.containsKey('duration') && !doc['duration'].empty) {
return (Math.round(doc['duration'].value) + Math.round(doc['startTime'].value)) / 1000.0
}
return 0
`,
lang: 'painless',
},
},
},
},
},
},
};
const dataPrepperQuery = {
size: 0,
query: {
bool: {
must: [],
filter: [],
should: [],
must_not: [],
},
},
aggs: {
traces: {
terms: {
field: 'traceId',
size: TRACES_MAX_NUM,
},
aggs: {
latency: {
max: {
script: {
source: `
if (doc.containsKey('traceGroupFields.durationInNanos') && !doc['traceGroupFields.durationInNanos'].empty) {
return Math.round(doc['traceGroupFields.durationInNanos'].value / 10000) / 100.0
}
return 0
`,
lang: 'painless',
},
},
},
trace_group: {
terms: {
field: 'traceGroup',
size: 1,
},
},
error_count: {
filter: {
term: {
'traceGroupFields.statusCode': '2',
},
},
},
last_updated: {
max: {
field: 'traceGroupFields.endTime',
},
},
},
},
},
};
return mode === 'jaeger' ? jaegerQuery : dataPrepperQuery;
};
62 changes: 62 additions & 0 deletions server/langchain/tools/tool_sets/traces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { DynamicTool } from 'langchain/tools';
import { PluginToolsFactory } from '../tools_factory/tools_factory';

import { flatten, jsonToCsv, swallowErrors } from '../../utils/utils';
import { getDashboardQuery, getMode, getTracesQuery } from './trace_tools/queries';
import {
DATA_PREPPER_INDEX_NAME,
JAEGER_INDEX_NAME,
} from '../../../../common/constants/trace_analytics';

export class TracesTools extends PluginToolsFactory {
static TOOL_NAMES = {
TRACE_GROUPS: 'Get trace groups',
SERVICES: 'Get trace services',
TRACES: 'Get traces',
} as const;

toolsList = [
new DynamicTool({
name: TracesTools.TOOL_NAMES.TRACE_GROUPS,
description:
'Use this to get information about each trace group. The tool response includes the key, doc_count, average_latency.value, trace_count.value, error_count.doc_count, error_count.trace_count.value, and error_rate.value. The key is the name of the trace group, the doc_count is the number of spans, the average_latency.value is the average latency of the trace group, measured in milliseconds. The trace_count.value is the number of traces in the trace group. The error_count.doc_count is the number of spans in the trace groups with errors, while the error_count.trace_count.value is the number of different traces in the trace group with errors. The error_rate.value is the percentage of traces in the trace group that has at least one error. This tool takes in no inputs.',
func: swallowErrors(async () => this.getTraceGroups()),
callbacks: this.callbacks,
}),
new DynamicTool({
name: TracesTools.TOOL_NAMES.TRACES,
description:
'Use this to get information about each trace. The tool response includes the key, doc_count, last_updated.value, last_updated.value_as_string, error_count.doc_count, trace_group.doc_count_error_upper_bound, trace_group.sum_other_doc_count, trace_group.buckets.0.key, and trace_groups.buckets.0.doc_count. The key is the ID of the trace. The doc_count is the number of spans in that particular trace. The last_updated.value_as_string is the last time that the trace was updated. The error_count.doc_count is how many spans in that trace has errors. The trace group.buckets.1.key is what trace group the trace belongs to. The other fields are mostly irrelevant data. This tool takes in no inputs.',
func: swallowErrors(async () => this.getTraces()),
callbacks: this.callbacks,
}),
];

public async getTraceGroups() {
const mode = await getMode(this.opensearchClient);
const query = getDashboardQuery();
console.log(DATA_PREPPER_INDEX_NAME);
const traceGroupsResponse = await this.opensearchClient.search({
index: DATA_PREPPER_INDEX_NAME,
body: query,
});
const traceGroups = traceGroupsResponse.body.aggregations.trace_group_name.buckets;
return jsonToCsv(flatten(traceGroups));
}

public async getTraces() {
const mode = await getMode(this.opensearchClient);
const query = getTracesQuery(mode);
const tracesResponse = await this.opensearchClient.search({
index: mode === 'data_prepper' ? DATA_PREPPER_INDEX_NAME : JAEGER_INDEX_NAME,
body: query,
});
const traces = tracesResponse.body.aggregations.traces.buckets;
return jsonToCsv(flatten(traces));
}
}
4 changes: 3 additions & 1 deletion server/langchain/tools/tools_helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { KnowledgeTools } from './tool_sets/knowledges';
import { OSAPITools } from './tool_sets/os_apis';
import { PPLTools } from './tool_sets/ppl';
import { SavedObjectsTools } from './tool_sets/saved_objects';
import { TracesTools } from './tool_sets/traces';

export const initTools = (
// proper way to get parameters possibly needs typescript 4.2 https://github.com/microsoft/TypeScript/issues/35576
Expand All @@ -19,5 +20,6 @@ export const initTools = (
const knowledgeTools = new KnowledgeTools(...args);
const opensearchTools = new OSAPITools(...args);
const savedObjectsTools = new SavedObjectsTools(...args);
return [pplTools, alertingTools, knowledgeTools, opensearchTools, savedObjectsTools];
const tracesTools = new TracesTools(...args);
return [pplTools, alertingTools, knowledgeTools, opensearchTools, savedObjectsTools, tracesTools];
};
27 changes: 26 additions & 1 deletion server/langchain/utils/__tests__/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { jsonToCsv, swallowErrors } from '../utils';
import { flatten, jsonToCsv, swallowErrors } from '../utils';

describe('swallow errors', () => {
it('should swallow errors for sync functions', async () => {
Expand Down Expand Up @@ -41,4 +41,29 @@ describe('utils', () => {
const csv = jsonToCsv([]);
expect(csv).toEqual('row_number\n');
});

it('flattens nested objects', () => {
const flattened = flatten([
{
key1: { key2: 'value1' },
key3: {
key4: 'value2',
key5: { key6: 'value3', key7: [{ key8: 'value4' }, { key9: 'value5' }] },
},
},
{ key10: { key11: 'value6' } },
]);
expect(flattened).toEqual([
{
'key1.key2': 'value1',
'key3.key4': 'value2',
'key3.key5.key6': 'value3',
'key3.key5.key7.0.key8': 'value4',
'key3.key5.key7.1.key9': 'value5',
},
{
'key10.key11': 'value6',
},
]);
});
});
41 changes: 41 additions & 0 deletions server/langchain/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,44 @@ export const jsonToCsv = (json: object[]) => {

return csv;
};

export const flatten = (response: Array<Record<string, string | object>>) => {
// Flattens each bucket in the response
for (const bucket in response) {
if (response.hasOwnProperty(bucket)) {
response[bucket] = flattenObject(response[bucket]);
}
}
return response;
};

function flattenObject(object: Record<string, unknown>, prefix = '') {
const result: Record<string, string> = {};

// Recursively flattens object if it's an object or an array
for (const key in object) {
if (object.hasOwnProperty(key)) {
const combinedKey = prefix ? `${prefix}.${key}` : key;

if (typeof object[key] === 'object') {
if (Array.isArray(object[key])) {
for (let i = 0; i < object[key].length; i++) {
const nestedObject = flattenObject(object[key][i], `${combinedKey}.${i}`);
Object.assign(result, nestedObject);
}
} else {
const nestedObject = flattenObject(
object[key] as Record<string, string | object>,
combinedKey
);
Object.assign(result, nestedObject);
}
} else {
result[combinedKey] = object[key];
}
}
}
return result;
}

export type TraceAnalyticsMode = 'jaeger' | 'data_prepper';

0 comments on commit 267b518

Please sign in to comment.