Skip to content

Commit

Permalink
Merge pull request #376 from metrico/k6-fixes
Browse files Browse the repository at this point in the history
K6 fixes
  • Loading branch information
akvlad authored Nov 7, 2023
2 parents 070da3e + 39bc043 commit df14728
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 66 deletions.
3 changes: 2 additions & 1 deletion lib/bun_wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const wrapper = (handler, parsers) => {
raw: stream,
log: log,
params: ctx.params || {},
query: {}
query: {},
method: ctx.method
}
for (const [key, value] of (new URL(ctx.url)).searchParams) {
if (!(key in req.query)) {
Expand Down
12 changes: 6 additions & 6 deletions lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ module.exports.overall = [
AS SELECT date, pairs.1 as key, pairs.2 as val, fingerprint
FROM time_series ARRAY JOIN JSONExtractKeysAndValues(time_series.labels, 'String') as pairs`,

`INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_5'), 'update',
'v3_1', toString(toUnixTimestamp(NOW())), NOW())`,
"INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_5'), 'update', " +
"'v3_1', toString(toUnixTimestamp(NOW())), NOW())",

`CREATE TABLE IF NOT EXISTS {{DB}}.metrics_15s {{{OnCluster}}} (
fingerprint UInt64,
Expand All @@ -68,8 +68,8 @@ SELECT fingerprint,
FROM samples_v3 as samples
GROUP BY fingerprint, timestamp_ns;`,

`INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update',
'v3_2', toString(toUnixTimestamp(NOW())), NOW())`
"INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update', " +
"'v3_2', toString(toUnixTimestamp(NOW())), NOW())"
]

module.exports.traces = [
Expand Down Expand Up @@ -151,8 +151,8 @@ module.exports.traces = [
duration_ns as duration
FROM traces_input ARRAY JOIN tags`,

`INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('tempo_traces_v1'), 'update',
'tempo_traces_v2', toString(toUnixTimestamp(NOW())), NOW())`
"INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('tempo_traces_v1'), 'update', " +
"'tempo_traces_v2', toString(toUnixTimestamp(NOW())), NOW())"
]

module.exports.overall_dist = [
Expand Down
11 changes: 7 additions & 4 deletions lib/handlers/prom_query_range.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ const { rangeQuery } = require('../../promql/index')

async function handler (req, res) {
req.log.debug('GET /api/v1/query_range')
const startMs = parseInt(req.query.start) * 1000 || Date.now() - 60000
const endMs = parseInt(req.query.end) * 1000 || Date.now()
const stepMs = parseInt(req.query.step) * 1000 || 15000
const query = req.query.query
const request = req.method === 'POST' ? req.body : req.query
const startMs = parseInt(request.start) * 1000 || Date.now() - 60000
const endMs = parseInt(request.end) * 1000 || Date.now()
const stepMs = parseInt(request.step) * 1000 || 15000
const query = request.query
try {
const result = await rangeQuery(query, startMs, endMs, stepMs)
return res.code(200).send(result)
} catch (err) {
console.log(req.query)
console.log(err)
return res.code(500).send(JSON.stringify({ status: 'error', error: err.message }))
}
}
Expand Down
50 changes: 30 additions & 20 deletions wasm_parts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,27 @@ func transpileTraceQL(id uint32) int {
return 0
}

var eng = promql.NewEngine(promql.EngineOpts{
Logger: TestLogger{},
Reg: nil,
MaxSamples: 100000,
Timeout: time.Second * 30,
ActiveQueryTracker: nil,
LookbackDelta: 0,
NoStepSubqueryIntervalFn: nil,
EnableAtModifier: false,
EnableNegativeOffset: false,
})
var eng *promql.Engine = nil
var engC = 0

func getEng() *promql.Engine {
if eng == nil || engC >= 5 {
eng = promql.NewEngine(promql.EngineOpts{
Logger: TestLogger{},
Reg: nil,
MaxSamples: 100000,
Timeout: time.Second * 30,
ActiveQueryTracker: nil,
LookbackDelta: 0,
NoStepSubqueryIntervalFn: nil,
EnableAtModifier: false,
EnableNegativeOffset: false,
})
engC = 0
}
engC++
return eng
}

//export stats
func stats() {
Expand All @@ -129,7 +139,7 @@ func stats() {
func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint32 {
return pql(data[id], func() (promql.Query, error) {
queriable := &TestQueryable{id: id}
return eng.NewRangeQuery(
return getEng().NewRangeQuery(
queriable,
nil,
string(data[id].request),
Expand All @@ -144,7 +154,7 @@ func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint
func pqlInstantQuery(id uint32, timeMS float64) uint32 {
return pql(data[id], func() (promql.Query, error) {
queriable := &TestQueryable{id: id}
return eng.NewInstantQuery(
return getEng().NewInstantQuery(
queriable,
nil,
string(data[id].request),
Expand All @@ -155,7 +165,7 @@ func pqlInstantQuery(id uint32, timeMS float64) uint32 {
//export pqlSeries
func pqlSeries(id uint32) uint32 {
queriable := &TestQueryable{id: id}
query, err := eng.NewRangeQuery(
query, err := getEng().NewRangeQuery(
queriable,
nil,
string(data[id].request),
Expand Down Expand Up @@ -388,13 +398,13 @@ func (t *TestSeries) Next() bool {
}

func (t *TestSeries) Seek(tmMS int64) bool {
t.i = 0
ms := *(*int64)(unsafe.Pointer(&t.data[t.i*16]))
for ms < tmMS && t.i*16 < len(t.data) {
t.i++
ms = *(*int64)(unsafe.Pointer(&t.data[t.i*16]))
for t.i = 0; t.i*16 < len(t.data); t.i++ {
ms := *(*int64)(unsafe.Pointer(&t.data[t.i*16]))
if ms >= tmMS {
return true
}
}
return t.i*16 < len(t.data)
return false
}

func (t *TestSeries) At() (int64, float64) {
Expand Down
90 changes: 55 additions & 35 deletions wasm_parts/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,35 @@ module.exports.WasmError = WasmError

let counter = 0
var go = new Go();
var wasm

const getWasm = (() => {
let wasm = null;
let cnt = 0;
let run = false;
async function init () {
run = true;
const _wasm = await WebAssembly.instantiate(
gunzipSync(fs.readFileSync(WASM_URL)), go.importObject)
go.run(_wasm.instance)
wasm = _wasm.instance
cnt = 0
run = false;
}
init();
return () => {
if (cnt > 5 && !run){
init();
}
return wasm
}
})()

const newId = () => {
const id = counter
counter = (counter + 1) & 0xFFFFFFFF
return id
}

async function init () {
const _wasm = await WebAssembly.instantiate(
gunzipSync(fs.readFileSync(WASM_URL)), go.importObject)
go.run(_wasm.instance)
wasm = _wasm.instance
}

init()
setInterval(async () => {
init()
}, 300000)

/**
*
* @param query {string}
Expand All @@ -39,7 +48,7 @@ setInterval(async () => {
* @returns {Promise<string>}
*/
module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) => {
const _wasm = wasm
const _wasm = getWasm()
const start = startMs || Date.now() - 300000
const end = endMs || Date.now()
const step = stepMs || 15000
Expand All @@ -57,28 +66,28 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) =>
*/
module.exports.pqlInstantQuery = async (query, timeMs, getData) => {
const time = timeMs || Date.now()
const _wasm = wasm
const _wasm = getWasm()
return await pql(query,
(ctx) => _wasm.exports.pqlInstantQuery(ctx.id, time),
(matchers) => getData(matchers, time - 300000, time))
}

module.exports.pqlMatchers = (query) => {
const _wasm = wasm
const _wasm = getWasm()
const id = newId()
const ctx = new Ctx(id, _wasm)
ctx.create()
try {
ctx.write(query)
const res1 = _wasm.exports.pqlSeries(id)
if (res1 !== 0) {
throw new WasmError(ctx.read())
}
/** @type {[[[string]]]} */
const matchersObj = JSON.parse(ctx.read())
return matchersObj
ctx.write(query)
const res1 = _wasm.exports.pqlSeries(id)
if (res1 !== 0) {
throw new WasmError(ctx.read())
}
/** @type {[[[string]]]} */
const matchersObj = JSON.parse(ctx.read())
return matchersObj
} finally {
ctx.destroy()
ctx.destroy()
}
}

Expand Down Expand Up @@ -108,7 +117,7 @@ module.exports.TranspileTraceQL = (request) => {
let _ctx
try {
const id = newId()
const _wasm = wasm
const _wasm = getWasm()
_ctx = new Ctx(id, _wasm)
_ctx.create()
_ctx.write(JSON.stringify(request))
Expand All @@ -132,10 +141,10 @@ module.exports.TranspileTraceQL = (request) => {
*/
const pql = async (query, wasmCall, getData) => {
const reqId = newId()
const _wasm = wasm
const _wasm = getWasm()
const ctx = new Ctx(reqId, _wasm)
ctx.create()
try {
ctx.create()
ctx.write(query)
const res1 = wasmCall(ctx)
if (res1 !== 0) {
Expand All @@ -155,26 +164,37 @@ const pql = async (query, wasmCall, getData) => {
writer.writeString(JSON.stringify(matchers))
writer.writeBytes([data])
}

fs.writeFileSync('req.txt', query)
fs.writeFileSync('data.bin', writer.buffer())
ctx.write(writer.buffer())
_wasm.exports.onDataLoad(reqId)
return ctx.read()
} finally {
ctx.destroy()
ctx && ctx.destroy()
}
}
class Ctx {
constructor (id, wasm) {
this.wasm = wasm
this.id = id
this.created = false
}

create () {
this.wasm.exports.createCtx(this.id)
try {
this.wasm.exports.createCtx(this.id)
this.created = true
} catch (err) {
throw err
}
}

destroy () {
this.wasm.exports.dealloc(this.id)
try {
if (this.created) this.wasm.exports.dealloc(this.id)
} catch (err) {
throw err
}
}

/**
Expand All @@ -186,8 +206,8 @@ class Ctx {
data = (new TextEncoder()).encode(data)
}
this.wasm.exports.alloc(this.id, data.length)
const ptr = wasm.exports.alloc(this.id, data.length)
new Uint8Array(wasm.exports.memory.buffer).set(data, ptr)
const ptr = this.wasm.exports.alloc(this.id, data.length)
new Uint8Array(this.wasm.exports.memory.buffer).set(data, ptr)
}

/**
Expand All @@ -198,7 +218,7 @@ class Ctx {
this.wasm.exports.getCtxResponse(this.id),
this.wasm.exports.getCtxResponseLen(this.id)
]
return new TextDecoder().decode(new Uint8Array(wasm.exports.memory.buffer).subarray(resPtr, resPtr + resLen))
return new TextDecoder().decode(new Uint8Array(this.wasm.exports.memory.buffer).subarray(resPtr, resPtr + resLen))
}
}

Expand Down
Binary file modified wasm_parts/main.wasm.gz
Binary file not shown.

0 comments on commit df14728

Please sign in to comment.