Skip to content

Commit

Permalink
Merge pull request #437 from metrico/fix_433
Browse files Browse the repository at this point in the history
fix: configurable telegraf schema support
  • Loading branch information
akvlad authored Jan 23, 2024
2 parents 526b45c + ce808c4 commit 59e8a39
Showing 1 changed file with 59 additions and 2 deletions.
61 changes: 59 additions & 2 deletions lib/handlers/influx_write.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ async function handler (req, res) {
throw new errors.QrynBadRequest(e.toString())
}
const promises = []
if (streams) {
if (process.env.ADVANCED_TELEGRAF_METRICS_SCHEMA === 'telegraf-prometheus-v2') {
await Promise.all(telegrafPrometheusV1(streams))
} else if (streams) {
streams.forEach(function (stream) {
let JSONLabels = {}
let JSONFields = {}
Expand Down Expand Up @@ -116,7 +118,7 @@ async function handler (req, res) {
const values = [
finger,
BigInt(pad('0000000000000000000', timestamp, true)),
_value || 0,
parseFloat(_value) || 0,
key || ''
]
bulk.add([values])
Expand All @@ -139,6 +141,61 @@ async function handler (req, res) {
return res.code(204).send('')
}

function telegrafPrometheusV1 (stream) {
const promises = []
for (const entry of stream) {
const timestamp = BigInt(entry.timestamp)
if (entry.measurement === 'syslog' || entry.fields.message) {
const labels = {
...entry.tags,
measurement: entry.measurement
}
const strLabels = stringify(Object.fromEntries(Object.entries(labels).sort()))
const fp = fingerPrint(strLabels)
promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
fp,
strLabels,
entry.measurement || ''
]]))
const values = [
fp,
timestamp,
0,
entry.fields.message || ''
]
promises.push(bulk.add([values]))
}
for (const [key, value] of Object.entries(entry.fields)) {
const iValue = parseFloat(value)
if (typeof iValue !== 'number') {
continue
}
const labels = {
...entry.tags,
measurement: entry.measurement,
__name__: key
}
const strLabels = stringify(Object.fromEntries(Object.entries(labels).sort()))
const fp = fingerPrint(strLabels)
promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
fp,
strLabels,
entry.measurement || ''
]]))
const values = [
fp,
timestamp,
iValue || 0,
key || ''
]
promises.push(bulk.add([values]))
}
}
return promises
}

function pad (pad, str, padLeft) {
if (typeof str === 'undefined') {
return pad
Expand Down

0 comments on commit 59e8a39

Please sign in to comment.