From c0a7b161f9a4730ae77e898827b0c10484d9bf1a Mon Sep 17 00:00:00 2001 From: Antoine Pultier <45740+fungiboletus@users.noreply.github.com> Date: Mon, 26 Aug 2024 11:24:59 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8C=88=20Work=20in=20progress=20w?= =?UTF-8?q?ith=20Prometheus=20syntax?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 47 ++++ Cargo.toml | 2 + src/datamodel/matchers.rs | 220 ++++++++++++++++++ src/datamodel/mod.rs | 1 + src/importers/lazy_csv.rs | 12 - src/ingestors/http/app_error.rs | 2 + src/ingestors/http/crud.rs | 57 ++++- src/ingestors/http/mod.rs | 2 + src/ingestors/http/prometheus.rs | 33 +++ src/ingestors/http/senml.rs | 136 +++++++++++ src/ingestors/http/server.rs | 10 +- src/ingestors/http/utils.rs | 23 ++ src/parsing/compressed.rs | 70 ++++++ src/parsing/mod.rs | 25 ++ src/storage/bigquery/bigquery_crud.rs | 6 +- .../migrations/20240223133248_init.sql | 5 +- src/storage/bigquery/mod.rs | 5 +- src/storage/duckdb/duckdb_crud.rs | 3 +- src/storage/duckdb/mod.rs | 4 +- .../migrations/20240110123122_init.sql | 7 +- src/storage/postgresql/postgresql.rs | 4 +- src/storage/postgresql/postgresql_crud.rs | 2 + src/storage/rrdcached/mod.rs | 3 + src/storage/sqlite/sqlite.rs | 10 +- src/storage/sqlite/sqlite_crud.rs | 6 +- src/storage/storage.rs | 6 +- .../migrations/20240223133248_init.sql | 7 +- src/storage/timescaledb/timescaledb.rs | 5 +- 28 files changed, 672 insertions(+), 41 deletions(-) create mode 100644 src/datamodel/matchers.rs delete mode 100644 src/importers/lazy_csv.rs create mode 100644 src/ingestors/http/senml.rs create mode 100644 src/ingestors/http/utils.rs create mode 100644 src/parsing/compressed.rs diff --git a/Cargo.lock b/Cargo.lock index dda6155..9e19966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1476,6 +1476,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enquote" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06c36cb11dbde389f4096111698d8b567c0720e3452fd5ac3e6b4e47e1939932" +dependencies = [ + "thiserror", +] + [[package]] name = "enum_dispatch" version = "0.3.13" @@ -3180,6 +3189,28 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "pest_consume" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79447402d15d18e7142e14c72f2e63fa3d155be1bc5b70b3ccbb610ac55f536b" +dependencies = [ + "pest", + "pest_consume_macros", + "pest_derive", +] + +[[package]] +name = "pest_consume_macros" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d8630a7a899cb344ec1c16ba0a6b24240029af34bdc0a21f84e411d7f793f29" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "pest_derive" version = "2.7.11" @@ -3810,6 +3841,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus-parser" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95ca50b208ff38f0e5f5a2843e81502ca1613d3d224a4d06722ea947b75fe27b" +dependencies = [ + "enquote", + "lazy_static", + "pest", + "pest_consume", + "pest_consume_macros", + "pest_derive", +] + [[package]] name = "prost" version = "0.13.1" @@ -4601,6 +4646,7 @@ dependencies = [ "once_cell", "opcua", "polars", + "prometheus-parser", "prost", "protobuf", "rand", @@ -4634,6 +4680,7 @@ dependencies = [ "utoipa", "utoipa-scalar", "uuid", + "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ed016ad..10b3c62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,3 +100,5 @@ rustls = "0.23" base64 = "0.22" rot13 = "0.1" time = "0.3" +zstd = "0.13" +prometheus-parser = "0.4" diff --git a/src/datamodel/matchers.rs b/src/datamodel/matchers.rs new file mode 100644 index 0000000..7dddfd4 --- /dev/null +++ b/src/datamodel/matchers.rs @@ -0,0 +1,220 @@ +use anyhow::{bail, Result}; +use prometheus_parser::{ + Expression as PrometheusExpression, Label as PrometheusLabel, LabelOp as PrometheusLabelOp, +}; + +#[derive(Debug, Default, PartialEq)] +pub enum StringMatcher { + #[default] + All, + Equal(String), + NotEqual(String), + Match(String), + NotMatch(String), +} + +impl StringMatcher { + fn from_prometheus_label_op(op: PrometheusLabelOp, value: String) -> Self { + match op { + PrometheusLabelOp::Equal => StringMatcher::Equal(value), + PrometheusLabelOp::NotEqual => StringMatcher::NotEqual(value), + PrometheusLabelOp::RegexEqual => StringMatcher::Match(value), + PrometheusLabelOp::RegexNotEqual => StringMatcher::NotMatch(value), + } + } +} + +#[derive(Debug, PartialEq)] +pub struct LabelMatcher { + name: String, + matcher: StringMatcher, +} + +impl LabelMatcher { + fn from_prometheus_label(label: PrometheusLabel) -> Self { + let name = label.key; + let matcher = StringMatcher::from_prometheus_label_op(label.op, label.value); + Self { name, matcher } + } +} + +#[derive(Debug, Default, PartialEq)] +pub struct SensorMatcher { + name_matcher: StringMatcher, + label_matchers: Option>, +} + +impl SensorMatcher { + /// Create a new sensor matcher from a prometheus query. + /// + /// PLease note that this is a subset of the prometheus query language. + /// I find the labels selection syntax neat. The more advanced features, + /// perhaps not so much. + pub fn from_prometheus_query(query: &str) -> Result { + let ast = prometheus_parser::parse_expr(query)?; + Self::from_prometheus_query_ast(ast) + } + + pub fn from_prometheus_query_ast(ast: PrometheusExpression) -> Result { + let selector = match ast { + PrometheusExpression::Selector(selector) => selector, + _ => bail!("Invalid query: it must be a prometheus query selector"), + }; + + if selector.subquery.is_some() || selector.offset.is_some() || selector.range.is_some() { + bail!( + "Invalid query: it must be a simple prometheus query selector, nothing more. sorry" + ); + } + + let mut name_matcher = match selector.metric { + Some(metric_name) => StringMatcher::Equal(metric_name), + None => StringMatcher::All, + }; + + let mut label_matchers = Vec::with_capacity(selector.labels.len()); + + for label in selector.labels { + if label.key == "__name__" { + name_matcher = StringMatcher::from_prometheus_label_op(label.op, label.value); + } else { + label_matchers.push(LabelMatcher::from_prometheus_label(label)); + } + } + + Ok(Self { + name_matcher, + label_matchers: match label_matchers.is_empty() { + true => None, + false => Some(label_matchers), + }, + }) + } +} + +// Test with: +// http_requests_total +// http_requests_total{job="prometheus",group="canary"} +// http_requests_total{environment=~"staging|testing|development",method!="GET"} +// http_requests_total{environment=""} +// http_requests_total{replica!="rep-a",replica=~"rep.*"} +// {__name__=~"job:.*"} +// +// should fail: +// http_requests_total{job="prometheus"}[5m] +// rate(http_requests_total[5m] offset 1w) +// http_requests_total @ 1609746000 +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sensor_matcher_from_prometheus_query() { + let matcher = SensorMatcher::from_prometheus_query("http_requests_total").unwrap(); + assert_eq!( + matcher.name_matcher, + StringMatcher::Equal("http_requests_total".to_string()) + ); + assert!(matcher.label_matchers.is_none()); + + let matcher = SensorMatcher::from_prometheus_query( + "http_requests_total{job=\"prometheus\",group=\"canary\"}", + ) + .unwrap(); + assert_eq!( + matcher.name_matcher, + StringMatcher::Equal("http_requests_total".to_string()) + ); + assert_eq!( + matcher.label_matchers.unwrap(), + vec![ + LabelMatcher { + name: "job".to_string(), + matcher: StringMatcher::Equal("prometheus".to_string()) + }, + LabelMatcher { + name: "group".to_string(), + matcher: StringMatcher::Equal("canary".to_string()) + } + ] + ); + + let matcher = SensorMatcher::from_prometheus_query( + "http_requests_total{environment=~\"staging|testing|development\",method!=\"GET\"}", + ) + .unwrap(); + assert_eq!( + matcher.name_matcher, + StringMatcher::Equal("http_requests_total".to_string()) + ); + assert_eq!( + matcher.label_matchers.unwrap(), + vec![ + LabelMatcher { + name: "environment".to_string(), + matcher: StringMatcher::Match("staging|testing|development".to_string()) + }, + LabelMatcher { + name: "method".to_string(), + matcher: StringMatcher::NotEqual("GET".to_string()) + } + ] + ); + + let matcher = SensorMatcher::from_prometheus_query( + "http_requests_total{environment=\"\",replica!=\"rep-a\",replica=~\"rep.*\"}", + ) + .unwrap(); + assert_eq!( + matcher.name_matcher, + StringMatcher::Equal("http_requests_total".to_string()) + ); + assert_eq!( + matcher.label_matchers.unwrap(), + vec![ + LabelMatcher { + name: "environment".to_string(), + matcher: StringMatcher::Equal("".to_string()) + }, + LabelMatcher { + name: "replica".to_string(), + matcher: StringMatcher::NotEqual("rep-a".to_string()) + }, + LabelMatcher { + name: "replica".to_string(), + matcher: StringMatcher::Match("rep.*".to_string()) + } + ] + ); + + let matcher = SensorMatcher::from_prometheus_query("{__name__=~\"job:.*\"}").unwrap(); + assert_eq!( + matcher.name_matcher, + StringMatcher::Match("job:.*".to_string()) + ); + assert!(matcher.label_matchers.is_none()); + + let matcher = + SensorMatcher::from_prometheus_query("{__name__=\"\\\"quoted_named\\\"\"}").unwrap(); + assert_eq!( + matcher.name_matcher, + StringMatcher::Equal("\"quoted_named\"".to_string()) + ); + assert!(matcher.label_matchers.is_none()); + } + + #[test] + fn test_sensor_matcher_errors() { + assert!(SensorMatcher::from_prometheus_query("").is_err()); + assert!(SensorMatcher::from_prometheus_query("\"wrong{[(@").is_err()); + assert!(SensorMatcher::from_prometheus_query( + "http_requests_total{job=\"prometheus\"}[5m]" + ) + .is_err()); + assert!( + SensorMatcher::from_prometheus_query("rate(http_requests_total[5m] offset 1w)") + .is_err() + ); + assert!(SensorMatcher::from_prometheus_query("http_requests_total @ 1609746000").is_err()); + } +} diff --git a/src/datamodel/mod.rs b/src/datamodel/mod.rs index ff4f389..20cb930 100644 --- a/src/datamodel/mod.rs +++ b/src/datamodel/mod.rs @@ -1,5 +1,6 @@ pub mod batch; pub mod batch_builder; +pub mod matchers; pub mod sample; pub mod sensapp_datetime; pub mod sensapp_labels; diff --git a/src/importers/lazy_csv.rs b/src/importers/lazy_csv.rs deleted file mode 100644 index 7672422..0000000 --- a/src/importers/lazy_csv.rs +++ /dev/null @@ -1,12 +0,0 @@ -use crate::bus::EventBus; -use anyhow::Result; -use futures::io; -use std::sync::Arc; - -pub async fn publish_csv_async( - mut async_reader: R, - batch_size: usize, - event_bus: Arc, -) -> Result<()> { - Ok(()) -} diff --git a/src/ingestors/http/app_error.rs b/src/ingestors/http/app_error.rs index a5dca7a..c220495 100644 --- a/src/ingestors/http/app_error.rs +++ b/src/ingestors/http/app_error.rs @@ -10,6 +10,7 @@ use serde_json::json; pub enum AppError { InternalServerError(anyhow::Error), BadRequest(anyhow::Error), + NotFound(anyhow::Error), } impl IntoResponse for AppError { @@ -22,6 +23,7 @@ impl IntoResponse for AppError { "Internal Server Error".to_string(), ) } + AppError::NotFound(error) => (StatusCode::NOT_FOUND, error.to_string()), AppError::BadRequest(error) => (StatusCode::BAD_REQUEST, error.to_string()), }; let body = Json(json!({ "error": message })); diff --git a/src/ingestors/http/crud.rs b/src/ingestors/http/crud.rs index 45059f0..9e4167b 100644 --- a/src/ingestors/http/crud.rs +++ b/src/ingestors/http/crud.rs @@ -1,14 +1,16 @@ use crate::crud::list_cursor::ListCursor; use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; +use crate::datamodel::matchers::SensorMatcher; use crate::ingestors::http::app_error::AppError; use crate::ingestors::http::state::HttpServerState; use anyhow::Result; -use axum::extract::{Query, State}; +use axum::extract::{Path, Query, State}; use axum::Json; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize)] pub struct ListSensorsQuery { + pub query: Option, pub cursor: Option, pub limit: Option, } @@ -28,6 +30,7 @@ pub struct ListSensorsResponse { (status = 200, description = "List of sensors", body = Vec) ), params( + ("query" = Option, Query, description = "Prometheus like query string to filter sensors. All sensors are returned if not specified"), ("cursor" = Option, Query, description = "Cursor to start listing from"), ("limit" = Option, Query, description = "Limit the number of sensors to return, 1000 by default", maximum = 100_000, minimum = 1), ), @@ -49,18 +52,52 @@ pub async fn list_sensors( ))); } - let (sensors, next_cursor) = - state - .storage - .list_sensors(cursor, limit) - .await - .map_err(|error| { - eprintln!("Failed to list sensors: {:?}", error); - AppError::InternalServerError(error) - })?; + let matcher = match query.query { + Some(query_str) => SensorMatcher::from_prometheus_query(&query_str)?, + None => SensorMatcher::default(), + }; + + let (sensors, next_cursor) = state + .storage + .list_sensors(matcher, cursor, limit) + .await + .map_err(|error| { + eprintln!("Failed to list sensors: {:?}", error); + AppError::InternalServerError(error) + })?; Ok(Json(ListSensorsResponse { sensors, cursor: next_cursor.map(|cursor| cursor.to_string()), })) } + +#[utoipa::path( + get, + path = "/api/v1/sensors/:uuid", + tag = "SensApp", + responses( + (status = 200, description = "Sensor metadata", body = SensorViewModel), + (status = 400, description = "Bad Request", body = AppError), + (status = 404, description = "Not Found", body = AppError), + (status = 500, description = "Internal Server Error", body = AppError), + ), + params( + ("uuid" = String, Path, description = "Sensor UUID", example = "20115fa5-aecd-8271-835d-07bfee981d6a"), + ) +)] +pub async fn get_sensor( + State(state): State, + Path(uuid): Path, +) -> Result, AppError> { + todo!(); + /*let sensor = state.storage.get_sensor(uuid).await.map_err(|error| { + eprintln!("Failed to get sensor: {:?}", error); + AppError::InternalServerError(error) + })?; + + match sensor { + Some(sensor) => Ok(Json(sensor)), + None => Err(AppError::NotFound(anyhow::anyhow!("Sensor not found"))), + }*/ +} diff --git a/src/ingestors/http/mod.rs b/src/ingestors/http/mod.rs index 1b81320..af8707b 100644 --- a/src/ingestors/http/mod.rs +++ b/src/ingestors/http/mod.rs @@ -2,5 +2,7 @@ pub mod app_error; pub mod crud; pub mod influxdb; pub mod prometheus; +pub mod senml; pub mod server; pub mod state; +pub mod utils; diff --git a/src/ingestors/http/prometheus.rs b/src/ingestors/http/prometheus.rs index 1450f50..75bb16a 100644 --- a/src/ingestors/http/prometheus.rs +++ b/src/ingestors/http/prometheus.rs @@ -115,3 +115,36 @@ pub async fn publish_prometheus( Ok(StatusCode::NO_CONTENT) } + +/// Prometheus Remote Read API. +/// +/// Read data from SensApp in Prometheus. +/// +/// It follows the [Prometheus Remote Read specification](https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/). +#[utoipa::path( + post, + path = "/api/v1/prometheus_remote_read", + tag = "Prometheus", + request_body( + content = String, + content_type = "application/x-protobuf", + description = "Prometheus Remote Read endpoint. [Reference](https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/).", + ), + params( + ("content-encoding" = String, Header, format = "snappy", description = "Content encoding, must be snappy"), + ("content-type" = String, Header, format = "application/x-protobuf", description = "Content type, must be application/x-protobuf"), + ), + responses( + (status = 200, description = "Prometheus Remote Read endpoint"), + (status = 400, description = "Bad Request", body = AppError), + (status = 500, description = "Internal Server Error", body = AppError), + ) +)] +#[debug_handler] +pub async fn read( + State(state): State, + headers: HeaderMap, + bytes: Bytes, +) -> Result { + todo!(); +} diff --git a/src/ingestors/http/senml.rs b/src/ingestors/http/senml.rs new file mode 100644 index 0000000..e33ec98 --- /dev/null +++ b/src/ingestors/http/senml.rs @@ -0,0 +1,136 @@ +use super::utils::get_potentiall_compressed_data; +use super::{app_error::AppError, state::HttpServerState}; +use crate::parsing::senml::SenMLParser; +use crate::{datamodel::batch_builder::BatchBuilder, parsing::compressed::CompressedParser}; +use anyhow::Result; +use axum::{ + debug_handler, + extract::State, + http::{HeaderMap, StatusCode}, +}; +use std::str; +use tokio_util::bytes::Bytes; + +/// SenML JSON Write API. +/// +/// Push SenML JSON data to SensApp. +/// +/// SenML data can be compressed. Use the `Content-Encoding` header to specify the compression algorithm. +/// `snappy`, `gzip` and `zstd` are supported. +/// +/// [SenML](https://www.rfc-editor.org/rfc/rfc8428) is a proposed standard for exchanging time-series data. +#[utoipa::path( + post, + path = "/api/v1/senml", + tag = "SenML", + request_body( + content = String, + content_type = "text/plain", + description = "SenML data. [Reference](https://www.rfc-editor.org/rfc/rfc8428).", + example = "[{\"n\":\"urn:dev:ow:10e2073a01080063\",\"u\":\"Cel\",\"v\":23.1}]" + ), + responses( + (status = 204, description = "No Content"), + (status = 400, description = "Bad Request", body = AppError), + (status = 500, description = "Internal Server Error", body = AppError), + ) +)] +#[debug_handler] +pub async fn publish_senml( + State(state): State, + headers: HeaderMap, + bytes: Bytes, +) -> Result { + let mut batch_builder = BatchBuilder::new()?; + let parser = CompressedParser::new_if_needed( + Box::new(SenMLParser), + get_potentiall_compressed_data(&headers)?, + ); + parser + .parse_data(&bytes, None, &mut batch_builder) + .await + .map_err(|error| AppError::BadRequest(anyhow::anyhow!(error)))?; + + match batch_builder.send_what_is_left(state.event_bus).await { + Ok(Some(mut receiver)) => { + receiver.wait().await?; + } + Ok(None) => {} + Err(error) => { + return Err(AppError::InternalServerError(anyhow::anyhow!(error))); + } + } + + Ok(StatusCode::NO_CONTENT) +} + +#[cfg(test)] +mod tests { + use crate::bus::{self, message}; + use crate::config::load_configuration; + use crate::storage::sqlite::SqliteStorage; + + use super::*; + use std::io::Write; + use std::sync::Arc; + + #[tokio::test] + async fn test_publish_influxdb() { + _ = load_configuration(); + let event_bus = Arc::new(bus::event_bus::EventBus::new()); + let mut wololo = event_bus.main_bus_receiver.activate_cloned(); + tokio::spawn(async move { + while let Ok(message) = wololo.recv().await { + match message { + message::Message::Publish(message::PublishMessage { + batch: _, + sync_receiver: _, + sync_sender, + }) => { + println!("Received publish message"); + sync_sender.broadcast(()).await.unwrap(); + } + } + } + }); + let state = State(HttpServerState { + name: Arc::new("influxdb test".to_string()), + event_bus: event_bus.clone(), + storage: Arc::new(SqliteStorage::connect("sqlite::memory:").await.unwrap()), + }); + let headers = HeaderMap::new(); + let bytes = + Bytes::from("[{\"n\":\"urn:dev:ow:10e2073a01080063\",\"u\":\"Cel\",\"v\":23.1}]"); + let result = publish_senml(state.clone(), headers, bytes).await.unwrap(); + assert_eq!(result, StatusCode::NO_CONTENT); + + // with good snappy encoding + let mut headers = HeaderMap::new(); + headers.insert("content-encoding", "snappy".parse().unwrap()); + let bytes = "[{\"n\":\"urn:dev:ow:10e2073a01080063\",\"u\":\"Cel\",\"v\":23.1}]".as_bytes(); + let mut buffer = Vec::new(); + let mut encoder = snap::write::FrameEncoder::new(&mut buffer); + encoder.write_all(bytes).unwrap(); + let data = encoder.into_inner().unwrap(); + let bytes = Bytes::from(data.to_vec()); + let result = publish_senml(state.clone(), headers, bytes).await.unwrap(); + assert_eq!(result, StatusCode::NO_CONTENT); + + // with wrong gzip encoding + let mut headers = HeaderMap::new(); + headers.insert("content-encoding", "gzip".parse().unwrap()); + let bytes = Bytes::from("definetely not gzip"); + let result = publish_senml(state.clone(), headers, bytes).await; + assert!(result.is_err()); + // Check it's an AppError::BadRequest + assert!(matches!(result, Err(AppError::BadRequest(_)))); + + // With wrong protocol + let headers = HeaderMap::new(); + let bytes = Bytes::from("{\"notsenml\":true}"); + let result = publish_senml(state.clone(), headers, bytes).await; + assert!(result.is_err()); + // Check it's an AppError::BadRequest + assert!(matches!(result, Err(AppError::BadRequest(_)))); + } +} diff --git a/src/ingestors/http/server.rs b/src/ingestors/http/server.rs index 8b95ec3..5e3d1ab 100644 --- a/src/ingestors/http/server.rs +++ b/src/ingestors/http/server.rs @@ -2,6 +2,7 @@ use super::app_error::AppError; use super::crud::list_sensors; use super::influxdb::publish_influxdb; use super::prometheus::publish_prometheus; +use super::senml::publish_senml; use super::state::HttpServerState; use crate::config; use crate::importers::csv::publish_csv_async; @@ -12,6 +13,7 @@ use axum::extract::DefaultBodyLimit; use crate::ingestors::http::crud::__path_list_sensors; use crate::ingestors::http::influxdb::__path_publish_influxdb; use crate::ingestors::http::prometheus::__path_publish_prometheus; +use crate::ingestors::http::senml::__path_publish_senml; use axum::extract::State; use axum::http::header; use axum::http::StatusCode; @@ -40,8 +42,9 @@ use utoipa_scalar::{Scalar, Servable as ScalarServable}; (name = "SensApp", description = "SensApp API"), (name = "InfluxDB", description = "InfluxDB Write API"), (name = "Prometheus", description = "Prometheus Remote Write API"), + (name = "SenML", description = "SenML API"), ), - paths(frontpage, list_sensors, vacuum, publish_influxdb, publish_prometheus), + paths(frontpage, list_sensors, vacuum, publish_influxdb, publish_prometheus, publish_senml), )] struct ApiDoc; @@ -104,6 +107,11 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res "/api/v1/prometheus_remote_write", post(publish_prometheus).layer(max_body_layer.clone()), ) + // SenML legacy SensApp API + .route( + "/api/v1/senml", + post(publish_senml).layer(max_body_layer.clone()), + ) .layer(middleware) .with_state(state); diff --git a/src/ingestors/http/utils.rs b/src/ingestors/http/utils.rs new file mode 100644 index 0000000..8b37dc1 --- /dev/null +++ b/src/ingestors/http/utils.rs @@ -0,0 +1,23 @@ +use crate::parsing::compressed::Compression; + +use super::app_error::AppError; +use anyhow::Result; +use axum::http::HeaderMap; + +pub fn get_potentiall_compressed_data( + headers: &HeaderMap, +) -> Result, AppError> { + match headers.get("content-encoding") { + Some(content_encoding) => match content_encoding.to_str() { + Ok("gzip") => Ok(Some(Compression::Gzip)), + Ok("snappy") | Ok("snappy-framed") => Ok(Some(Compression::Snappy)), + Ok("zstd") => Ok(Some(Compression::Zstd)), + Ok("plain") => Ok(None), + _ => Err(AppError::BadRequest(anyhow::anyhow!( + "Unsupported content-encoding: {:?}", + content_encoding + ))), + }, + None => Ok(None), + } +} diff --git a/src/parsing/compressed.rs b/src/parsing/compressed.rs new file mode 100644 index 0000000..b3593d6 --- /dev/null +++ b/src/parsing/compressed.rs @@ -0,0 +1,70 @@ +use anyhow::Result; +use axum::async_trait; +use hybridmap::HybridMap; +use std::io::Read; + +use crate::datamodel::batch_builder::BatchBuilder; + +use super::ParseData; + +#[derive(PartialEq)] +pub enum Compression { + Gzip, + Snappy, + Zstd, +} + +pub struct CompressedParser { + parent_parser: Box, + compression: Compression, +} + +impl CompressedParser { + pub fn new(parent_parser: Box, compression: Compression) -> Self { + Self { + parent_parser, + compression, + } + } + + pub fn new_if_needed( + parent_parser: Box, + compression: Option, + ) -> Box { + match compression { + Some(compression) => Box::new(CompressedParser::new(parent_parser, compression)), + None => parent_parser, + } + } +} + +#[async_trait] +impl ParseData for CompressedParser { + async fn parse_data( + &self, + data: &[u8], + context: Option>, + batch_builder: &mut BatchBuilder, + ) -> Result<()> { + let mut uncompressed_data = Vec::new(); + + match self.compression { + Compression::Gzip => { + let mut d = flate2::read::GzDecoder::new(data); + d.read_to_end(&mut uncompressed_data)?; + } + Compression::Snappy => { + let mut d = snap::read::FrameDecoder::new(data); + d.read_to_end(&mut uncompressed_data)?; + } + Compression::Zstd => { + let mut d = zstd::Decoder::new(data)?; + d.read_to_end(&mut uncompressed_data)?; + } + }; + + self.parent_parser + .parse_data(&uncompressed_data, context, batch_builder) + .await + } +} diff --git a/src/parsing/mod.rs b/src/parsing/mod.rs index b1719d2..1835d61 100644 --- a/src/parsing/mod.rs +++ b/src/parsing/mod.rs @@ -4,6 +4,7 @@ use hybridmap::HybridMap; use crate::datamodel::batch_builder::BatchBuilder; +pub mod compressed; pub mod geobuf; pub mod influx; pub mod prometheus; @@ -23,8 +24,32 @@ pub fn get_parser_from_name(name: &str) -> Result> { match name { "prometheus_remote_write" => Ok(Box::new(prometheus::PrometheusParser)), "senml_json" => Ok(Box::new(senml::SenMLParser)), + "senml_json_gzip" => Ok(Box::new(compressed::CompressedParser::new( + Box::new(senml::SenMLParser), + compressed::Compression::Gzip, + ))), + "senml_json_snappy" => Ok(Box::new(compressed::CompressedParser::new( + Box::new(senml::SenMLParser), + compressed::Compression::Snappy, + ))), + "senml_json_zstd" => Ok(Box::new(compressed::CompressedParser::new( + Box::new(senml::SenMLParser), + compressed::Compression::Zstd, + ))), "geobuf" => Ok(Box::new(geobuf::GeobufParser)), "influx_line_protocol" => Ok(Box::new(influx::InfluxParser::default())), + "influx_line_protocol_gzip" => Ok(Box::new(compressed::CompressedParser::new( + Box::new(influx::InfluxParser::default()), + compressed::Compression::Gzip, + ))), + "influx_line_protocol_snappy" => Ok(Box::new(compressed::CompressedParser::new( + Box::new(influx::InfluxParser::default()), + compressed::Compression::Snappy, + ))), + "influx_line_protocol_zstd" => Ok(Box::new(compressed::CompressedParser::new( + Box::new(influx::InfluxParser::default()), + compressed::Compression::Zstd, + ))), _ => bail!("Unsupported parser: {}", name), } } diff --git a/src/storage/bigquery/bigquery_crud.rs b/src/storage/bigquery/bigquery_crud.rs index 7d2b202..bb4a209 100644 --- a/src/storage/bigquery/bigquery_crud.rs +++ b/src/storage/bigquery/bigquery_crud.rs @@ -1,6 +1,9 @@ use std::collections::BTreeMap; -use crate::crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}; +use crate::{ + crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}, + datamodel::matchers::SensorMatcher, +}; use anyhow::{anyhow, Result}; use gcp_bigquery_client::model::{ query_parameter::QueryParameter, query_parameter_type::QueryParameterType, @@ -12,6 +15,7 @@ use super::BigQueryStorage; pub async fn list_sensors( bqs: &BigQueryStorage, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { diff --git a/src/storage/bigquery/migrations/20240223133248_init.sql b/src/storage/bigquery/migrations/20240223133248_init.sql index f7a9812..1c3cc7a 100644 --- a/src/storage/bigquery/migrations/20240223133248_init.sql +++ b/src/storage/bigquery/migrations/20240223133248_init.sql @@ -132,9 +132,10 @@ ON s.sensor_id = nv.sensor_id; CREATE OR REPLACE VIEW `{dataset_id}.sensor_labels_view` AS -SELECT sensors.uuid, sensors.created_at, sensors.name, type, units.name as unit, JSON_OBJECT( +SELECT sensors.uuid, sensors.created_at, sensors.name, type, units.name as unit, +CASE WHEN COUNT(labels.sensor_id) = 0 THEN JSON_OBJECT() ELSE JSON_OBJECT( ARRAY_AGG(labels_name_dictionary.name), ARRAY_AGG(labels_description_dictionary.description) -) AS labels +) END AS labels FROM `{dataset_id}.sensors` as sensors LEFT JOIN `{dataset_id}.units` as units on sensors.unit = units.id LEFT JOIN `{dataset_id}.labels` as labels on sensors.sensor_id = labels.sensor_id diff --git a/src/storage/bigquery/mod.rs b/src/storage/bigquery/mod.rs index 4205542..3684f2d 100644 --- a/src/storage/bigquery/mod.rs +++ b/src/storage/bigquery/mod.rs @@ -22,6 +22,8 @@ use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{sync::RwLock, time::timeout}; use url::Url; +use crate::datamodel::matchers::SensorMatcher; + mod bigquery_crud; mod bigquery_labels_utilities; mod bigquery_prost_structs; @@ -240,9 +242,10 @@ impl StorageInstance for BigQueryStorage { async fn list_sensors( &self, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { - list_sensors(self, cursor, limit).await + list_sensors(self, matcher, cursor, limit).await } } diff --git a/src/storage/duckdb/duckdb_crud.rs b/src/storage/duckdb/duckdb_crud.rs index d27ecc1..9234620 100644 --- a/src/storage/duckdb/duckdb_crud.rs +++ b/src/storage/duckdb/duckdb_crud.rs @@ -6,11 +6,12 @@ use tokio::sync::Mutex; use crate::{ crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}, - datamodel::{sensapp_datetime::SensAppDateTimeExt, SensAppDateTime}, + datamodel::{matchers::SensorMatcher, sensapp_datetime::SensAppDateTimeExt, SensAppDateTime}, }; pub async fn list_sensors( connection: Arc>, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { diff --git a/src/storage/duckdb/mod.rs b/src/storage/duckdb/mod.rs index 372ee42..31fd67f 100644 --- a/src/storage/duckdb/mod.rs +++ b/src/storage/duckdb/mod.rs @@ -1,6 +1,7 @@ use crate::crud::list_cursor::ListCursor; use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; use crate::datamodel::batch::{Batch, SingleSensorBatch}; +use crate::datamodel::matchers::SensorMatcher; use crate::datamodel::TypedSamples; use anyhow::{bail, Context, Result}; use async_broadcast::Sender; @@ -98,10 +99,11 @@ impl StorageInstance for DuckDBStorage { async fn list_sensors( &self, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { - list_sensors(self.connection.clone(), cursor, limit).await + list_sensors(self.connection.clone(), matcher, cursor, limit).await } } diff --git a/src/storage/postgresql/migrations/20240110123122_init.sql b/src/storage/postgresql/migrations/20240110123122_init.sql index b8b04b7..46dd746 100644 --- a/src/storage/postgresql/migrations/20240110123122_init.sql +++ b/src/storage/postgresql/migrations/20240110123122_init.sql @@ -133,9 +133,10 @@ CREATE INDEX index_json_values ON json_values USING brin (sensor_id, timestamp_m CREATE INDEX index_blob_values ON blob_values USING brin (sensor_id, timestamp_ms) WITH (pages_per_range = 32); CREATE VIEW sensor_labels_view AS -SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, jsonb_object_agg( - labels_name_dictionary."name",labels_description_dictionary."description" -) AS labels +SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, +CASE WHEN COUNT(labels.sensor_id) = 0 THEN '{}' ELSE jsonb_object_agg( + COALESCE(labels_name_dictionary."name",'whatever_this_is_a_bug_workaround'),labels_description_dictionary."description") +END AS labels FROM sensors LEFT JOIN units on sensors.unit = units.id LEFT JOIN Labels on sensors.sensor_id = labels.sensor_id diff --git a/src/storage/postgresql/postgresql.rs b/src/storage/postgresql/postgresql.rs index fe26842..3b04bb4 100644 --- a/src/storage/postgresql/postgresql.rs +++ b/src/storage/postgresql/postgresql.rs @@ -5,6 +5,7 @@ use super::{ }; use crate::crud::list_cursor::ListCursor; use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; +use crate::datamodel::matchers::SensorMatcher; use crate::datamodel::{batch::Batch, TypedSamples}; use anyhow::{Context, Result}; use async_broadcast::Sender; @@ -84,10 +85,11 @@ impl StorageInstance for PostgresStorage { async fn list_sensors( &self, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { - list_sensors(&self.pool, cursor, limit).await + list_sensors(&self.pool, matcher, cursor, limit).await } } diff --git a/src/storage/postgresql/postgresql_crud.rs b/src/storage/postgresql/postgresql_crud.rs index ca97614..f37ca6b 100644 --- a/src/storage/postgresql/postgresql_crud.rs +++ b/src/storage/postgresql/postgresql_crud.rs @@ -7,9 +7,11 @@ use sqlx::PgPool; use sqlx::Row; use crate::crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}; +use crate::datamodel::matchers::SensorMatcher; pub async fn list_sensors( pool: &PgPool, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { diff --git a/src/storage/rrdcached/mod.rs b/src/storage/rrdcached/mod.rs index 935a8f4..24eef85 100644 --- a/src/storage/rrdcached/mod.rs +++ b/src/storage/rrdcached/mod.rs @@ -17,6 +17,8 @@ use tokio::{sync::RwLock, time::timeout}; use url::Url; use uuid::Uuid; +use crate::datamodel::matchers::SensorMatcher; + #[derive(Debug, Clone, PartialEq)] pub enum Preset { Munin, @@ -324,6 +326,7 @@ impl StorageInstance for RrdCachedStorage { async fn list_sensors( &self, + _matcher: SensorMatcher, _cursor: ListCursor, _limit: usize, ) -> Result<(Vec, Option)> { diff --git a/src/storage/sqlite/sqlite.rs b/src/storage/sqlite/sqlite.rs index 7c8e0e9..7f5fc21 100644 --- a/src/storage/sqlite/sqlite.rs +++ b/src/storage/sqlite/sqlite.rs @@ -4,6 +4,7 @@ use super::sqlite_utilities::get_sensor_id_or_create_sensor; use crate::crud::list_cursor::ListCursor; use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; use crate::datamodel::batch::{Batch, SingleSensorBatch}; +use crate::datamodel::matchers::SensorMatcher; use crate::datamodel::TypedSamples; use crate::storage::storage::StorageInstance; use anyhow::{Context, Result}; @@ -81,8 +82,13 @@ impl StorageInstance for SqliteStorage { Ok(()) } - async fn list_sensors(&self, cursor: ListCursor, limit: usize) -> Result<(Vec, Option)> { - list_sensors(&self.pool, cursor, limit).await + async fn list_sensors( + &self, + matcher: SensorMatcher, + cursor: ListCursor, + limit: usize, + ) -> Result<(Vec, Option)> { + list_sensors(&self.pool, matcher, cursor, limit).await } } diff --git a/src/storage/sqlite/sqlite_crud.rs b/src/storage/sqlite/sqlite_crud.rs index 8167470..c5ddbe1 100644 --- a/src/storage/sqlite/sqlite_crud.rs +++ b/src/storage/sqlite/sqlite_crud.rs @@ -3,10 +3,14 @@ use std::collections::BTreeMap; use anyhow::Result; use sqlx::SqlitePool; -use crate::crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}; +use crate::{ + crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}, + datamodel::matchers::SensorMatcher, +}; pub async fn list_sensors( pool: &SqlitePool, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { diff --git a/src/storage/storage.rs b/src/storage/storage.rs index f366365..f43258c 100644 --- a/src/storage/storage.rs +++ b/src/storage/storage.rs @@ -2,7 +2,10 @@ use anyhow::Result; use async_trait::async_trait; use std::fmt::Debug; -use crate::crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}; +use crate::{ + crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}, + datamodel::matchers::SensorMatcher, +}; #[async_trait] pub trait StorageInstance: Send + Sync + Debug { @@ -17,6 +20,7 @@ pub trait StorageInstance: Send + Sync + Debug { async fn list_sensors( &self, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)>; diff --git a/src/storage/timescaledb/migrations/20240223133248_init.sql b/src/storage/timescaledb/migrations/20240223133248_init.sql index 9e7e5e2..6e59abb 100644 --- a/src/storage/timescaledb/migrations/20240223133248_init.sql +++ b/src/storage/timescaledb/migrations/20240223133248_init.sql @@ -196,9 +196,10 @@ SELECT add_compression_policy('blob_values', INTERVAL '7 days'); SELECT add_dimension('blob_values', by_hash('sensor_id', 2)); CREATE VIEW sensor_labels_view AS -SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, jsonb_object_agg( - labels_name_dictionary."name",labels_description_dictionary."description" -) AS labels +SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, +CASE WHEN COUNT(labels.sensor_id) = 0 THEN '{}' ELSE jsonb_object_agg( + COALESCE(labels_name_dictionary."name",'whatever_this_is_a_bug_workaround'),labels_description_dictionary."description") +END AS labels FROM sensors LEFT JOIN units on sensors.unit = units.id LEFT JOIN Labels on sensors.sensor_id = labels.sensor_id diff --git a/src/storage/timescaledb/timescaledb.rs b/src/storage/timescaledb/timescaledb.rs index 2d63bbf..d56a56c 100644 --- a/src/storage/timescaledb/timescaledb.rs +++ b/src/storage/timescaledb/timescaledb.rs @@ -4,6 +4,7 @@ use super::{ }; use crate::crud::list_cursor::ListCursor; use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; +use crate::datamodel::matchers::SensorMatcher; use crate::datamodel::{batch::Batch, TypedSamples}; use anyhow::{Context, Result}; use async_broadcast::Sender; @@ -68,10 +69,12 @@ impl StorageInstance for TimeScaleDBStorage { async fn list_sensors( &self, + matcher: SensorMatcher, cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { - super::super::postgresql::postgresql_crud::list_sensors(&self.pool, cursor, limit).await + super::super::postgresql::postgresql_crud::list_sensors(&self.pool, matcher, cursor, limit) + .await } }