From aeca0d8e8a9241db1e807aeb75d15de77e999a90 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Fri, 8 Mar 2024 08:17:57 +0000 Subject: [PATCH] feat(influxdb): add db query param support for v2 write api (#3445) * feat(influxdb): add db query param support for v2 write api * fix(influxdb): update authorize logic to get catalog and schema from query string * fix(influxdb): address CR suggestions * fix(influxdb): use the correct import --- src/servers/src/grpc/authorize.rs | 2 +- src/servers/src/http/authorize.rs | 42 ++++++--- src/servers/src/http/influxdb.rs | 8 +- src/servers/src/influxdb.rs | 17 +++- src/servers/tests/http/influxdb_test.rs | 120 ++++++++++++++++++++++++ 5 files changed, 170 insertions(+), 19 deletions(-) diff --git a/src/servers/src/grpc/authorize.rs b/src/servers/src/grpc/authorize.rs index 878a45f1693c..84e203d3730e 100644 --- a/src/servers/src/grpc/authorize.rs +++ b/src/servers/src/grpc/authorize.rs @@ -112,7 +112,7 @@ async fn do_auth( return Ok(()); }; - let (username, password) = extract_username_and_password(false, req) + let (username, password) = extract_username_and_password(req) .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; let id = auth::Identity::UserId(&username, None); diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 843cdae183d3..135d1027308b 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -40,6 +40,7 @@ use crate::error::{ }; use crate::http::error_result::ErrorResponse; use crate::http::HTTP_API_PREFIX; +use crate::influxdb::{is_influxdb_request, is_influxdb_v2_request}; /// AuthState is a holder state for [`UserProviderRef`] /// during [`check_http_auth`] function in axum's middleware @@ -69,7 +70,6 @@ pub async fn inner_auth( let query_ctx = query_ctx_builder.build(); let need_auth = need_auth(&req); - let is_influxdb = req.uri().path().contains("influxdb"); // 2. check if auth is needed let user_provider = if let Some(user_provider) = user_provider.filter(|_| need_auth) { @@ -81,14 +81,14 @@ pub async fn inner_auth( }; // 3. get username and pwd - let (username, password) = match extract_username_and_password(is_influxdb, &req) { + let (username, password) = match extract_username_and_password(&req) { Ok((username, password)) => (username, password), Err(e) => { warn!("extract username and password failed: {}", e); crate::metrics::METRIC_AUTH_FAILURE .with_label_values(&[e.status_code().as_ref()]) .inc(); - return Err(err_response(is_influxdb, e).into_response()); + return Err(err_response(is_influxdb_request(&req), e).into_response()); } }; @@ -112,7 +112,7 @@ pub async fn inner_auth( crate::metrics::METRIC_AUTH_FAILURE .with_label_values(&[e.status_code().as_ref()]) .inc(); - Err(err_response(is_influxdb, e).into_response()) + Err(err_response(is_influxdb_request(&req), e).into_response()) } } } @@ -146,7 +146,11 @@ pub fn extract_catalog_and_schema(request: &Request) -> (&str, &str) { .and_then(|header| header.to_str().ok()) .or_else(|| { let query = request.uri().query().unwrap_or_default(); - extract_db_from_query(query) + if is_influxdb_v2_request(request) { + extract_db_from_query(query).or_else(|| extract_bucket_from_query(query)) + } else { + extract_db_from_query(query) + } }) .unwrap_or(DEFAULT_SCHEMA_NAME); @@ -208,11 +212,8 @@ fn get_influxdb_credentials(request: &Request) -> Result( - is_influxdb: bool, - request: &Request, -) -> Result<(Username, Password)> { - Ok(if is_influxdb { +pub fn extract_username_and_password(request: &Request) -> Result<(Username, Password)> { + Ok(if is_influxdb_request(request) { // compatible with influxdb auth get_influxdb_credentials(request)?.context(NotFoundInfluxAuthSnafu)? } else { @@ -290,15 +291,26 @@ fn need_auth(req: &Request) -> bool { path.starts_with(HTTP_API_PREFIX) } -fn extract_db_from_query(query: &str) -> Option<&str> { +fn extract_param_from_query<'a>(query: &'a str, param: &'a str) -> Option<&'a str> { + let prefix = format!("{}=", param); for pair in query.split('&') { - if let Some(db) = pair.strip_prefix("db=") { - return if db.is_empty() { None } else { Some(db) }; + if let Some(param) = pair.strip_prefix(&prefix) { + return if param.is_empty() { None } else { Some(param) }; } } None } +fn extract_db_from_query(query: &str) -> Option<&str> { + extract_param_from_query(query, "db") +} + +/// InfluxDB v2 uses "bucket" instead of "db" +/// https://docs.influxdata.com/influxdb/v1/tools/api/#apiv2write-http-endpoint +fn extract_bucket_from_query(query: &str) -> Option<&str> { + extract_param_from_query(query, "bucket") +} + fn extract_influxdb_user_from_query(query: &str) -> (Option<&str>, Option<&str>) { let mut username = None; let mut password = None; @@ -422,10 +434,14 @@ mod tests { assert_matches!(extract_db_from_query(""), None); assert_matches!(extract_db_from_query("&"), None); assert_matches!(extract_db_from_query("db="), None); + assert_matches!(extract_bucket_from_query("bucket="), None); + assert_matches!(extract_bucket_from_query("db=foo"), None); assert_matches!(extract_db_from_query("db=foo"), Some("foo")); + assert_matches!(extract_bucket_from_query("bucket=foo"), Some("foo")); assert_matches!(extract_db_from_query("name=bar"), None); assert_matches!(extract_db_from_query("db=&name=bar"), None); assert_matches!(extract_db_from_query("db=foo&name=bar"), Some("foo")); + assert_matches!(extract_bucket_from_query("db=foo&bucket=bar"), Some("bar")); assert_matches!(extract_db_from_query("name=bar&db="), None); assert_matches!(extract_db_from_query("name=bar&db=foo"), Some("foo")); assert_matches!(extract_db_from_query("name=bar&db=&name=bar"), None); diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs index 9ced1557bf0c..4b8a606b6835 100644 --- a/src/servers/src/http/influxdb.rs +++ b/src/servers/src/http/influxdb.rs @@ -67,9 +67,11 @@ pub async fn influxdb_write_v2( Extension(query_ctx): Extension, lines: String, ) -> Result { - let db = params - .remove("bucket") - .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + let db = match (params.remove("db"), params.remove("bucket")) { + (_, Some(bucket)) => bucket.clone(), + (Some(db), None) => db.clone(), + _ => DEFAULT_SCHEMA_NAME.to_string(), + }; let precision = params .get("precision") diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 870573679370..36a3fce1faa1 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -15,14 +15,27 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; use common_grpc::writer::Precision; +use hyper::Request; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; use crate::error::{Error, InfluxdbLineProtocolSnafu}; use crate::row_writer::{self, MultiTableData}; -pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; -pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond; +const INFLUXDB_API_PATH_NAME: &str = "influxdb"; +const INFLUXDB_API_V2_PATH_NAME: &str = "influxdb/api/v2"; +const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; +const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond; + +#[inline] +pub(crate) fn is_influxdb_request(req: &Request) -> bool { + req.uri().path().contains(INFLUXDB_API_PATH_NAME) +} + +#[inline] +pub(crate) fn is_influxdb_v2_request(req: &Request) -> bool { + req.uri().path().contains(INFLUXDB_API_V2_PATH_NAME) +} #[derive(Debug)] pub struct InfluxdbRequest { diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 5779e377e591..ff5f4c85afbd 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -26,6 +26,7 @@ use query::parser::PromQuery; use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use servers::error::{Error, Result}; +use servers::http::header::constants::GREPTIME_DB_HEADER_NAME; use servers::http::header::GREPTIME_DB_HEADER_FORMAT; use servers::http::{HttpOptions, HttpServerBuilder}; use servers::influxdb::InfluxdbRequest; @@ -244,3 +245,122 @@ async fn test_influxdb_write() { ] ); } + +#[tokio::test] +async fn test_influxdb_write_v2() { + let (tx, mut rx) = mpsc::channel(100); + let tx = Arc::new(tx); + + let public_db_app = make_test_app(tx.clone(), None); + let public_db_client = TestClient::new(public_db_app); + + let result = public_db_client.get("/v1/influxdb/health").send().await; + assert_eq!(result.status(), 200); + + let result = public_db_client.get("/v1/influxdb/ping").send().await; + assert_eq!(result.status(), 204); + + // right request with no query string + let result = public_db_client + .post("/v1/influxdb/api/v2/write") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .header(http::header::AUTHORIZATION, "token greptime:greptime") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // right request with `bucket` query string + let result = public_db_client + .post("/v1/influxdb/api/v2/write?bucket=public") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .header(http::header::AUTHORIZATION, "token greptime:greptime") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // right request with `db` query string + let result = public_db_client + .post("/v1/influxdb/api/v2/write?db=public") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .header(http::header::AUTHORIZATION, "token greptime:greptime") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // make new app for 'influxdb' database + let app = make_test_app(tx, Some("influxdb")); + let client = TestClient::new(app); + + // right request with `bucket` query string + let result = client + .post("/v1/influxdb/api/v2/write?bucket=influxdb") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .header(http::header::AUTHORIZATION, "token greptime:greptime") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // right request with `db` query string + let result = client + .post("/v1/influxdb/api/v2/write?db=influxdb") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .header(http::header::AUTHORIZATION, "token greptime:greptime") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // right request with no query string, `public_db_client` is used otherwise the auth will fail + let result = public_db_client + .post("/v1/influxdb/api/v2/write") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .header(http::header::AUTHORIZATION, "token greptime:greptime") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // right request with the 'greptime' header and 'db' query string + let result = client + .post("/v1/influxdb/api/v2/write?db=influxdbv2") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .header(http::header::AUTHORIZATION, "token greptime:greptime") + .header(GREPTIME_DB_HEADER_NAME, "influxdb") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + // right request with the 'greptime' header and 'bucket' query string + let result = client + .post("/v1/influxdb/api/v2/write?bucket=influxdbv2") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .header(http::header::AUTHORIZATION, "token greptime:greptime") + .header(GREPTIME_DB_HEADER_NAME, "influxdb") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + + let mut metrics = vec![]; + while let Ok(s) = rx.try_recv() { + metrics.push(s); + } + assert_eq!( + metrics, + vec![ + ("public".to_string(), "monitor".to_string()), + ("public".to_string(), "monitor".to_string()), + ("public".to_string(), "monitor".to_string()), + ("influxdb".to_string(), "monitor".to_string()), + ("influxdb".to_string(), "monitor".to_string()), + ("public".to_string(), "monitor".to_string()), + ("influxdb".to_string(), "monitor".to_string()), + ("influxdb".to_string(), "monitor".to_string()), + ] + ); +}