diff --git a/lib/handlers/tempo_search.js b/lib/handlers/tempo_search.js index cf62c985..c47a2646 100644 --- a/lib/handlers/tempo_search.js +++ b/lib/handlers/tempo_search.js @@ -17,9 +17,13 @@ const logfmt = require('logfmt') const common = require('../../common') const { asyncLogError, CORS } = require('../../common') const { scanTempo } = require('../db/clickhouse') +const { search } = require('../../traceql') async function handler (req, res) { req.log.debug('GET /api/search') + if (req.query.q) { + return await searchV2(req, res) + } const resp = { data: [] } if (!req.query.tags) { return res.send(resp) @@ -58,4 +62,21 @@ async function handler (req, res) { } } +const searchV2 = async (req, res) => { + try { + const query = req.query.q + if (req.query.q === '{}') { + return res.code(200).send(JSON.stringify({ traces: [] })) + } + const limit = req.query.limit || 100 + const start = req.query.start || Math.floor(Date.now() / 1000) - 3600 + const end = req.query.end || Math.floor(Date.now() / 1000) - 3600 + const traces = await search(query, limit, new Date(start * 1000), new Date(end * 1000)) + return res.code(200).send(JSON.stringify({ traces: traces })) + } catch (e) { + req.log.error(e) + return res.code(500).send(e.message) + } +} + module.exports = handler diff --git a/lib/handlers/tempo_v2_tags.js b/lib/handlers/tempo_v2_tags.js new file mode 100644 index 00000000..dafdef03 --- /dev/null +++ b/lib/handlers/tempo_v2_tags.js @@ -0,0 +1,13 @@ +const { asyncLogError } = require('../../common') +const { queryTempoTags } = require('../db/clickhouse') +async function handler (req, res) { + try { + const resp = await queryTempoTags() + return res.send({ scopes: [{ name: 'span', tags: resp.map(e => `${e.key}`) }] }) + } catch (e) { + asyncLogError(e, req.log) + res.code(500) + } +} + +module.exports = handler diff --git a/lib/handlers/tempo_v2_values.js b/lib/handlers/tempo_v2_values.js new file mode 100644 index 00000000..7df6cf82 --- /dev/null +++ b/lib/handlers/tempo_v2_values.js @@ -0,0 +1,33 @@ +/* Tag Value Handler V2 */ +/* + For retrieving the tag values tempo can query on. + Responses looks like this: +{ + "tagValues": [ + { + "type": "string", + "value": "a" + }, + .... + ] +} +*/ +const { asyncLogError } = require('../../common') +const { queryTempoValues } = require('../db/clickhouse') + +async function handler (req, res) { + req.log.debug(`GET /api/v2/search/tag/${req.params.name}/values`) + if (!req.params.name) { + return res.send({ tagValues: [] }) + } + try { + req.params.name = req.params.name.replace(/^resource\.|^span\./, '') + const vals = (await queryTempoValues(req.params.name)).map(e => e.val) + return res.send({ tagValues: vals.map(v => ({ type: 'string', value: v })) }) + } catch (e) { + asyncLogError(e, req.log) + res.code(500) + } +}; + +module.exports = handler diff --git a/qryn_node.js b/qryn_node.js index 183348ce..dd034ed7 100755 --- a/qryn_node.js +++ b/qryn_node.js @@ -269,11 +269,19 @@ let fastify = require('fastify')({ fastify.get('/api/search/tags', handlerTempoLabel) fastify.get('/tempo/api/search/tags', handlerTempoLabel) + const handlerTempoLabelV2 = require('./lib/handlers/tempo_v2_tags').bind(this) + fastify.get('/api/v2/search/tags', handlerTempoLabelV2) + fastify.get('/tempo/api/v2/search/tags', handlerTempoLabelV2) + /* Tempo Tag Value Handler */ const handlerTempoLabelValues = require('./lib/handlers/tempo_values').bind(this) fastify.get('/api/search/tag/:name/values', handlerTempoLabelValues) fastify.get('/tempo/api/search/tag/:name/values', handlerTempoLabelValues) + const handlerTempoLabelV2Values = require('./lib/handlers/tempo_v2_values').bind(this) + fastify.get('/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) + fastify.get('/tempo/api/v2/search/tag/:name/values', handlerTempoLabelV2Values) + /* Tempo Traces Query Handler */ const handlerTempoSearch = require('./lib/handlers/tempo_search.js').bind(this) fastify.get('/api/search', handlerTempoSearch) diff --git a/traceql/index.js b/traceql/index.js new file mode 100644 index 00000000..b0d3d018 --- /dev/null +++ b/traceql/index.js @@ -0,0 +1,63 @@ +const { TranspileTraceQL } = require('../wasm_parts/main') +const { clusterName } = require('../common') +const { DATABASE_NAME } = require('../lib/utils') +const dist = clusterName ? '_dist' : '' +const { rawRequest } = require('../lib/db/clickhouse') + +/** + * + * @param query {string} + * @param limit {number} + * @param from {Date} + * @param to {Date} + * @returns {Promise<[]>} + */ +const search = async (query, limit, from, to) => { + const request = { + Request: query, + Ctx: { + IsCluster: !!clusterName, + OrgID: '0', + FromS: Math.floor(from.getTime() / 1000) - 600, + ToS: Math.floor(to.getTime() / 1000), + Limit: parseInt(limit), + + TimeSeriesGinTableName: 'time_series_gin', + SamplesTableName: `samples_v3${dist}`, + TimeSeriesTableName: 'time_series', + TimeSeriesDistTableName: 'time_series_dist', + Metrics15sTableName: `metrics_15s${dist}`, + + TracesAttrsTable: 'tempo_traces_attrs_gin', + TracesAttrsDistTable: 'tempo_traces_attrs_gin_dist', + TracesTable: 'tempo_traces', + TracesDistTable: 'tempo_traces_dist' + } + } + console.log(JSON.stringify(request)) + const sql = TranspileTraceQL(request) + const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) + const traces = response.data.data.map(row => ({ + traceID: row.trace_id, + rootServiceName: row.root_service_name, + rootTraceName: row.root_trace_name, + startTimeUnixNano: row.start_time_unix_nano, + durationMs: row.duration_ms, + spanSets: [ + { + spans: row.span_id.map((spanId, i) => ({ + spanID: spanId, + startTimeUnixNano: row.timestamps_ns[i], + durationNanos: row.duration[i], + attributes: [] + })), + matched: row.span_id.length + } + ] + })) + return traces +} + +module.exports = { + search +} diff --git a/wasm_parts/go.mod b/wasm_parts/go.mod index 457cff05..45769ad4 100644 --- a/wasm_parts/go.mod +++ b/wasm_parts/go.mod @@ -4,11 +4,15 @@ replace ( cloud.google.com/go v0.65.0 => cloud.google.com/go v0.102.1 github.com/InfluxCommunity/influxdb3-go v0.2.0 => github.com/akvlad/influxdb3-go v0.0.1 github.com/docker/distribution v2.7.1+incompatible => github.com/docker/distribution v2.8.0+incompatible - k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.22.1 github.com/json-iterator/go v1.1.12 => ./json.iterator + k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.22.1 ) -require github.com/prometheus/prometheus v1.8.2-0.20220714142409-b41e0750abf5 +require ( + github.com/alecthomas/participle/v2 v2.1.0 + github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 + github.com/prometheus/prometheus v1.8.2-0.20220714142409-b41e0750abf5 +) require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect @@ -18,22 +22,16 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dennwc/varint v1.0.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect - github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect - github.com/json-iterator/go v1.1.12 // indirect - github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -44,9 +42,7 @@ require ( github.com/prometheus/common/sigv4 v0.1.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/stretchr/testify v1.8.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect - go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/goleak v1.1.12 // indirect @@ -55,7 +51,6 @@ require ( golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/wasm_parts/go.sum b/wasm_parts/go.sum index a064c7cb..4cc0835d 100644 --- a/wasm_parts/go.sum +++ b/wasm_parts/go.sum @@ -68,6 +68,10 @@ github.com/Microsoft/go-winio v0.5.1 h1:aPJp2QD7OOrhO5tQXqQoGSJc+DjDtWTGLOmNyAm6 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= +github.com/alecthomas/assert/v2 v2.3.0 h1:mAsH2wmvjsuvyBvAmCtm7zFsBlb8mIHx5ySLVdDZXL0= +github.com/alecthomas/participle/v2 v2.1.0 h1:z7dElHRrOEEq45F2TG5cbQihMtNTv8vwldytDj7Wrz4= +github.com/alecthomas/participle/v2 v2.1.0/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= +github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -129,9 +133,6 @@ github.com/envoyproxy/go-control-plane v0.10.3 h1:xdCVXxEe0Y3FQith+0cj2irwZudqGY github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.7 h1:qcZcULcd/abmQg6dwigimCNEyi4gg31M/xaciQlDml8= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= -github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= -github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -161,7 +162,6 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= -github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -262,6 +262,7 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l github.com/hashicorp/nomad/api v0.0.0-20220629141207-c2428e1673ec h1:jAF71e0KoaY2LJlRsRxxGz6MNQOG5gTBIc+rklxfNO0= github.com/hashicorp/serf v0.9.6 h1:uuEX1kLR6aoda1TBttmJQKDLZE1Ob7KN0NPdE7EtCDc= github.com/hetznercloud/hcloud-go v1.35.0 h1:sduXOrWM0/sJXwBty7EQd7+RXEJh5+CsAGQmHshChFg= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= @@ -276,14 +277,10 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= -github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= -github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b h1:iNjcivnc6lhbvJA3LD622NPrUponluJrBWPIwGG/3Bg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -305,12 +302,9 @@ github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= -github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= @@ -325,6 +319,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 h1:xoIK0ctDddBMnc74udxJYBqlo9Ylnsp1waqjLsnef20= +github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -387,12 +383,8 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0 h1:mac9BKRqwaX6zxHPDe3pvmWpwuuIM0vuXv2juCnQevE= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0/go.mod h1:5eCOqeGphOyz6TsY3ZDNjE33SM/TFAK3RGuCL2naTgY= go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM= go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= -go.opentelemetry.io/otel/metric v0.30.0 h1:Hs8eQZ8aQgs0U49diZoaS6Uaxw3+bBE3lcMUKBFIk3c= -go.opentelemetry.io/otel/metric v0.30.0/go.mod h1:/ShZ7+TS4dHzDFmfi1kSXMhMVubNoP0oIaBp70J6UXU= go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o= go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= @@ -608,7 +600,6 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U= -golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -646,13 +637,11 @@ golang.org/x/tools v0.0.0-20200501065659-ab2804fb9c9d/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= -golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= diff --git a/wasm_parts/main.go b/wasm_parts/main.go index a32e2ed7..2aebc51f 100644 --- a/wasm_parts/main.go +++ b/wasm_parts/main.go @@ -12,6 +12,10 @@ import ( "strings" "time" "unsafe" + sql "wasm_parts/sql_select" + parser2 "wasm_parts/traceql/parser" + traceql_transpiler "wasm_parts/traceql/transpiler" + "wasm_parts/types" ) type ctx struct { @@ -58,6 +62,52 @@ func getCtxResponseLen(id uint32) uint32 { return uint32(len(data[id].response)) } +//export transpileTraceQL +func transpileTraceQL(id uint32) int { + request := types.TraceQLRequest{} + err := request.UnmarshalJSON(data[id].request) + if err != nil { + data[id].response = []byte(err.Error()) + return 1 + } + + script, err := parser2.Parse(request.Request) + if err != nil { + data[id].response = []byte(err.Error()) + return 1 + } + + planner, err := traceql_transpiler.Plan(script) + if err != nil { + data[id].response = []byte(err.Error()) + return 1 + } + request.Ctx.Ctx = context.Background() + request.Ctx.CancelCtx = func() {} + request.Ctx.CHSqlCtx = &sql.Ctx{ + Params: make(map[string]sql.SQLObject), + Result: make(map[string]sql.SQLObject), + } + request.Ctx.From = time.Unix(int64(request.Ctx.FromS), 0) + request.Ctx.To = time.Unix(int64(request.Ctx.ToS), 0) + sel, err := planner.Process(&request.Ctx) + if err != nil { + data[id].response = []byte(err.Error()) + return 1 + } + var options []int + if request.Ctx.IsCluster { + options = append(options, sql.STRING_OPT_INLINE_WITH) + } + str, err := sel.String(request.Ctx.CHSqlCtx, options...) + if err != nil { + data[id].response = []byte(err.Error()) + return 1 + } + data[id].response = []byte(str) + return 0 +} + var eng = promql.NewEngine(promql.EngineOpts{ Logger: TestLogger{}, Reg: nil, diff --git a/wasm_parts/main.js b/wasm_parts/main.js index bf9feed0..48b6e7ee 100644 --- a/wasm_parts/main.js +++ b/wasm_parts/main.js @@ -11,6 +11,12 @@ let counter = 0 var go = new Go(); var wasm +const newId = () => { + const id = counter + counter = (counter + 1) & 0xFFFFFFFF + return id +} + async function init () { const _wasm = await WebAssembly.instantiate( gunzipSync(fs.readFileSync(WASM_URL)), go.importObject) @@ -59,8 +65,7 @@ module.exports.pqlInstantQuery = async (query, timeMs, getData) => { module.exports.pqlMatchers = (query) => { const _wasm = wasm - const id = counter - counter = (counter + 1) & 0xFFFFFFFF + const id = newId() const ctx = new Ctx(id, _wasm) ctx.create() try { @@ -77,6 +82,47 @@ module.exports.pqlMatchers = (query) => { } } +/** + * + * @param request {{ + * Request: string, + * Ctx: { + * IsCluster: boolean, + * OrgID: string, + * FromS: number, + * ToS: number, + * TimeSeriesGinTableName: string, + * SamplesTableName: string, + * TimeSeriesTableName: string, + * TimeSeriesDistTableName: string, + * Metrics15sTableName: string, + * TracesAttrsTable: string, + * TracesAttrsDistTable: string, + * TracesTable: string, + * TracesDistTable: string + * }}} + * @returns {String} + * @constructor + */ +module.exports.TranspileTraceQL = (request) => { + let _ctx + try { + const id = newId() + const _wasm = wasm + _ctx = new Ctx(id, _wasm) + _ctx.create() + _ctx.write(JSON.stringify(request)) + let res = _wasm.exports.transpileTraceQL(id) + if (res !== 0) { + throw new WasmError(_ctx.read()) + } + res = _ctx.read() + return res + } finally { + _ctx && _ctx.destroy() + } +} + /** * * @param query {string} @@ -85,8 +131,7 @@ module.exports.pqlMatchers = (query) => { * @returns {Promise} */ const pql = async (query, wasmCall, getData) => { - const reqId = counter - counter = (counter + 1) & 0xFFFFFFFF + const reqId = newId() const _wasm = wasm const ctx = new Ctx(reqId, _wasm) ctx.create() diff --git a/wasm_parts/main.wasm.gz b/wasm_parts/main.wasm.gz index 18058847..1aa94dea 100644 Binary files a/wasm_parts/main.wasm.gz and b/wasm_parts/main.wasm.gz differ diff --git a/wasm_parts/sql_select/condition.go b/wasm_parts/sql_select/condition.go new file mode 100644 index 00000000..ea66b06b --- /dev/null +++ b/wasm_parts/sql_select/condition.go @@ -0,0 +1,129 @@ +package sql + +import ( + "fmt" + "strings" +) + +type LogicalOp struct { + fn string + clauses []SQLObject +} + +func (op *LogicalOp) GetFunction() string { + return op.fn +} + +func (op *LogicalOp) GetEntity() []SQLObject { + return op.clauses +} + +func (op *LogicalOp) AppendEntity(clauses ...SQLCondition) { + for _, v := range clauses { + op.clauses = append(op.clauses, v) + } +} + +func (op *LogicalOp) String(ctx *Ctx, options ...int) (string, error) { + strClauses := make([]string, len(op.clauses)) + for i, c := range op.clauses { + s, err := c.String(ctx, options...) + if err != nil { + return "", err + } + strClauses[i] = "(" + s + ")" + } + return strings.Join(strClauses, " "+op.fn+" "), nil +} + +func NewGenericLogicalOp(fn string, clauses ...SQLCondition) *LogicalOp { + _clauses := make([]SQLObject, len(clauses)) + for i, c := range clauses { + _clauses[i] = c + } + return &LogicalOp{ + fn: fn, + clauses: _clauses, + } +} + +func And(clauses ...SQLCondition) *LogicalOp { + return NewGenericLogicalOp("and", clauses...) +} + +func Or(clauses ...SQLCondition) *LogicalOp { + return NewGenericLogicalOp("or", clauses...) +} + +func BinaryLogicalOp(fn string, left SQLObject, right SQLObject) *LogicalOp { + return &LogicalOp{ + fn: fn, + clauses: []SQLObject{left, right}, + } +} + +func Eq(left SQLObject, right SQLObject) *LogicalOp { + return BinaryLogicalOp("==", left, right) +} + +func Neq(left SQLObject, right SQLObject) *LogicalOp { + return BinaryLogicalOp("!=", left, right) +} + +func Lt(left SQLObject, right SQLObject) *LogicalOp { + return BinaryLogicalOp("<", left, right) +} + +func Le(left SQLObject, right SQLObject) *LogicalOp { + return BinaryLogicalOp("<=", left, right) +} + +func Gt(left SQLObject, right SQLObject) *LogicalOp { + return BinaryLogicalOp(">", left, right) +} + +func Ge(left SQLObject, right SQLObject) *LogicalOp { + return BinaryLogicalOp(">=", left, right) +} + +type CNot struct { + expr SQLObject +} + +func (n *CNot) GetFunction() string { + return "NOT" +} + +func (n *CNot) GetEntity() []SQLObject { + return []SQLObject{n.expr} +} + +func (n *CNot) String(ctx *Ctx, options ...int) (string, error) { + str, err := n.expr.String(ctx, options...) + return fmt.Sprintf("!(%s)", str), err +} + +func Not(expr SQLObject) SQLCondition { + return &CNot{expr: expr} +} + +type CNotNull struct { + expr SQLObject +} + +func (c *CNotNull) GetFunction() string { + return "IS NOT NULL" +} + +func (c *CNotNull) GetEntity() []SQLObject { + return []SQLObject{c.expr} +} + +func (c *CNotNull) String(ctx *Ctx, options ...int) (string, error) { + str, err := c.expr.String(ctx, options...) + return fmt.Sprintf("%s IS NOT NULL", str), err +} + +func NotNull(obj SQLObject) SQLCondition { + return &CNotNull{expr: obj} +} diff --git a/wasm_parts/sql_select/iface.go b/wasm_parts/sql_select/iface.go new file mode 100644 index 00000000..a09a0a27 --- /dev/null +++ b/wasm_parts/sql_select/iface.go @@ -0,0 +1,72 @@ +package sql + +const ( + STRING_OPT_SKIP_WITH = 1 + STRING_OPT_INLINE_WITH = 2 + ORDER_BY_DIRECTION_ASC = 3 + ORDER_BY_DIRECTION_DESC = 4 + WITH_REF_NO_ALIAS = 5 +) + +type SQLObject interface { + String(ctx *Ctx, options ...int) (string, error) +} + +type SQLCondition interface { + GetFunction() string + GetEntity() []SQLObject + String(ctx *Ctx, options ...int) (string, error) +} + +type Ctx struct { + id int + Params map[string]SQLObject + Result map[string]SQLObject +} + +func (c *Ctx) Id() int { + c.id++ + return c.id +} + +type ISelect interface { + Distinct(distinct bool) ISelect + GetDistinct() bool + Select(cols ...SQLObject) ISelect + GetSelect() []SQLObject + From(table SQLObject) ISelect + GetFrom() SQLObject + AndWhere(clauses ...SQLCondition) ISelect + OrWhere(clauses ...SQLCondition) ISelect + GetWhere() SQLCondition + AndPreWhere(clauses ...SQLCondition) ISelect + OrPreWhere(clauses ...SQLCondition) ISelect + GetPreWhere() SQLCondition + AndHaving(clauses ...SQLCondition) ISelect + OrHaving(clauses ...SQLCondition) ISelect + GetHaving() SQLCondition + GroupBy(fields ...SQLObject) ISelect + GetGroupBy() []SQLObject + OrderBy(fields ...SQLObject) ISelect + GetOrderBy() []SQLObject + Limit(limit SQLObject) ISelect + GetLimit() SQLObject + Offset(offset SQLObject) ISelect + GetOffset() SQLObject + With(withs ...*With) ISelect + AddWith(withs ...*With) ISelect + DropWith(alias ...string) ISelect + GetWith() []*With + Join(joins ...*Join) ISelect + AddJoin(joins ...*Join) ISelect + GetJoin() []*Join + String(ctx *Ctx, options ...int) (string, error) + SetSetting(name string, value string) ISelect + GetSettings(table SQLObject) map[string]string +} + +type Aliased interface { + GetExpr() SQLObject + GetAlias() string + String(ctx *Ctx, options ...int) (string, error) +} diff --git a/wasm_parts/sql_select/objects.go b/wasm_parts/sql_select/objects.go new file mode 100644 index 00000000..944c9495 --- /dev/null +++ b/wasm_parts/sql_select/objects.go @@ -0,0 +1,324 @@ +package sql + +import ( + "fmt" + "strings" +) + +type RawObject struct { + val string +} + +func (r *RawObject) String(ctx *Ctx, options ...int) (string, error) { + return r.val, nil +} + +func NewRawObject(val string) *RawObject { + return &RawObject{ + val: val, + } +} + +func FmtRawObject(tmpl string, arg ...interface{}) *RawObject { + return &RawObject{fmt.Sprintf(tmpl, arg...)} +} + +type OrderBy struct { + col SQLObject + direction int +} + +func (o *OrderBy) String(ctx *Ctx, options ...int) (string, error) { + order := "desc" + if o.direction == ORDER_BY_DIRECTION_ASC { + order = "asc" + } + str, err := o.col.String(ctx, options...) + return fmt.Sprintf("%s %s", str, order), err +} + +func NewOrderBy(col SQLObject, direction int) *OrderBy { + return &OrderBy{ + col: col, + direction: direction, + } +} + +type With struct { + query ISelect + alias string +} + +func (w *With) GetQuery() ISelect { + return w.query +} + +func (w *With) GetAlias() string { + return w.alias +} + +func (w *With) String(ctx *Ctx, options ...int) (string, error) { + str, err := w.query.String(ctx, options...) + return fmt.Sprintf("%s as (%s)", w.alias, str), err +} + +func NewWith(query ISelect, alias string) *With { + return &With{ + query: query, + alias: alias, + } +} + +type WithRef struct { + ref *With +} + +func (w *WithRef) String(ctx *Ctx, options ...int) (string, error) { + if w.ref.alias == "" { + return "", fmt.Errorf("alias is empty") + } + inline := false + noAlias := false + var _opts []int + for _, opt := range options { + inline = inline || opt == STRING_OPT_INLINE_WITH + noAlias = noAlias || opt == WITH_REF_NO_ALIAS + if opt != WITH_REF_NO_ALIAS { + _opts = append(_opts, opt) + } + } + res := w.ref.alias + if inline { + str, err := w.ref.GetQuery().String(ctx, _opts...) + if err != nil { + return "", err + } + res = "(" + str + ")" + if !noAlias { + res += " as " + w.ref.alias + } + } + return res, nil +} + +func NewWithRef(ref *With) *WithRef { + return &WithRef{ref: ref} +} + +type Join struct { + tp string + table SQLObject + on SQLCondition +} + +func (l *Join) String(ctx *Ctx, options ...int) (string, error) { + tbl, err := l.table.String(ctx, options...) + if err != nil { + return "", err + } + on := "" + if strings.ToLower(l.tp) != "array" { + _on, err := l.on.String(ctx, options...) + if err != nil { + return "", err + } + on += "ON " + _on + } + return fmt.Sprintf("%s %s", tbl, on), err +} + +func (l *Join) GetTable() SQLObject { + return l.table +} + +func (l *Join) GetOn() SQLCondition { + return l.on +} + +func NewJoin(tp string, table SQLObject, on SQLCondition) *Join { + return &Join{ + tp: tp, + table: table, + on: on, + } +} + +type CtxParam struct { + name string + def *string +} + +func (c *CtxParam) String(ctx *Ctx, options ...int) (string, error) { + if _, ok := ctx.Params[c.name]; !ok { + if c.def == nil { + return "", fmt.Errorf("undefined parameter %s", c.name) + } + return *c.def, nil + } + return ctx.Params[c.name].String(ctx, options...) +} + +func NewCtxParam(name string, def *string) *CtxParam { + return &CtxParam{ + name: name, + def: def, + } +} + +func NewCtxParamOrDef(name string, def string) *CtxParam { + return &CtxParam{ + name: name, + def: &def, + } +} + +type StringVal struct { + val string +} + +func (s *StringVal) String(ctx *Ctx, options ...int) (string, error) { + find := []string{"\\", "\000", "\n", "\r", "\b", "\t", "\x1a", "'"} + replace := []string{"\\\\", "\\0", "\\n", "\\r", "\\b", "\\t", "\\x1a", "\\'"} + res := s.val + for i, v := range find { + res = strings.Replace(res, v, replace[i], -1) + } + return "'" + res + "'", nil +} + +func NewStringVal(s string) SQLObject { + return &StringVal{ + val: s, + } +} + +type IntVal struct { + val int64 +} + +func (i *IntVal) String(ctx *Ctx, options ...int) (string, error) { + return fmt.Sprintf("%d", i.val), nil +} + +func NewIntVal(val int64) *IntVal { + return &IntVal{ + val: val, + } +} + +type BoolVal struct { + val bool +} + +func (b *BoolVal) String(ctx *Ctx, options ...int) (string, error) { + if b.val { + return "true", nil + } + return "false", nil +} + +func NewBoolVal(b bool) SQLObject { + return &BoolVal{ + val: b, + } +} + +type FloatVal struct { + val float64 +} + +func (f *FloatVal) String(ctx *Ctx, options ...int) (string, error) { + return fmt.Sprintf("%f", f.val), nil +} + +func NewFloatVal(f float64) SQLObject { + return &FloatVal{ + val: f, + } +} + +type Col struct { + expr SQLObject + alias string +} + +func (c *Col) GetExpr() SQLObject { + return c.expr +} + +func (c *Col) GetAlias() string { + return c.alias +} + +func (c *Col) String(ctx *Ctx, options ...int) (string, error) { + _opts := append(options, WITH_REF_NO_ALIAS) + expr, err := c.expr.String(ctx, _opts...) + if c.alias == "" { + return fmt.Sprintf("%s", expr), err + } + return fmt.Sprintf("%s as %s", expr, c.alias), err +} + +func NewCol(expr SQLObject, alias string) SQLObject { + return &Col{ + expr: expr, + alias: alias, + } +} + +func NewSimpleCol(name string, alias string) SQLObject { + return &Col{ + expr: NewRawObject(name), + alias: alias, + } +} + +type In struct { + leftSide SQLObject + rightSide []SQLObject +} + +func (in *In) String(ctx *Ctx, options ...int) (string, error) { + parts := make([]string, len(in.rightSide)) + for i, e := range in.rightSide { + str, err := e.String(ctx, options...) + if err != nil { + return "", err + } + parts[i] = str + } + str, err := in.leftSide.String(ctx, options...) + return fmt.Sprintf("%s IN (%s)", str, strings.Join(parts, ",")), err +} + +func (in *In) GetFunction() string { + return "IN" +} + +func (in *In) GetEntity() []SQLObject { + ent := make([]SQLObject, len(in.rightSide)+1) + ent[0] = in.leftSide + for i, r := range in.rightSide { + ent[i+1] = r + } + return ent +} + +func NewIn(left SQLObject, right ...SQLObject) *In { + return &In{ + leftSide: left, + rightSide: right, + } +} + +type CustomCol struct { + stringify func(ctx *Ctx, options ...int) (string, error) +} + +func (c *CustomCol) String(ctx *Ctx, options ...int) (string, error) { + return c.stringify(ctx, options...) +} + +func NewCustomCol(fn func(ctx *Ctx, options ...int) (string, error)) SQLObject { + return &CustomCol{stringify: fn} +} diff --git a/wasm_parts/sql_select/select.go b/wasm_parts/sql_select/select.go new file mode 100644 index 00000000..c1669a70 --- /dev/null +++ b/wasm_parts/sql_select/select.go @@ -0,0 +1,426 @@ +package sql + +import ( + "fmt" + "strings" +) + +type Select struct { + distinct bool + columns []SQLObject + from SQLObject + where SQLCondition + preWhere SQLCondition + having SQLCondition + groupBy []SQLObject + orderBy []SQLObject + limit SQLObject + offset SQLObject + withs []*With + joins []*Join + settings map[string]string +} + +func (s *Select) Distinct(distinct bool) ISelect { + s.distinct = distinct + return s +} + +func (s *Select) GetDistinct() bool { + return s.distinct +} + +func (s *Select) Select(cols ...SQLObject) ISelect { + s.columns = cols + return s +} + +func (s *Select) GetSelect() []SQLObject { + return s.columns +} + +func (s *Select) From(table SQLObject) ISelect { + s.from = table + return s +} + +func (s *Select) SetSetting(name string, value string) ISelect { + if s.settings == nil { + s.settings = make(map[string]string) + } + s.settings[name] = value + return s +} + +func (s *Select) GetSettings(table SQLObject) map[string]string { + return s.settings +} + +func (s *Select) GetFrom() SQLObject { + return s.from +} + +func (s *Select) AndWhere(clauses ...SQLCondition) ISelect { + if s.where == nil { + s.where = And(clauses...) + return s + } + if _, ok := s.where.(*LogicalOp); ok && s.where.GetFunction() == "and" { + s.where.(*LogicalOp).AppendEntity(clauses...) + return s + } + _clauses := make([]SQLCondition, len(clauses)+1) + _clauses[0] = s.where + for i, v := range clauses { + _clauses[i+1] = v + } + s.where = And(_clauses...) + return s +} + +func (s *Select) OrWhere(clauses ...SQLCondition) ISelect { + if s.where == nil { + s.where = Or(clauses...) + return s + } + if _, ok := s.where.(*LogicalOp); ok && s.where.GetFunction() == "or" { + s.where.(*LogicalOp).AppendEntity(clauses...) + return s + } + _clauses := make([]SQLCondition, len(clauses)+1) + _clauses[0] = s.where + for i, v := range clauses { + _clauses[i+1] = v + } + s.where = Or(_clauses...) + return s +} + +func (s *Select) GetPreWhere() SQLCondition { + return s.preWhere +} + +func (s *Select) AndPreWhere(clauses ...SQLCondition) ISelect { + if s.preWhere == nil { + s.preWhere = And(clauses...) + return s + } + if _, ok := s.preWhere.(*LogicalOp); ok && s.preWhere.GetFunction() == "and" { + s.preWhere.(*LogicalOp).AppendEntity(clauses...) + return s + } + _clauses := make([]SQLCondition, len(clauses)) + _clauses[0] = s.preWhere + for i, v := range clauses { + _clauses[i+1] = v + } + return s +} + +func (s *Select) OrPreWhere(clauses ...SQLCondition) ISelect { + if s.preWhere == nil { + s.preWhere = Or(clauses...) + return s + } + if _, ok := s.preWhere.(*LogicalOp); ok && s.preWhere.GetFunction() == "or" { + s.preWhere.(*LogicalOp).AppendEntity(clauses...) + return s + } + _clauses := make([]SQLCondition, len(clauses)+1) + _clauses[0] = s.preWhere + for i, v := range clauses { + _clauses[i+1] = v + } + return s +} + +func (s *Select) GetWhere() SQLCondition { + return s.where +} + +func (s *Select) AndHaving(clauses ...SQLCondition) ISelect { + if s.having == nil { + s.having = And(clauses...) + return s + } + if _, ok := s.having.(*LogicalOp); ok && s.having.GetFunction() == "and" { + s.having.(*LogicalOp).AppendEntity(clauses...) + return s + } + _clauses := make([]SQLCondition, len(clauses)+1) + _clauses[0] = s.having + for i, v := range clauses { + _clauses[i+1] = v + } + s.having = And(_clauses...) + return s +} + +func (s *Select) OrHaving(clauses ...SQLCondition) ISelect { + if s.having == nil { + s.having = Or(clauses...) + return s + } + if _, ok := s.having.(*LogicalOp); ok && s.where.GetFunction() == "or" { + s.having.(*LogicalOp).AppendEntity(clauses...) + return s + } + _clauses := make([]SQLCondition, len(clauses)+1) + _clauses[0] = s.having + for i, v := range clauses { + _clauses[i+1] = v + } + s.having = Or(_clauses...) + return s +} + +func (s *Select) GetHaving() SQLCondition { + return s.having +} + +func (s *Select) GroupBy(fields ...SQLObject) ISelect { + s.groupBy = fields + return s +} + +func (s *Select) GetGroupBy() []SQLObject { + return s.groupBy +} + +func (s *Select) OrderBy(fields ...SQLObject) ISelect { + s.orderBy = fields + return s +} + +func (s *Select) GetOrderBy() []SQLObject { + return s.orderBy +} + +func (s *Select) Limit(limit SQLObject) ISelect { + s.limit = limit + return s +} + +func (s *Select) GetLimit() SQLObject { + return s.limit +} + +func (s *Select) Offset(offset SQLObject) ISelect { + s.offset = offset + return s +} + +func (s *Select) GetOffset() SQLObject { + return s.offset +} + +func (s *Select) With(withs ...*With) ISelect { + s.withs = []*With{} + s.AddWith(withs...) + return s +} + +func (s *Select) AddWith(withs ...*With) ISelect { + if s.withs == nil { + return s.With(withs...) + } + for _, w := range withs { + exists := false + for _, with := range s.withs { + if with.alias == w.alias { + exists = true + } + } + if exists { + continue + } + + if _, ok := w.GetQuery().(*Select); ok { + s.AddWith(w.GetQuery().(*Select).GetWith()...) + } + s.withs = append(s.withs, w) + } + return s +} + +func (s *Select) DropWith(alias ...string) ISelect { + aliases := map[string]bool{} + for _, a := range alias { + aliases[a] = true + } + withs := make([]*With, 0, len(s.withs)) + for _, w := range s.withs { + if aliases[w.alias] { + continue + } + withs = append(withs, w) + } + s.withs = withs + return s +} + +func (s *Select) GetWith() []*With { + res := make([]*With, 0, len(s.withs)) + for _, w := range s.withs { + res = append(res, w) + } + return res +} + +func (s *Select) Join(joins ...*Join) ISelect { + s.joins = joins + return s +} + +func (s *Select) AddJoin(joins ...*Join) ISelect { + for _, lj := range joins { + s.joins = append(s.joins, lj) + } + return s +} + +func (s *Select) GetJoin() []*Join { + return s.joins +} + +func (s *Select) String(ctx *Ctx, options ...int) (string, error) { + res := strings.Builder{} + skipWith := false + for _, i := range options { + skipWith = skipWith || i == STRING_OPT_SKIP_WITH || i == STRING_OPT_INLINE_WITH + } + if !skipWith && len(s.withs) > 0 { + res.WriteString("WITH ") + i := 0 + _options := append(options, STRING_OPT_SKIP_WITH) + for _, w := range s.withs { + if i != 0 { + res.WriteRune(',') + } + str, err := w.String(ctx, _options...) + if err != nil { + return "", err + } + res.WriteString(str) + i++ + } + } + res.WriteString(" SELECT ") + if s.distinct { + res.WriteString(" DISTINCT ") + } + if s.columns == nil || len(s.columns) == 0 { + return "", fmt.Errorf("no 'SELECT' part") + } + for i, col := range s.columns { + if i != 0 { + res.WriteString(", ") + } + str, err := col.String(ctx, options...) + if err != nil { + return "", err + } + res.WriteString(str) + } + if s.from == nil { + return "", fmt.Errorf("no 'FROM' part") + } + res.WriteString(" FROM ") + str, err := s.from.String(ctx, options...) + if err != nil { + return "", err + } + res.WriteString(str) + for _, lj := range s.joins { + res.WriteString(fmt.Sprintf(" %s JOIN ", lj.tp)) + str, err = lj.String(ctx, options...) + if err != nil { + return "", err + } + res.WriteString(str) + } + if s.preWhere != nil { + res.WriteString(" PREWHERE ") + str, err = s.preWhere.String(ctx, options...) + if err != nil { + return "", err + } + res.WriteString(str) + } + if s.where != nil { + res.WriteString(" WHERE ") + str, err = s.where.String(ctx, options...) + if err != nil { + return "", err + } + res.WriteString(str) + } + if s.groupBy != nil && len(s.groupBy) > 0 { + res.WriteString(" GROUP BY ") + for i, f := range s.groupBy { + if i != 0 { + res.WriteString(", ") + } + str, err = f.String(ctx, options...) + if err != nil { + return "", err + } + res.WriteString(str) + } + } + if s.having != nil { + res.WriteString(" HAVING ") + str, err = s.having.String(ctx, options...) + if err != nil { + return "", err + } + res.WriteString(str) + } + if s.orderBy != nil && len(s.orderBy) > 0 { + res.WriteString(" ORDER BY ") + for i, f := range s.orderBy { + if i != 0 { + res.WriteString(", ") + } + str, err = f.String(ctx, options...) + if err != nil { + return "", err + } + res.WriteString(str) + } + } + if s.limit != nil { + str, err = s.limit.String(ctx, options...) + if err != nil { + return "", err + } + if str != "" { + res.WriteString(" LIMIT ") + res.WriteString(str) + } + } + if s.offset != nil { + str, err = s.offset.String(ctx, options...) + if err != nil { + return "", err + } + if str != "" { + res.WriteString(" OFFSET ") + res.WriteString(str) + } + } + if s.settings != nil { + res.WriteString(" SETTINGS ") + for k, v := range s.settings { + res.WriteString(k) + res.WriteString("=") + res.WriteString(v) + res.WriteString(" ") + } + } + return res.String(), nil +} + +func NewSelect() ISelect { + return &Select{} +} diff --git a/wasm_parts/traceql/parser/lexer_rules v2.go b/wasm_parts/traceql/parser/lexer_rules v2.go new file mode 100644 index 00000000..e48504f5 --- /dev/null +++ b/wasm_parts/traceql/parser/lexer_rules v2.go @@ -0,0 +1,41 @@ +package traceql_parser + +import ( + "github.com/alecthomas/participle/v2/lexer" +) + +var TraceQLLexerRulesV2 = []lexer.SimpleRule{ + {"Ocb", `\{`}, + {"Ccb", `\}`}, + + {"Ob", `\(`}, + {"Cb", `\)`}, + + {"Ge", `>=`}, + {"Le", `<=`}, + {"Gt", `>`}, + {"Lt", `<`}, + + {"Neq", `!=`}, + {"Re", `=~`}, + {"Nre", `!~`}, + {"Eq", `=`}, + + {"Label_name", `(\.[a-zA-Z_][.a-zA-Z0-9_]*|[a-zA-Z_][.a-zA-Z0-9_]*)`}, + {"Dot", `\.`}, + + {"And", `&&`}, + {"Or", `\|\|`}, + + {"Pipe", `\|`}, + + {"Quoted_string", `"([^"\\]|\\.)*"`}, + {"Ticked_string", "`([^`\\\\]|\\\\.)*`"}, + + {"Minus", "-"}, + {"Integer", "[0-9]+"}, + + {"space", `\s+`}, +} + +var TraceQLLexerDefinition lexer.Definition = lexer.MustSimple(TraceQLLexerRulesV2) diff --git a/wasm_parts/traceql/parser/model_v2.go b/wasm_parts/traceql/parser/model_v2.go new file mode 100644 index 00000000..d62deb83 --- /dev/null +++ b/wasm_parts/traceql/parser/model_v2.go @@ -0,0 +1,117 @@ +package traceql_parser + +import ( + "encoding/json" + "strings" +) + +type TraceQLScript struct { + Head Selector `@@` + AndOr string `@(And|Or)?` + Tail *TraceQLScript `@@?` +} + +func (l TraceQLScript) String() string { + var tail string + if l.AndOr != "" { + tail = " " + l.AndOr + " " + l.Tail.String() + } + return l.Head.String() + tail +} + +type Selector struct { + AttrSelector AttrSelectorExp `"{" @@ "}"` + Aggregator *Aggregator `@@?` +} + +func (s Selector) String() string { + res := "{" + s.AttrSelector.String() + "}" + if s.Aggregator != nil { + res += " " + s.Aggregator.String() + } + return res +} + +type AttrSelectorExp struct { + Head *AttrSelector `(@@` + ComplexHead *AttrSelectorExp `| "(" @@ ")" )` + AndOr string `@(And|Or)?` + Tail *AttrSelectorExp `@@?` +} + +func (a AttrSelectorExp) String() string { + res := "" + if a.Head != nil { + res += a.Head.String() + } + if a.ComplexHead != nil { + res += "(" + a.ComplexHead.String() + ")" + } + if a.AndOr != "" { + res += " " + a.AndOr + " " + a.Tail.String() + } + return res +} + +type Aggregator struct { + Fn string `"|" @("count"|"sum"|"min"|"max"|"avg")` + Attr string `"(" @Label_name? ")"` + Cmp string `@("="|"!="|"<"|"<="|">"|">=")` + Num string `@Minus? @Integer @Dot? @Integer?` + Measurement string `@("ns"|"us"|"ms"|"s"|"m"|"h"|"d")?` +} + +func (a Aggregator) String() string { + return "| " + a.Fn + "(" + a.Attr + ") " + a.Cmp + " " + a.Num + a.Measurement +} + +type AttrSelector struct { + Label string `@Label_name` + Op string `@("="|"!="|"<"|"<="|">"|">="|"=~"|"!~")` + Val Value `@@` +} + +func (a AttrSelector) String() string { + return a.Label + " " + a.Op + " " + a.Val.String() +} + +type Value struct { + TimeVal string `@Integer @("ns"|"us"|"ms"|"s"|"m"|"h"|"d")` + FVal string `| @Minus? @Integer @Dot? @Integer?` + StrVal *QuotedString `| @@` +} + +func (v Value) String() string { + if v.StrVal != nil { + return v.StrVal.Str + } + if v.FVal != "" { + return v.FVal + } + if v.TimeVal != "" { + return v.TimeVal + } + return "" +} + +type QuotedString struct { + Str string `@(Quoted_string|Ticked_string) ` +} + +func (q QuotedString) String() string { + return q.Str +} + +func (q *QuotedString) Unquote() (string, error) { + str := q.Str + if q.Str[0] == '`' { + str = str[1 : len(str)-1] + str = strings.ReplaceAll(str, "\\`", "`") + str = strings.ReplaceAll(str, `\`, `\\`) + str = strings.ReplaceAll(str, `"`, `\"`) + str = `"` + str + `"` + } + var res string = "" + err := json.Unmarshal([]byte(str), &res) + return res, err +} diff --git a/wasm_parts/traceql/parser/parser.go b/wasm_parts/traceql/parser/parser.go new file mode 100644 index 00000000..d7c28fc9 --- /dev/null +++ b/wasm_parts/traceql/parser/parser.go @@ -0,0 +1,15 @@ +package traceql_parser + +import ( + "github.com/alecthomas/participle/v2" +) + +func Parse(str string) (*TraceQLScript, error) { + res := &TraceQLScript{} + parser, err := participle.Build[TraceQLScript](participle.Lexer(TraceQLLexerDefinition), participle.UseLookahead(2)) + if err != nil { + return nil, err + } + res, err = parser.ParseString("", str+" ") + return res, err +} diff --git a/wasm_parts/traceql/shared/errors.go b/wasm_parts/traceql/shared/errors.go new file mode 100644 index 00000000..caf0e155 --- /dev/null +++ b/wasm_parts/traceql/shared/errors.go @@ -0,0 +1,14 @@ +package shared + +type NotSupportedError struct { + Msg string +} + +func (n *NotSupportedError) Error() string { + return n.Msg +} + +func isNotSupportedError(e error) bool { + _, ok := e.(*NotSupportedError) + return ok +} diff --git a/wasm_parts/traceql/shared/plannerCtx.go b/wasm_parts/traceql/shared/plannerCtx.go new file mode 100644 index 00000000..69d49885 --- /dev/null +++ b/wasm_parts/traceql/shared/plannerCtx.go @@ -0,0 +1,51 @@ +package shared + +import ( + "context" + "time" + sql "wasm_parts/sql_select" +) + +type PlannerContext struct { + IsCluster bool + OrgID string + From time.Time + To time.Time + FromS int32 + ToS int32 + OrderASC bool + Limit int64 + + TimeSeriesGinTableName string + SamplesTableName string + TimeSeriesTableName string + TimeSeriesDistTableName string + Metrics15sTableName string + + TracesAttrsTable string + TracesAttrsDistTable string + TracesTable string + TracesDistTable string + + UseCache bool + + Ctx context.Context + CancelCtx context.CancelFunc + + CHFinalize bool + CHSqlCtx *sql.Ctx + + DDBSamplesTable string + DDBTSTable string + + Step time.Duration + + DeleteID string + + id int +} + +func (p *PlannerContext) Id() int { + p.id++ + return p.id +} diff --git a/wasm_parts/traceql/shared/plannerCtx_ffjson.go b/wasm_parts/traceql/shared/plannerCtx_ffjson.go new file mode 100644 index 00000000..0c2c928f --- /dev/null +++ b/wasm_parts/traceql/shared/plannerCtx_ffjson.go @@ -0,0 +1,1462 @@ +// Code generated by ffjson . DO NOT EDIT. +// source: plannerCtx.go + +package shared + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + fflib "github.com/pquerna/ffjson/fflib/v1" + "time" +) + +// MarshalJSON marshal bytes to json - template +func (j *PlannerContext) MarshalJSON() ([]byte, error) { + var buf fflib.Buffer + if j == nil { + buf.WriteString("null") + return buf.Bytes(), nil + } + err := j.MarshalJSONBuf(&buf) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// MarshalJSONBuf marshal buff to json - template +func (j *PlannerContext) MarshalJSONBuf(buf fflib.EncodingBuffer) error { + if j == nil { + buf.WriteString("null") + return nil + } + var err error + var obj []byte + _ = obj + _ = err + if j.IsCluster { + buf.WriteString(`{"IsCluster":true`) + } else { + buf.WriteString(`{"IsCluster":false`) + } + buf.WriteString(`,"OrgID":`) + fflib.WriteJsonString(buf, string(j.OrgID)) + buf.WriteString(`,"From":`) + + { + + obj, err = j.From.MarshalJSON() + if err != nil { + return err + } + buf.Write(obj) + + } + buf.WriteString(`,"To":`) + + { + + obj, err = j.To.MarshalJSON() + if err != nil { + return err + } + buf.Write(obj) + + } + buf.WriteString(`,"FromS":`) + fflib.FormatBits2(buf, uint64(j.FromS), 10, j.FromS < 0) + buf.WriteString(`,"ToS":`) + fflib.FormatBits2(buf, uint64(j.ToS), 10, j.ToS < 0) + if j.OrderASC { + buf.WriteString(`,"OrderASC":true`) + } else { + buf.WriteString(`,"OrderASC":false`) + } + buf.WriteString(`,"Limit":`) + fflib.FormatBits2(buf, uint64(j.Limit), 10, j.Limit < 0) + buf.WriteString(`,"TimeSeriesGinTableName":`) + fflib.WriteJsonString(buf, string(j.TimeSeriesGinTableName)) + buf.WriteString(`,"SamplesTableName":`) + fflib.WriteJsonString(buf, string(j.SamplesTableName)) + buf.WriteString(`,"TimeSeriesTableName":`) + fflib.WriteJsonString(buf, string(j.TimeSeriesTableName)) + buf.WriteString(`,"TimeSeriesDistTableName":`) + fflib.WriteJsonString(buf, string(j.TimeSeriesDistTableName)) + buf.WriteString(`,"Metrics15sTableName":`) + fflib.WriteJsonString(buf, string(j.Metrics15sTableName)) + buf.WriteString(`,"TracesAttrsTable":`) + fflib.WriteJsonString(buf, string(j.TracesAttrsTable)) + buf.WriteString(`,"TracesAttrsDistTable":`) + fflib.WriteJsonString(buf, string(j.TracesAttrsDistTable)) + buf.WriteString(`,"TracesTable":`) + fflib.WriteJsonString(buf, string(j.TracesTable)) + buf.WriteString(`,"TracesDistTable":`) + fflib.WriteJsonString(buf, string(j.TracesDistTable)) + if j.UseCache { + buf.WriteString(`,"UseCache":true`) + } else { + buf.WriteString(`,"UseCache":false`) + } + buf.WriteString(`,"Ctx":`) + /* Interface types must use runtime reflection. type=context.Context kind=interface */ + err = buf.Encode(j.Ctx) + if err != nil { + return err + } + buf.WriteString(`,"CancelCtx":`) + /* Falling back. type=context.CancelFunc kind=func */ + err = buf.Encode(j.CancelCtx) + if err != nil { + return err + } + if j.CHFinalize { + buf.WriteString(`,"CHFinalize":true`) + } else { + buf.WriteString(`,"CHFinalize":false`) + } + if j.CHSqlCtx != nil { + /* Struct fall back. type=sql.Ctx kind=struct */ + buf.WriteString(`,"CHSqlCtx":`) + err = buf.Encode(j.CHSqlCtx) + if err != nil { + return err + } + } else { + buf.WriteString(`,"CHSqlCtx":null`) + } + buf.WriteString(`,"DDBSamplesTable":`) + fflib.WriteJsonString(buf, string(j.DDBSamplesTable)) + buf.WriteString(`,"DDBTSTable":`) + fflib.WriteJsonString(buf, string(j.DDBTSTable)) + buf.WriteString(`,"Step":`) + fflib.FormatBits2(buf, uint64(j.Step), 10, j.Step < 0) + buf.WriteString(`,"DeleteID":`) + fflib.WriteJsonString(buf, string(j.DeleteID)) + buf.WriteByte('}') + return nil +} + +const ( + ffjtPlannerContextbase = iota + ffjtPlannerContextnosuchkey + + ffjtPlannerContextIsCluster + + ffjtPlannerContextOrgID + + ffjtPlannerContextFrom + + ffjtPlannerContextTo + + ffjtPlannerContextFromS + + ffjtPlannerContextToS + + ffjtPlannerContextOrderASC + + ffjtPlannerContextLimit + + ffjtPlannerContextTimeSeriesGinTableName + + ffjtPlannerContextSamplesTableName + + ffjtPlannerContextTimeSeriesTableName + + ffjtPlannerContextTimeSeriesDistTableName + + ffjtPlannerContextMetrics15sTableName + + ffjtPlannerContextTracesAttrsTable + + ffjtPlannerContextTracesAttrsDistTable + + ffjtPlannerContextTracesTable + + ffjtPlannerContextTracesDistTable + + ffjtPlannerContextUseCache + + ffjtPlannerContextCtx + + ffjtPlannerContextCancelCtx + + ffjtPlannerContextCHFinalize + + ffjtPlannerContextCHSqlCtx + + ffjtPlannerContextDDBSamplesTable + + ffjtPlannerContextDDBTSTable + + ffjtPlannerContextStep + + ffjtPlannerContextDeleteID +) + +var ffjKeyPlannerContextIsCluster = []byte("IsCluster") + +var ffjKeyPlannerContextOrgID = []byte("OrgID") + +var ffjKeyPlannerContextFrom = []byte("From") + +var ffjKeyPlannerContextTo = []byte("To") + +var ffjKeyPlannerContextFromS = []byte("FromS") + +var ffjKeyPlannerContextToS = []byte("ToS") + +var ffjKeyPlannerContextOrderASC = []byte("OrderASC") + +var ffjKeyPlannerContextLimit = []byte("Limit") + +var ffjKeyPlannerContextTimeSeriesGinTableName = []byte("TimeSeriesGinTableName") + +var ffjKeyPlannerContextSamplesTableName = []byte("SamplesTableName") + +var ffjKeyPlannerContextTimeSeriesTableName = []byte("TimeSeriesTableName") + +var ffjKeyPlannerContextTimeSeriesDistTableName = []byte("TimeSeriesDistTableName") + +var ffjKeyPlannerContextMetrics15sTableName = []byte("Metrics15sTableName") + +var ffjKeyPlannerContextTracesAttrsTable = []byte("TracesAttrsTable") + +var ffjKeyPlannerContextTracesAttrsDistTable = []byte("TracesAttrsDistTable") + +var ffjKeyPlannerContextTracesTable = []byte("TracesTable") + +var ffjKeyPlannerContextTracesDistTable = []byte("TracesDistTable") + +var ffjKeyPlannerContextUseCache = []byte("UseCache") + +var ffjKeyPlannerContextCtx = []byte("Ctx") + +var ffjKeyPlannerContextCancelCtx = []byte("CancelCtx") + +var ffjKeyPlannerContextCHFinalize = []byte("CHFinalize") + +var ffjKeyPlannerContextCHSqlCtx = []byte("CHSqlCtx") + +var ffjKeyPlannerContextDDBSamplesTable = []byte("DDBSamplesTable") + +var ffjKeyPlannerContextDDBTSTable = []byte("DDBTSTable") + +var ffjKeyPlannerContextStep = []byte("Step") + +var ffjKeyPlannerContextDeleteID = []byte("DeleteID") + +// UnmarshalJSON umarshall json - template of ffjson +func (j *PlannerContext) UnmarshalJSON(input []byte) error { + fs := fflib.NewFFLexer(input) + return j.UnmarshalJSONFFLexer(fs, fflib.FFParse_map_start) +} + +// UnmarshalJSONFFLexer fast json unmarshall - template ffjson +func (j *PlannerContext) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error { + var err error + currentKey := ffjtPlannerContextbase + _ = currentKey + tok := fflib.FFTok_init + wantedTok := fflib.FFTok_init + +mainparse: + for { + tok = fs.Scan() + // println(fmt.Sprintf("debug: tok: %v state: %v", tok, state)) + if tok == fflib.FFTok_error { + goto tokerror + } + + switch state { + + case fflib.FFParse_map_start: + if tok != fflib.FFTok_left_bracket { + wantedTok = fflib.FFTok_left_bracket + goto wrongtokenerror + } + state = fflib.FFParse_want_key + continue + + case fflib.FFParse_after_value: + if tok == fflib.FFTok_comma { + state = fflib.FFParse_want_key + } else if tok == fflib.FFTok_right_bracket { + goto done + } else { + wantedTok = fflib.FFTok_comma + goto wrongtokenerror + } + + case fflib.FFParse_want_key: + // json {} ended. goto exit. woo. + if tok == fflib.FFTok_right_bracket { + goto done + } + if tok != fflib.FFTok_string { + wantedTok = fflib.FFTok_string + goto wrongtokenerror + } + + kn := fs.Output.Bytes() + if len(kn) <= 0 { + // "" case. hrm. + currentKey = ffjtPlannerContextnosuchkey + state = fflib.FFParse_want_colon + goto mainparse + } else { + switch kn[0] { + + case 'C': + + if bytes.Equal(ffjKeyPlannerContextCtx, kn) { + currentKey = ffjtPlannerContextCtx + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextCancelCtx, kn) { + currentKey = ffjtPlannerContextCancelCtx + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextCHFinalize, kn) { + currentKey = ffjtPlannerContextCHFinalize + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextCHSqlCtx, kn) { + currentKey = ffjtPlannerContextCHSqlCtx + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'D': + + if bytes.Equal(ffjKeyPlannerContextDDBSamplesTable, kn) { + currentKey = ffjtPlannerContextDDBSamplesTable + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextDDBTSTable, kn) { + currentKey = ffjtPlannerContextDDBTSTable + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextDeleteID, kn) { + currentKey = ffjtPlannerContextDeleteID + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'F': + + if bytes.Equal(ffjKeyPlannerContextFrom, kn) { + currentKey = ffjtPlannerContextFrom + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextFromS, kn) { + currentKey = ffjtPlannerContextFromS + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'I': + + if bytes.Equal(ffjKeyPlannerContextIsCluster, kn) { + currentKey = ffjtPlannerContextIsCluster + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'L': + + if bytes.Equal(ffjKeyPlannerContextLimit, kn) { + currentKey = ffjtPlannerContextLimit + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'M': + + if bytes.Equal(ffjKeyPlannerContextMetrics15sTableName, kn) { + currentKey = ffjtPlannerContextMetrics15sTableName + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'O': + + if bytes.Equal(ffjKeyPlannerContextOrgID, kn) { + currentKey = ffjtPlannerContextOrgID + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextOrderASC, kn) { + currentKey = ffjtPlannerContextOrderASC + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'S': + + if bytes.Equal(ffjKeyPlannerContextSamplesTableName, kn) { + currentKey = ffjtPlannerContextSamplesTableName + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextStep, kn) { + currentKey = ffjtPlannerContextStep + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'T': + + if bytes.Equal(ffjKeyPlannerContextTo, kn) { + currentKey = ffjtPlannerContextTo + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextToS, kn) { + currentKey = ffjtPlannerContextToS + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextTimeSeriesGinTableName, kn) { + currentKey = ffjtPlannerContextTimeSeriesGinTableName + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextTimeSeriesTableName, kn) { + currentKey = ffjtPlannerContextTimeSeriesTableName + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextTimeSeriesDistTableName, kn) { + currentKey = ffjtPlannerContextTimeSeriesDistTableName + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextTracesAttrsTable, kn) { + currentKey = ffjtPlannerContextTracesAttrsTable + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextTracesAttrsDistTable, kn) { + currentKey = ffjtPlannerContextTracesAttrsDistTable + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextTracesTable, kn) { + currentKey = ffjtPlannerContextTracesTable + state = fflib.FFParse_want_colon + goto mainparse + + } else if bytes.Equal(ffjKeyPlannerContextTracesDistTable, kn) { + currentKey = ffjtPlannerContextTracesDistTable + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'U': + + if bytes.Equal(ffjKeyPlannerContextUseCache, kn) { + currentKey = ffjtPlannerContextUseCache + state = fflib.FFParse_want_colon + goto mainparse + } + + } + + if fflib.SimpleLetterEqualFold(ffjKeyPlannerContextDeleteID, kn) { + currentKey = ffjtPlannerContextDeleteID + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextStep, kn) { + currentKey = ffjtPlannerContextStep + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextDDBTSTable, kn) { + currentKey = ffjtPlannerContextDDBTSTable + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextDDBSamplesTable, kn) { + currentKey = ffjtPlannerContextDDBSamplesTable + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextCHSqlCtx, kn) { + currentKey = ffjtPlannerContextCHSqlCtx + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.SimpleLetterEqualFold(ffjKeyPlannerContextCHFinalize, kn) { + currentKey = ffjtPlannerContextCHFinalize + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.SimpleLetterEqualFold(ffjKeyPlannerContextCancelCtx, kn) { + currentKey = ffjtPlannerContextCancelCtx + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.SimpleLetterEqualFold(ffjKeyPlannerContextCtx, kn) { + currentKey = ffjtPlannerContextCtx + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextUseCache, kn) { + currentKey = ffjtPlannerContextUseCache + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextTracesDistTable, kn) { + currentKey = ffjtPlannerContextTracesDistTable + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextTracesTable, kn) { + currentKey = ffjtPlannerContextTracesTable + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextTracesAttrsDistTable, kn) { + currentKey = ffjtPlannerContextTracesAttrsDistTable + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextTracesAttrsTable, kn) { + currentKey = ffjtPlannerContextTracesAttrsTable + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextMetrics15sTableName, kn) { + currentKey = ffjtPlannerContextMetrics15sTableName + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextTimeSeriesDistTableName, kn) { + currentKey = ffjtPlannerContextTimeSeriesDistTableName + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextTimeSeriesTableName, kn) { + currentKey = ffjtPlannerContextTimeSeriesTableName + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextSamplesTableName, kn) { + currentKey = ffjtPlannerContextSamplesTableName + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextTimeSeriesGinTableName, kn) { + currentKey = ffjtPlannerContextTimeSeriesGinTableName + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.SimpleLetterEqualFold(ffjKeyPlannerContextLimit, kn) { + currentKey = ffjtPlannerContextLimit + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextOrderASC, kn) { + currentKey = ffjtPlannerContextOrderASC + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextToS, kn) { + currentKey = ffjtPlannerContextToS + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextFromS, kn) { + currentKey = ffjtPlannerContextFromS + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.SimpleLetterEqualFold(ffjKeyPlannerContextTo, kn) { + currentKey = ffjtPlannerContextTo + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.SimpleLetterEqualFold(ffjKeyPlannerContextFrom, kn) { + currentKey = ffjtPlannerContextFrom + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.SimpleLetterEqualFold(ffjKeyPlannerContextOrgID, kn) { + currentKey = ffjtPlannerContextOrgID + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyPlannerContextIsCluster, kn) { + currentKey = ffjtPlannerContextIsCluster + state = fflib.FFParse_want_colon + goto mainparse + } + + currentKey = ffjtPlannerContextnosuchkey + state = fflib.FFParse_want_colon + goto mainparse + } + + case fflib.FFParse_want_colon: + if tok != fflib.FFTok_colon { + wantedTok = fflib.FFTok_colon + goto wrongtokenerror + } + state = fflib.FFParse_want_value + continue + case fflib.FFParse_want_value: + + if tok == fflib.FFTok_left_brace || tok == fflib.FFTok_left_bracket || tok == fflib.FFTok_integer || tok == fflib.FFTok_double || tok == fflib.FFTok_string || tok == fflib.FFTok_bool || tok == fflib.FFTok_null { + switch currentKey { + + case ffjtPlannerContextIsCluster: + goto handle_IsCluster + + case ffjtPlannerContextOrgID: + goto handle_OrgID + + case ffjtPlannerContextFrom: + goto handle_From + + case ffjtPlannerContextTo: + goto handle_To + + case ffjtPlannerContextFromS: + goto handle_FromS + + case ffjtPlannerContextToS: + goto handle_ToS + + case ffjtPlannerContextOrderASC: + goto handle_OrderASC + + case ffjtPlannerContextLimit: + goto handle_Limit + + case ffjtPlannerContextTimeSeriesGinTableName: + goto handle_TimeSeriesGinTableName + + case ffjtPlannerContextSamplesTableName: + goto handle_SamplesTableName + + case ffjtPlannerContextTimeSeriesTableName: + goto handle_TimeSeriesTableName + + case ffjtPlannerContextTimeSeriesDistTableName: + goto handle_TimeSeriesDistTableName + + case ffjtPlannerContextMetrics15sTableName: + goto handle_Metrics15sTableName + + case ffjtPlannerContextTracesAttrsTable: + goto handle_TracesAttrsTable + + case ffjtPlannerContextTracesAttrsDistTable: + goto handle_TracesAttrsDistTable + + case ffjtPlannerContextTracesTable: + goto handle_TracesTable + + case ffjtPlannerContextTracesDistTable: + goto handle_TracesDistTable + + case ffjtPlannerContextUseCache: + goto handle_UseCache + + case ffjtPlannerContextCtx: + goto handle_Ctx + + case ffjtPlannerContextCancelCtx: + goto handle_CancelCtx + + case ffjtPlannerContextCHFinalize: + goto handle_CHFinalize + + case ffjtPlannerContextCHSqlCtx: + goto handle_CHSqlCtx + + case ffjtPlannerContextDDBSamplesTable: + goto handle_DDBSamplesTable + + case ffjtPlannerContextDDBTSTable: + goto handle_DDBTSTable + + case ffjtPlannerContextStep: + goto handle_Step + + case ffjtPlannerContextDeleteID: + goto handle_DeleteID + + case ffjtPlannerContextnosuchkey: + err = fs.SkipField(tok) + if err != nil { + return fs.WrapErr(err) + } + state = fflib.FFParse_after_value + goto mainparse + } + } else { + goto wantedvalue + } + } + } + +handle_IsCluster: + + /* handler: j.IsCluster type=bool kind=bool quoted=false*/ + + { + if tok != fflib.FFTok_bool && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for bool", tok)) + } + } + + { + if tok == fflib.FFTok_null { + + } else { + tmpb := fs.Output.Bytes() + + if bytes.Compare([]byte{'t', 'r', 'u', 'e'}, tmpb) == 0 { + + j.IsCluster = true + + } else if bytes.Compare([]byte{'f', 'a', 'l', 's', 'e'}, tmpb) == 0 { + + j.IsCluster = false + + } else { + err = errors.New("unexpected bytes for true/false value") + return fs.WrapErr(err) + } + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_OrgID: + + /* handler: j.OrgID type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.OrgID = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_From: + + /* handler: j.From type=time.Time kind=struct quoted=false*/ + + { + if tok == fflib.FFTok_null { + + } else { + + tbuf, err := fs.CaptureField(tok) + if err != nil { + return fs.WrapErr(err) + } + + err = j.From.UnmarshalJSON(tbuf) + if err != nil { + return fs.WrapErr(err) + } + } + state = fflib.FFParse_after_value + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_To: + + /* handler: j.To type=time.Time kind=struct quoted=false*/ + + { + if tok == fflib.FFTok_null { + + } else { + + tbuf, err := fs.CaptureField(tok) + if err != nil { + return fs.WrapErr(err) + } + + err = j.To.UnmarshalJSON(tbuf) + if err != nil { + return fs.WrapErr(err) + } + } + state = fflib.FFParse_after_value + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_FromS: + + /* handler: j.FromS type=int32 kind=int32 quoted=false*/ + + { + if tok != fflib.FFTok_integer && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for int32", tok)) + } + } + + { + + if tok == fflib.FFTok_null { + + } else { + + tval, err := fflib.ParseInt(fs.Output.Bytes(), 10, 32) + + if err != nil { + return fs.WrapErr(err) + } + + j.FromS = int32(tval) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_ToS: + + /* handler: j.ToS type=int32 kind=int32 quoted=false*/ + + { + if tok != fflib.FFTok_integer && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for int32", tok)) + } + } + + { + + if tok == fflib.FFTok_null { + + } else { + + tval, err := fflib.ParseInt(fs.Output.Bytes(), 10, 32) + + if err != nil { + return fs.WrapErr(err) + } + + j.ToS = int32(tval) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_OrderASC: + + /* handler: j.OrderASC type=bool kind=bool quoted=false*/ + + { + if tok != fflib.FFTok_bool && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for bool", tok)) + } + } + + { + if tok == fflib.FFTok_null { + + } else { + tmpb := fs.Output.Bytes() + + if bytes.Compare([]byte{'t', 'r', 'u', 'e'}, tmpb) == 0 { + + j.OrderASC = true + + } else if bytes.Compare([]byte{'f', 'a', 'l', 's', 'e'}, tmpb) == 0 { + + j.OrderASC = false + + } else { + err = errors.New("unexpected bytes for true/false value") + return fs.WrapErr(err) + } + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_Limit: + + /* handler: j.Limit type=int64 kind=int64 quoted=false*/ + + { + if tok != fflib.FFTok_integer && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for int64", tok)) + } + } + + { + + if tok == fflib.FFTok_null { + + } else { + + tval, err := fflib.ParseInt(fs.Output.Bytes(), 10, 64) + + if err != nil { + return fs.WrapErr(err) + } + + j.Limit = int64(tval) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_TimeSeriesGinTableName: + + /* handler: j.TimeSeriesGinTableName type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.TimeSeriesGinTableName = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_SamplesTableName: + + /* handler: j.SamplesTableName type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.SamplesTableName = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_TimeSeriesTableName: + + /* handler: j.TimeSeriesTableName type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.TimeSeriesTableName = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_TimeSeriesDistTableName: + + /* handler: j.TimeSeriesDistTableName type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.TimeSeriesDistTableName = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_Metrics15sTableName: + + /* handler: j.Metrics15sTableName type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.Metrics15sTableName = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_TracesAttrsTable: + + /* handler: j.TracesAttrsTable type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.TracesAttrsTable = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_TracesAttrsDistTable: + + /* handler: j.TracesAttrsDistTable type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.TracesAttrsDistTable = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_TracesTable: + + /* handler: j.TracesTable type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.TracesTable = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_TracesDistTable: + + /* handler: j.TracesDistTable type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.TracesDistTable = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_UseCache: + + /* handler: j.UseCache type=bool kind=bool quoted=false*/ + + { + if tok != fflib.FFTok_bool && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for bool", tok)) + } + } + + { + if tok == fflib.FFTok_null { + + } else { + tmpb := fs.Output.Bytes() + + if bytes.Compare([]byte{'t', 'r', 'u', 'e'}, tmpb) == 0 { + + j.UseCache = true + + } else if bytes.Compare([]byte{'f', 'a', 'l', 's', 'e'}, tmpb) == 0 { + + j.UseCache = false + + } else { + err = errors.New("unexpected bytes for true/false value") + return fs.WrapErr(err) + } + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_Ctx: + + /* handler: j.Ctx type=context.Context kind=interface quoted=false*/ + + { + /* Falling back. type=context.Context kind=interface */ + tbuf, err := fs.CaptureField(tok) + if err != nil { + return fs.WrapErr(err) + } + + err = json.Unmarshal(tbuf, &j.Ctx) + if err != nil { + return fs.WrapErr(err) + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_CancelCtx: + + /* handler: j.CancelCtx type=context.CancelFunc kind=func quoted=false*/ + + { + /* Falling back. type=context.CancelFunc kind=func */ + tbuf, err := fs.CaptureField(tok) + if err != nil { + return fs.WrapErr(err) + } + + err = json.Unmarshal(tbuf, &j.CancelCtx) + if err != nil { + return fs.WrapErr(err) + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_CHFinalize: + + /* handler: j.CHFinalize type=bool kind=bool quoted=false*/ + + { + if tok != fflib.FFTok_bool && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for bool", tok)) + } + } + + { + if tok == fflib.FFTok_null { + + } else { + tmpb := fs.Output.Bytes() + + if bytes.Compare([]byte{'t', 'r', 'u', 'e'}, tmpb) == 0 { + + j.CHFinalize = true + + } else if bytes.Compare([]byte{'f', 'a', 'l', 's', 'e'}, tmpb) == 0 { + + j.CHFinalize = false + + } else { + err = errors.New("unexpected bytes for true/false value") + return fs.WrapErr(err) + } + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_CHSqlCtx: + + /* handler: j.CHSqlCtx type=sql.Ctx kind=struct quoted=false*/ + + { + /* Falling back. type=sql.Ctx kind=struct */ + tbuf, err := fs.CaptureField(tok) + if err != nil { + return fs.WrapErr(err) + } + + err = json.Unmarshal(tbuf, &j.CHSqlCtx) + if err != nil { + return fs.WrapErr(err) + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_DDBSamplesTable: + + /* handler: j.DDBSamplesTable type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.DDBSamplesTable = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_DDBTSTable: + + /* handler: j.DDBTSTable type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.DDBTSTable = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_Step: + + /* handler: j.Step type=time.Duration kind=int64 quoted=false*/ + + { + if tok != fflib.FFTok_integer && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for Duration", tok)) + } + } + + { + + if tok == fflib.FFTok_null { + + } else { + + tval, err := fflib.ParseInt(fs.Output.Bytes(), 10, 64) + + if err != nil { + return fs.WrapErr(err) + } + + j.Step = time.Duration(tval) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_DeleteID: + + /* handler: j.DeleteID type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.DeleteID = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +wantedvalue: + return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok)) +wrongtokenerror: + return fs.WrapErr(fmt.Errorf("ffjson: wanted token: %v, but got token: %v output=%s", wantedTok, tok, fs.Output.String())) +tokerror: + if fs.BigError != nil { + return fs.WrapErr(fs.BigError) + } + err = fs.Error.ToError() + if err != nil { + return fs.WrapErr(err) + } + panic("ffjson-generated: unreachable, please report bug.") +done: + + return nil +} diff --git a/wasm_parts/traceql/shared/tempo_types.go b/wasm_parts/traceql/shared/tempo_types.go new file mode 100644 index 00000000..bdcb4c1e --- /dev/null +++ b/wasm_parts/traceql/shared/tempo_types.go @@ -0,0 +1,33 @@ +package shared + +type TraceInfo struct { + TraceID string `json:"traceID"` + RootServiceName string `json:"rootServiceName"` + RootTraceName string `json:"rootTraceName"` + StartTimeUnixNano string `json:"startTimeUnixNano"` + DurationMs float64 `json:"durationMs"` + SpanSet SpanSet `json:"spanSet"` +} + +type SpanInfo struct { + SpanID string `json:"spanID"` + StartTimeUnixNano string `json:"startTimeUnixNano"` + DurationNanos string `json:"durationNanos"` + Attributes []SpanAttr `json:"attributes"` +} + +type SpanSet struct { + Spans []SpanInfo `json:"spans"` + Matched int `json:"matched"` +} + +type SpanAttr struct { + Key string `json:"key"` + Value struct { + StringValue string `json:"stringValue"` + } `json:"value"` +} + +type TraceRequestProcessor interface { + Process(*PlannerContext) (chan []TraceInfo, error) +} diff --git a/wasm_parts/traceql/shared/types.go b/wasm_parts/traceql/shared/types.go new file mode 100644 index 00000000..cd1c80dd --- /dev/null +++ b/wasm_parts/traceql/shared/types.go @@ -0,0 +1,30 @@ +package shared + +import ( + "wasm_parts/sql_select" +) + +type RequestProcessor interface { + IsMatrix() bool + Process(*PlannerContext, chan []LogEntry) (chan []LogEntry, error) +} + +type SQLRequestPlanner interface { + Process(ctx *PlannerContext) (sql.ISelect, error) +} + +type LogEntry struct { + TimestampNS int64 + Fingerprint uint64 + Labels map[string]string + Message string + Value float64 + + Err error +} + +type RequestProcessorChain []RequestProcessor + +type RequestPlanner interface { + Process(cnain RequestProcessorChain) (RequestProcessorChain, error) +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/aggregator.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/aggregator.go new file mode 100644 index 00000000..1ff7dac6 --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/aggregator.go @@ -0,0 +1,73 @@ +package clickhouse_transpiler + +import ( + "strconv" + "time" + sql "wasm_parts/sql_select" + "wasm_parts/traceql/shared" +) + +type AggregatorPlanner struct { + Main shared.SQLRequestPlanner + Fn string + Attr string + CompareFn string + CompareVal string + + fCmpVal float64 +} + +func (a *AggregatorPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := a.Main.Process(ctx) + if err != nil { + return nil, err + } + + agg, err := a.getAggregator() + if err != nil { + return nil, err + } + + fn, err := getComparisonFn(a.CompareFn) + if err != nil { + return nil, err + } + + err = a.cmpVal() + if err != nil { + return nil, err + } + + return main.AndHaving(fn(agg, sql.NewFloatVal(a.fCmpVal))), nil +} + +func (a *AggregatorPlanner) cmpVal() error { + if a.Attr == "duration" { + cmpDuration, err := time.ParseDuration(a.CompareVal) + if err != nil { + return err + } + a.fCmpVal = float64(cmpDuration.Nanoseconds()) + return nil + } + + var err error + a.fCmpVal, err = strconv.ParseFloat(a.CompareVal, 64) + return err +} + +func (a *AggregatorPlanner) getAggregator() (sql.SQLObject, error) { + switch a.Fn { + case "count": + return sql.NewRawObject("toFloat64(count(distinct index_search.span_id))"), nil + case "avg": + return sql.NewRawObject("avgIf(agg_val, isNotNull(agg_val))"), nil + case "max": + return sql.NewRawObject("maxIf(agg_val, isNotNull(agg_val))"), nil + case "min": + return sql.NewRawObject("minIf(agg_val, isNotNull(agg_val))"), nil + case "sum": + return sql.NewRawObject("sumIf(agg_val, isNotNull(agg_val))"), nil + } + return nil, &shared.NotSupportedError{"aggregator not supported: " + a.Fn} +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/attr_condition.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/attr_condition.go new file mode 100644 index 00000000..bc8ab898 --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/attr_condition.go @@ -0,0 +1,335 @@ +package clickhouse_transpiler + +import ( + "fmt" + "strconv" + "strings" + "time" + sql "wasm_parts/sql_select" + traceql_parser "wasm_parts/traceql/parser" + "wasm_parts/traceql/shared" +) + +type AttrConditionPlanner struct { + Main shared.SQLRequestPlanner + Terms []*traceql_parser.AttrSelector + Conds *condition + AggregatedAttr string + + sqlConds []sql.SQLCondition + isAliased bool + alias string + where []sql.SQLCondition +} + +func (a *AttrConditionPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := a.Main.Process(ctx) + if err != nil { + return nil, err + } + a.alias = "bsCond" + + for _, t := range a.Terms { + sqlTerm, err := a.getTerm(t) + if err != nil { + return nil, err + } + a.sqlConds = append(a.sqlConds, sqlTerm) + + if !strings.HasPrefix(t.Label, "span.") && + !strings.HasPrefix(t.Label, "resource.") && + !strings.HasPrefix(t.Label, ".") && + t.Label != "name" { + continue + } + a.where = append(a.where, sqlTerm) + } + + having, err := a.getCond(a.Conds) + if err != nil { + return nil, err + } + + err = a.aggregator(main) + if err != nil { + return nil, err + } + + return main.AndWhere(sql.Or(a.where...)).AndHaving(having), nil +} + +func (a *AttrConditionPlanner) aggregator(main sql.ISelect) error { + if a.AggregatedAttr == "" { + return nil + } + + s := main.GetSelect() + if a.AggregatedAttr == "duration" { + s = append(s, sql.NewSimpleCol("toFloat64(duration)", "agg_val")) + main.Select(s...) + return nil + } + + if strings.HasPrefix(a.AggregatedAttr, "span.") { + a.AggregatedAttr = a.AggregatedAttr[5:] + } + if strings.HasPrefix(a.AggregatedAttr, "resource.") { + a.AggregatedAttr = a.AggregatedAttr[9:] + } + if strings.HasPrefix(a.AggregatedAttr, ".") { + a.AggregatedAttr = a.AggregatedAttr[1:] + } + s = append(s, sql.NewCol(&sqlAttrValue{a.AggregatedAttr}, "agg_val")) + main.Select(s...) + a.where = append(a.where, sql.Eq(sql.NewRawObject("key"), sql.NewStringVal(a.AggregatedAttr))) + return nil +} + +func (a *AttrConditionPlanner) getCond(c *condition) (sql.SQLCondition, error) { + if c.simpleIdx == -1 { + subs := make([]sql.SQLCondition, len(c.complex)) + for i, s := range c.complex { + cond, err := a.getCond(s) + if err != nil { + return nil, err + } + subs[i] = cond + } + switch c.op { + case "&&": + return sql.And(subs...), nil + } + return sql.Or(subs...), nil + } + var left sql.SQLObject + if !a.isAliased { + left = &groupBitOr{&bitSet{ + terms: a.sqlConds, + }, a.alias} + a.isAliased = true + } else { + left = sql.NewRawObject(a.alias) + } + return sql.Neq(&bitAnd{left, sql.NewIntVal(int64(1) << c.simpleIdx)}, sql.NewIntVal(0)), nil +} + +func (a *AttrConditionPlanner) getTerm(t *traceql_parser.AttrSelector) (sql.SQLCondition, error) { + key := t.Label + if strings.HasPrefix(key, "span.") { + key = key[5:] + } else if strings.HasPrefix(key, "resource.") { + key = key[9:] + } else if strings.HasPrefix(key, ".") { + key = key[1:] + } else { + switch key { + case "duration": + return a.getTermDuration(t) + case "name": + key = "name" + default: + return nil, fmt.Errorf("unsupported attribute %s", key) + } + } + + if t.Val.StrVal != nil { + return a.getTermStr(t, key) + } else if t.Val.FVal != "" { + return a.getTermNum(t, key) + } + return nil, fmt.Errorf("unsupported statement `%s`", t.String()) +} + +func (a *AttrConditionPlanner) getTermNum(t *traceql_parser.AttrSelector, key string) (sql.SQLCondition, error) { + var fn func(left sql.SQLObject, right sql.SQLObject) *sql.LogicalOp + switch t.Op { + case "=": + fn = sql.Eq + case "!=": + fn = sql.Neq + case ">": + fn = sql.Gt + case "<": + fn = sql.Lt + case ">=": + fn = sql.Ge + case "<=": + fn = sql.Le + default: + return nil, &shared.NotSupportedError{Msg: "not supported operator: " + t.Op} + } + + if t.Val.FVal == "" { + return nil, fmt.Errorf("%s is not a number value (%s)", t.Val.FVal, t.String()) + } + fVal, err := strconv.ParseFloat(t.Val.FVal, 64) + if err != nil { + return nil, err + } + return sql.And( + sql.Eq(sql.NewRawObject("key"), sql.NewStringVal(key)), + sql.Eq(sql.NewRawObject("isNotNull(toFloat64OrNull(val))"), sql.NewIntVal(1)), + fn(sql.NewRawObject("toFloat64OrZero(val)"), sql.NewFloatVal(fVal)), + ), nil +} + +func (a *AttrConditionPlanner) getTermStr(t *traceql_parser.AttrSelector, key string) (sql.SQLCondition, error) { + switch t.Op { + case "=": + strVal, err := a.getString(t) + if err != nil { + return nil, err + } + return sql.And( + sql.Eq(sql.NewRawObject("key"), sql.NewStringVal(key)), + sql.Eq(sql.NewRawObject("val"), sql.NewStringVal(strVal)), + ), nil + case "!=": + strVal, err := a.getString(t) + if err != nil { + return nil, err + } + return sql.And( + sql.Eq(sql.NewRawObject("key"), sql.NewStringVal(key)), + sql.Neq(sql.NewRawObject("val"), sql.NewStringVal(strVal)), + ), nil + case "=~": + strVal, err := a.getString(t) + if err != nil { + return nil, err + } + return sql.And( + sql.Eq(sql.NewRawObject("key"), sql.NewStringVal(key)), + sql.Eq(&matchRe{sql.NewRawObject("val"), strVal}, sql.NewIntVal(1)), + ), nil + case "!~": + strVal, err := a.getString(t) + if err != nil { + return nil, err + } + return sql.And( + sql.Eq(sql.NewRawObject("key"), sql.NewStringVal(key)), + sql.Eq(&matchRe{sql.NewRawObject("val"), strVal}, sql.NewIntVal(0)), + ), nil + } + return nil, &shared.NotSupportedError{Msg: "not supported operator: " + t.Op} +} + +func (a *AttrConditionPlanner) getTermDuration(t *traceql_parser.AttrSelector) (sql.SQLCondition, error) { + if t.Val.TimeVal == "" { + return nil, fmt.Errorf("%s is not a time duration value (%s)", t.Val.TimeVal, t.String()) + } + fVal, err := time.ParseDuration(t.Val.TimeVal) + if err != nil { + return nil, err + } + + fn, err := getComparisonFn(t.Op) + if err != nil { + return nil, err + } + + return fn(sql.NewRawObject("traces_idx.duration"), sql.NewIntVal(fVal.Nanoseconds())), nil +} + +func (a *AttrConditionPlanner) getString(t *traceql_parser.AttrSelector) (string, error) { + var ( + strVal string + err error + ) + if t.Val.StrVal != nil { + strVal, err = t.Val.StrVal.Unquote() + if err != nil { + return "", err + } + } else { + strVal = t.Val.FVal + } + return strVal, nil +} + +type bitSet struct { + terms []sql.SQLCondition +} + +func (b *bitSet) String(ctx *sql.Ctx, options ...int) (string, error) { + strTerms := make([]string, len(b.terms)) + for i, term := range b.terms { + strTerm, err := term.String(ctx, options...) + if err != nil { + return "", err + } + strTerms[i] = fmt.Sprintf("bitShiftLeft(toUInt64(%s),%d)", strTerm, i) + } + res := strings.Join(strTerms, "+") + + return res, nil +} + +type bitAnd struct { + left sql.SQLObject + right sql.SQLObject +} + +func (b *bitAnd) String(ctx *sql.Ctx, options ...int) (string, error) { + + strLeft, err := b.left.String(ctx, options...) + if err != nil { + return "", err + } + strRight, err := b.right.String(ctx, options...) + if err != nil { + return "", err + } + res := fmt.Sprintf("bitAnd(%s,%s)", strLeft, strRight) + + return res, nil +} + +type groupBitOr struct { + left sql.SQLObject + alias string +} + +func (b *groupBitOr) String(ctx *sql.Ctx, options ...int) (string, error) { + + strLeft, err := b.left.String(ctx, options...) + if err != nil { + return "", err + } + res := fmt.Sprintf("groupBitOr(%s)", strLeft) + if b.alias != "" { + res = fmt.Sprintf("%s as %s", res, b.alias) + } + return res, nil +} + +type matchRe struct { + field sql.SQLObject + re string +} + +func (m matchRe) String(ctx *sql.Ctx, options ...int) (string, error) { + field, err := m.field.String(ctx, options...) + if err != nil { + return "", err + } + strRe, err := sql.NewStringVal(m.re).String(ctx, options...) + if err != nil { + return "", err + } + return fmt.Sprintf("match(%s,%s)", field, strRe), nil +} + +type sqlAttrValue struct { + attr string +} + +func (s *sqlAttrValue) String(ctx *sql.Ctx, options ...int) (string, error) { + attr, err := sql.NewStringVal(s.attr).String(ctx, options...) + if err != nil { + return "", err + } + + return fmt.Sprintf("anyIf(toFloat64OrNull(val), key == %s)", attr), nil +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/index_groupby.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/index_groupby.go new file mode 100644 index 00000000..2eab14fe --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/index_groupby.go @@ -0,0 +1,31 @@ +package clickhouse_transpiler + +import ( + sql "wasm_parts/sql_select" + "wasm_parts/traceql/shared" +) + +type IndexGroupByPlanner struct { + Main shared.SQLRequestPlanner +} + +func (i *IndexGroupByPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := i.Main.Process(ctx) + if err != nil { + return nil, err + } + + withMain := sql.NewWith(main, "index_search") + return sql.NewSelect(). + With(withMain). + Select( + sql.NewSimpleCol("trace_id", "trace_id"), + sql.NewSimpleCol("groupArray(span_id)", "span_id"), + sql.NewSimpleCol("groupArray(duration)", "duration"), + sql.NewSimpleCol("groupArray(timestamp_ns)", "timestamp_ns")). + From(sql.NewWithRef(withMain)). + GroupBy(sql.NewRawObject("trace_id")). + OrderBy( + sql.NewOrderBy(sql.NewRawObject("max(index_search.timestamp_ns)"), sql.ORDER_BY_DIRECTION_DESC), + ), nil +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/index_limit.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/index_limit.go new file mode 100644 index 00000000..746a1231 --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/index_limit.go @@ -0,0 +1,23 @@ +package clickhouse_transpiler + +import ( + sql "wasm_parts/sql_select" + "wasm_parts/traceql/shared" +) + +type IndexLimitPlanner struct { + Main shared.SQLRequestPlanner +} + +func (i *IndexLimitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := i.Main.Process(ctx) + if err != nil { + return nil, err + } + + if ctx.Limit == 0 { + return main, nil + } + + return main.Limit(sql.NewIntVal(ctx.Limit)), nil +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/init.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/init.go new file mode 100644 index 00000000..abb08a82 --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/init.go @@ -0,0 +1,26 @@ +package clickhouse_transpiler + +import ( + sql "wasm_parts/sql_select" + "wasm_parts/traceql/shared" +) + +type InitIndexPlanner struct { +} + +func (i *InitIndexPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + return sql.NewSelect().Select( + sql.NewSimpleCol("trace_id", "trace_id"), + sql.NewSimpleCol("lower(hex(span_id))", "span_id"), + sql.NewSimpleCol("any(duration)", "duration"), + sql.NewSimpleCol("any(timestamp_ns)", "timestamp_ns")). + From(sql.NewSimpleCol(ctx.TracesAttrsTable, "traces_idx")). + AndWhere(sql.And( + sql.Eq(sql.NewRawObject("oid"), sql.NewStringVal(ctx.OrgID)), + sql.Ge(sql.NewRawObject("date"), sql.NewStringVal(ctx.From.Format("2006-01-02"))), + sql.Le(sql.NewRawObject("date"), sql.NewStringVal(ctx.To.Format("2006-01-02"))), + sql.Ge(sql.NewRawObject("traces_idx.timestamp_ns"), sql.NewIntVal(ctx.From.UnixNano())), + sql.Lt(sql.NewRawObject("traces_idx.timestamp_ns"), sql.NewIntVal(ctx.To.UnixNano())), + )).GroupBy(sql.NewRawObject("trace_id"), sql.NewRawObject("span_id")). + OrderBy(sql.NewOrderBy(sql.NewRawObject("timestamp_ns"), sql.ORDER_BY_DIRECTION_DESC)), nil +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/planner.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/planner.go new file mode 100644 index 00000000..48cb5639 --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/planner.go @@ -0,0 +1,115 @@ +package clickhouse_transpiler + +import ( + traceql_parser "wasm_parts/traceql/parser" + "wasm_parts/traceql/shared" +) + +func Plan(script *traceql_parser.TraceQLScript) (shared.SQLRequestPlanner, error) { + return (&planner{script: script}).plan() +} + +type planner struct { + script *traceql_parser.TraceQLScript + + //Analyze results + termIdx []*traceql_parser.AttrSelector + cond *condition + aggFn string + aggAttr string + cmpVal string + + terms map[string]int +} + +func (p *planner) plan() (shared.SQLRequestPlanner, error) { + err := p.check() + if err != nil { + return nil, err + } + + p.analyze() + + var res shared.SQLRequestPlanner = &AttrConditionPlanner{ + Main: &InitIndexPlanner{}, + Terms: p.termIdx, + Conds: p.cond, + AggregatedAttr: p.aggAttr, + } + + res = &IndexGroupByPlanner{res} + + if p.aggFn != "" { + res = &AggregatorPlanner{ + Main: res, + Fn: p.aggFn, + Attr: p.aggAttr, + CompareFn: p.script.Head.Aggregator.Cmp, + CompareVal: p.script.Head.Aggregator.Num + p.script.Head.Aggregator.Measurement, + } + } + + res = &IndexLimitPlanner{res} + + res = &TracesDataPlanner{Main: res} + + res = &IndexLimitPlanner{res} + + return res, nil +} + +func (p *planner) check() error { + if p.script.Tail != nil { + return &shared.NotSupportedError{Msg: "more than one selector not supported"} + } + return nil +} + +func (p *planner) analyze() { + p.terms = make(map[string]int) + p.cond = p.analyzeCond(&p.script.Head.AttrSelector) + p.analyzeAgg() +} + +func (p *planner) analyzeCond(exp *traceql_parser.AttrSelectorExp) *condition { + var res *condition + if exp.ComplexHead != nil { + res = p.analyzeCond(exp.ComplexHead) + } else if exp.Head != nil { + term := exp.Head.String() + if p.terms[term] != 0 { + res = &condition{simpleIdx: p.terms[term] - 1} + } else { + p.termIdx = append(p.termIdx, exp.Head) + p.terms[term] = len(p.termIdx) + res = &condition{simpleIdx: len(p.termIdx) - 1} + } + } + if exp.Tail != nil { + res = &condition{ + simpleIdx: -1, + op: exp.AndOr, + complex: []*condition{res, p.analyzeCond(exp.Tail)}, + } + } + return res +} + +func (p *planner) analyzeAgg() { + if p.script.Head.Aggregator == nil { + return + } + + p.aggFn = p.script.Head.Aggregator.Fn + p.aggAttr = p.script.Head.Aggregator.Attr + + p.cmpVal = p.script.Head.Aggregator.Num + p.script.Head.Aggregator.Measurement + return +} + +type condition struct { + simpleIdx int // index of term; -1 means complex + + op string + complex []*condition +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/planner_test.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/planner_test.go new file mode 100644 index 00000000..b0565251 --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/planner_test.go @@ -0,0 +1,49 @@ +package clickhouse_transpiler + +import ( + "fmt" + "math/rand" + "testing" + "time" + sql "wasm_parts/sql_select" + traceql_parser "wasm_parts/traceql/parser" + "wasm_parts/traceql/shared" +) + +func TestPlanner(t *testing.T) { + script, err := traceql_parser.Parse(`{.randomContainer=~"admiring" && .randomFloat > 10}`) + if err != nil { + t.Fatal(err) + } + plan, err := Plan(script) + if err != nil { + t.Fatal(err) + } + + req, err := plan.Process(&shared.PlannerContext{ + IsCluster: false, + OrgID: "0", + From: time.Now().Add(time.Hour * -44), + To: time.Now(), + Limit: 3, + TracesAttrsTable: "tempo_traces_attrs_gin", + TracesAttrsDistTable: "tempo_traces_attrs_gin_dist", + TracesTable: "tempo_traces", + TracesDistTable: "tempo_traces_dist", + }) + if err != nil { + t.Fatal(err) + } + res, err := req.String(&sql.Ctx{ + Params: map[string]sql.SQLObject{}, + Result: map[string]sql.SQLObject{}, + }) + if err != nil { + t.Fatal(err) + } + fmt.Println(res) +} + +func TestRandom(t *testing.T) { + fmt.Sprintf("%f", 50+(rand.Float64()*100-50)) +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/shared.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/shared.go new file mode 100644 index 00000000..7bb1c911 --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/shared.go @@ -0,0 +1,24 @@ +package clickhouse_transpiler + +import ( + sql "wasm_parts/sql_select" + "wasm_parts/traceql/shared" +) + +func getComparisonFn(op string) (func(left sql.SQLObject, right sql.SQLObject) *sql.LogicalOp, error) { + switch op { + case "=": + return sql.Eq, nil + case ">": + return sql.Gt, nil + case "<": + return sql.Lt, nil + case ">=": + return sql.Ge, nil + case "<=": + return sql.Le, nil + case "!=": + return sql.Neq, nil + } + return nil, &shared.NotSupportedError{Msg: "not supported operator: " + op} +} diff --git a/wasm_parts/traceql/transpiler/clickhouse_transpiler/traces_data.go b/wasm_parts/traceql/transpiler/clickhouse_transpiler/traces_data.go new file mode 100644 index 00000000..89f70a2d --- /dev/null +++ b/wasm_parts/traceql/transpiler/clickhouse_transpiler/traces_data.go @@ -0,0 +1,50 @@ +package clickhouse_transpiler + +import ( + sql "wasm_parts/sql_select" + "wasm_parts/traceql/shared" +) + +type TracesDataPlanner struct { + Main shared.SQLRequestPlanner +} + +func (t *TracesDataPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := t.Main.Process(ctx) + if err != nil { + return nil, err + } + + table := ctx.TracesTable + if ctx.IsCluster { + table = ctx.TracesDistTable + } + + withMain := sql.NewWith(main, "index_grouped") + withTraceIds := sql.NewWith( + sql.NewSelect().Select(sql.NewRawObject("trace_id")).From(sql.NewWithRef(withMain)), + "trace_ids") + return sql.NewSelect(). + With(withMain, withTraceIds). + Select( + sql.NewSimpleCol("lower(hex(traces.trace_id))", "trace_id"), + sql.NewSimpleCol("any(index_grouped.span_id)", "span_id"), + sql.NewSimpleCol("any(index_grouped.duration)", "duration"), + sql.NewSimpleCol("any(index_grouped.timestamp_ns)", "timestamps_ns"), + sql.NewSimpleCol("min(traces.timestamp_ns)", "start_time_unix_nano"), + sql.NewSimpleCol( + "toFloat64(max(traces.timestamp_ns + traces.duration_ns) - min(traces.timestamp_ns)) / 1000000", + "duration_ms"), + sql.NewSimpleCol("argMin(traces.service_name, traces.timestamp_ns)", "root_service_name"), + sql.NewSimpleCol("argMin(traces.name, traces.timestamp_ns)", "root_trace_name"), + ). + From(sql.NewSimpleCol(table, "traces")). + Join(sql.NewJoin("LEFT ANY", + sql.NewWithRef(withMain), + sql.Eq(sql.NewRawObject("traces.trace_id"), sql.NewRawObject("index_grouped.trace_id")))). + AndWhere( + sql.Eq(sql.NewRawObject("oid"), sql.NewStringVal(ctx.OrgID)), + sql.NewIn(sql.NewRawObject("traces.trace_id"), sql.NewWithRef(withTraceIds))). + GroupBy(sql.NewRawObject("traces.trace_id")). + OrderBy(sql.NewOrderBy(sql.NewRawObject("start_time_unix_nano"), sql.ORDER_BY_DIRECTION_DESC)), nil +} diff --git a/wasm_parts/traceql/transpiler/planner.go b/wasm_parts/traceql/transpiler/planner.go new file mode 100644 index 00000000..1a023a8e --- /dev/null +++ b/wasm_parts/traceql/transpiler/planner.go @@ -0,0 +1,15 @@ +package traceql_transpiler + +import ( + traceql_parser "wasm_parts/traceql/parser" + "wasm_parts/traceql/shared" + "wasm_parts/traceql/transpiler/clickhouse_transpiler" +) + +func Plan(script *traceql_parser.TraceQLScript) (shared.SQLRequestPlanner, error) { + sqlPlanner, err := clickhouse_transpiler.Plan(script) + if err != nil { + return nil, err + } + return sqlPlanner, nil +} diff --git a/wasm_parts/types/traceQLRequest.go b/wasm_parts/types/traceQLRequest.go new file mode 100644 index 00000000..72f94d7d --- /dev/null +++ b/wasm_parts/types/traceQLRequest.go @@ -0,0 +1,10 @@ +package types + +import ( + "wasm_parts/traceql/shared" +) + +type TraceQLRequest struct { + Request string + Ctx shared.PlannerContext +} diff --git a/wasm_parts/types/traceQLRequest_ffjson.go b/wasm_parts/types/traceQLRequest_ffjson.go new file mode 100644 index 00000000..8275d7a5 --- /dev/null +++ b/wasm_parts/types/traceQLRequest_ffjson.go @@ -0,0 +1,255 @@ +// Code generated by ffjson . DO NOT EDIT. +// source: traceQLRequest.go + +package types + +import ( + "bytes" + "fmt" + fflib "github.com/pquerna/ffjson/fflib/v1" +) + +// MarshalJSON marshal bytes to json - template +func (j *TraceQLRequest) MarshalJSON() ([]byte, error) { + var buf fflib.Buffer + if j == nil { + buf.WriteString("null") + return buf.Bytes(), nil + } + err := j.MarshalJSONBuf(&buf) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// MarshalJSONBuf marshal buff to json - template +func (j *TraceQLRequest) MarshalJSONBuf(buf fflib.EncodingBuffer) error { + if j == nil { + buf.WriteString("null") + return nil + } + var err error + var obj []byte + _ = obj + _ = err + buf.WriteString(`{"Request":`) + fflib.WriteJsonString(buf, string(j.Request)) + buf.WriteString(`,"Ctx":`) + + { + + err = j.Ctx.MarshalJSONBuf(buf) + if err != nil { + return err + } + + } + buf.WriteByte('}') + return nil +} + +const ( + ffjtTraceQLRequestbase = iota + ffjtTraceQLRequestnosuchkey + + ffjtTraceQLRequestRequest + + ffjtTraceQLRequestCtx +) + +var ffjKeyTraceQLRequestRequest = []byte("Request") + +var ffjKeyTraceQLRequestCtx = []byte("Ctx") + +// UnmarshalJSON umarshall json - template of ffjson +func (j *TraceQLRequest) UnmarshalJSON(input []byte) error { + fs := fflib.NewFFLexer(input) + return j.UnmarshalJSONFFLexer(fs, fflib.FFParse_map_start) +} + +// UnmarshalJSONFFLexer fast json unmarshall - template ffjson +func (j *TraceQLRequest) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error { + var err error + currentKey := ffjtTraceQLRequestbase + _ = currentKey + tok := fflib.FFTok_init + wantedTok := fflib.FFTok_init + +mainparse: + for { + tok = fs.Scan() + // println(fmt.Sprintf("debug: tok: %v state: %v", tok, state)) + if tok == fflib.FFTok_error { + goto tokerror + } + + switch state { + + case fflib.FFParse_map_start: + if tok != fflib.FFTok_left_bracket { + wantedTok = fflib.FFTok_left_bracket + goto wrongtokenerror + } + state = fflib.FFParse_want_key + continue + + case fflib.FFParse_after_value: + if tok == fflib.FFTok_comma { + state = fflib.FFParse_want_key + } else if tok == fflib.FFTok_right_bracket { + goto done + } else { + wantedTok = fflib.FFTok_comma + goto wrongtokenerror + } + + case fflib.FFParse_want_key: + // json {} ended. goto exit. woo. + if tok == fflib.FFTok_right_bracket { + goto done + } + if tok != fflib.FFTok_string { + wantedTok = fflib.FFTok_string + goto wrongtokenerror + } + + kn := fs.Output.Bytes() + if len(kn) <= 0 { + // "" case. hrm. + currentKey = ffjtTraceQLRequestnosuchkey + state = fflib.FFParse_want_colon + goto mainparse + } else { + switch kn[0] { + + case 'C': + + if bytes.Equal(ffjKeyTraceQLRequestCtx, kn) { + currentKey = ffjtTraceQLRequestCtx + state = fflib.FFParse_want_colon + goto mainparse + } + + case 'R': + + if bytes.Equal(ffjKeyTraceQLRequestRequest, kn) { + currentKey = ffjtTraceQLRequestRequest + state = fflib.FFParse_want_colon + goto mainparse + } + + } + + if fflib.SimpleLetterEqualFold(ffjKeyTraceQLRequestCtx, kn) { + currentKey = ffjtTraceQLRequestCtx + state = fflib.FFParse_want_colon + goto mainparse + } + + if fflib.EqualFoldRight(ffjKeyTraceQLRequestRequest, kn) { + currentKey = ffjtTraceQLRequestRequest + state = fflib.FFParse_want_colon + goto mainparse + } + + currentKey = ffjtTraceQLRequestnosuchkey + state = fflib.FFParse_want_colon + goto mainparse + } + + case fflib.FFParse_want_colon: + if tok != fflib.FFTok_colon { + wantedTok = fflib.FFTok_colon + goto wrongtokenerror + } + state = fflib.FFParse_want_value + continue + case fflib.FFParse_want_value: + + if tok == fflib.FFTok_left_brace || tok == fflib.FFTok_left_bracket || tok == fflib.FFTok_integer || tok == fflib.FFTok_double || tok == fflib.FFTok_string || tok == fflib.FFTok_bool || tok == fflib.FFTok_null { + switch currentKey { + + case ffjtTraceQLRequestRequest: + goto handle_Request + + case ffjtTraceQLRequestCtx: + goto handle_Ctx + + case ffjtTraceQLRequestnosuchkey: + err = fs.SkipField(tok) + if err != nil { + return fs.WrapErr(err) + } + state = fflib.FFParse_after_value + goto mainparse + } + } else { + goto wantedvalue + } + } + } + +handle_Request: + + /* handler: j.Request type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + j.Request = string(string(outBuf)) + + } + } + + state = fflib.FFParse_after_value + goto mainparse + +handle_Ctx: + + /* handler: j.Ctx type=shared.PlannerContext kind=struct quoted=false*/ + + { + if tok == fflib.FFTok_null { + + } else { + + err = j.Ctx.UnmarshalJSONFFLexer(fs, fflib.FFParse_want_key) + if err != nil { + return err + } + } + state = fflib.FFParse_after_value + } + + state = fflib.FFParse_after_value + goto mainparse + +wantedvalue: + return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok)) +wrongtokenerror: + return fs.WrapErr(fmt.Errorf("ffjson: wanted token: %v, but got token: %v output=%s", wantedTok, tok, fs.Output.String())) +tokerror: + if fs.BigError != nil { + return fs.WrapErr(fs.BigError) + } + err = fs.Error.ToError() + if err != nil { + return fs.WrapErr(err) + } + panic("ffjson-generated: unreachable, please report bug.") +done: + + return nil +}