Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/npm_and_yarn/npm_and_yarn-13161…
Browse files Browse the repository at this point in the history
…0d547
  • Loading branch information
lmangani authored Dec 28, 2024
2 parents d48c6e9 + d7145ef commit 6799b09
Show file tree
Hide file tree
Showing 13 changed files with 2,417 additions and 642 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ node_modules
/wasm_parts/wasm_parts.iml
/test/qryn_test_env/clickhouse/_data/
/test/qryn_test_env/grafana/_data/
/test/qryn_test_cluster_env/grafana/_data/
2 changes: 1 addition & 1 deletion lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ module.exports.traces = [
]

module.exports.overall_dist = [
`CREATE TABLE {{DB}}.metrics_15s_dist {{{OnCluster}}} (
`CREATE TABLE IF NOT EXISTS {{DB}}.metrics_15s_dist {{{OnCluster}}} (
\`fingerprint\` UInt64,
\`timestamp_ns\` Int64 CODEC(DoubleDelta),
\`last\` AggregateFunction(argMax, Float64, Int64),
Expand Down
4 changes: 2 additions & 2 deletions lib/handlers/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function processStream (stream, labels, bulkLabels, bulk, toJSON, fingerPrint) {
values.push([
finger,
ts,
(typeof entry.value === 'undefined') ? null : entry.value,
(typeof entry.value !== 'number') ? 0 : entry.value,
entry.line || '',
type === 3 ? bothType : type
])
Expand All @@ -98,7 +98,7 @@ function processStream (stream, labels, bulkLabels, bulk, toJSON, fingerPrint) {
values.push([
finger,
BigInt(value[0]),
(typeof value[2] === 'undefined') ? null : value[2],
(typeof value[2] !== 'number') ? 0 : value[2],
value[1] || '',
type === 3 ? bothType : type
])
Expand Down
31 changes: 11 additions & 20 deletions parser/bnf.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,23 @@ compiler.ParseScript = function (script) {
const aqLiterals = []
let _script = script
let res = ''
let qsMatch = _script.match(/^([^"]*)("([^"\\]|\\.)*")?/)
const re = /^([^"`]*)("(([^"\\]|\\.)*)"|`(([^`\\]|\\.)*)`)?/
let qsMatch = _script.match(re)
while (qsMatch && qsMatch[0]) {
let repl = qsMatch[2] || ''
let repl = qsMatch[2] || qsMatch[4] || ''
if (repl.length > 512) {
qLiterals.push(repl)
repl = `"QL_${qLiterals.length - 1}"`
if (repl.startsWith('"')) {
qLiterals.push(repl)
repl = `"QL_${qLiterals.length - 1}"`
} else {
aqLiterals.push(repl)
repl = `\`AL_${aqLiterals.length - 1}\``
}
}
res = res + qsMatch[1] + repl
_script = _script.slice(qsMatch[0].length)
qsMatch = _script.match(/^([^"]*)("([^"\\]|\\.)*")?/)
qsMatch = _script.match(re)
}

_script = res
res = ''
qsMatch = _script.match(/^([^`]*)(`([^`\\]|\\.)*`)?/)
while (qsMatch && qsMatch[0]) {
let repl = qsMatch[2] || ''
if (repl.length > 512) {
aqLiterals.push(repl)
repl = `\`AL_${qLiterals.length - 1}\``
}
res = res + qsMatch[1] + repl
_script = _script.slice(qsMatch[0].length)
qsMatch = _script.match(/^([^`]*)(`([^`\\]|\\.)*`)?/)
}

const parsedScript = this._ParseScript(res)
if (!parsedScript) {
return parsedScript
Expand Down
10 changes: 6 additions & 4 deletions parser/registry/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ module.exports.querySelectorPostProcess = (query) => {
* @returns {string}
*/
module.exports.unquoteToken = (token) => {
let value = token.Child('quoted_str').value
value = `"${value.substr(1, value.length - 2)}"`
return JSON.parse(value)
const value = token.Child('quoted_str').value
if (value.startsWith('"')) {
return JSON.parse(value)
}
return value.substr(1, value.length - 2)
}

/**
Expand Down Expand Up @@ -445,7 +447,7 @@ module.exports.preJoinLabels = (token, query, dist) => {
dist = dist || ''
const timeSeriesReq = new Sql.Select()
.select('fingerprint', 'labels')
.from([`${DATABASE_NAME()}.time_series${dist}`, 'time_series'])
.from([`${DATABASE_NAME()}.time_series`, 'time_series'])
.where(new Sql.And(
new Sql.In('time_series.fingerprint', 'in', inRightSide),
Sql.Gte(new Sql.Raw('date'), sqlFrom),
Expand Down
119 changes: 78 additions & 41 deletions promql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ module.exports.series = async (query, fromMs, toMs) => {
const fromS = Math.floor(fromMs / 1000)
const toS = Math.floor(toMs / 1000)
const matchers = prometheus.pqlMatchers(query)
const conds = getMatchersIdxCond(matchers[0])
const idx = getIdxSubquery(conds, fromMs, toMs)
const idx = getIdxSubqueryV2(matchers[0], fromMs, toMs)
const withIdx = new Sql.With('idx', idx, !!clusterName)
const req = (new Sql.Select())
.with(withIdx)
Expand All @@ -72,51 +71,90 @@ module.exports.series = async (query, fromMs, toMs) => {
}
}

/**
*
* @param matcher {[string]}
*/
const getMatcherIdxCond = (matcher) => {
const res = [
Sql.Eq('key', matcher[0])
]
switch (matcher[1]) {
case '=':
res.push(Sql.Eq('val', matcher[2]))
break
case '!=':
res.push(Sql.Ne('val', matcher[2]))
break
case '=~':
res.push(Sql.Eq(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
break
case '!~':
res.push(Sql.Ne(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
}
return res
}

/**
*
* @param matchers {[[string]]}
*/
const getMatchersIdxCond = (matchers) => {
const matchesCond = []
for (const matcher of matchers) {
const _matcher = [
Sql.Eq('key', matcher[0])
]
switch (matcher[1]) {
case '=':
_matcher.push(Sql.Eq('val', matcher[2]))
break
case '!=':
_matcher.push(Sql.Ne('val', matcher[2]))
break
case '=~':
_matcher.push(Sql.Eq(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
break
case '!~':
_matcher.push(Sql.Ne(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
}
matchesCond.push(Sql.And(..._matcher))
}
return matchesCond
return matchers.map(matcher => Sql.And(...getMatcherIdxCond(matcher)))
}

const getIdxSubquery = (conds, fromMs, toMs) => {
const getIdxSubqueryV2 = (matchers, fromMs, toMs) => {
const fromS = Math.floor(fromMs / 1000)
const toS = Math.floor(toMs / 1000)
return (new Sql.Select())
.select('fingerprint')
.from([DATABASE_NAME() + '.time_series_gin', 'time_series_gin'])
.where(Sql.And(
Sql.Or(...conds),
Sql.Gte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${fromS}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${toS}))`)),
new Sql.In('type', 'in', [bothType, metricType])))
.having(
Sql.Eq(
new Sql.Raw('groupBitOr(' + conds.map(
(m, i) => new Sql.Raw(`bitShiftLeft((${m})::UInt64, ${i})`)
).join('+') + ')'), (1 << conds.length) - 1)
).groupBy('fingerprint')
const nonEmptyMatchers = matchers.filter(m => m[2] !== '')
const emptyMatchers = matchers.filter(m => m[2] === '' && ['=', '!='].includes(m[1]))
let req = null
if (nonEmptyMatchers.length) {
const nonEmptyConds = getMatchersIdxCond(nonEmptyMatchers)
req = (new Sql.Select())
.select('fingerprint')
.from([DATABASE_NAME() + '.time_series_gin', 'time_series_gin'])
.where(Sql.And(
Sql.Or(...nonEmptyConds),
Sql.Gte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${fromS}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${toS}))`)),
new Sql.In('type', 'in', [bothType, metricType])))
.having(
Sql.Eq(
new Sql.Raw('groupBitOr(' + nonEmptyConds.map(
(m, i) => new Sql.Raw(`bitShiftLeft((${m})::UInt64, ${i})`)
).join('+') + ')'), (1 << nonEmptyConds.length) - 1)
).groupBy('fingerprint')
}
if (emptyMatchers.length) {
const emptyConds = emptyMatchers.map(m => {
const visitParamHas = new Sql.Raw('')
visitParamHas.toString = function () {
return `visitParamHas(labels, ${Sql.quoteVal(m[0])})`
}
switch (m[1]) {
case '=':
return Sql.Eq(visitParamHas, new Sql.Raw('0'))
case '!=':
return Sql.Ne(visitParamHas, new Sql.Raw('1'))
default:
return null
}
}).filter(m => !!m)
const emptyReq = (new Sql.Select())
.select('fingerprint')
.from(`time_series${_dist}`)
.where(Sql.And(...emptyConds))
if (nonEmptyMatchers.length) {
const withNonEmptyIdx = new Sql.With('nonEmptyIdx', req, !!clusterName)
emptyReq.with(withNonEmptyIdx)
.where(
new Sql.In('fingerprint', 'in', new Sql.WithReference(withNonEmptyIdx))
)
}
req = emptyReq
}
return req
}

module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
Expand All @@ -128,8 +166,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
null, db, { responseType: 'arraybuffer' })
return new Uint8Array(data.data)
}
const matches = getMatchersIdxCond(matchers)
const idx = getIdxSubquery(matches, fromMs, toMs)
const idx = getIdxSubqueryV2(matchers, fromMs, toMs)
const withIdx = new Sql.With('idx', idx, !!clusterName)
const timeSeries = (new Sql.Select())
.select(
Expand All @@ -138,7 +175,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
).from(DATABASE_NAME() + '.time_series')
.where(Sql.And(
new Sql.In('fingerprint', 'in', new Sql.WithReference(withIdx)),
new Sql.In('type', 'in', [bothType,metricType])))
new Sql.In('type', 'in', [bothType, metricType])))
const withTimeSeries = new Sql.With('timeSeries', timeSeries, !!clusterName)
const raw = (new Sql.Select())
.with(withIdx)
Expand Down
6 changes: 6 additions & 0 deletions pyroscope/pprof-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ flate2 = "1.0"
# code size when deploying.
console_error_panic_hook = { version = "0.1.7", optional = true }
base64 = "0.22.1"
memchr = "2.7.4"

[dev-dependencies]
wasm-bindgen-test = "0.3.34"
criterion = { version = "0.5.1", features = ["html_reports"] }

[[bench]]
name = "my_benchmark"
harness = false

[profile.release]
# Tell `rustc` to optimize for small code size.
Expand Down
18 changes: 18 additions & 0 deletions pyroscope/pprof-bin/benches/my_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use pprof_bin::merge_prof;
use pprof_bin::utest::get_test_pprof_data;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn merge_bench(pprofs: &Vec<Vec<u8>>) {

for pprof in pprofs {
merge_prof(0, pprof.as_slice(), "process_cpu:samples:count:cpu:nanoseconds".to_string());
}
}

fn criterion_benchmark(c: &mut Criterion) {
let pprofs = get_test_pprof_data();
c.bench_function("merge", |b| b.iter(|| merge_bench(&pprofs)));
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
Binary file modified pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm
Binary file not shown.
Loading

0 comments on commit 6799b09

Please sign in to comment.