diff --git a/.github/workflows/bump_version.yml b/.github/workflows/bump_version.yml
index 7fa0222e..c05f2a1b 100644
--- a/.github/workflows/bump_version.yml
+++ b/.github/workflows/bump_version.yml
@@ -1,39 +1,25 @@
name: 'Build Multi-Arch'
on:
- push:
- branches:
- - 'master'
- paths-ignore:
- - '**.md'
- - '**.yml'
- - '**.yaml'
+ release:
+ types: [created]
jobs:
- bump-version:
- name: 'Bump Version on master & Publish'
+ build:
+ name: 'Build & Publish'
runs-on: ubuntu-latest
steps:
- name: 'Checkout source code'
- uses: 'actions/checkout@v2'
+ uses: 'actions/checkout@v3'
with:
ref: ${{ github.ref }}
- - name: 'cat package.json'
- run: cat ./package.json
- - name: 'Automated Version Bump'
- id: version-bump
- uses: 'phips28/gh-action-bump-version@master'
- with:
- tag-prefix: 'v'
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: 'cat package.json'
- run: cat ./package.json
- - name: 'Output Step'
- env:
- NEW_TAG: ${{ steps.version-bump.outputs.newTag }}
- run: echo "new tag $NEW_TAG"
+ - uses: MYXOMOPX/modify-pkg-json@master
+ id: setcmnver
+ with:
+ target: ./package.json
+ action: "set_version"
+ argument: "${{ github.ref_name }}"
- name: Check NPM secret presence
id: checksecrets
shell: bash
@@ -45,10 +31,10 @@ jobs:
fi
env:
SECRET: ${{ secrets.NPM_TOKEN }}
- - uses: actions/setup-node@v1
+ - uses: actions/setup-node@v4.0.0
if: ${{ steps.checksecrets.outputs.secretspresent }}
with:
- node-version: 14
+ node-version: 18
- name: Publish to NPM
if: ${{ steps.checksecrets.outputs.secretspresent }}
run: |
@@ -93,9 +79,9 @@ jobs:
push: true
tags: |
qxip/qryn:latest
- qxip/qryn:${{ steps.version-bump.outputs.newTag }}
+ qxip/qryn:${{ github.ref_name }}
qxip/cloki:latest
- qxip/cloki:${{ steps.version-bump.outputs.newTag }}
+ qxip/cloki:${{ github.ref_name }}
- name: Log in to the GHCR registry
uses: docker/login-action@v2.0.0
@@ -111,4 +97,4 @@ jobs:
push: true
tags: |
ghcr.io/metrico/qryn:latest
- ghcr.io/metrico/qryn:${{ steps.version-bump.outputs.newTag }}
+ ghcr.io/metrico/qryn:${{ github.ref_name }}
diff --git a/.github/workflows/bump_version_beta.yml b/.github/workflows/bump_version_beta.yml
deleted file mode 100644
index 96cab544..00000000
--- a/.github/workflows/bump_version_beta.yml
+++ /dev/null
@@ -1,95 +0,0 @@
-name: 'Bump & Publish'
-
-on:
- push:
- branches:
- - 'beta'
- paths-ignore:
- - '**.md'
- - '**.yml'
- - '**.yaml'
-
-jobs:
- bump-version:
- name: 'Bump Version on master & Publish'
- runs-on: ubuntu-latest
-
- steps:
- - name: 'Checkout source code'
- uses: 'actions/checkout@v2'
- with:
- ref: ${{ github.ref }}
- - name: 'cat package.json'
- run: cat ./package.json
- - name: 'Automated Version Bump'
- id: version-bump
- uses: 'phips28/gh-action-bump-version@master'
- with:
- tag-prefix: 'v'
- tag-suffix: '-beta'
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: 'cat package.json'
- run: cat ./package.json
- - name: 'Output Step'
- env:
- NEW_TAG: ${{ steps.version-bump.outputs.newTag }}
- run: echo "new tag $NEW_TAG"
- - name: Check NPM secret presence
- id: checksecrets
- shell: bash
- run: |
- if [ "$SECRET" == "" ]; then
- echo ::set-output name=secretspresent::false
- else
- echo ::set-output name=secretspresent::true
- fi
- env:
- SECRET: ${{ secrets.NPM_TOKEN }}
- - uses: actions/setup-node@v1
- if: ${{ steps.checksecrets.outputs.secretspresent }}
- with:
- node-version: 14
- - name: Publish to NPM
- if: ${{ steps.checksecrets.outputs.secretspresent }}
- run: |
- npm config set //registry.npmjs.org/:_authToken ${NPM_TOKEN}
- npm install
- npm publish --access public
- env:
- NPM_TOKEN: ${{ secrets.NPM_TOKEN }}
-
- - name: Check Docker secret presence
- id: checkdocker
- shell: bash
- run: |
- if [ "$SECRET" == "" ]; then
- echo ::set-output name=secretspresent::false
- else
- echo ::set-output name=secretspresent::true
- fi
- env:
- SECRET: ${{ secrets.DOCKERHUB_TOKEN }}
- - name: Set up Docker QEMU
- if: ${{ steps.checkdocker.outputs.secretspresent }}
- uses: docker/setup-qemu-action@v1
- - name: Set up Docker Buildx
- if: ${{ steps.checkdocker.outputs.secretspresent }}
- uses: docker/setup-buildx-action@v1
- - name: Login to DockerHub
- if: ${{ steps.checkdocker.outputs.secretspresent }}
- uses: docker/login-action@v1
- with:
- username: ${{ secrets.DOCKERHUB_USERNAME }}
- password: ${{ secrets.DOCKERHUB_TOKEN }}
- - name: Build and push
- if: ${{ steps.checkdocker.outputs.secretspresent }}
- id: docker_build
- uses: docker/build-push-action@v2
- with:
- push: true
- tags: |
- qxip/qryn:latest
- qxip/qryn:${{ steps.version-bump.outputs.newTag }}
- qxip/cloki:latest
- qxip/cloki:${{ steps.version-bump.outputs.newTag }}
diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml
index 0fee32e5..9e5fdf41 100644
--- a/.github/workflows/node-clickhouse.js.yml
+++ b/.github/workflows/node-clickhouse.js.yml
@@ -23,7 +23,7 @@ jobs:
strategy:
matrix:
- node-version: [18, 16.x]
+ node-version: [18, 16.x, 20]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/
steps:
@@ -43,4 +43,4 @@ jobs:
CLICKHOUSE_TSDB: loki
INTEGRATION_E2E: 1
CLOKI_EXT_URL: 127.0.0.1:3100
- run: node qryn.js >/dev/stdout & npm run test --forceExit
+ run: node qryn.mjs >/dev/stdout & npm run test --forceExit
diff --git a/.gitignore b/.gitignore
index f838c061..c2bb70cb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,8 @@ node_modules
/test/e2e/
/lib/influx/.idea/
/lib/influx/influx.iml
+/wasm_parts/_vendor.zip
+/wasm_parts/.idea/
+/wasm_parts/vendor/
+/wasm_parts/main.wasm
+/wasm_parts/wasm_parts.iml
diff --git a/Dockerfile_bun b/Dockerfile_bun
new file mode 100644
index 00000000..2bd49ab1
--- /dev/null
+++ b/Dockerfile_bun
@@ -0,0 +1,15 @@
+# Qryn
+FROM oven/bun:1-alpine
+
+# BUILD FORCE
+ENV BUILD 703030
+ENV PORT 3100
+
+COPY . /app
+WORKDIR /app
+RUN bun install
+
+# Expose Ports
+EXPOSE 3100
+
+CMD [ "bun", "qryn.mjs" ]
diff --git a/README.md b/README.md
index c555f51d..1e62aeee 100644
--- a/README.md
+++ b/README.md
@@ -4,53 +4,60 @@
[![Build Status](https://github.com/metrico/qryn/actions/workflows/bump_version.yml/badge.svg)](https://github.com/metrico/qryn/actions/workflows/bump_version.yml)
![CodeQL](https://github.com/lmangani/cLoki/workflows/CodeQL/badge.svg)
-
-
-
+[![Stand With Ukraine](https://raw.githubusercontent.com/vshymanskyy/StandWithUkraine/main/badges/StandWithUkraine.svg)](https://stand-with-ukraine.pp.ua)
-# [qryn.dev](https://qryn.dev) :cloud: [qryn.cloud](https://qryn.cloud) :heart:
-> ... it's pronounced /ˈkwɪr..ɪŋ/ or just querying
+# [qryn 3.x](https://qryn.dev) :cloud: [qryn.cloud](https://qryn.cloud)
+> ... it's pronounced /ˈkwɪr..ɪŋ/ or just _querying_
![image](https://user-images.githubusercontent.com/1423657/232089970-c4536f16-5967-4051-85a5-8ad94fcde67c.png)
-:rocket: **qryn** is a _drop-in Grafana compatible_ **polyglot observability** framework
-- **Logs, Metrics and Traces** living happily together. Drop-in compatible with multiple vendors formats.
-- Native [LogQL/PromQL/TempoQL APIs](https://qryn.cloud) support for [querying](https://github.com/lmangani/qryn/wiki/LogQL-for-Beginners), [processing](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries), [tracing](https://github.com/lmangani/qryn/wiki/Tempo-Tracing) and [alerting](https://github.com/lmangani/qryn/wiki/Ruler---Alerts) [^2] in [Grafana](http://docs.grafana.org/features/explore/) [^3]
-- Search, filter and extract metrics from _logs, events, spans and traces_ using familiar languages. _SQL Optional_.
-- Ingestion [APIs](https://qryn.metrico.in/#/support) transparently compatible with [Opentelemetry, Loki, Prometheus, InfluxDB, Elastic](https://qryn.dev) _and [more](https://github.com/metrico/otel-collector)_
-- Ready to use with popular Agents such as [Promtail, Grafana-Agent, Vector, Logstash, Telegraf](https://qryn.metrico.in/#/ingestion) _and more_
-- Built in [Explore UI](https://github.com/metrico/cloki-view) and [CLI](https://github.com/lmangani/vLogQL) for querying supported datasources
-- Designed for edge _(js/bun/wasm)_ and core/backend deployments _(golang/rust)_.
-- Total data control. Compatible with [ClickHouse](https://clickhouse.com/) or [InfluxDB IOx](https://influxdata.com) with S3 object storage.
-
-:rocket: **qryn.cloud** is the _supercharged_ **qryn** version developed in _go_ with additional _functionality, speed and features!_
+:rocket: _lighweight, multi-standard drop-in compatible_ **polyglot observability** framework for _**Logs, Metrics and Traces**_.
+- **Polyglot**: Use **LogQL, PromQL**, and **TempoQL** languages to query, process and alert _any data_.
+- **Voracious**: Ingest anything compatible with **Opentelemetry, Loki, Prometheus, Influx, Datadog, Elastic** _& more_.
+- **Versatile**: Explore data with qryn's built-in **Explore UI** and **CLI** or _native_ **Grafana** compatibility.
+- **Secure**: Keep control of your data, using **ClickHouse** or **InfluxDB IOx** on top of disposable **S3** storage.
+- **Unmetered**: Unlimited FOSS deployments or **qryn.cloud** option with advanced features and high performance.
+- **Effective**: Do more with less, replace lock-in competitors for a fractions of the cost and complexity.
+
+
+## 🚀 [Get Started](https://qryn.metrico.in/#/installation)
+
+* Deploy qryn OSS using the [documentation](https://qryn.metrico.in/#/installation) or get help in our [Matrix room](https://matrix.to/#/#qryn:matrix.org) :octocat:
+* Create a free account on [qryn.cloud](https://qryn.cloud) and get polyglot without moving a finger ☁️
+
-## 🚀 [Get Started](https://qryn.metrico.in/#/installation)
+
-:octocat: Get qryn OSS up and running on-prem in no time using the [Documentation](https://qryn.metrico.in/#/installation) or join our [Matrix Room](https://matrix.to/#/#qryn:matrix.org)
+## Features
-☁️ Create a free account on [qryn.cloud](https://qryn.cloud) and go straight to production at any scale with **polyglot confidence**.
+💡 _**qryn** independently implements popular observability standards, protocols and query languages:_
+
+
+### 📚 OpenTelemetry
+
+⚡ **qryn** is officially integrated with [opentelemetry](https://github.com/metrico/otel-collector) supports _any log, trace or metric format_
+
+Ingested data can be queried using any of the avialable qryn APIs _(LogQL, PromQL, TraceQL)_
-## Supported Features
+### 📚 Loki + LogQL
-### 📚 OpenTelemetry
-qryn fully supports opentelemetry and comes with a powerful [otel-collector](https://github.com/metrico/otel-collector) distribution supporting _any log, trace or metric format_ and writing directly to ClickHouse _qryn tables_ ready to be consumed through any query API.
+> Any Loki compatible client or application can be used with qryn out of the box
+
+⚡ **qryn** implements the [Loki API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) for transparent compatibility with **[LogQL](https://grafana.com/docs/loki/latest/query/)** clients
-### 📚 LogQL
-qryn implements a complete [LogQL API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) to provide transparent compatibility with Loki clients
The Grafana Loki datasource can be used to natively browse and query _logs_ and display extracted _timeseries_
@@ -59,11 +66,16 @@ The Grafana Loki datasource can be used to natively browse and query _logs_ and
:tada: _No plugins needed_
+
-### 📈 Prometheus
-qryn implements a complete [Prometheus API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) to provide transparent compatibility with Prometheus clients
-The Grafana Prometheus datasource can be used to natively browse and query _metrics_ and display extracted _timeseries_
+### 📈 Prometheus + PromQL
+
+> Any Prometheus compatible client or application can be used with qryn out of the box
+
+⚡ **qryn** implements the [Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/) for transparent **[PromQL](https://prometheus.io/docs/prometheus/latest/querying/basics/)** compatibility using WASM 🏆
+
+The Grafana Prometheus datasource can be used to natively to query _metrics_ and display _timeseries_
@@ -71,11 +83,16 @@ The Grafana Prometheus datasource can be used to natively browse and query _metr
:tada: _No plugins needed_
+
-### 🕛 Tempo
-qryn implements the [Tempo API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) to provide transparent compatibility with Tempo/OTLP clients.
-The Tempo datasource can be used to natively query _traces_ including _beta search_ and _service graphs_
+### 🕛 Tempo + TraceQL
+
+⚡ **qryn** implements the [Tempo API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) for transparent compatibility with **[TraceQL](https://grafana.com/docs/tempo/latest/traceql/)** clients.
+
+> Any Tempo/Opentelemetry compatible client or application can be used with qryn out of the box
+
+The Tempo datasource can be used to natively query _traces_ including _**TraceQL**_ and supporting _service graphs_
@@ -85,8 +102,20 @@ The Tempo datasource can be used to natively query _traces_ including _beta sear
-### ↔️ Correlation
-Data correlation made simple with dynamic **links** between _logs, metrics and traces_
+### 📚 Other Vendors
+
+**qryn** can ingest data using the [InfluxDB, DataDog, Elastic](https://qryn.metrico.in/#/support) and other vendors.
+
+
+
+
+With **qryn** and **grafana** everything _just works_ right out of the box:
+
+- Native datasource support without any plugin or extension
+- Advanced Correlation between Logs, Metrics and Traces
+- Service Graphs and Service Status Panels, and all the cool features
+
+
@@ -94,7 +123,9 @@ Data correlation made simple with dynamic **links** between _logs, metrics and t
-### :eye: View
+
+
+### :eye: Explore View
No Grafana? No Problem. **qryn** ships with **view** - it's own lightweight data exploration tool
@@ -121,9 +152,9 @@ Whether it's code, documentation or grammar, we ❤️ all contributions. Not su
[![Contributors for @metrico/qryn](https://contributors-img.web.app/image?repo=lmangani/cloki)](https://github.com/metrico/qryn/graphs/contributors)
-[![Stargazers repo roster for @metrico/qryn](https://reporoster.com/stars/metrico/qryn)](https://github.com/metrico/qryn/stargazers)
+[![Stargazers repo roster for @metrico/qryn](https://bytecrank.com/nastyox/reporoster/php/stargazersSVG.php?user=metrico&repo=qryn)](https://github.com/metrico/qryn/stargazers)
-[![Forkers repo roster for @metrico/qryn](https://reporoster.com/forks/metrico/qryn)](https://github.com/metrico/qryn/network/members)
+[![Forkers repo roster for @metrico/qryn](https://bytecrank.com/nastyox/reporoster/php/forkersSVG.php?user=metrico&repo=qryn)](https://github.com/metrico/qryn/network/members)
#### License
diff --git a/common.js b/common.js
index acf354a2..5936707d 100644
--- a/common.js
+++ b/common.js
@@ -125,3 +125,13 @@ module.exports.isCustomSamplesOrderingRule = () => {
module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*'
module.exports.clusterName = process.env.CLUSTER_NAME
+
+module.exports.readonly = process.env.READONLY || false
+
+module.exports.bun = () => {
+ try {
+ return Bun
+ } catch (err) {
+ return false
+ }
+}
diff --git a/docker/docker-compose-centos.yml b/docker/docker-compose-centos.yml
index 7a142343..9beb607f 100644
--- a/docker/docker-compose-centos.yml
+++ b/docker/docker-compose-centos.yml
@@ -39,4 +39,4 @@ services:
container_name: centos
volumes:
- ../:/opt/qryn
- entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.js'
+ entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.mjs'
diff --git a/lib/bun_wrapper.js b/lib/bun_wrapper.js
new file mode 100644
index 00000000..6fe38a2c
--- /dev/null
+++ b/lib/bun_wrapper.js
@@ -0,0 +1,168 @@
+const { Transform } = require('stream')
+const log = require('./logger')
+const { EventEmitter } = require('events')
+
+class BodyStream extends Transform {
+ _transform (chunk, encoding, callback) {
+ callback(null, chunk)
+ }
+
+ once (event, listerer) {
+ const self = this
+ const _listener = (e) => {
+ listerer(e)
+ self.removeListener(event, _listener)
+ }
+ this.on(event, _listener)
+ }
+}
+
+const wrapper = (handler, parsers) => {
+ /**
+ * @param ctx {Request}
+ */
+ const res = async (ctx, server) => {
+ let response = ''
+ let status = 200
+ let reqBody = ''
+ let headers = {}
+ log.info(`${ctx.url}`)
+
+ const stream = new BodyStream()
+ setTimeout(async () => {
+ if (!ctx.body) {
+ stream.end()
+ return
+ }
+ for await (const chunk of ctx.body) {
+ stream.write(chunk)
+ }
+ stream.end()
+ })
+ const req = {
+ headers: Object.fromEntries(ctx.headers.entries()),
+ raw: stream,
+ log: log,
+ params: ctx.params || {},
+ query: {}
+ }
+ for (const [key, value] of (new URL(ctx.url)).searchParams) {
+ if (!(key in req.query)) {
+ req.query[key] = value
+ continue
+ }
+ req.query[key] = Array.isArray(req.query[key])
+ ? [...req.query[key], value]
+ : [req.query[key], value]
+ }
+ const res = {
+ send: (msg) => {
+ response = msg
+ },
+ code: (code) => {
+ status = code
+ return res
+ },
+ header: (key, value) => {
+ headers[key] = value
+ return res
+ },
+ headers: (hdrs) => {
+ headers = { ...headers, ...hdrs }
+ return res
+ }
+ }
+
+ if (parsers) {
+ const contentType = (ctx.headers.get('Content-Type') || '')
+ let ok = false
+ for (const [type, parser] of Object.entries(parsers)) {
+ if (type !== '*' && contentType.indexOf(type) > -1) {
+ log.debug(`parsing ${type}`)
+ reqBody = await parser(req, stream)
+ ok = true
+ log.debug(`parsing ${type} ok`)
+ }
+ }
+ if (!ok && parsers['*']) {
+ log.debug('parsing *')
+ reqBody = await parsers['*'](req, stream)
+ ok = true
+ log.debug('parsing * ok')
+ }
+ if (!ok) {
+ throw new Error('undefined content type ' + contentType)
+ }
+ }
+
+ req.body = reqBody || stream
+
+ let result = handler(req, res)
+ if (result && result.then) {
+ result = await result
+ }
+ if (result && result.on) {
+ response = ''
+ result.on('data', (d) => {
+ response += d
+ })
+ await new Promise((resolve, reject) => {
+ result.on('end', resolve)
+ result.on('error', reject)
+ result.on('close', resolve)
+ })
+ result = null
+ }
+ if (result) {
+ response = result
+ }
+ if (response instanceof Object && typeof response !== 'string' && !Buffer.isBuffer(response)) {
+ response = JSON.stringify(response)
+ }
+ return new Response(response, { status: status, headers: headers })
+ }
+ return res
+}
+
+const wsWrapper = (handler) => {
+ /**
+ * @param ctx {Request}
+ */
+ const res = {
+ open: async (ctx, server) => {
+ const req = {
+ headers: Object.fromEntries(ctx.data.ctx.headers.entries()),
+ log: log,
+ query: {}
+ }
+ for (const [key, value] of (new URL(ctx.data.ctx.url)).searchParams) {
+ if (!(key in req.query)) {
+ req.query[key] = value
+ continue
+ }
+ req.query[key] = Array.isArray(req.query[key])
+ ? [...req.query[key], value]
+ : [req.query[key], value]
+ }
+
+ ctx.closeEmitter = new EventEmitter()
+ ctx.closeEmitter.send = ctx.send.bind(ctx)
+
+ const ws = {
+ socket: ctx.closeEmitter
+ }
+
+ const result = handler(ws, { query: req.query })
+ if (result && result.then) {
+ await result
+ }
+ },
+ close: (ctx) => { ctx.closeEmitter.emit('close') }
+ }
+ return res
+}
+
+module.exports = {
+ wrapper,
+ wsWrapper
+}
diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js
index 6e7ab8bf..a704fe5f 100644
--- a/lib/db/clickhouse.js
+++ b/lib/db/clickhouse.js
@@ -17,14 +17,6 @@ const dist = clusterName ? '_dist' : ''
/* DB Helper */
const ClickHouse = require('@apla/clickhouse')
-const clickhouseOptions = {
- host: process.env.CLICKHOUSE_SERVER || 'localhost',
- port: process.env.CLICKHOUSE_PORT || 8123,
- auth: process.env.CLICKHOUSE_AUTH || 'default:',
- protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:',
- readonly: !!process.env.READONLY,
- queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' }
-}
const transpiler = require('../../parser/transpiler')
const rotationLabels = process.env.LABELS_DAYS || 7
@@ -33,9 +25,9 @@ const axios = require('axios')
const { samplesTableName, samplesReadTableName } = UTILS
const path = require('path')
const { Transform } = require('stream')
-const { CORS } = require('../../common')
-
-const protocol = process.env.CLICKHOUSE_PROTO || 'http'
+const { CORS, bun } = require('../../common')
+const clickhouseOptions = require('./clickhouse_options').databaseOptions
+const { getClickhouseUrl } = require('./clickhouse_options')
// External Storage Policy for Tables (S3, MINIO)
const storagePolicy = process.env.STORAGE_POLICY || false
@@ -76,7 +68,8 @@ const conveyor = {
let throttler = null
const resolvers = {}
const rejectors = {}
-if (isMainThread) {
+let first = false
+if (isMainThread && !bun()) {
throttler = new Worker(path.join(__dirname, 'throttler.js'))
throttler.on('message', (msg) => {
switch (msg.status) {
@@ -90,8 +83,29 @@ if (isMainThread) {
delete resolvers[msg.id]
delete rejectors[msg.id]
})
+} else if (isMainThread && !first) {
+ first = true
+ const _throttler = require('./throttler')
+ throttler = {
+ on: _throttler.on,
+ postMessage: _throttler.postMessage,
+ removeAllListeners: _throttler.removeAllListeners,
+ terminate: _throttler.terminate
+ }
+ _throttler.init()
+ throttler.on('message', (msg) => {
+ switch (msg.status) {
+ case 'ok':
+ resolvers[msg.id]()
+ break
+ case 'err':
+ rejectors[msg.id](new Error('Database push error'))
+ break
+ }
+ delete resolvers[msg.id]
+ delete rejectors[msg.id]
+ })
}
-
// timeSeriesv2Throttler.start();
/* Cache Helper */
@@ -348,10 +362,6 @@ function pushOTLP (traces) {
})
}
-function getClickhouseUrl () {
- return `${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}`
-}
-
/**
* @param query {{
* query: string,
@@ -455,6 +465,7 @@ const queryTempoScanV2 = async function (query) {
}
const limit = query.limit ? `LIMIT ${parseInt(query.limit)}` : ''
const sql = `${select} ${from} WHERE ${where.join(' AND ')} ORDER BY timestamp_ns DESC ${limit} FORMAT JSON`
+ console.log(sql)
const resp = await rawRequest(sql, null, process.env.CLICKHOUSE_DB || 'cloki')
return resp.data.data ? resp.data.data : JSON.parse(resp.data).data
}
@@ -815,7 +826,13 @@ const outputTempoSearch = async (dataStream, res) => {
*/
const preprocessStream = (rawStream, processors) => {
let dStream = StringStream.from(rawStream.data).lines().endWith(JSON.stringify({ EOF: true }))
- .map(chunk => chunk ? JSON.parse(chunk) : ({}), DataStream)
+ .map(chunk => {
+ try {
+ return chunk ? JSON.parse(chunk) : ({})
+ } catch (e) {
+ return {}
+ }
+ }, DataStream)
.map(chunk => {
try {
if (!chunk || !chunk.labels) {
@@ -1333,15 +1350,16 @@ const samplesReadTable = {
* @param query {string}
* @param data {string | Buffer | Uint8Array}
* @param database {string}
+ * @param config {Object?}
* @returns {Promise>}
*/
-const rawRequest = (query, data, database) => {
+const rawRequest = (query, data, database, config) => {
const getParams = [
(database ? `database=${encodeURIComponent(database)}` : null),
(data ? `query=${encodeURIComponent(query)}` : null)
].filter(p => p)
const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}`
- return axios.post(url, data || query)
+ return axios.post(url, data || query, config)
}
/**
diff --git a/lib/db/clickhouse_options.js b/lib/db/clickhouse_options.js
new file mode 100644
index 00000000..2db4510b
--- /dev/null
+++ b/lib/db/clickhouse_options.js
@@ -0,0 +1,22 @@
+const UTILS = require('../utils')
+const { samplesTableName, samplesReadTableName } = UTILS
+
+const clickhouseOptions = {
+ host: process.env.CLICKHOUSE_SERVER || 'localhost',
+ port: process.env.CLICKHOUSE_PORT || 8123,
+ auth: process.env.CLICKHOUSE_AUTH || 'default:',
+ protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:',
+ readonly: !!process.env.READONLY,
+ queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' }
+}
+
+function getClickhouseUrl () {
+ return `${clickhouseOptions.protocol}//${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}`
+}
+
+module.exports = {
+ samplesTableName,
+ samplesReadTableName,
+ getClickhouseUrl,
+ databaseOptions: clickhouseOptions
+}
diff --git a/lib/db/throttler.js b/lib/db/throttler.js
index 21a3250c..8d84495e 100644
--- a/lib/db/throttler.js
+++ b/lib/db/throttler.js
@@ -1,10 +1,11 @@
const { isMainThread, parentPort } = require('worker_threads')
const axios = require('axios')
-const { getClickhouseUrl, samplesTableName } = require('./clickhouse')
-const clickhouseOptions = require('./clickhouse').databaseOptions
+const { getClickhouseUrl, samplesTableName } = require('./clickhouse_options')
+const clickhouseOptions = require('./clickhouse_options').databaseOptions
const logger = require('../logger')
const clusterName = require('../../common').clusterName
const dist = clusterName ? '_dist' : ''
+const { EventEmitter } = require('events')
const axiosError = async (err) => {
try {
@@ -71,14 +72,45 @@ const tracesThottler = new TimeoutThrottler(
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
FORMAT JSONEachRow`)
-if (isMainThread) {
- module.exports = {
- samplesThrottler,
- timeSeriesThrottler,
- TimeoutThrottler
+const emitter = new EventEmitter()
+let on = true
+const postMessage = message => {
+ const genericRequest = (throttler) => {
+ throttler.queue.push(message.data)
+ throttler.resolvers.push(() => {
+ if (isMainThread) {
+ emitter.emit('message', { status: 'ok', id: message.id })
+ return
+ }
+ parentPort.postMessage({ status: 'ok', id: message.id })
+ })
+ throttler.rejects.push(() => {
+ if (isMainThread) {
+ emitter.emit('message', { status: 'err', id: message.id })
+ return
+ }
+ parentPort.postMessage({ status: 'err', id: message.id })
+ })
}
-} else {
- let on = true
+ switch (message.type) {
+ case 'end':
+ on = false
+ if (!isMainThread) {
+ parentPort.removeAllListeners('message')
+ }
+ break
+ case 'values':
+ genericRequest(samplesThrottler)
+ break
+ case 'labels':
+ genericRequest(timeSeriesThrottler)
+ break
+ case 'traces':
+ genericRequest(tracesThottler)
+ }
+}
+
+const init = () => {
setTimeout(async () => {
// eslint-disable-next-line no-unmodified-loop-condition
while (on) {
@@ -96,29 +128,25 @@ if (isMainThread) {
}
}
}, 0)
- parentPort.on('message', message => {
- const genericRequest = (throttler) => {
- throttler.queue.push(message.data)
- throttler.resolvers.push(() => {
- parentPort.postMessage({ status: 'ok', id: message.id })
- })
- throttler.rejects.push(() => {
- parentPort.postMessage({ status: 'err', id: message.id })
- })
- }
- switch (message.type) {
- case 'end':
- on = false
- parentPort.removeAllListeners('message')
- break
- case 'values':
- genericRequest(samplesThrottler)
- break
- case 'labels':
- genericRequest(timeSeriesThrottler)
- break
- case 'traces':
- genericRequest(tracesThottler)
+}
+
+if (isMainThread) {
+ module.exports = {
+ samplesThrottler,
+ timeSeriesThrottler,
+ tracesThottler,
+ TimeoutThrottler,
+ postMessage,
+ on: emitter.on.bind(emitter),
+ removeAllListeners: emitter.removeAllListeners.bind(emitter),
+ init,
+ terminate: () => {
+ postMessage({ type: 'end' })
}
+ }
+} else {
+ init()
+ parentPort.on('message', message => {
+ postMessage(message)
})
}
diff --git a/lib/db/zipkin.js b/lib/db/zipkin.js
index 2837c727..3920dcfc 100644
--- a/lib/db/zipkin.js
+++ b/lib/db/zipkin.js
@@ -26,7 +26,13 @@ module.exports = class {
* @returns {string}
*/
toJson () {
- return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val)
+ const res = {
+ ...this,
+ timestamp_ns: this.timestamp_ns.toString(),
+ duration_ns: this.duration_ns.toString()
+ }
+ return JSON.stringify(res)
+ //return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val)
}
/**
diff --git a/lib/handlers/404.js b/lib/handlers/404.js
index bf1cb337..daba3363 100644
--- a/lib/handlers/404.js
+++ b/lib/handlers/404.js
@@ -1,6 +1,6 @@
function handler (req, res) {
req.log.debug('unsupported', req.url)
- return res.send('404 Not Supported')
+ return res.code(404).send('404 Not Supported')
}
module.exports = handler
diff --git a/lib/handlers/datadog_log_push.js b/lib/handlers/datadog_log_push.js
index cbb89883..a1cd8677 100644
--- a/lib/handlers/datadog_log_push.js
+++ b/lib/handlers/datadog_log_push.js
@@ -18,6 +18,11 @@
*/
const stringify = require('../utils').stringify
+const DATABASE = require('../db/clickhouse')
+const { bulk_labels, bulk, labels } = DATABASE.cache
+const { fingerPrint } = require('../utils')
+const { readonly } = require('../../common')
+
const tagsToObject = (data, delimiter = ',') =>
Object.fromEntries(data.split(',').map(v => {
const fields = v.split(':')
@@ -25,13 +30,12 @@ const tagsToObject = (data, delimiter = ',') =>
}))
async function handler (req, res) {
- const self = this
req.log.debug('Datadog Log Index Request')
if (!req.body) {
req.log.error('No Request Body or Target!')
return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
}
- if (this.readonly) {
+ if (readonly) {
req.log.error('Readonly! No push support.')
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}
@@ -69,9 +73,9 @@ async function handler (req, res) {
}
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
- finger = self.fingerPrint(strJson)
+ finger = fingerPrint(strJson)
// Store Fingerprint
- promises.push(self.bulk_labels.add([[
+ promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
@@ -79,8 +83,8 @@ async function handler (req, res) {
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
- self.labels.add('_LABELS_', key)
- self.labels.add(key, JSONLabels[key])
+ labels.add('_LABELS_', key)
+ labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err }, 'failed ingesting datadog log')
@@ -94,7 +98,7 @@ async function handler (req, res) {
stream.message
]
req.log.debug({ finger, values }, 'store')
- promises.push(self.bulk.add([values]))
+ promises.push(bulk.add([values]))
})
}
await Promise.all(promises)
diff --git a/lib/handlers/datadog_series_push.js b/lib/handlers/datadog_series_push.js
index f7f92420..58cf1863 100644
--- a/lib/handlers/datadog_series_push.js
+++ b/lib/handlers/datadog_series_push.js
@@ -25,16 +25,19 @@
*/
const stringify = require('../utils').stringify
+const DATABASE = require('../db/clickhouse')
+const { bulk_labels, bulk, labels } = DATABASE.cache
+const { fingerPrint } = require('../utils')
+const { readonly } = require('../../common')
async function handler (req, res) {
- const self = this
req.log.debug('Datadog Series Index Request')
if (!req.body) {
req.log.error('No Request Body!')
res.code(500).send()
return
}
- if (this.readonly) {
+ if (readonly) {
req.log.error('Readonly! No push support.')
res.code(500).send()
return
@@ -63,18 +66,18 @@ async function handler (req, res) {
}
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
- finger = self.fingerPrint(strJson)
- self.labels.add(finger.toString(), stream.labels)
+ finger = fingerPrint(strJson)
+ labels.add(finger.toString(), stream.labels)
// Store Fingerprint
- promises.push(self.bulk_labels.add([[
+ promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.__name__ || 'undefined'
]]))
for (const key in JSONLabels) {
- self.labels.add('_LABELS_', key)
- self.labels.add(key, JSONLabels[key])
+ labels.add('_LABELS_', key)
+ labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err })
@@ -97,7 +100,7 @@ async function handler (req, res) {
entry.value,
JSONLabels.__name__ || 'undefined'
]
- promises.push(self.bulk.add([values]))
+ promises.push(bulk.add([values]))
})
}
})
diff --git a/lib/handlers/elastic_bulk.js b/lib/handlers/elastic_bulk.js
index afa3a418..f7668539 100644
--- a/lib/handlers/elastic_bulk.js
+++ b/lib/handlers/elastic_bulk.js
@@ -8,15 +8,18 @@
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify
+const DATABASE = require('../db/clickhouse')
+const { bulk_labels, bulk, labels } = DATABASE.cache
+const { fingerPrint } = require('../utils')
+const { readonly } = require('../../common')
async function handler (req, res) {
- const self = this
req.log.debug('ELASTIC Bulk Request')
if (!req.body) {
asyncLogError('No Request Body or Target!' + req.body, req.log)
return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
}
- if (this.readonly) {
+ if (readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}
@@ -38,6 +41,9 @@ async function handler (req, res) {
const promises = []
if (streams) {
streams.forEach(function (stream) {
+ if (!stream) {
+ return
+ }
try {
stream = JSON.parse(stream)
} catch (err) { asyncLogError(err, req.log); return };
@@ -67,10 +73,10 @@ async function handler (req, res) {
}
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
- finger = self.fingerPrint(strJson)
+ finger = fingerPrint(strJson)
req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT')
// Store Fingerprint
- promises.push(self.bulk_labels.add([[
+ promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
@@ -78,8 +84,8 @@ async function handler (req, res) {
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
- self.labels.add('_LABELS_', key)
- self.labels.add(key, JSONLabels[key])
+ labels.add('_LABELS_', key)
+ labels.add(key, JSONLabels[key])
}
} catch (err) {
asyncLogError(err, req.log)
@@ -93,7 +99,7 @@ async function handler (req, res) {
JSON.stringify(stream) || stream
]
req.log.debug({ finger, values }, 'store')
- promises.push(self.bulk.add([values]))
+ promises.push(bulk.add([values]))
// Reset State, Expect Command
lastTags = false
diff --git a/lib/handlers/elastic_index.js b/lib/handlers/elastic_index.js
index 19528092..ee314c45 100644
--- a/lib/handlers/elastic_index.js
+++ b/lib/handlers/elastic_index.js
@@ -11,15 +11,19 @@
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify
+const DATABASE = require('../db/clickhouse')
+const { bulk_labels, bulk, labels } = DATABASE.cache
+const { fingerPrint } = require('../utils')
+const { readonly } = require('../../common')
+
async function handler (req, res) {
- const self = this
req.log.debug('ELASTIC Index Request')
if (!req.body || !req.params.target) {
asyncLogError('No Request Body or Target!', req.log)
return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
}
- if (this.readonly) {
+ if (readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}
@@ -57,9 +61,9 @@ async function handler (req, res) {
}
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
- finger = self.fingerPrint(strJson)
+ finger = fingerPrint(strJson)
// Store Fingerprint
- promises.push(self.bulk_labels.add([[
+ promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
@@ -67,8 +71,8 @@ async function handler (req, res) {
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
- self.labels.add('_LABELS_', key)
- self.labels.add(key, JSONLabels[key])
+ labels.add('_LABELS_', key)
+ labels.add(key, JSONLabels[key])
}
} catch (err) {
asyncLogError(err, req.log)
@@ -87,7 +91,7 @@ async function handler (req, res) {
JSON.stringify(stream) || stream
]
req.log.debug({ finger, values }, 'store')
- promises.push(self.bulk.add([values]))
+ promises.push(bulk.add([values]))
})
}
await Promise.all(promises)
diff --git a/lib/handlers/influx_write.js b/lib/handlers/influx_write.js
index 5563f8a9..42a93103 100644
--- a/lib/handlers/influx_write.js
+++ b/lib/handlers/influx_write.js
@@ -39,14 +39,17 @@
const stringify = require('../utils').stringify
const influxParser = require('../influx')
const { asyncLogError, errors } = require('../../common')
+const DATABASE = require('../db/clickhouse')
+const { bulk_labels, bulk, labels } = DATABASE.cache
+const { fingerPrint } = require('../utils')
+const { readonly } = require('../../common')
async function handler (req, res) {
- const self = this
if (!req.body && !req.body.metrics) {
asyncLogError('No Request Body!', req.log)
return
}
- if (self.readonly) {
+ if (readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.code(500).send('')
}
@@ -75,10 +78,10 @@ async function handler (req, res) {
}
// Calculate Fingerprint
const strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort()))
- finger = self.fingerPrint(strLabels)
- self.labels.add(finger.toString(), stream.labels)
+ finger = fingerPrint(strLabels)
+ labels.add(finger.toString(), stream.labels)
// Store Fingerprint
- self.bulk_labels.add([[
+ bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strLabels,
@@ -86,8 +89,8 @@ async function handler (req, res) {
]])
for (const key in JSONLabels) {
// req.log.debug({ key, data: JSONLabels[key] }, 'Storing label');
- self.labels.add('_LABELS_', key)
- self.labels.add(key, JSONLabels[key])
+ labels.add('_LABELS_', key)
+ labels.add(key, JSONLabels[key])
}
} catch (err) {
asyncLogError(err, req.log)
@@ -111,7 +114,7 @@ async function handler (req, res) {
value || 0,
key || ''
]
- self.bulk.add([values])
+ bulk.add([values])
}
/* logs or syslog */
} else if (stream.measurement === 'syslog' || JSONFields.message) {
@@ -123,7 +126,7 @@ async function handler (req, res) {
null,
JSONFields.message
]
- self.bulk.add([values])
+ bulk.add([values])
}
})
}
diff --git a/lib/handlers/newrelic_log_push.js b/lib/handlers/newrelic_log_push.js
index c4b6fb6a..dda46c96 100644
--- a/lib/handlers/newrelic_log_push.js
+++ b/lib/handlers/newrelic_log_push.js
@@ -31,15 +31,18 @@
const { QrynBadRequest } = require('./errors')
const stringify = require('../utils').stringify
+const DATABASE = require('../db/clickhouse')
+const { bulk_labels, bulk, labels } = DATABASE.cache
+const { fingerPrint } = require('../utils')
+const { readonly } = require('../../common')
async function handler (req, res) {
- const self = this
req.log.debug('NewRelic Log Index Request')
if (!req.body) {
req.log.error('No Request Body')
throw new QrynBadRequest('No request body')
}
- if (this.readonly) {
+ if (readonly) {
req.log.error('Readonly! No push support.')
throw new QrynBadRequest('Read only mode')
}
@@ -77,12 +80,12 @@ async function handler (req, res) {
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
- finger = self.fingerPrint(strJson)
+ finger = fingerPrint(strJson)
// Store Fingerprint
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
- self.labels.add('_LABELS_', key)
- self.labels.add(key, JSONLabels[key])
+ labels.add('_LABELS_', key)
+ labels.add(key, JSONLabels[key])
}
const dates = {}
@@ -99,11 +102,11 @@ async function handler (req, res) {
null,
log.message
]
- promises.push(self.bulk.add([values]))
+ promises.push(bulk.add([values]))
})
}
for (const d of Object.keys(dates)) {
- promises.push(self.bulk_labels.add([[
+ promises.push(bulk_labels.add([[
d,
finger,
strJson,
diff --git a/lib/handlers/otlp_push.js b/lib/handlers/otlp_push.js
index 1c93d30d..73a62d1e 100644
--- a/lib/handlers/otlp_push.js
+++ b/lib/handlers/otlp_push.js
@@ -17,16 +17,9 @@
}]
*/
-const { Transform } = require('stream')
const { asyncLogError } = require('../../common')
-
-function handleOne (req, streams, promises) {
- const self = this
- streams.on('data', function (stream) {
- stream = stream.value
- promises.push(self.pushZipkin([stream]))
- })
-}
+const { pushOTLP } = require('../db/clickhouse')
+const { readonly } = require('../../common')
async function handler (req, res) {
req.log.debug('POST /tempo/api/push')
@@ -34,7 +27,7 @@ async function handler (req, res) {
asyncLogError('No Request Body!', req.log)
return res.code(500).send()
}
- if (this.readonly) {
+ if (readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.code(500).send()
}
@@ -53,7 +46,7 @@ async function handler (req, res) {
spans.push.apply(spans, scope.spans)
}
}
- await this.pushOTLP(spans)
+ await pushOTLP(spans)
return res.code(200).send('OK')
}
diff --git a/lib/handlers/prom_push.js b/lib/handlers/prom_push.js
index 9fcf36ae..b841b2fe 100644
--- a/lib/handlers/prom_push.js
+++ b/lib/handlers/prom_push.js
@@ -13,6 +13,10 @@
*/
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify
+const DATABASE = require('../db/clickhouse')
+const { bulk_labels, bulk, labels } = DATABASE.cache
+const { fingerPrint } = require('../utils')
+const { readonly } = require('../../common')
async function handler (req, res) {
const self = this
@@ -21,7 +25,7 @@ async function handler (req, res) {
asyncLogError('No Request Body!', req.log)
return res.code(500).send()
}
- if (this.readonly) {
+ if (readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.code(500).send()
}
@@ -41,14 +45,12 @@ async function handler (req, res) {
}, {})
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
- finger = self.fingerPrint(strJson)
- req.log.debug({ labels: stream.labels, finger }, 'LABELS FINGERPRINT')
- self.labels.add(finger.toString(), stream.labels)
+ finger = fingerPrint(strJson)
+ labels.add(finger.toString(), stream.labels)
const dates = {}
if (stream.samples) {
stream.samples.forEach(function (entry) {
- req.log.debug({ entry, finger }, 'BULK ROW')
if (
!entry &&
!entry.timestamp &&
@@ -67,20 +69,20 @@ async function handler (req, res) {
dates[
new Date(parseInt((ts / BigInt('1000000')).toString())).toISOString().split('T')[0]
] = 1
- promises.push(self.bulk.add([values]))
+ promises.push(bulk.add([values]))
})
}
for (const d of Object.keys(dates)) {
// Store Fingerprint
- promises.push(self.bulk_labels.add([[
+ promises.push(bulk_labels.add([[
d,
finger,
strJson,
JSONLabels.__name__ || 'undefined'
]]))
for (const key in JSONLabels) {
- self.labels.add('_LABELS_', key)
- self.labels.add(key, JSONLabels[key])
+ labels.add('_LABELS_', key)
+ labels.add(key, JSONLabels[key])
}
}
} catch (err) {
diff --git a/lib/handlers/prom_query.js b/lib/handlers/prom_query.js
index 91a1ec70..59935bef 100644
--- a/lib/handlers/prom_query.js
+++ b/lib/handlers/prom_query.js
@@ -1,12 +1,11 @@
/* Emulated PromQL Query Handler */
-const { p2l } = require('@qxip/promql2logql');
const { asyncLogError, CORS } = require('../../common')
+const { instantQuery } = require('../../promql')
const empty = '{"status" : "success", "data" : {"resultType" : "scalar", "result" : []}}'; // to be removed
const test = () => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, "2"]}}`; // to be removed
const exec = (val) => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, val]}}`; // to be removed
-
async function handler (req, res) {
req.log.debug('GET /loki/api/v1/query')
const resp = {
@@ -24,34 +23,23 @@ async function handler (req, res) {
}
if (req.query.query === '1+1') {
return res.status(200).send(test())
- }
- else if (!isNaN(parseInt(req.query.query))) {
+ } else if (!isNaN(parseInt(req.query.query))) {
return res.status(200).send(exec(parseInt(req.query.query)))
}
/* remove newlines */
req.query.query = req.query.query.replace(/\n/g, ' ')
+ req.query.time = req.query.time ? parseInt(req.query.time) * 1000 : Date.now()
/* transpile to logql */
try {
- req.query.query = p2l(req.query.query)
- } catch(e) {
- asyncLogError({ e }, req.log)
- return res.send(empty)
- }
- /* scan fingerprints */
- /* TODO: handle time tag + direction + limit to govern the query output */
- try {
- const response = await this.instantQueryScan(
- req.query
- )
- res.code(200)
- res.headers({
- 'Content-Type': 'application/json',
- 'Access-Control-Allow-Origin': CORS
- })
- return response
+ const response = await instantQuery(req.query.query, req.query.time)
+ return res.code(200)
+ .headers({
+ 'Content-Type': 'application/json',
+ 'Access-Control-Allow-Origin': CORS
+ }).send(response)
} catch (err) {
asyncLogError(err, req.log)
- return res.send(empty)
+ return res.code(500).send(JSON.stringify({ status: 'error', error: err.message }))
}
}
diff --git a/lib/handlers/prom_query_range.js b/lib/handlers/prom_query_range.js
index 74cd3460..df37c1ab 100644
--- a/lib/handlers/prom_query_range.js
+++ b/lib/handlers/prom_query_range.js
@@ -9,48 +9,19 @@
regexp: a regex to filter the returned results, will eventually be rolled into the query language
*/
-const { p2l } = require('@qxip/promql2logql')
-const { asyncLogError, CORS } = require('../../common')
+const { rangeQuery } = require('../../promql/index')
async function handler (req, res) {
req.log.debug('GET /api/v1/query_range')
- const resp = {
- status: "success",
- data: {
- resultType: "vector",
- result: []
- }
- }
- if (req.method === 'POST') {
- req.query = req.body
- }
- if (!req.query.query) {
- return res.send(resp)
- }
- /* remove newlines */
- req.query.query = req.query.query.replace(/\n/g, ' ')
- if (!req.query.query) {
- return res.code(400).send('invalid query')
- }
- // Convert PromQL to LogQL and execute
+ const startMs = parseInt(req.query.start) * 1000 || Date.now() - 60000
+ const endMs = parseInt(req.query.end) * 1000 || Date.now()
+ const stepMs = parseInt(req.query.step) * 1000 || 15000
+ const query = req.query.query
try {
- req.query.query = p2l(req.query.query)
- const response = await this.scanFingerprints(
- {
- ...req.query,
- start: parseInt(req.query.start) * 1e9,
- end: parseInt(req.query.end) * 1e9
- }
- )
- res.code(200)
- res.headers({
- 'Content-Type': 'application/json',
- 'Access-Control-Allow-Origin': CORS
- })
- return response
+ const result = await rangeQuery(query, startMs, endMs, stepMs)
+ return res.code(200).send(result)
} catch (err) {
- asyncLogError(err, req.log)
- return res.send(resp)
+ return res.code(500).send(JSON.stringify({ status: 'error', error: err.message }))
}
}
diff --git a/lib/handlers/prom_series.js b/lib/handlers/prom_series.js
index 50f2c995..d6862b7d 100644
--- a/lib/handlers/prom_series.js
+++ b/lib/handlers/prom_series.js
@@ -1,21 +1,8 @@
const { scanSeries } = require('../db/clickhouse')
const { CORS } = require('../../common')
-const { Compiler } = require('bnf')
const { isArray } = require('handlebars-helpers/lib/array')
const { QrynError } = require('./errors')
-
-const promqlSeriesBnf = `
- ::= | "{" "}" | "{" [] "}"
-label ::= ( | "_") *( | "." | "_" | )
-operator ::= "=~" | "!~" | "!=" | "="
-quoted_str ::= () | () | |
-metric_name ::= label
-label_selector ::=