Skip to content

Commit

Permalink
#fix: alerting for cluster; e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Oct 1, 2023
1 parent 6a6f6d6 commit e1b81d1
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 26 deletions.
3 changes: 2 additions & 1 deletion lib/db/alerting/alertWatcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const CallbackTimeSeriesAlertWatcher = require('./callbackTimeSeriesAlertWatcher
const CallbackCliqlAlertWatcher = require('./callbackCliqlAlertWatcher')
const MVAlertWatcher = require('./MVAlertWatcher')
const { parseCliQL } = require('../../../cliql')
const {clusterName} = require('../../../../common')
/**
* @param nsName {string}
* @param group {alerting.group | alerting.objGroup}
Expand All @@ -24,7 +25,7 @@ module.exports = (nsName, group, rule) => {
if (q.matrix) {
return new CallbackTimeSeriesAlertWatcher(nsName, group, rule)
}
if (q.stream && q.stream.length) {
if ((q.stream && q.stream.length) || clusterName) {
return new CallbackLogAlertWatcher(nsName, group, rule)
}
return new MVAlertWatcher(nsName, group, rule)
Expand Down
8 changes: 4 additions & 4 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ const tempoQueryScan = async function (query, res, traceId) {
const tempoQueryScanV2 = async function (query, res, traceId) {
logger.debug(`Scanning Tempo Fingerprints... ${traceId}`)
const _stream = await axios.post(getClickhouseUrl() + '/',
`SELECT payload_type, payload FROM ${DATABASE_NAME()}.tempo_traces WHERE oid='0' AND trace_id=unhex('${traceId}') ORDER BY timestamp_ns ASC LIMIT 2000 FORMAT JSONEachRow`,
`SELECT payload_type, payload FROM ${DATABASE_NAME()}.tempo_traces${dist} WHERE oid='0' AND trace_id=unhex('${traceId}') ORDER BY timestamp_ns ASC LIMIT 2000 FORMAT JSONEachRow`,
{
responseType: 'stream'
}
Expand Down Expand Up @@ -422,7 +422,7 @@ const queryTempoScanV2 = async function (query) {
const select = `SELECT hex(trace_id) as traceID, service_name as rootServiceName,
name as rootTraceName, timestamp_ns as startTimeUnixNano,
intDiv(duration_ns, 1000000) as durationMs`
const from = `FROM ${DATABASE_NAME()}.tempo_traces`
const from = `FROM ${DATABASE_NAME()}.tempo_traces${dist}`
const where = [
'oid = \'0\'',
`timestamp_ns >= ${parseInt(query.start)} AND timestamp_ns <= ${parseInt(query.end)}`,
Expand Down Expand Up @@ -461,7 +461,7 @@ const queryTempoScanV2 = async function (query) {

async function queryTempoTags () {
const q = `SELECT distinct key
FROM ${DATABASE_NAME()}.tempo_traces_kv
FROM ${DATABASE_NAME()}.tempo_traces_kv${dist}
WHERE oid='0' AND date >= toDate(NOW()) - interval '1 day'
FORMAT JSON`
const resp = await axios.post(getClickhouseUrl() + '/',q)
Expand All @@ -475,7 +475,7 @@ async function queryTempoTags () {
*/
async function queryTempoValues (tag) {
const q = `SELECT distinct val
FROM ${DATABASE_NAME()}.tempo_traces_kv
FROM ${DATABASE_NAME()}.tempo_traces_kv${dist}
WHERE oid='0' AND date >= toDate(NOW()) - interval '1 day' AND key = ${csql.quoteVal(tag)}
FORMAT JSON`
const resp = await axios.post(getClickhouseUrl() + '/', q)
Expand Down
46 changes: 29 additions & 17 deletions lib/db/clickhouse_alerting.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ const { DATABASE_NAME } = require('../utils')
const UTILS = require('../utils')
const { getClickhouseUrl } = require('./clickhouse')
const Sql = require('@cloki/clickhouse-sql')
const { clusterName } = require('../../common')
const onCluster = clusterName ? `ON CLUSTER ${clusterName}` : ''
const dist = clusterName ? '_dist' : ''
/**
* @param ns {string}
* @param group {string}
Expand All @@ -18,7 +21,7 @@ module.exports.getAlertRule = async (ns, group, name) => {
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' +
`FROM ${DATABASE_NAME()}.settings ` +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE fingerprint = ${fp} AND ${mark} == ${mark} ` +
'GROUP BY fingerprint ' +
'HAVING name != \'\' ' +
Expand Down Expand Up @@ -47,7 +50,7 @@ module.exports.putAlertRule = async (namespace, group, rule) => {
const groupFp = getGroupFp(namespace, group.name)
const groupVal = JSON.stringify({ name: group.name, interval: group.interval })
await axios.post(getClickhouseUrl(),
`INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` +
`INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` +
JSON.stringify({ fingerprint: ruleFp, type: 'alert_rule', name: ruleName, value: JSON.stringify(ruleVal), inserted_at: Date.now() * 1000000 }) + '\n' +
JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 })
)
Expand Down Expand Up @@ -99,7 +102,7 @@ module.exports.getAlertRules = async (limit, offset) => {
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' +
`FROM ${DATABASE_NAME()}.settings ` +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE type == 'alert_rule' AND ${mark} == ${mark} ` +
`GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`)
return res.data.data.map(e => {
Expand All @@ -119,7 +122,7 @@ module.exports.getAlertGroups = async (limit, offset) => {
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' +
`FROM ${DATABASE_NAME()}.settings ` +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE type == 'alert_group' AND ${mark} == ${mark} ` +
`GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`)
return res.data.data.map(e => {
Expand All @@ -134,7 +137,7 @@ module.exports.getAlertRulesCount = async () => {
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
'SELECT COUNT(1) as count FROM (SELECT fingerprint ' +
`FROM ${DATABASE_NAME()}.settings ` +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE type=\'alert_rule\' AND ${mark} == ${mark} ` +
'GROUP BY fingerprint ' +
'HAVING argMax(name, inserted_at) != \'\') FORMAT JSON')
Expand All @@ -153,8 +156,8 @@ module.exports.deleteAlertRule = async (ns, group, rule) => {
`INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` +
JSON.stringify({ fingerprint: fp, type: 'alert_rule', name: '', value: '', inserted_at: Date.now() })
)
await axios.post(getClickhouseUrl(),
`ALTER TABLE ${DATABASE_NAME()}.settings DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')`
await axios.post(getClickhouseUrl() + '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1',
`ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')`
)
}

Expand All @@ -166,11 +169,12 @@ module.exports.deleteAlertRule = async (ns, group, rule) => {
module.exports.deleteGroup = async (ns, group) => {
const fp = getGroupFp(ns, group)
await axios.post(getClickhouseUrl(),
`INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` +
`INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` +
JSON.stringify({ fingerprint: fp, type: 'alert_group', name: '', value: '', inserted_at: Date.now() })
)
await axios.post(getClickhouseUrl(),
`ALTER TABLE ${DATABASE_NAME()}.settings DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')`
await axios.post(getClickhouseUrl() + '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1',
`ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')
SETTINGS allow_nondeterministic_mutations=1`
)
}

Expand All @@ -183,9 +187,11 @@ module.exports.deleteGroup = async (ns, group) => {
module.exports.dropAlertViews = async (ns, group, rule) => {
const fp = getRuleFP(ns, group, rule)
await axios.post(getClickhouseUrl(),
`DROP VIEW IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}`)
`DROP VIEW IF EXISTS ${DATABASE_NAME()}._alert_view_${fp} ${onCluster}`)
await axios.post(getClickhouseUrl(),
`DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark`)
`DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ${onCluster}`)
await axios.post(getClickhouseUrl(),
`DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark_dist ${onCluster}`)
}

/**
Expand All @@ -197,9 +203,15 @@ module.exports.dropAlertViews = async (ns, group, rule) => {
module.exports.createMarksTable = async (ns, group, rule) => {
const fp = getRuleFP(ns, group, rule)
await axios.post(getClickhouseUrl(),
`CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ` +
`CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ${onCluster}` +
'(id UInt8 default 0,mark UInt64, inserted_at DateTime default now()) ' +
'ENGINE ReplacingMergeTree(mark) ORDER BY id')
`ENGINE ${clusterName ? 'Replicated' : ''}ReplacingMergeTree(mark) ORDER BY id`)
if (clusterName) {
await axios.post(getClickhouseUrl(),
`CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark_dist ${onCluster}` +
'(id UInt8 default 0,mark UInt64, inserted_at DateTime default now()) ' +
`ENGINE=Distributed('${clusterName}', '${DATABASE_NAME()}', '_alert_view_${fp}_mark', id)`)
}
}

/**
Expand Down Expand Up @@ -235,7 +247,7 @@ module.exports.createAlertViews = async (ns, group, rule, request) => {
module.exports.getLastMark = async (ns, group, rule) => {
const fp = getRuleFP(ns, group, rule)
const mark = await axios.post(getClickhouseUrl(),
`SELECT max(mark) as mark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = 0 FORMAT JSON`)
`SELECT max(mark) as mark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark${dist} WHERE id = 0 FORMAT JSON`)
return parseInt(mark.data.data[0].mark)
}

Expand All @@ -253,7 +265,7 @@ module.exports.incAlertMark = async (ns, group, rule, newMark, id) => {
newMark = newMark || Date.now()
id = id || 0
await axios.post(getClickhouseUrl(),
`INSERT INTO ${DATABASE_NAME()}._alert_view_${fp}_mark (mark, id) VALUES (${newMark}, ${id})`)
`INSERT INTO ${DATABASE_NAME()}._alert_view_${fp}_mark${dist} (mark, id) VALUES (${newMark}, ${id})`)
return [mark, newMark]
}

Expand Down Expand Up @@ -285,7 +297,7 @@ module.exports.getAlerts = async (ns, group, rule, mark) => {
module.exports.dropOutdatedParts = async (ns, group, rule, mark) => {
const fp = getRuleFP(ns, group, rule)
const partitions = await axios.post(getClickhouseUrl(),
`SELECT DISTINCT mark FROM ${DATABASE_NAME()}._alert_view_${fp} WHERE mark <= ${mark} FORMAT JSON`)
`SELECT DISTINCT mark FROM ${DATABASE_NAME()}._alert_view_${fp}${dist} WHERE mark <= ${mark} FORMAT JSON`)
if (!partitions.data || !partitions.data.data || !partitions.data.data.length) {
return
}
Expand Down
9 changes: 8 additions & 1 deletion lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,14 @@ module.exports.overall_dist = [
\`name\` String,
\`value\` String,
\`inserted_at\` DateTime64(9, 'UTC')
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());`
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());`,

`CREATE TABLE IF NOT EXISTS {{DB}}.time_series_gin_dist {{{OnCluster}}} (
date Date,
key String,
val String,
fingerprint UInt64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand());`,
]

module.exports.traces_dist = [
Expand Down
5 changes: 4 additions & 1 deletion lib/handlers/label.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

const clickhouse = require('../db/clickhouse')
const utils = require('../utils')
const { clusterName } = require('../../common')
const dist = clusterName ? '_dist' : ''

async function handler (req, res) {
req.log.debug('GET /loki/api/v1/label')
Expand All @@ -21,7 +23,8 @@ async function handler (req, res) {
req.query.end && !isNaN(parseInt(req.query.end)) ? `date <= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.end)}, 1000000000)))` : null
].filter(w => w)
where = where.length ? `WHERE ${where.join(' AND ')}` : ''
const q = `SELECT DISTINCT key FROM time_series_gin ${where} FORMAT JSON`
const q = `SELECT DISTINCT key FROM time_series_gin${dist} ${where} FORMAT JSON`
console.log(q)
const allLabels = await clickhouse.rawRequest(q, null, utils.DATABASE_NAME())
const resp = { status: 'success', data: allLabels.data.data.map(r => r.key) }
return res.send(resp)
Expand Down
4 changes: 3 additions & 1 deletion lib/handlers/label_values.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
const clickhouse = require('../db/clickhouse')
const Sql = require('@cloki/clickhouse-sql')
const utils = require('../utils')
const { clusterName } = require('../../common')
const dist = clusterName ? '_dist' : ''

async function handler (req, res) {
req.log.debug(`GET /api/prom/label/${req.params.name}/values`)
Expand All @@ -23,7 +25,7 @@ async function handler (req, res) {
req.query.end && !isNaN(parseInt(req.query.end)) ? `date <= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.end)}, 1000000000)))` : null
].filter(w => w)
where = `WHERE ${where.join(' AND ')}`
const q = `SELECT DISTINCT val FROM time_series_gin ${where} FORMAT JSON`
const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} FORMAT JSON`
const allValues = await clickhouse.rawRequest(q, null, utils.DATABASE_NAME())
const resp = { status: 'success', data: allValues.data.data.map(r => r.val) }
return res.send(resp)
Expand Down
2 changes: 1 addition & 1 deletion parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ module.exports.transpile = (request) => {
step: step,
legacy: !checkVersion('v3_1', start),
joinLabels: joinLabels,
inline: clusterName !== ''
inline: !!clusterName
}
let duration = null
const matrixOp = [
Expand Down

0 comments on commit e1b81d1

Please sign in to comment.