From 39cbbf4ec7916a997f9ab48ccc70b47c4c03226b Mon Sep 17 00:00:00 2001 From: Damien de Lemeny Date: Wed, 28 Feb 2024 14:11:32 -0500 Subject: [PATCH] Add logs query repagination mixin (WIP) --- src/datasource/index.ts | 2 + src/datasource/queryLimits.ts | 145 ++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 src/datasource/queryLimits.ts diff --git a/src/datasource/index.ts b/src/datasource/index.ts index f818a21..c3c980c 100644 --- a/src/datasource/index.ts +++ b/src/datasource/index.ts @@ -2,10 +2,12 @@ import { BaseQuickwitDataSource } from './base'; import { withSupplementaryQueries } from './supplementaryQueries'; import { withLogContext } from './logsContext'; +import { withSizeLimitedLogsRequests } from './queryLimits'; const mixins = [ withLogContext, withSupplementaryQueries, + withSizeLimitedLogsRequests(100), ] const qwds = mixins.reduce(( qwds, fn) => fn(qwds), BaseQuickwitDataSource) export class QuickwitDataSource extends qwds { diff --git a/src/datasource/queryLimits.ts b/src/datasource/queryLimits.ts new file mode 100644 index 0000000..924a5fb --- /dev/null +++ b/src/datasource/queryLimits.ts @@ -0,0 +1,145 @@ +import { DataFrame, DataQueryRequest, DataQueryResponse } from "@grafana/data"; +import { Observable,pipe, map, from, toArray, lastValueFrom } from "rxjs"; +import { BaseQuickwitDataSourceConstructor } from "./base"; +import { ElasticsearchQuery, MetricAggregationWithSettings } from "types"; + +// DataQueryRequest modifiers + +function limitLogRequest(request: DataQueryRequest, limit: number){ + request.targets = request.targets.map((t)=>{ + if (t.metrics){ + t.metrics = t.metrics.map((m)=>{ + if (m.type === 'logs'){ + m.settings = {...m.settings, limit: limit.toString()} + } + return m + }) + } + return t + }) + return request; +} + +function addSearchAfter(request: DataQueryRequest, searchAfterValues: {[key: string]: any}) { + request.targets = request.targets.map((t)=>{ + if (t.metrics){ + const metricAgg = t.metrics[0] as MetricAggregationWithSettings + metricAgg.settings = {...metricAgg.settings, searchAfter: searchAfterValues[t.refId]} + } + return t + }) + return request +} + +function getSearchAfterValues(response: DataQueryResponse){ + const searchAfterValues: {[key: string]: any} = {}; + response.data.forEach((df: DataFrame)=>{ + if (df.meta?.custom){ + const sortValues = df.fields.find(f=>f.name==='sort')?.values + if (sortValues && sortValues.length > 0) { + searchAfterValues[df.refId!] = sortValues[sortValues.length -1] + } + } + }) + return searchAfterValues +} + +function getResponseWithNextRequest(request: DataQueryRequest, limit: number) { + return pipe(map((response: DataQueryResponse)=>{ + let next: DataQueryRequest | null = null; + + const searchAfterValues = getSearchAfterValues(response); + if (Object.entries(searchAfterValues).length > 0){ + next = addSearchAfter(limitLogRequest(request, limit), searchAfterValues) + console.log("Next request", next) + } + return{response, next} + })) +} + +// DataQueryResponses dataframes merging + +function mergeResponses(responses: DataQueryResponse[]): DataQueryResponse{ + + const mergedPartial: {[key: string]: DataFrame} = {} + responses.reduce((built, response: DataQueryResponse)=>{ + const newDataFrames = response.data + newDataFrames.forEach((newdf: DataFrame)=>{ + if (!newdf.refId) { console.warn("Can't process dataframes without refId"); return } + if (newdf.length === 0) {return} // Can't merge empty + + let builtdf; + if ((builtdf = built[newdf.refId]) === undefined) { + built[newdf.refId] = newdf + } + else { + extendDataFrame(builtdf, newdf) + } + }) + return built + }, mergedPartial) + const finalResponse: DataQueryResponse = { + ...responses[0], + data:Object.values(mergedPartial) + } + console.log("Final Response", finalResponse) + return finalResponse +} + +function extendDataFrame(base: DataFrame, appendix: DataFrame) { + base.length += appendix.length + base.fields.forEach((baseField)=>{ + const sameField = appendix.fields.find(apdxField=>apdxField.name === baseField.name)! + baseField.values = [...baseField.values, ...sameField.values] + if (baseField.nanos){ + baseField.nanos = [...baseField.nanos, ...sameField.nanos!] + } + }) + return base +} + + +const DEFAULT_LIMIT = 100; + +// Datasource mixin + +export function withSizeLimitedLogsRequests(limit=DEFAULT_LIMIT){ + return function (Base: TBase){ + return class extends Base { + getLimitedRequestIterable(request: DataQueryRequest){ + const doQuery = (request: DataQueryRequest)=>{ + return super.query(request) + } + return { + [Symbol.asyncIterator](): AsyncIterator { + let nextRequest: DataQueryRequest | null = limitLogRequest(request, limit); + return { + next() { + if (!nextRequest) {return Promise.resolve({done:true, value:null})} + return lastValueFrom(doQuery(nextRequest).pipe( + getResponseWithNextRequest(request, limit), + map((res) => { + nextRequest = res.next + return {done: false, value: res.response} + }) + )) + } + } + } + } + } + + query(request: DataQueryRequest): Observable{ + /** Query fixed-size pages of logs until the selected timerange is exhausted + */ + + const metrics = request.targets[0].metrics; + if (metrics && metrics[0].type !== 'logs') { + return super.query(request) + } + const limitedQueries = from(this.getLimitedRequestIterable(request)) + return limitedQueries.pipe(toArray(), map(mergeResponses)) + } + } + } +}