Skip to content

Commit

Permalink
feat(influxdb): add db query param support for v2 write api (#3445)
Browse files Browse the repository at this point in the history
* feat(influxdb): add db query param support for v2 write api

* fix(influxdb): update authorize logic to get catalog and schema from query string

* fix(influxdb): address CR suggestions

* fix(influxdb): use the correct import
  • Loading branch information
etolbakov authored Mar 8, 2024
1 parent a309cd0 commit aeca0d8
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/servers/src/grpc/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async fn do_auth<T>(
return Ok(());
};

let (username, password) = extract_username_and_password(false, req)
let (username, password) = extract_username_and_password(req)
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;

let id = auth::Identity::UserId(&username, None);
Expand Down
42 changes: 29 additions & 13 deletions src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::error::{
};
use crate::http::error_result::ErrorResponse;
use crate::http::HTTP_API_PREFIX;
use crate::influxdb::{is_influxdb_request, is_influxdb_v2_request};

/// AuthState is a holder state for [`UserProviderRef`]
/// during [`check_http_auth`] function in axum's middleware
Expand Down Expand Up @@ -69,7 +70,6 @@ pub async fn inner_auth<B>(

let query_ctx = query_ctx_builder.build();
let need_auth = need_auth(&req);
let is_influxdb = req.uri().path().contains("influxdb");

// 2. check if auth is needed
let user_provider = if let Some(user_provider) = user_provider.filter(|_| need_auth) {
Expand All @@ -81,14 +81,14 @@ pub async fn inner_auth<B>(
};

// 3. get username and pwd
let (username, password) = match extract_username_and_password(is_influxdb, &req) {
let (username, password) = match extract_username_and_password(&req) {
Ok((username, password)) => (username, password),
Err(e) => {
warn!("extract username and password failed: {}", e);
crate::metrics::METRIC_AUTH_FAILURE
.with_label_values(&[e.status_code().as_ref()])
.inc();
return Err(err_response(is_influxdb, e).into_response());
return Err(err_response(is_influxdb_request(&req), e).into_response());
}
};

Expand All @@ -112,7 +112,7 @@ pub async fn inner_auth<B>(
crate::metrics::METRIC_AUTH_FAILURE
.with_label_values(&[e.status_code().as_ref()])
.inc();
Err(err_response(is_influxdb, e).into_response())
Err(err_response(is_influxdb_request(&req), e).into_response())
}
}
}
Expand Down Expand Up @@ -146,7 +146,11 @@ pub fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {
.and_then(|header| header.to_str().ok())
.or_else(|| {
let query = request.uri().query().unwrap_or_default();
extract_db_from_query(query)
if is_influxdb_v2_request(request) {
extract_db_from_query(query).or_else(|| extract_bucket_from_query(query))
} else {
extract_db_from_query(query)
}
})
.unwrap_or(DEFAULT_SCHEMA_NAME);

Expand Down Expand Up @@ -208,11 +212,8 @@ fn get_influxdb_credentials<B>(request: &Request<B>) -> Result<Option<(Username,
}
}

pub fn extract_username_and_password<B>(
is_influxdb: bool,
request: &Request<B>,
) -> Result<(Username, Password)> {
Ok(if is_influxdb {
pub fn extract_username_and_password<B>(request: &Request<B>) -> Result<(Username, Password)> {
Ok(if is_influxdb_request(request) {
// compatible with influxdb auth
get_influxdb_credentials(request)?.context(NotFoundInfluxAuthSnafu)?
} else {
Expand Down Expand Up @@ -290,15 +291,26 @@ fn need_auth<B>(req: &Request<B>) -> bool {
path.starts_with(HTTP_API_PREFIX)
}

fn extract_db_from_query(query: &str) -> Option<&str> {
fn extract_param_from_query<'a>(query: &'a str, param: &'a str) -> Option<&'a str> {
let prefix = format!("{}=", param);
for pair in query.split('&') {
if let Some(db) = pair.strip_prefix("db=") {
return if db.is_empty() { None } else { Some(db) };
if let Some(param) = pair.strip_prefix(&prefix) {
return if param.is_empty() { None } else { Some(param) };
}
}
None
}

fn extract_db_from_query(query: &str) -> Option<&str> {
extract_param_from_query(query, "db")
}

/// InfluxDB v2 uses "bucket" instead of "db"
/// https://docs.influxdata.com/influxdb/v1/tools/api/#apiv2write-http-endpoint
fn extract_bucket_from_query(query: &str) -> Option<&str> {
extract_param_from_query(query, "bucket")
}

fn extract_influxdb_user_from_query(query: &str) -> (Option<&str>, Option<&str>) {
let mut username = None;
let mut password = None;
Expand Down Expand Up @@ -422,10 +434,14 @@ mod tests {
assert_matches!(extract_db_from_query(""), None);
assert_matches!(extract_db_from_query("&"), None);
assert_matches!(extract_db_from_query("db="), None);
assert_matches!(extract_bucket_from_query("bucket="), None);
assert_matches!(extract_bucket_from_query("db=foo"), None);
assert_matches!(extract_db_from_query("db=foo"), Some("foo"));
assert_matches!(extract_bucket_from_query("bucket=foo"), Some("foo"));
assert_matches!(extract_db_from_query("name=bar"), None);
assert_matches!(extract_db_from_query("db=&name=bar"), None);
assert_matches!(extract_db_from_query("db=foo&name=bar"), Some("foo"));
assert_matches!(extract_bucket_from_query("db=foo&bucket=bar"), Some("bar"));
assert_matches!(extract_db_from_query("name=bar&db="), None);
assert_matches!(extract_db_from_query("name=bar&db=foo"), Some("foo"));
assert_matches!(extract_db_from_query("name=bar&db=&name=bar"), None);
Expand Down
8 changes: 5 additions & 3 deletions src/servers/src/http/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ pub async fn influxdb_write_v2(
Extension(query_ctx): Extension<QueryContextRef>,
lines: String,
) -> Result<impl IntoResponse> {
let db = params
.remove("bucket")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let db = match (params.remove("db"), params.remove("bucket")) {
(_, Some(bucket)) => bucket.clone(),
(Some(db), None) => db.clone(),
_ => DEFAULT_SCHEMA_NAME.to_string(),
};

let precision = params
.get("precision")
Expand Down
17 changes: 15 additions & 2 deletions src/servers/src/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,27 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::writer::Precision;
use hyper::Request;
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;

use crate::error::{Error, InfluxdbLineProtocolSnafu};
use crate::row_writer::{self, MultiTableData};

pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
const INFLUXDB_API_PATH_NAME: &str = "influxdb";
const INFLUXDB_API_V2_PATH_NAME: &str = "influxdb/api/v2";
const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;

#[inline]
pub(crate) fn is_influxdb_request<T>(req: &Request<T>) -> bool {
req.uri().path().contains(INFLUXDB_API_PATH_NAME)
}

#[inline]
pub(crate) fn is_influxdb_v2_request<T>(req: &Request<T>) -> bool {
req.uri().path().contains(INFLUXDB_API_V2_PATH_NAME)
}

#[derive(Debug)]
pub struct InfluxdbRequest {
Expand Down
120 changes: 120 additions & 0 deletions src/servers/tests/http/influxdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use query::parser::PromQuery;
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use servers::error::{Error, Result};
use servers::http::header::constants::GREPTIME_DB_HEADER_NAME;
use servers::http::header::GREPTIME_DB_HEADER_FORMAT;
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::influxdb::InfluxdbRequest;
Expand Down Expand Up @@ -244,3 +245,122 @@ async fn test_influxdb_write() {
]
);
}

#[tokio::test]
async fn test_influxdb_write_v2() {
let (tx, mut rx) = mpsc::channel(100);
let tx = Arc::new(tx);

let public_db_app = make_test_app(tx.clone(), None);
let public_db_client = TestClient::new(public_db_app);

let result = public_db_client.get("/v1/influxdb/health").send().await;
assert_eq!(result.status(), 200);

let result = public_db_client.get("/v1/influxdb/ping").send().await;
assert_eq!(result.status(), 204);

// right request with no query string
let result = public_db_client
.post("/v1/influxdb/api/v2/write")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(http::header::AUTHORIZATION, "token greptime:greptime")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());

// right request with `bucket` query string
let result = public_db_client
.post("/v1/influxdb/api/v2/write?bucket=public")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(http::header::AUTHORIZATION, "token greptime:greptime")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());

// right request with `db` query string
let result = public_db_client
.post("/v1/influxdb/api/v2/write?db=public")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(http::header::AUTHORIZATION, "token greptime:greptime")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());

// make new app for 'influxdb' database
let app = make_test_app(tx, Some("influxdb"));
let client = TestClient::new(app);

// right request with `bucket` query string
let result = client
.post("/v1/influxdb/api/v2/write?bucket=influxdb")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(http::header::AUTHORIZATION, "token greptime:greptime")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());

// right request with `db` query string
let result = client
.post("/v1/influxdb/api/v2/write?db=influxdb")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(http::header::AUTHORIZATION, "token greptime:greptime")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());

// right request with no query string, `public_db_client` is used otherwise the auth will fail
let result = public_db_client
.post("/v1/influxdb/api/v2/write")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(http::header::AUTHORIZATION, "token greptime:greptime")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());

// right request with the 'greptime' header and 'db' query string
let result = client
.post("/v1/influxdb/api/v2/write?db=influxdbv2")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(http::header::AUTHORIZATION, "token greptime:greptime")
.header(GREPTIME_DB_HEADER_NAME, "influxdb")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());

// right request with the 'greptime' header and 'bucket' query string
let result = client
.post("/v1/influxdb/api/v2/write?bucket=influxdbv2")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(http::header::AUTHORIZATION, "token greptime:greptime")
.header(GREPTIME_DB_HEADER_NAME, "influxdb")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());

let mut metrics = vec![];
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(
metrics,
vec![
("public".to_string(), "monitor".to_string()),
("public".to_string(), "monitor".to_string()),
("public".to_string(), "monitor".to_string()),
("influxdb".to_string(), "monitor".to_string()),
("influxdb".to_string(), "monitor".to_string()),
("public".to_string(), "monitor".to_string()),
("influxdb".to_string(), "monitor".to_string()),
("influxdb".to_string(), "monitor".to_string()),
]
);
}

0 comments on commit aeca0d8

Please sign in to comment.