From ce808c4bc4a58f796a5a1983d6ded4a469af25e6 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 23 Jan 2024 14:10:32 +0200 Subject: [PATCH] fix: configurable telegraf schema support --- lib/handlers/influx_write.js | 61 ++++++++++++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/lib/handlers/influx_write.js b/lib/handlers/influx_write.js index a9a86111..571d2a89 100644 --- a/lib/handlers/influx_write.js +++ b/lib/handlers/influx_write.js @@ -62,7 +62,9 @@ async function handler (req, res) { throw new errors.QrynBadRequest(e.toString()) } const promises = [] - if (streams) { + if (process.env.ADVANCED_TELEGRAF_METRICS_SCHEMA === 'telegraf-prometheus-v2') { + await Promise.all(telegrafPrometheusV1(streams)) + } else if (streams) { streams.forEach(function (stream) { let JSONLabels = {} let JSONFields = {} @@ -116,7 +118,7 @@ async function handler (req, res) { const values = [ finger, BigInt(pad('0000000000000000000', timestamp, true)), - _value || 0, + parseFloat(_value) || 0, key || '' ] bulk.add([values]) @@ -139,6 +141,61 @@ async function handler (req, res) { return res.code(204).send('') } +function telegrafPrometheusV1 (stream) { + const promises = [] + for (const entry of stream) { + const timestamp = BigInt(entry.timestamp) + if (entry.measurement === 'syslog' || entry.fields.message) { + const labels = { + ...entry.tags, + measurement: entry.measurement + } + const strLabels = stringify(Object.fromEntries(Object.entries(labels).sort())) + const fp = fingerPrint(strLabels) + promises.push(bulk_labels.add([[ + new Date().toISOString().split('T')[0], + fp, + strLabels, + entry.measurement || '' + ]])) + const values = [ + fp, + timestamp, + 0, + entry.fields.message || '' + ] + promises.push(bulk.add([values])) + } + for (const [key, value] of Object.entries(entry.fields)) { + const iValue = parseFloat(value) + if (typeof iValue !== 'number') { + continue + } + const labels = { + ...entry.tags, + measurement: entry.measurement, + __name__: key + } + const strLabels = stringify(Object.fromEntries(Object.entries(labels).sort())) + const fp = fingerPrint(strLabels) + promises.push(bulk_labels.add([[ + new Date().toISOString().split('T')[0], + fp, + strLabels, + entry.measurement || '' + ]])) + const values = [ + fp, + timestamp, + iValue || 0, + key || '' + ] + promises.push(bulk.add([values])) + } + } + return promises +} + function pad (pad, str, padLeft) { if (typeof str === 'undefined') { return pad