Skip to content

Commit

Permalink
Merge pull request #514 from metrico/release/3_2_23_beta
Browse files Browse the repository at this point in the history
Release 3.2.23 prep
  • Loading branch information
akvlad authored Jun 13, 2024
2 parents 2944762 + d68d1ba commit 0a2d6dd
Show file tree
Hide file tree
Showing 11 changed files with 471 additions and 209 deletions.
3 changes: 3 additions & 0 deletions common.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,6 @@ module.exports.logType = boolEnv('DISTINGUISH_LOGS_METRICS') ? 1 : 0
module.exports.metricType = boolEnv('DISTINGUISH_LOGS_METRICS') ? 2 : 0

module.exports.bothType = 0

module.exports.writerMode = (process.env.MODE === 'writer' || !process.env.MODE) && !boolEnv('READONLY')
module.exports.readerMode = process.env.MODE === 'reader' || boolEnv('READONLY') || !process.env.MODE
69 changes: 49 additions & 20 deletions lib/db/throttler.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,42 @@ const axiosError = async (err) => {
}
}

class TimeoutThrottler {
constructor (statement) {
class TimeoutOrSizeThrottler {
constructor (statement, maxSizeB, maxAgeMS) {
this.statement = statement
this.queue = []
this.resolvers = []
this.rejects = []
this.size = 0

this.maxSizeB = maxSizeB
this.maxAgeMs = maxAgeMS
this.lastRequest = 0
}

/**
* @param message {string}
*/
queuePush (message) {
this.queue.push(message)
this.size += message.length
}

willFlush () {
return (this.maxSizeB && this.size > this.maxSizeB) ||
(this.maxAgeMs && Date.now() - this.lastRequest > this.maxAgeMs)
}

async flush () {
/**
* @param force {boolean}
* @returns {Promise<void>}
*/
async flush (force) {
try {
if (!force && !this.willFlush()) {
return
}
this.lastRequest = Date.now()
await this._flush()
this.resolvers.forEach(r => r())
} catch (err) {
Expand All @@ -62,6 +88,7 @@ class TimeoutThrottler {
}
const _queue = this.queue
this.queue = []
this.size = 0
await rawRequest(this.statement, _queue.join('\n'), DATABASE_NAME(), { maxBodyLength: Infinity })
}

Expand All @@ -70,12 +97,11 @@ class TimeoutThrottler {
}
}


const emitter = new EventEmitter()
let on = true
const postMessage = message => {
const genericRequest = (throttler) => {
throttler.queue.push(message.data)
throttler.queuePush(message.data)
throttler.resolvers.push(() => {
if (isMainThread) {
emitter.emit('message', { status: 'ok', id: message.id })
Expand Down Expand Up @@ -115,30 +141,33 @@ const init = () => {
require('./clickhouse').rawRequest
]

samplesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`)
timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`)
tracesThottler = new TimeoutThrottler(
samplesThrottler = new TimeoutOrSizeThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`,
parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100))
timeSeriesThrottler = new TimeoutOrSizeThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`,
parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100))
tracesThottler = new TimeoutOrSizeThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
FORMAT JSONEachRow`)
FORMAT JSONEachRow`,
parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100))

setTimeout(async () => {
// eslint-disable-next-line no-unmodified-loop-condition
while (on) {
const ts = Date.now()
try {
await timeSeriesThrottler.flush()
await samplesThrottler.flush()
await tracesThottler.flush()
await Promise.all([
(async () => {
await timeSeriesThrottler.flush(samplesThrottler.willFlush())
await samplesThrottler.flush(false)
})(),
tracesThottler.flush(false)
])
} catch (err) {
logger.error(await axiosError(err), 'AXIOS ERROR')
}
const p = Date.now() - ts
if (p < 100) {
await new Promise((resolve) => setTimeout(resolve, 100 - p))
}
await new Promise((resolve) => setTimeout(resolve, 100))
}
}, 0)
}
Expand All @@ -148,7 +177,7 @@ if (isMainThread) {
samplesThrottler,
timeSeriesThrottler,
tracesThottler,
TimeoutThrottler,
TimeoutThrottler: TimeoutOrSizeThrottler,
postMessage,
on: emitter.on.bind(emitter),
removeAllListeners: emitter.removeAllListeners.bind(emitter),
Expand Down
2 changes: 1 addition & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ const flatOTLPAttrs = (attrs) => {
return
}
}
if (val.arrayValue) {
if (val.arrayValue && val.arrayValue.values) {
val.arrayValue.values.forEach((v, i) => {
flatVal(`${i}`, v, `${prefix}${key}.`, res)
})
Expand Down
Loading

0 comments on commit 0a2d6dd

Please sign in to comment.