From 6b70340595674866ed58c18f9727300560b43f42 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 10 Jun 2024 17:41:05 +0300 Subject: [PATCH 1/7] Traceql DNF optimization init --- test/e2e | 2 +- .../clickhouse_transpiler/attr_condition.js | 58 +++-- .../attr_condition_eval.js | 4 +- traceql/clickhouse_transpiler/index.js | 222 ++++++++++++++++-- traceql/clickhouse_transpiler/init.js | 1 + traceql/index.js | 31 ++- 6 files changed, 265 insertions(+), 53 deletions(-) diff --git a/test/e2e b/test/e2e index 8847ca00..a2203f48 160000 --- a/test/e2e +++ b/test/e2e @@ -1 +1 @@ -Subproject commit 8847ca00a0deda194008bb2d8dccc02d879267d2 +Subproject commit a2203f48234322d703b60dc1478a531771024467 diff --git a/traceql/clickhouse_transpiler/attr_condition.js b/traceql/clickhouse_transpiler/attr_condition.js index 131cacd6..6831b4a8 100644 --- a/traceql/clickhouse_transpiler/attr_condition.js +++ b/traceql/clickhouse_transpiler/attr_condition.js @@ -3,6 +3,7 @@ const Sql = require('@cloki/clickhouse-sql') module.exports = class Builder { constructor () { this.main = null + this.precondition = null this.terms = [] this.conds = null this.aggregatedAttr = '' @@ -51,6 +52,11 @@ module.exports = class Builder { return this } + withPrecondition (precondition) { + this.precondition = precondition + return this + } + /** * @returns {ProcessFn} */ @@ -58,7 +64,10 @@ module.exports = class Builder { const self = this /** @type {BuiltProcessFn} */ const res = (ctx) => { - const sel = this.main(ctx) + const sel = self.main(ctx) + const withPreconditionSel = self.precondition + ? new Sql.With('precond', self.buildPrecondition(ctx)) + : null self.alias = 'bsCond' for (const term of self.terms) { const sqlTerm = self.getTerm(term) @@ -83,37 +92,52 @@ module.exports = class Builder { sel.conditions, Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1]))) } + if (withPreconditionSel) { + sel.with(withPreconditionSel) + sel.conditions = Sql.And( + sel.conditions, + new Sql.In(new Sql.Raw('(trace_id, span_id)'), 'in', new Sql.WithReference(withPreconditionSel))) + } sel.having(having) return sel } return res } + buildPrecondition (ctx) { + if (!this.precondition) { + return null + } + const sel = this.precondition(ctx) + sel.select_list = sel.select_list.filter(x => Array.isArray(x) && (x[1] === 'trace_id' || x[1] === 'span_id')) + sel.order_expressions = [] + return sel + } + /** * @typedef {{simpleIdx: number, op: string, complex: [Condition]}} Condition */ /** - * @param c {Condition} + * @param c {Token || [any]} */ getCond (c) { - if (c.simpleIdx === -1) { - const subs = [] - for (const s of c.complex) { - subs.push(this.getCond(s)) - } - switch (c.op) { - case '&&': - return Sql.And(...subs) - case '||': - return Sql.Or(...subs) + if (c.name) { + let left = new Sql.Raw(this.alias) + if (!this.isAliased) { + left = groupBitOr(bitSet(this.sqlConditions), this.alias) } - throw new Error(`unsupported condition operator ${c.op}`) + const termIdx = this.terms.findIndex(x => x.value === c.value) + return Sql.Ne(bitAnd(left, new Sql.Raw((BigInt(1) << BigInt(termIdx)).toString())), Sql.val(0)) } - let left = new Sql.Raw(this.alias) - if (!this.isAliased) { - left = groupBitOr(bitSet(this.sqlConditions), this.alias) + const op = c[0] + const subs = c.slice(1).map(x => this.getCond(x)) + switch (op) { + case '&&': + return Sql.And(...subs) + case '||': + return Sql.Or(...subs) } - return Sql.Ne(bitAnd(left, new Sql.Raw((BigInt(1) << BigInt(c.simpleIdx)).toString())), Sql.val(0)) + throw new Error(`unsupported condition operator ${c.op}`) } /** diff --git a/traceql/clickhouse_transpiler/attr_condition_eval.js b/traceql/clickhouse_transpiler/attr_condition_eval.js index 43cf2a02..2512ebc0 100644 --- a/traceql/clickhouse_transpiler/attr_condition_eval.js +++ b/traceql/clickhouse_transpiler/attr_condition_eval.js @@ -1,5 +1,5 @@ const attrCondition = require('./attr_condition') -const {bitSet} = require('./shared') +const { bitSet } = require('./shared') const Sql = require('@cloki/clickhouse-sql') module.exports = class Builder extends attrCondition { build () { @@ -10,7 +10,7 @@ module.exports = class Builder extends attrCondition { const sel = superBuild(ctx) sel.having_conditions = [] sel.aggregations = [bitSet(self.sqlConditions)] - sel.select_list = [[new Sql.Raw('count()'), 'count']] + sel.select_list = [[bitSet(self.sqlConditions), 'cond'], [new Sql.Raw('count()'), 'count']] sel.order_expressions = [] return sel } diff --git a/traceql/clickhouse_transpiler/index.js b/traceql/clickhouse_transpiler/index.js index f34c009f..44b07ea8 100644 --- a/traceql/clickhouse_transpiler/index.js +++ b/traceql/clickhouse_transpiler/index.js @@ -5,33 +5,42 @@ const IndexGroupByPlanner = require('./group_by') const AggregatorPlanner = require('./aggregator') const IndexLimitPlanner = require('./limit') const TracesDataPlanner = require('./traces_data') +const { th } = require('date-fns/locale') /** * @param script {Token} */ module.exports.transpile = (script) => { - return new Planner(script).plan() + return new module.exports.Planner(script).plan() } /** * @param script {Token} */ module.exports.evaluateCmpl = (script) => { - return new Planner(script).planEval() + return new module.exports.Planner(script).planEval() } -class Planner { +module.exports.Planner = class Planner { /** * * @param script {Token} */ constructor (script) { this.script = script - this.termIdx = [] this.cond = null + this.terms = {} + this.termIdx = [] + + this.eval = null + + this.preCond = null + this.preCondTerms = {} + this.precondTermsIdx = [] + this.aggregatedAttr = '' this.cmpVal = '' - this.terms = {} + this.aggFn = '' } @@ -43,7 +52,14 @@ class Planner { .withConditions(this.cond) .withAggregatedAttr(this.aggregatedAttr) .withMain((new InitIndexPlanner()).build()) - .build() + if (this.preCond) { + const preCond = (new AttrConditionPlanner()) + .withTerms(this.precondTermsIdx) + .withConditions(this.preCond) + .withMain((new InitIndexPlanner()).build()) + res = res.withPrecondition(preCond.build()) + } + res = res.build() res = (new IndexGroupByPlanner()).withMain(res).build() if (this.aggFn) { res = (new AggregatorPlanner()) @@ -74,6 +90,35 @@ class Planner { return res } + setEvaluationResult (result) { + this.eval = {} + for (const row of result) { + this.eval[row.cond] = row.count + } + } + + minify () { + const subcost = {} + for (let i = 0; i < this.termIdx.length; i++) { + subcost[this.termIdx[i].value] = Object.entries(this.eval) + .find(x => parseInt(x[0]) === i + 1) + subcost[this.termIdx[i].value] = subcost[this.termIdx[i].value] + ? parseInt(subcost[this.termIdx[i].value][1]) + : 0 + } + if (!this.isDNF(this.cond)) { + return this.estimateCost(this.cond, subcost) + } + this.preCond = this.getSimplePrecondition(this.cond, subcost) + if (this.preCond) { + this.extractTermsIdx(this.preCond, this.precondTermsIdx, this.preCondTerms) + } + + return this.preCond + ? this.estimateCost(this.preCond, subcost) + : this.estimateCost(this.cond, subcost) + } + check () { if (this.script.Children('SYNTAX').length > 1) { throw new Error('more than one selector is not supported') @@ -86,35 +131,160 @@ class Planner { this.analyzeAgg() } + /** + * + * @param token {Token} + * @param tree {{root: any}} + * @param place {{ref: any}} + */ + buildExpressionTree (token, tree, place) { + if (token.name !== 'attr_selector_exp') { + throw new Error('unsupported selector') + } + let leftHand = token.tokens[0] + if (token.tokens[0].name === 'complex_head') { + const newTree = { root: { ref: null } } + this.buildExpressionTree(token.tokens[0].tokens.find(x => x.name === 'attr_selector_exp'), + newTree, + newTree.root + ) + leftHand = newTree.root + } + const tail = token.tokens.find(x => x.name === 'tail') + if (!tail) { + // if we have `a` + place.ref = leftHand + return + } + const andOr = token.tokens.find(x => x.name === 'and_or').value + const newPlace = { ref: null } + switch (andOr) { + case '&&': + place.ref = ['&&', { ref: leftHand }, newPlace] + this.buildExpressionTree(tail.tokens[0], tree, newPlace) + return + case '||': + place.ref = leftHand + tree.root = ['||', { ref: tree.root }, newPlace] + this.buildExpressionTree(tail.tokens[0], tree, newPlace) + } + } + + /** + * + * @param t {{ref: any} | Token | Array} + * @returns {Token | Array} + */ + minimizeTree (t) { + while (t.ref) { + t = t.ref + } + if (!Array.isArray(t)) { + return t + } + for (let i = t.length - 1; i > 0; i--) { + t[i] = this.minimizeTree(t[i]) + if (Array.isArray(t[i]) && t[i][0] === t[0]) { + t.splice(i, 1, ...t[i].slice(1)) + } + } + return t + } + + /** + * @param t {Token | Array} + * @returns {boolean} + */ + isDNF (t) { + if (t.name) { + return true + } + const fn = t[0] + for (let i = 1; i < t.length; i++) { + if (!this.isDNF(t[i])) { + return false + } + if (Array.isArray(t[i]) && fn === '&&' && t[i][0] === '||') { + return false + } + } + return true + } + + /** + * + * @param t {Token | Array} + * @param subcosts {{[key: string]: number}} + * @returns number + */ + estimateCost (t, subcosts) { + if (t.name) { + return subcosts[t.value] + } + const fn = t[0] + const costs = t.slice(1).map(x => this.estimateCost(x, subcosts)) + switch (fn) { + case '&&': + return Math.min(...costs) + case '||': + return costs.reduce((a, b) => a + b) + } + throw new Error('unsupported function') + } + + /** + * + * @param t {Token | Array} + * @param subcosts {{[key: string]: number}} + */ + getSimplePrecondition (t, subcosts) { + if (!this.isDNF(t)) { + return null + } + if (t.name) { + return subcosts[t.value] < 10000000 ? t : null + } + const fn = t[0] + const self = this + const simplify = x => x.length === 2 ? x[1] : x + if (fn === '&&') { + const res = t.slice(1).filter(x => self.estimateCost(x, subcosts) < 10000000) + return res.length > 0 ? simplify(['&&', ...res]) : null + } + if (fn === '||') { + const res = t.slice(1).map(x => self.getSimplePrecondition(x, subcosts)).filter(x => x) + return res.length === t.length - 1 ? simplify(['||', ...res]) : null + } + throw new Error('unsupported function') + } + /** * * @param token {Token} */ analyzeCond (token) { - let res = {} - const complexHead = token.tokens.find(x => x.name === 'complex_head') - const simpleHead = token.tokens.find(x => x.name === 'attr_selector') - if (complexHead) { - res = this.analyzeCond(complexHead.tokens.find(x => x.name === 'attr_selector_exp')) - } else if (simpleHead) { - const term = simpleHead.value - if (this.terms[term]) { - res = { simpleIdx: this.terms[term] - 1, op: '', complex: [] } + const tree = { root: { ref: null } } + this.buildExpressionTree(token, tree, tree.root) + tree.root = this.minimizeTree(tree.root) + this.extractTermsIdx(tree.root, this.termIdx, this.terms) + return tree.root + } + + extractTermsIdx (t, termIdx, terms) { + const self = this + if (t.name) { + if (!terms[t.value]) { + termIdx.push(t) + terms[t.value] = termIdx.length + t.termIdx = termIdx.length - 1 } else { - this.termIdx.push(simpleHead) - this.terms[term] = this.termIdx.length - res = { simpleIdx: this.termIdx.length - 1, op: '', complex: [] } + t.termIdx = terms[t.value] - 1 } + return } - const tail = token.tokens.find(x => x.name === 'tail') - if (tail) { - res = { - simpleIdx: -1, - op: token.tokens.find(x => x.name === 'and_or').value, - complex: [res, this.analyzeCond(tail.tokens.find(x => x.name === 'attr_selector_exp'))] - } + if (Array.isArray(t)) { + t.forEach(x => self.extractTermsIdx(x, termIdx, terms)) } - return res } analyzeAgg () { diff --git a/traceql/clickhouse_transpiler/init.js b/traceql/clickhouse_transpiler/init.js index 5d3cf131..1deec485 100644 --- a/traceql/clickhouse_transpiler/init.js +++ b/traceql/clickhouse_transpiler/init.js @@ -12,6 +12,7 @@ const { standardBuilder } = require('./shared') * tracesDistTable: string, * randomFilter: number[]|undefined, * cachedTraceIds: string[]|undefined, + * planner: Planner * }} Context */ /** diff --git a/traceql/index.js b/traceql/index.js index 37beedf9..17fd3999 100644 --- a/traceql/index.js +++ b/traceql/index.js @@ -1,5 +1,5 @@ const parser = require('./parser') -const { transpile, evaluateCmpl } = require('./clickhouse_transpiler') +const { Planner } = require('./clickhouse_transpiler') const logger = require('../lib/logger') const { DATABASE_NAME } = require('../lib/utils') const { clusterName } = require('../common') @@ -15,6 +15,7 @@ const { rawRequest } = require('../lib/db/clickhouse') */ const search = async (query, limit, from, to) => { const _dbname = DATABASE_NAME() + const scrpit = parser.ParseScript(query) /** @type {Context} */ const ctx = { tracesDistTable: `${_dbname}.tempo_traces_dist`, @@ -24,11 +25,15 @@ const search = async (query, limit, from, to) => { from: from, to: to, limit: limit, - randomFilter: null + randomFilter: null, + planner: new Planner(scrpit.rootToken) } - const scrpit = parser.ParseScript(query) - const complexity = await evaluateComplexity(ctx, scrpit.rootToken) + + let complexity = await evaluateComplexity(ctx, scrpit.rootToken) let res = [] + if (complexity > 10000000) { + complexity = ctx.planner.minify() + } if (complexity > 10000000) { res = await processComplexResult(ctx, scrpit.rootToken, complexity) } else { @@ -49,9 +54,10 @@ const search = async (query, limit, from, to) => { * @param script {Token} */ const evaluateComplexity = async (ctx, script) => { - const evaluator = evaluateCmpl(script) + const evaluator = ctx.planner.planEval() const sql = evaluator(ctx) const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) + ctx.planner.setEvaluationResult(response.data.data) return response.data.data.reduce((acc, row) => Math.max(acc, row.count), 0) } @@ -62,7 +68,7 @@ const evaluateComplexity = async (ctx, script) => { * @param complexity {number} */ async function processComplexResult (ctx, script, complexity) { - const planner = transpile(script) + const planner = ctx.planner.plan() const maxFilter = Math.floor(complexity / 10000000) let traces = [] for (let i = 0; i < maxFilter; i++) { @@ -110,7 +116,7 @@ async function processComplexResult (ctx, script, complexity) { * @param script {Token} */ async function processSmallResult (ctx, script) { - const planner = transpile(script) + const planner = ctx.planner.plan() const sql = planner(ctx) const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) const traces = response.data.data.map(row => ({ @@ -119,11 +125,22 @@ async function processSmallResult (ctx, script) { rootTraceName: row.root_trace_name, startTimeUnixNano: row.start_time_unix_nano, durationMs: row.duration_ms, + spanSet: { + spans: row.span_id.map((spanId, i) => ({ + spanID: spanId, + startTimeUnixNano: row.timestamp_ns[i], + spanStartTime: row.timestamp_ns[i], + durationNanos: row.duration[i], + attributes: [] + })), + matched: row.span_id.length + }, spanSets: [ { spans: row.span_id.map((spanId, i) => ({ spanID: spanId, startTimeUnixNano: row.timestamp_ns[i], + spanStartTime: row.timestamp_ns[i], durationNanos: row.duration[i], attributes: [] })), From 374b9bba0fb93601417ad80c430a130dd077c122 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 10 Jun 2024 17:46:20 +0300 Subject: [PATCH 2/7] debug --- test/e2e | 2 +- traceql/index.js | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/test/e2e b/test/e2e index a2203f48..14260227 160000 --- a/test/e2e +++ b/test/e2e @@ -1 +1 @@ -Subproject commit a2203f48234322d703b60dc1478a531771024467 +Subproject commit 142602272531e9cb4548d6d4bbd5ffe0329a9ca4 diff --git a/traceql/index.js b/traceql/index.js index 17fd3999..bec8b969 100644 --- a/traceql/index.js +++ b/traceql/index.js @@ -140,7 +140,6 @@ async function processSmallResult (ctx, script) { spans: row.span_id.map((spanId, i) => ({ spanID: spanId, startTimeUnixNano: row.timestamp_ns[i], - spanStartTime: row.timestamp_ns[i], durationNanos: row.duration[i], attributes: [] })), From 364707756993b628250b77afecdbaf203c3e9265 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 11 Jun 2024 18:29:40 +0300 Subject: [PATCH 3/7] init only mode --- qryn_bun.mjs | 13 +++++++++++-- qryn_node.js | 8 ++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/qryn_bun.mjs b/qryn_bun.mjs index 098ba091..1af58844 100644 --- a/qryn_bun.mjs +++ b/qryn_bun.mjs @@ -58,7 +58,7 @@ import handlerTempoLabelV2 from './lib/handlers/tempo_v2_tags.js' import handlerTempoLabelV2Values from './lib/handlers/tempo_v2_values.js' import {init as pyroscopeInit } from './pyroscope/pyroscope.js' -import { readonly } from './common.js' +import { boolEnv, readonly } from './common.js' import DATABASE, { init } from './lib/db/clickhouse.js' import { startAlerting } from './lib/db/alerting/index.js' import fs from 'fs' @@ -66,12 +66,21 @@ import path from 'path' import { file, dir, group, CORS } from '@stricjs/utils'; import auth from 'basic-auth' import * as errors from 'http-errors' +import logger from './lib/logger.js' const http_user = process.env.QRYN_LOGIN || process.env.CLOKI_LOGIN || undefined const http_password = process.env.QRYN_PASSWORD || process.env.CLOKI_PASSWORD || undefined export default async() => { - await init(process.env.CLICKHOUSE_DB || 'cloki') + try { + await init(process.env.CLICKHOUSE_DB || 'cloki') + if (process.env.MODE === 'init_only') { + process.exit(0) + } + } catch (err) { + logger.error(err, 'Error starting qryn') + process.exit(1) + } if (!readonly) { await startAlerting() } diff --git a/qryn_node.js b/qryn_node.js index 62f08dae..2a7ba4ec 100755 --- a/qryn_node.js +++ b/qryn_node.js @@ -73,6 +73,14 @@ let fastify = require('fastify')({ (async () => { try { await init(process.env.CLICKHOUSE_DB || 'cloki') + if (process.env.MODE === 'init_only') { + process.exit(0) + } + } catch (err) { + logger.error(err, 'Error starting qryn') + process.exit(1) + } + try { if (!this.readonly) { await startAlerting() } From 85f293b57f0ddd3b992a471b6f6b2bebf0a08636 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 11 Jun 2024 19:46:48 +0300 Subject: [PATCH 4/7] reader/writer modes --- common.js | 3 ++ qryn_bun.mjs | 140 +++++++++++++++++++++++++-------------------------- qryn_node.js | 130 ++++++++++++++++++++++++----------------------- 3 files changed, 139 insertions(+), 134 deletions(-) diff --git a/common.js b/common.js index e8748973..b3bf76ab 100644 --- a/common.js +++ b/common.js @@ -159,3 +159,6 @@ module.exports.logType = boolEnv('DISTINGUISH_LOGS_METRICS') ? 1 : 0 module.exports.metricType = boolEnv('DISTINGUISH_LOGS_METRICS') ? 2 : 0 module.exports.bothType = 0 + +module.exports.writerMode = (process.env.MODE === 'writer' || !process.env.MODE) && !boolEnv('READONLY') +module.exports.readerMode = process.env.MODE === 'reader' || boolEnv('READONLY') || !process.env.MODE diff --git a/qryn_bun.mjs b/qryn_bun.mjs index 1af58844..992ed4e7 100644 --- a/qryn_bun.mjs +++ b/qryn_bun.mjs @@ -58,7 +58,7 @@ import handlerTempoLabelV2 from './lib/handlers/tempo_v2_tags.js' import handlerTempoLabelV2Values from './lib/handlers/tempo_v2_values.js' import {init as pyroscopeInit } from './pyroscope/pyroscope.js' -import { boolEnv, readonly } from './common.js' +import { boolEnv, readonly, readerMode, writerMode } from './common.js' import DATABASE, { init } from './lib/db/clickhouse.js' import { startAlerting } from './lib/db/alerting/index.js' import fs from 'fs' @@ -110,61 +110,61 @@ export default async() => { }); app.get('/hello', wrapper(handlerHello)) - .get('/ready', wrapper(handlerHello)) - .post('/loki/api/v1/push', wrapper(handlerPush, { + app.get('/ready', wrapper(handlerHello)) + writerMode && app.post('/loki/api/v1/push', wrapper(handlerPush, { 'application/json': lokiPushJSONParser, 'application/x-protobuf': lokiPushProtoParser, '*': lokiPushJSONParser })) - .post('/:target/_doc', wrapper(handlerElasticPush, { + writerMode && app.post('/:target/_doc', wrapper(handlerElasticPush, { 'application/json': jsonParser, '*': rawStringParser })) - .post('/:target/_create/:id', wrapper(handlerElasticPush, { + writerMode && app.post('/:target/_create/:id', wrapper(handlerElasticPush, { 'application/json': jsonParser, '*': rawStringParser })) - .put('/:target/_doc/:id', wrapper(handlerElasticPush, { + writerMode && app.put('/:target/_doc/:id', wrapper(handlerElasticPush, { 'application/json': jsonParser, '*': rawStringParser })) - .put('/:target/_create/:id', wrapper(handlerElasticPush, { + writerMode && app.put('/:target/_create/:id', wrapper(handlerElasticPush, { 'application/json': jsonParser, '*': rawStringParser })) - .post('/_bulk', wrapper(handlerElasticBulk, { + writerMode && app.post('/_bulk', wrapper(handlerElasticBulk, { 'application/json': jsonParser, '*': rawStringParser })) - .post('/:target/_bulk', wrapper(handlerElasticBulk, { + writerMode && app.post('/:target/_bulk', wrapper(handlerElasticBulk, { 'application/json': jsonParser, '*': rawStringParser })) - .post('/tempo/api/push', wrapper(handlerTempoPush, { + writerMode && app.post('/tempo/api/push', wrapper(handlerTempoPush, { 'application/json': tempoPushParser, 'application/x-ndjson': tempoPushNDJSONParser, '*': tempoPushParser })) - .post('/tempo/spans', wrapper(handlerTempoPush, { + writerMode && app.post('/tempo/spans', wrapper(handlerTempoPush, { 'application/json': tempoPushParser, 'application/x-ndjson': tempoPushNDJSONParser, '*': tempoPushParser })) - .post('/api/v2/spans', wrapper(handlerTempoPush, { + writerMode && app.post('/api/v2/spans', wrapper(handlerTempoPush, { 'application/json': tempoPushParser, 'application/x-ndjson': tempoPushNDJSONParser, '*': tempoPushParser })) - .get('/api/traces/:traceId', wrapper(handlerTempoTraces)) - .get('/api/traces/:traceId/:json', wrapper(handlerTempoTraces)) - .get('/tempo/api/traces/:traceId', wrapper(handlerTempoTraces)) - .get('/tempo/api/traces/:traceId/:json', wrapper(handlerTempoTraces)) - .get('/api/echo', wrapper(handlerTempoEcho)) - .get('/tempo/api/echo', wrapper(handlerTempoEcho)) - .ws('/loki/api/v1/tail', wsWrapper(handlerTail)) - .get('/config', () => new Response('not supported')) - .get('/metrics', () => new Response('not supported')) - .get('/influx/api/v2/write/health', () => new Response('ok')) + readerMode && app.get('/api/traces/:traceId', wrapper(handlerTempoTraces)) + readerMode && app.get('/api/traces/:traceId/:json', wrapper(handlerTempoTraces)) + readerMode && app.get('/tempo/api/traces/:traceId', wrapper(handlerTempoTraces)) + readerMode && app.get('/tempo/api/traces/:traceId/:json', wrapper(handlerTempoTraces)) + readerMode && app.get('/api/echo', wrapper(handlerTempoEcho)) + readerMode && app.get('/tempo/api/echo', wrapper(handlerTempoEcho)) + readerMode && app.ws('/loki/api/v1/tail', wsWrapper(handlerTail)) + app.get('/config', () => new Response('not supported')) + app.get('/metrics', () => new Response('not supported')) + app.get('/influx/api/v2/write/health', () => new Response('ok')) const fastify = { @@ -182,74 +182,74 @@ export default async() => { } } - fastify.get('/api/search/tags', handlerTempoLabel) - fastify.get('/tempo/api/search/tags', handlerTempoLabel) + readerMode && fastify.get('/api/search/tags', handlerTempoLabel) + readerMode && fastify.get('/tempo/api/search/tags', handlerTempoLabel) /* Tempo Tag Value Handler */ - fastify.get('/api/search/tag/:name/values', handlerTempoLabelValues) - fastify.get('/tempo/api/search/tag/:name/values', handlerTempoLabelValues) + readerMode && fastify.get('/api/search/tag/:name/values', handlerTempoLabelValues) + readerMode && fastify.get('/tempo/api/search/tag/:name/values', handlerTempoLabelValues) /* Tempo Traces Query Handler */ - fastify.get('/api/search', handlerTempoSearch) - fastify.get('/tempo/api/search', handlerTempoSearch) + readerMode && fastify.get('/api/search', handlerTempoSearch) + readerMode && fastify.get('/tempo/api/search', handlerTempoSearch) /* Tempo Echo Handler */ fastify.get('/api/echo', handlerTempoEcho) fastify.get('/tempo/api/echo', handlerTempoEcho) /* Telegraf HTTP Bulk handler */ - fastify.post('/telegraf', handlerTelegraf, { + writerMode && fastify.post('/telegraf', handlerTelegraf, { '*': jsonParser }) /* Datadog Log Push Handler */ - fastify.post('/api/v2/logs', handlerDatadogLogPush, { + writerMode && fastify.post('/api/v2/logs', handlerDatadogLogPush, { 'application/json': jsonParser, '*': rawStringParser }) /* Datadog Series Push Handler */ - fastify.post('/api/v2/series', handlerDatadogSeriesPush, { + writerMode && fastify.post('/api/v2/series', handlerDatadogSeriesPush, { 'application/json': jsonParser, '*': rawStringParser }) /* Query Handler */ - fastify.get('/loki/api/v1/query_range', handlerQueryRange) + readerMode && fastify.get('/loki/api/v1/query_range', handlerQueryRange) /* Label Handlers */ /* Label Value Handler via query (test) */ - fastify.get('/loki/api/v1/query', handlerQuery) + readerMode && fastify.get('/loki/api/v1/query', handlerQuery) /* Label Handlers */ - fastify.get('/loki/api/v1/label', handlerLabel) - fastify.get('/loki/api/v1/labels', handlerLabel) + readerMode && fastify.get('/loki/api/v1/label', handlerLabel) + readerMode && fastify.get('/loki/api/v1/labels', handlerLabel) /* Label Value Handler */ - fastify.get('/loki/api/v1/label/:name/values', handlerLabelValues) + readerMode && fastify.get('/loki/api/v1/label/:name/values', handlerLabelValues) /* Series Handler - experimental support for both Loki and Prometheus */ - fastify.get('/loki/api/v1/series', handlerSeries) + readerMode && fastify.get('/loki/api/v1/series', handlerSeries) - fastify.get('/api/v1/series', handlerPromSeries) - fastify.post('/api/v1/series', handlerPromSeries, { + readerMode && fastify.get('/api/v1/series', handlerPromSeries) + readerMode && fastify.post('/api/v1/series', handlerPromSeries, { 'application/x-www-form-urlencoded': wwwFormParser }) /* ALERT MANAGER Handlers */ - fastify.get('/api/prom/rules', handlerGetRules) - fastify.get('/api/prom/rules/:ns/:group', handlerGetGroup) - fastify.post('/api/prom/rules/:ns', handlerPostGroup, { + readerMode && fastify.get('/api/prom/rules', handlerGetRules) + readerMode && fastify.get('/api/prom/rules/:ns/:group', handlerGetGroup) + readerMode && fastify.post('/api/prom/rules/:ns', handlerPostGroup, { '*': yamlParser }) - fastify.delete('/api/prom/rules/:ns/:group', handlerDelGroup) - fastify.delete('/api/prom/rules/:ns', handlerDelNS) - fastify.get('/prometheus/api/v1/rules', handlerPromGetRules) + readerMode && fastify.delete('/api/prom/rules/:ns/:group', handlerDelGroup) + readerMode && fastify.delete('/api/prom/rules/:ns', handlerDelNS) + readerMode && fastify.get('/prometheus/api/v1/rules', handlerPromGetRules) /* PROMETHEUS REMOTE WRITE Handlers */ const remoteWritePaths = [ @@ -260,59 +260,59 @@ export default async() => { '/api/prom/push' ] for (const path of remoteWritePaths) { - fastify.post(path, promWriteHandler, { + writerMode && fastify.post(path, promWriteHandler, { 'application/x-protobuf': prometheusPushProtoParser, 'application/json': jsonParser, '*': combinedParser(prometheusPushProtoParser, jsonParser) }) - fastify.get(path, handlerTempoEcho) + writerMode && fastify.get(path, handlerTempoEcho) } /* PROMQETHEUS API EMULATION */ - fastify.post('/api/v1/query_range', handlerPromQueryRange, { + readerMode && fastify.post('/api/v1/query_range', handlerPromQueryRange, { 'application/x-www-form-urlencoded': wwwFormParser }) - fastify.get('/api/v1/query_range', handlerPromQueryRange) + readerMode && fastify.get('/api/v1/query_range', handlerPromQueryRange) - fastify.post('/api/v1/query', handlerPromQuery, { + readerMode && fastify.post('/api/v1/query', handlerPromQuery, { 'application/x-www-form-urlencoded': wwwFormParser }) - fastify.get('/api/v1/query', handlerPromQuery) - fastify.get('/api/v1/labels', handlerPromLabel) // piggyback on qryn labels - fastify.get('/api/v1/label/:name/values', handlerPromLabelValues) // piggyback on qryn values - fastify.post('/api/v1/labels', handlerPromLabel, { + readerMode && fastify.get('/api/v1/query', handlerPromQuery) + readerMode && fastify.get('/api/v1/labels', handlerPromLabel) // piggyback on qryn labels + readerMode && fastify.get('/api/v1/label/:name/values', handlerPromLabelValues) // piggyback on qryn values + readerMode && fastify.post('/api/v1/labels', handlerPromLabel, { '*': rawStringParser }) // piggyback on qryn labels - fastify.post('/api/v1/label/:name/values', handlerPromLabelValues, { + readerMode && fastify.post('/api/v1/label/:name/values', handlerPromLabelValues, { '*': rawStringParser }) // piggyback on qryn values - fastify.get('/api/v1/metadata', handlerPromDefault.misc) // default handler TBD - fastify.get('/api/v1/rules', handlerPromDefault.rules) // default handler TBD - fastify.get('/api/v1/query_exemplars', handlerPromDefault.misc) // default handler TBD - fastify.post('/api/v1/query_exemplars', handlerPromDefault.misc, { + readerMode && fastify.get('/api/v1/metadata', handlerPromDefault.misc) // default handler TBD + readerMode && fastify.get('/api/v1/rules', handlerPromDefault.rules) // default handler TBD + readerMode && fastify.get('/api/v1/query_exemplars', handlerPromDefault.misc) // default handler TBD + readerMode && fastify.post('/api/v1/query_exemplars', handlerPromDefault.misc, { 'application/x-www-form-urlencoded': wwwFormParser }) // default handler TBD - fastify.get('/api/v1/format_query', handlerPromDefault.misc) // default handler TBD - fastify.post('/api/v1/format_query', handlerPromDefault.misc, { + readerMode && fastify.get('/api/v1/format_query', handlerPromDefault.misc) // default handler TBD + readerMode && fastify.post('/api/v1/format_query', handlerPromDefault.misc, { 'application/x-www-form-urlencoded': wwwFormParser }) // default handler TBD fastify.get('/api/v1/status/buildinfo', handlerPromDefault.buildinfo) // default handler TBD /* NewRelic Log Handler */ - fastify.post('/log/v1', handlerNewrelicLogPush, { + writerMode && fastify.post('/log/v1', handlerNewrelicLogPush, { 'text/plain': jsonParser, '*': jsonParser }) /* INFLUX WRITE Handlers */ - fastify.post('/write', handlerInfluxWrite, { + writerMode && fastify.post('/write', handlerInfluxWrite, { '*': rawStringParser }) - fastify.post('/influx/api/v2/write', handlerInfluxWrite, { + writerMode && fastify.post('/influx/api/v2/write', handlerInfluxWrite, { '*': rawStringParser }) /* INFLUX HEALTH Handlers */ @@ -321,16 +321,16 @@ export default async() => { fastify.get('/influx/health', handlerInfluxHealth) - fastify.post('/v1/traces', handlerOTLPPush, { + writerMode && fastify.post('/v1/traces', handlerOTLPPush, { '*': otlpPushProtoParser }) - fastify.get('/api/v2/search/tags', handlerTempoLabelV2) - fastify.get('/tempo/api/v2/search/tags', handlerTempoLabelV2) - fastify.get('/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) - fastify.get('/tempo/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) + readerMode && fastify.get('/api/v2/search/tags', handlerTempoLabelV2) + readerMode && fastify.get('/tempo/api/v2/search/tags', handlerTempoLabelV2) + readerMode && fastify.get('/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) + readerMode && fastify.get('/tempo/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) - pyroscopeInit(fastify) + readerMode && pyroscopeInit(fastify) const serveView = fs.existsSync(path.join(__dirname, 'view/index.html')) if (serveView) { diff --git a/qryn_node.js b/qryn_node.js index 2a7ba4ec..1b863b80 100755 --- a/qryn_node.js +++ b/qryn_node.js @@ -4,7 +4,7 @@ * qryn: polyglot observability API * (C) 2018-2024 QXIP BV */ -const { boolEnv } = require('./common') +const { boolEnv, readerMode, writerMode } = require('./common') this.readonly = boolEnv('READONLY') this.http_user = process.env.QRYN_LOGIN || process.env.CLOKI_LOGIN || undefined @@ -64,6 +64,8 @@ const { const fastifyPlugin = require('fastify-plugin') + + let fastify = require('fastify')({ logger, bodyLimit: parseInt(process.env.FASTIFY_BODYLIMIT) || 5242880, @@ -213,7 +215,7 @@ let fastify = require('fastify')({ /* Write Handler */ const handlerPush = require('./lib/handlers/push.js').bind(this) - fastify.post('/loki/api/v1/push', handlerPush, { + writerMode && fastify.post('/loki/api/v1/push', handlerPush, { 'application/json': lokiPushJSONParser, 'application/x-protobuf': lokiPushProtoParser, '*': lokiPushJSONParser @@ -221,28 +223,28 @@ let fastify = require('fastify')({ /* Elastic Write Handler */ const handlerElasticPush = require('./lib/handlers/elastic_index.js').bind(this) - fastify.post('/:target/_doc', handlerElasticPush, { + writerMode && fastify.post('/:target/_doc', handlerElasticPush, { 'application/json': jsonParser, '*': rawStringParser }) - fastify.post('/:target/_create/:id', handlerElasticPush, { + writerMode && fastify.post('/:target/_create/:id', handlerElasticPush, { 'application/json': jsonParser, '*': rawStringParser }) - fastify.put('/:target/_doc/:id', handlerElasticPush, { + writerMode && fastify.put('/:target/_doc/:id', handlerElasticPush, { 'application/json': jsonParser, '*': rawStringParser }) - fastify.put('/:target/_create/:id', handlerElasticPush, { + writerMode && fastify.put('/:target/_create/:id', handlerElasticPush, { 'application/json': jsonParser, '*': rawStringParser }) const handlerElasticBulk = require('./lib/handlers/elastic_bulk.js').bind(this) - fastify.post('/_bulk', handlerElasticBulk, { + writerMode && fastify.post('/_bulk', handlerElasticBulk, { 'application/json': jsonParser, '*': rawStringParser }) - fastify.post('/:target/_bulk', handlerElasticBulk, { + writerMode && fastify.post('/:target/_bulk', handlerElasticBulk, { 'application/json': jsonParser, '*': rawStringParser }) @@ -250,17 +252,17 @@ let fastify = require('fastify')({ /* Tempo Write Handler */ this.tempo_tagtrace = boolEnv('TEMPO_TAGTRACE') const handlerTempoPush = require('./lib/handlers/tempo_push.js').bind(this) - fastify.post('/tempo/api/push', handlerTempoPush, { + writerMode && fastify.post('/tempo/api/push', handlerTempoPush, { 'application/json': tempoPushParser, 'application/x-ndjson': tempoPushNDJSONParser, '*': tempoPushParser }) - fastify.post('/tempo/spans', handlerTempoPush, { + writerMode && fastify.post('/tempo/spans', handlerTempoPush, { 'application/json': tempoPushParser, 'application/x-ndjson': tempoPushNDJSONParser, '*': tempoPushParser }) - fastify.post('/api/v2/spans', handlerTempoPush, { + writerMode && fastify.post('/api/v2/spans', handlerTempoPush, { 'application/json': tempoPushParser, 'application/x-ndjson': tempoPushNDJSONParser, '*': tempoPushParser @@ -269,34 +271,34 @@ let fastify = require('fastify')({ /* Tempo Traces Query Handler */ this.tempo_span = process.env.TEMPO_SPAN || 24 const handlerTempoTraces = require('./lib/handlers/tempo_traces.js').bind(this) - fastify.get('/api/traces/:traceId', handlerTempoTraces) - fastify.get('/api/traces/:traceId/:json', handlerTempoTraces) - fastify.get('/tempo/api/traces/:traceId', handlerTempoTraces) - fastify.get('/tempo/api/traces/:traceId/:json', handlerTempoTraces) + readerMode && fastify.get('/api/traces/:traceId', handlerTempoTraces) + readerMode && fastify.get('/api/traces/:traceId/:json', handlerTempoTraces) + readerMode && fastify.get('/tempo/api/traces/:traceId', handlerTempoTraces) + readerMode && fastify.get('/tempo/api/traces/:traceId/:json', handlerTempoTraces) /* Tempo Tag Handlers */ const handlerTempoLabel = require('./lib/handlers/tempo_tags').bind(this) - fastify.get('/api/search/tags', handlerTempoLabel) - fastify.get('/tempo/api/search/tags', handlerTempoLabel) + readerMode && fastify.get('/api/search/tags', handlerTempoLabel) + readerMode && fastify.get('/tempo/api/search/tags', handlerTempoLabel) const handlerTempoLabelV2 = require('./lib/handlers/tempo_v2_tags').bind(this) - fastify.get('/api/v2/search/tags', handlerTempoLabelV2) - fastify.get('/tempo/api/v2/search/tags', handlerTempoLabelV2) + readerMode && fastify.get('/api/v2/search/tags', handlerTempoLabelV2) + readerMode && fastify.get('/tempo/api/v2/search/tags', handlerTempoLabelV2) /* Tempo Tag Value Handler */ const handlerTempoLabelValues = require('./lib/handlers/tempo_values').bind(this) - fastify.get('/api/search/tag/:name/values', handlerTempoLabelValues) - fastify.get('/tempo/api/search/tag/:name/values', handlerTempoLabelValues) + readerMode && fastify.get('/api/search/tag/:name/values', handlerTempoLabelValues) + readerMode && fastify.get('/tempo/api/search/tag/:name/values', handlerTempoLabelValues) const handlerTempoLabelV2Values = require('./lib/handlers/tempo_v2_values').bind(this) - fastify.get('/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) - fastify.get('/tempo/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) + readerMode && fastify.get('/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) + readerMode && fastify.get('/tempo/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) /* Tempo Traces Query Handler */ const handlerTempoSearch = require('./lib/handlers/tempo_search.js').bind(this) - fastify.get('/api/search', handlerTempoSearch) - fastify.get('/tempo/api/search', handlerTempoSearch) + readerMode && fastify.get('/api/search', handlerTempoSearch) + readerMode && fastify.get('/tempo/api/search', handlerTempoSearch) /* Tempo Echo Handler */ const handlerTempoEcho = require('./lib/handlers/echo.js').bind(this) @@ -305,64 +307,64 @@ let fastify = require('fastify')({ /* Telegraf HTTP Bulk handler */ const handlerTelegraf = require('./lib/handlers/telegraf.js').bind(this) - fastify.post('/telegraf', handlerTelegraf, { + writerMode && fastify.post('/telegraf', handlerTelegraf, { '*': jsonParser }) /* Datadog Log Push Handler */ const handlerDatadogLogPush = require('./lib/handlers/datadog_log_push.js').bind(this) - fastify.post('/api/v2/logs', handlerDatadogLogPush, { + writerMode && fastify.post('/api/v2/logs', handlerDatadogLogPush, { 'application/json': jsonParser, '*': rawStringParser }) /* Datadog Series Push Handler */ const handlerDatadogSeriesPush = require('./lib/handlers/datadog_series_push.js').bind(this) - fastify.post('/api/v2/series', handlerDatadogSeriesPush, { + writerMode && fastify.post('/api/v2/series', handlerDatadogSeriesPush, { 'application/json': jsonParser, '*': rawStringParser }) /* Query Handler */ const handlerQueryRange = require('./lib/handlers/query_range.js').bind(this) - fastify.get('/loki/api/v1/query_range', handlerQueryRange) + readerMode && fastify.get('/loki/api/v1/query_range', handlerQueryRange) /* Label Handlers */ /* Label Value Handler via query (test) */ const handlerQuery = require('./lib/handlers/query.js').bind(this) - fastify.get('/loki/api/v1/query', handlerQuery) + readerMode && fastify.get('/loki/api/v1/query', handlerQuery) /* Label Handlers */ const handlerLabel = require('./lib/handlers/label.js').bind(this) - fastify.get('/loki/api/v1/label', handlerLabel) - fastify.get('/loki/api/v1/labels', handlerLabel) + readerMode && fastify.get('/loki/api/v1/label', handlerLabel) + readerMode && fastify.get('/loki/api/v1/labels', handlerLabel) /* Label Value Handler */ const handlerLabelValues = require('./lib/handlers/label_values.js').bind(this) - fastify.get('/loki/api/v1/label/:name/values', handlerLabelValues) + readerMode && fastify.get('/loki/api/v1/label/:name/values', handlerLabelValues) /* Series Handler - experimental support for both Loki and Prometheus */ const handlerSeries = require('./lib/handlers/series.js').bind(this) - fastify.get('/loki/api/v1/series', handlerSeries) + readerMode && fastify.get('/loki/api/v1/series', handlerSeries) const handlerPromSeries = require('./lib/handlers/prom_series.js').bind(this) - fastify.get('/api/v1/series', handlerPromSeries) - fastify.post('/api/v1/series', handlerPromSeries, { + readerMode && fastify.get('/api/v1/series', handlerPromSeries) + readerMode && fastify.post('/api/v1/series', handlerPromSeries, { 'application/x-www-form-urlencoded': wwwFormParser }) - fastify.register(async (fastify) => { + readerMode && fastify.register(async (fastify) => { fastify.get('/loki/api/v1/tail', { websocket: true }, require('./lib/handlers/tail').bind(this)) }) /* ALERT MANAGER Handlers */ - fastify.get('/api/prom/rules', require('./lib/handlers/alerts/get_rules').bind(this)) - fastify.get('/api/prom/rules/:ns/:group', require('./lib/handlers/alerts/get_group').bind(this)) - fastify.post('/api/prom/rules/:ns', require('./lib/handlers/alerts/post_group').bind(this), { + readerMode && fastify.get('/api/prom/rules', require('./lib/handlers/alerts/get_rules').bind(this)) + readerMode && fastify.get('/api/prom/rules/:ns/:group', require('./lib/handlers/alerts/get_group').bind(this)) + readerMode && fastify.post('/api/prom/rules/:ns', require('./lib/handlers/alerts/post_group').bind(this), { '*': yamlParser }) - fastify.delete('/api/prom/rules/:ns/:group', require('./lib/handlers/alerts/del_group').bind(this)) - fastify.delete('/api/prom/rules/:ns', require('./lib/handlers/alerts/del_ns').bind(this)) - fastify.get('/prometheus/api/v1/rules', require('./lib/handlers/alerts/prom_get_rules').bind(this)) + readerMode && fastify.delete('/api/prom/rules/:ns/:group', require('./lib/handlers/alerts/del_group').bind(this)) + readerMode && fastify.delete('/api/prom/rules/:ns', require('./lib/handlers/alerts/del_ns').bind(this)) + readerMode && fastify.get('/prometheus/api/v1/rules', require('./lib/handlers/alerts/prom_get_rules').bind(this)) /* PROMETHEUS REMOTE WRITE Handlers */ const promWriteHandler = require('./lib/handlers/prom_push.js').bind(this) @@ -373,61 +375,61 @@ let fastify = require('fastify')({ '/api/v1/write' ] for (const path of remoteWritePaths) { - fastify.post(path, promWriteHandler, { + writerMode && fastify.post(path, promWriteHandler, { 'application/x-protobuf': prometheusPushProtoParser, 'application/json': jsonParser, '*': combinedParser(prometheusPushProtoParser, jsonParser) }) - fastify.get(path, handlerTempoEcho) + writerMode && fastify.get(path, handlerTempoEcho) } /* PROMQETHEUS API EMULATION */ const handlerPromQueryRange = require('./lib/handlers/prom_query_range.js').bind(this) - fastify.post('/api/v1/query_range', handlerPromQueryRange, { + readerMode && fastify.post('/api/v1/query_range', handlerPromQueryRange, { 'application/x-www-form-urlencoded': wwwFormParser }) - fastify.get('/api/v1/query_range', handlerPromQueryRange) + readerMode && fastify.get('/api/v1/query_range', handlerPromQueryRange) const handlerPromQuery = require('./lib/handlers/prom_query.js').bind(this) - fastify.post('/api/v1/query', handlerPromQuery, { + readerMode && fastify.post('/api/v1/query', handlerPromQuery, { 'application/x-www-form-urlencoded': wwwFormParser }) - fastify.get('/api/v1/query', handlerPromQuery) + readerMode && fastify.get('/api/v1/query', handlerPromQuery) const handlerPromLabel = require('./lib/handlers/promlabel.js').bind(this) const handlerPromLabelValues = require('./lib/handlers/promlabel_values.js').bind(this) - fastify.get('/api/v1/labels', handlerPromLabel) // piggyback on qryn labels - fastify.get('/api/v1/label/:name/values', handlerPromLabelValues) // piggyback on qryn values - fastify.post('/api/v1/labels', handlerPromLabel, { + readerMode && fastify.get('/api/v1/labels', handlerPromLabel) // piggyback on qryn labels + readerMode && fastify.get('/api/v1/label/:name/values', handlerPromLabelValues) // piggyback on qryn values + readerMode && fastify.post('/api/v1/labels', handlerPromLabel, { '*': rawStringParser }) // piggyback on qryn labels - fastify.post('/api/v1/label/:name/values', handlerPromLabelValues, { + readerMode && fastify.post('/api/v1/label/:name/values', handlerPromLabelValues, { '*': rawStringParser }) // piggyback on qryn values const handlerPromDefault = require('./lib/handlers/prom_default.js') - fastify.get('/api/v1/metadata', handlerPromDefault.misc.bind(this)) // default handler TBD - fastify.get('/api/v1/rules', handlerPromDefault.rules.bind(this)) // default handler TBD - fastify.get('/api/v1/query_exemplars', handlerPromDefault.misc.bind(this)) // default handler TBD - fastify.post('/api/v1/query_exemplars', handlerPromDefault.misc.bind(this), { + readerMode && fastify.get('/api/v1/metadata', handlerPromDefault.misc.bind(this)) // default handler TBD + readerMode && fastify.get('/api/v1/rules', handlerPromDefault.rules.bind(this)) // default handler TBD + readerMode && fastify.get('/api/v1/query_exemplars', handlerPromDefault.misc.bind(this)) // default handler TBD + readerMode && fastify.post('/api/v1/query_exemplars', handlerPromDefault.misc.bind(this), { 'application/x-www-form-urlencoded': wwwFormParser }) // default handler TBD - fastify.get('/api/v1/format_query', handlerPromDefault.misc.bind(this)) // default handler TBD - fastify.post('/api/v1/format_query', handlerPromDefault.misc.bind(this), { + readerMode && fastify.get('/api/v1/format_query', handlerPromDefault.misc.bind(this)) // default handler TBD + readerMode && fastify.post('/api/v1/format_query', handlerPromDefault.misc.bind(this), { 'application/x-www-form-urlencoded': wwwFormParser }) // default handler TBD fastify.get('/api/v1/status/buildinfo', handlerPromDefault.buildinfo.bind(this)) // default handler TBD /* NewRelic Log Handler */ const handlerNewrelicLogPush = require('./lib/handlers/newrelic_log_push.js').bind(this) - fastify.post('/log/v1', handlerNewrelicLogPush, { + writerMode && fastify.post('/log/v1', handlerNewrelicLogPush, { 'text/plain': jsonParser, '*': jsonParser }) /* INFLUX WRITE Handlers */ const handlerInfluxWrite = require('./lib/handlers/influx_write.js').bind(this) - fastify.post('/write', handlerInfluxWrite, { + writerMode && fastify.post('/write', handlerInfluxWrite, { '*': rawStringParser }) - fastify.post('/influx/api/v2/write', handlerInfluxWrite, { + writerMode && fastify.post('/influx/api/v2/write', handlerInfluxWrite, { '*': rawStringParser }) /* INFLUX HEALTH Handlers */ @@ -436,7 +438,7 @@ let fastify = require('fastify')({ fastify.get('/influx/health', handlerInfluxHealth) const handlerOTLPPush = require('./lib/handlers/otlp_push').bind(this) - fastify.post('/v1/traces', handlerOTLPPush, { + writerMode && fastify.post('/v1/traces', handlerOTLPPush, { '*': otlpPushProtoParser }) @@ -456,7 +458,7 @@ let fastify = require('fastify')({ } } - require('./pyroscope/pyroscope').init(fastify) + readerMode && require('./pyroscope/pyroscope').init(fastify) // Run API Service fastify.listen( From 5464490fec94946dc8843f189cede25f39d7b8fc Mon Sep 17 00:00:00 2001 From: akvlad Date: Wed, 12 Jun 2024 11:52:47 +0300 Subject: [PATCH 5/7] fix --- lib/utils.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/utils.js b/lib/utils.js index d26b5bae..7931b44d 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -169,7 +169,7 @@ const flatOTLPAttrs = (attrs) => { return } } - if (val.arrayValue) { + if (val.arrayValue && val.arrayValue.values) { val.arrayValue.values.forEach((v, i) => { flatVal(`${i}`, v, `${prefix}${key}.`, res) }) From e5a3013007f8aa8f8723c213bc4dfaaa051b5e56 Mon Sep 17 00:00:00 2001 From: akvlad Date: Wed, 12 Jun 2024 13:20:12 +0300 Subject: [PATCH 6/7] init --- lib/db/throttler.js | 68 ++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/lib/db/throttler.js b/lib/db/throttler.js index 6e50db85..59104cbe 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -35,16 +35,42 @@ const axiosError = async (err) => { } } -class TimeoutThrottler { - constructor (statement) { +class TimeoutOrSizeThrottler { + constructor (statement, maxSizeB, maxAgeMS) { this.statement = statement this.queue = [] this.resolvers = [] this.rejects = [] + this.size = 0 + + this.maxSizeB = maxSizeB + this.maxAgeMs = maxAgeMS + this.lastRequest = 0 + } + + /** + * @param message {string} + */ + queuePush (message) { + this.queue.push(message) + this.size += message.length + } + + willFlush () { + return (this.maxSizeB && this.size > this.maxSizeB) || + (this.maxAgeMs && Date.now() - this.lastRequest > this.maxAgeMs) } - async flush () { + /** + * @param force {boolean} + * @returns {Promise} + */ + async flush (force) { try { + if (!force && !this.willFlush()) { + return + } + this.lastRequest = Date.now() await this._flush() this.resolvers.forEach(r => r()) } catch (err) { @@ -70,12 +96,11 @@ class TimeoutThrottler { } } - const emitter = new EventEmitter() let on = true const postMessage = message => { const genericRequest = (throttler) => { - throttler.queue.push(message.data) + throttler.queuePush(message.data) throttler.resolvers.push(() => { if (isMainThread) { emitter.emit('message', { status: 'ok', id: message.id }) @@ -115,30 +140,33 @@ const init = () => { require('./clickhouse').rawRequest ] - samplesThrottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`) - timeSeriesThrottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`) - tracesThottler = new TimeoutThrottler( + samplesThrottler = new TimeoutOrSizeThrottler( + `INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`, + parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100)) + timeSeriesThrottler = new TimeoutOrSizeThrottler( + `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`, + parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100)) + tracesThottler = new TimeoutOrSizeThrottler( `INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input (trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags) - FORMAT JSONEachRow`) + FORMAT JSONEachRow`, + parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100)) setTimeout(async () => { // eslint-disable-next-line no-unmodified-loop-condition while (on) { - const ts = Date.now() try { - await timeSeriesThrottler.flush() - await samplesThrottler.flush() - await tracesThottler.flush() + await Promise.all([ + (async () => { + await timeSeriesThrottler.flush(samplesThrottler.willFlush()) + await samplesThrottler.flush(false) + })(), + tracesThottler.flush(false) + ]) } catch (err) { logger.error(await axiosError(err), 'AXIOS ERROR') } - const p = Date.now() - ts - if (p < 100) { - await new Promise((resolve) => setTimeout(resolve, 100 - p)) - } + await new Promise((resolve) => setTimeout(resolve, 100)) } }, 0) } @@ -148,7 +176,7 @@ if (isMainThread) { samplesThrottler, timeSeriesThrottler, tracesThottler, - TimeoutThrottler, + TimeoutThrottler: TimeoutOrSizeThrottler, postMessage, on: emitter.on.bind(emitter), removeAllListeners: emitter.removeAllListeners.bind(emitter), From d68d1bac5531b0c57ef29f31435b567856648bb0 Mon Sep 17 00:00:00 2001 From: akvlad Date: Thu, 13 Jun 2024 16:28:21 +0300 Subject: [PATCH 7/7] debug BULK_MAX_SIZE_BYTES/BULK_MAX_AGE_MS --- lib/db/throttler.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/db/throttler.js b/lib/db/throttler.js index 59104cbe..90fc492c 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -88,6 +88,7 @@ class TimeoutOrSizeThrottler { } const _queue = this.queue this.queue = [] + this.size = 0 await rawRequest(this.statement, _queue.join('\n'), DATABASE_NAME(), { maxBodyLength: Infinity }) }