From 75e26a0eff66e653924965b174204e50d28b8f2a Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 18 Oct 2023 22:13:12 +0000 Subject: [PATCH 1/8] feat: opentsdb row protocol --- src/frontend/src/instance/opentsdb.rs | 26 ++++++++++------- src/servers/src/http/opentsdb.rs | 31 +++++++++++--------- src/servers/src/opentsdb.rs | 38 +++++++++++++++++++++++++ src/servers/src/opentsdb/codec.rs | 2 +- src/servers/src/opentsdb/handler.rs | 8 +++--- src/servers/src/query_handler.rs | 2 +- src/servers/tests/http/opentsdb_test.rs | 10 +++---- src/servers/tests/opentsdb.rs | 6 ++-- tests-integration/src/opentsdb.rs | 27 +++++++++--------- 9 files changed, 97 insertions(+), 53 deletions(-) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 1ac8fe029048..b2957754d205 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::InsertRequests; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; use servers::error as server_error; use servers::error::AuthSnafu; use servers::opentsdb::codec::DataPoint; +use servers::opentsdb::data_point_to_grpc_row_insert_requests; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use snafu::prelude::*; @@ -27,23 +27,27 @@ use crate::instance::Instance; #[async_trait] impl OpentsdbProtocolHandler for Instance { - async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> { + async fn exec( + &self, + data_points: &[DataPoint], + ctx: QueryContextRef, + ) -> server_error::Result { self.plugins .get::() .as_ref() .check_permission(ctx.current_user(), PermissionReq::Opentsdb) .context(AuthSnafu)?; - let requests = InsertRequests { - inserts: vec![data_point.as_grpc_insert()], - }; - let _ = self - .handle_inserts(requests, ctx) + let (requests, no_of_rows) = data_point_to_grpc_row_insert_requests(data_points)?; + let output = self + .handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{data_point:?}"), - })?; - Ok(()) + .context(servers::error::ExecuteGrpcQuerySnafu)?; + + Ok(match output { + common_query::Output::AffectedRows(rows) => rows, + _ => no_of_rows, + }) } } diff --git a/src/servers/src/http/opentsdb.rs b/src/servers/src/http/opentsdb.rs index c5b90b42a438..ea5bb2b2183a 100644 --- a/src/servers/src/http/opentsdb.rs +++ b/src/servers/src/http/opentsdb.rs @@ -84,17 +84,19 @@ pub async fn put( let summary = params.contains_key("summary"); let details = params.contains_key("details"); - let data_points = parse_data_points(body).await?; + let data_point_requests = parse_data_points(body).await?; + let data_points = data_point_requests + .iter() + .map(|point| point.clone().into()) + .collect::>(); let response = if !summary && !details { - for data_point in data_points.into_iter() { - if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await { - // Not debugging purpose, failed fast. - return error::InternalSnafu { - err_msg: e.to_string(), - } - .fail(); + if let Err(e) = opentsdb_handler.exec(&data_points, ctx.clone()).await { + // Not debugging purpose, failed fast. + return error::InternalSnafu { + err_msg: e.to_string(), } + .fail(); } (HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty)) } else { @@ -108,14 +110,15 @@ pub async fn put( }, }; - for data_point in data_points.into_iter() { + for (idx, data_point) in data_points.into_iter().enumerate() { let result = opentsdb_handler - .exec(&data_point.clone().into(), ctx.clone()) + .exec(&[data_point.clone()], ctx.clone()) .await; match result { - Ok(()) => response.on_success(), + Ok(affected_rows) => response.on_success(affected_rows), Err(e) => { - response.on_failed(data_point, e); + let data_point_request = data_point_requests.get(idx).unwrap(); + response.on_failed(data_point_request.clone(), e); } } } @@ -151,8 +154,8 @@ pub struct OpentsdbDebuggingResponse { } impl OpentsdbDebuggingResponse { - fn on_success(&mut self) { - self.success += 1; + fn on_success(&mut self, affected_rows: usize) { + self.success += affected_rows as i32; } fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) { diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 61ed84167064..fb8a83f84d95 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -20,16 +20,20 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; +use api::v1::RowInsertRequests; use async_trait::async_trait; use common_runtime::Runtime; use common_telemetry::logging::error; use futures::StreamExt; use tokio::sync::broadcast; +use self::codec::DataPoint; use crate::error::Result; use crate::opentsdb::connection::Connection; use crate::opentsdb::handler::Handler; +use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}; use crate::query_handler::OpentsdbProtocolHandlerRef; +use crate::row_writer::{self, MultiTableData}; use crate::server::{AbortableStream, BaseTcpServer, Server}; use crate::shutdown::Shutdown; @@ -126,3 +130,37 @@ impl Server for OpentsdbServer { OPENTSDB_SERVER } } + +pub fn data_point_to_grpc_row_insert_requests( + data_points: &[DataPoint], +) -> Result<(RowInsertRequests, usize)> { + let mut multi_table_data = MultiTableData::new(); + + for data_point in data_points { + let table_name = data_point.metric(); + let tags = data_point.tags().clone(); + let value = data_point.value(); + let timestamp = data_point.ts_millis(); + let num_columns = tags.len() + 1; + + let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0); + let mut one_row = table_data.alloc_one_row(); + + //tags + row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?; + + // value + row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?; + // timestamp + row_writer::write_ts_millis( + table_data, + TIMESTAMP_COLUMN_NAME, + Some(timestamp), + &mut one_row, + )?; + + table_data.add_row(one_row); + } + + Ok(multi_table_data.into_row_insert_requests()) +} diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 163e060adece..3ae28cf82b12 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -19,7 +19,7 @@ use crate::error::{self, Result}; pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value"; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DataPoint { metric: String, ts_millis: i64, diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs index 4cbe1731fe11..6ddad7252bb1 100644 --- a/src/servers/src/opentsdb/handler.rs +++ b/src/servers/src/opentsdb/handler.rs @@ -94,7 +94,7 @@ impl Handler { match DataPoint::try_create(&line) { Ok(data_point) => { let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED); - let result = self.query_handler.exec(&data_point, ctx.clone()).await; + let result = self.query_handler.exec(&[data_point], ctx.clone()).await; if let Err(e) = result { self.connection.write_line(e.output_msg()).await?; } @@ -128,8 +128,8 @@ mod tests { #[async_trait] impl OpentsdbProtocolHandler for DummyQueryHandler { - async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { - let metric = data_point.metric(); + async fn exec(&self, data_points: &[DataPoint], _ctx: QueryContextRef) -> Result { + let metric = data_points.first().unwrap().metric(); if metric == "should_failed" { return error::InternalSnafu { err_msg: "expected", @@ -137,7 +137,7 @@ mod tests { .fail(); } self.tx.send(metric.to_string()).await.unwrap(); - Ok(()) + Ok(data_points.len()) } } diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index ef8f74575e7c..955be25426c2 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler { pub trait OpentsdbProtocolHandler { /// A successful request will not return a response. /// Only on error will the socket return a line of data. - async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>; + async fn exec(&self, data_points: &[DataPoint], ctx: QueryContextRef) -> Result; } pub struct PromStoreResponse { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index e77143d3b3a1..cb11eef388ba 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -51,7 +51,8 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl OpentsdbProtocolHandler for DummyInstance { - async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { + async fn exec(&self, data_points: &[DataPoint], _ctx: QueryContextRef) -> Result { + let data_point = data_points.first().unwrap(); if data_point.metric() == "should_failed" { return error::InternalSnafu { err_msg: "expected", @@ -59,7 +60,7 @@ impl OpentsdbProtocolHandler for DummyInstance { .fail(); } let _ = self.tx.send(data_point.metric().to_string()).await; - Ok(()) + Ok(data_points.len()) } } @@ -172,10 +173,7 @@ async fn test_opentsdb_put() { while let Ok(s) = rx.try_recv() { metrics.push(s); } - assert_eq!( - metrics, - vec!["m1".to_string(), "m2".to_string(), "m3".to_string()] - ); + assert_eq!(metrics, vec!["m1".to_string(), "m2".to_string()]); } #[tokio::test] diff --git a/src/servers/tests/opentsdb.rs b/src/servers/tests/opentsdb.rs index 145fdc07dbe6..8589e6637042 100644 --- a/src/servers/tests/opentsdb.rs +++ b/src/servers/tests/opentsdb.rs @@ -37,8 +37,8 @@ struct DummyOpentsdbInstance { #[async_trait] impl OpentsdbProtocolHandler for DummyOpentsdbInstance { - async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { - let metric = data_point.metric(); + async fn exec(&self, data_points: &[DataPoint], _ctx: QueryContextRef) -> Result { + let metric = data_points.first().unwrap().metric(); if metric == "should_failed" { return server_error::InternalSnafu { err_msg: "expected", @@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for DummyOpentsdbInstance { } let i = metric.parse::().unwrap(); let _ = self.tx.send(i * i).await; - Ok(()) + Ok(data_points.len()) } } diff --git a/tests-integration/src/opentsdb.rs b/tests-integration/src/opentsdb.rs index 5d6338d94270..a1930592de72 100644 --- a/tests-integration/src/opentsdb.rs +++ b/tests-integration/src/opentsdb.rs @@ -46,6 +46,8 @@ mod tests { async fn test_exec(instance: &Arc) { let ctx = QueryContext::arc(); + + // should create new table "my_metric_1" directly let data_point1 = DataPoint::new( "my_metric_1".to_string(), 1000, @@ -55,9 +57,8 @@ mod tests { ("tagk2".to_string(), "tagv2".to_string()), ], ); - // should create new table "my_metric_1" directly - instance.exec(&data_point1, ctx.clone()).await.unwrap(); + // should create new column "tagk3" directly let data_point2 = DataPoint::new( "my_metric_1".to_string(), 2000, @@ -67,12 +68,12 @@ mod tests { ("tagk3".to_string(), "tagv3".to_string()), ], ); - // should create new column "tagk3" directly - instance.exec(&data_point2, ctx.clone()).await.unwrap(); - let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); // should handle null tags properly - instance.exec(&data_point3, ctx.clone()).await.unwrap(); + let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); + + let data_points = vec![data_point1, data_point2, data_point3]; + instance.exec(&data_points, ctx.clone()).await.unwrap(); let output = instance .do_query( @@ -87,13 +88,13 @@ mod tests { let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let pretty_print = recordbatches.pretty_print().unwrap(); let expected = vec![ - "+---------------------+----------------+-------+-------+-------+", - "| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |", - "+---------------------+----------------+-------+-------+-------+", - "| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |", - "| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |", - "| 1970-01-01T00:00:03 | 3.0 | | | |", - "+---------------------+----------------+-------+-------+-------+", + "+-------+-------+----------------+---------------------+-------+", + "| tagk1 | tagk2 | greptime_value | greptime_timestamp | tagk3 |", + "+-------+-------+----------------+---------------------+-------+", + "| tagv1 | tagv2 | 1.0 | 1970-01-01T00:00:01 | |", + "| | tagv2 | 2.0 | 1970-01-01T00:00:02 | tagv3 |", + "| | | 3.0 | 1970-01-01T00:00:03 | |", + "+-------+-------+----------------+---------------------+-------+", ] .into_iter() .join("\n"); From 62a251a55501ec70ce1a1fd740f77abab3bdbdd5 Mon Sep 17 00:00:00 2001 From: Lilit0x Date: Thu, 19 Oct 2023 06:15:53 +0000 Subject: [PATCH 2/8] fix: added commnets for num of rows and failure if output is not of affecetd rows --- src/frontend/src/instance/opentsdb.rs | 4 ++-- src/servers/src/opentsdb.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index b2957754d205..21eda0ff1644 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -38,7 +38,7 @@ impl OpentsdbProtocolHandler for Instance { .check_permission(ctx.current_user(), PermissionReq::Opentsdb) .context(AuthSnafu)?; - let (requests, no_of_rows) = data_point_to_grpc_row_insert_requests(data_points)?; + let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?; let output = self .handle_row_inserts(requests, ctx) .await @@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for Instance { Ok(match output { common_query::Output::AffectedRows(rows) => rows, - _ => no_of_rows, + _ => unreachable!(), }) } } diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index fb8a83f84d95..aa1f195bc79b 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -141,9 +141,10 @@ pub fn data_point_to_grpc_row_insert_requests( let tags = data_point.tags().clone(); let value = data_point.value(); let timestamp = data_point.ts_millis(); + //length of tags + 1 extra column for greptime_timestamp let num_columns = tags.len() + 1; - let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0); + let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1); let mut one_row = table_data.alloc_one_row(); //tags From ab16756ef957f1d232160a62f90d86bab3ad1f6f Mon Sep 17 00:00:00 2001 From: Lilit0x Date: Thu, 19 Oct 2023 06:51:41 +0000 Subject: [PATCH 3/8] fix: added extra 1 to number of columns --- src/servers/src/opentsdb.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index aa1f195bc79b..141511bb17aa 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -141,13 +141,13 @@ pub fn data_point_to_grpc_row_insert_requests( let tags = data_point.tags().clone(); let value = data_point.value(); let timestamp = data_point.ts_millis(); - //length of tags + 1 extra column for greptime_timestamp - let num_columns = tags.len() + 1; + //length of tags + 2 extra columns for greptime_timestamp and the value + let num_columns = tags.len() + 2; let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1); let mut one_row = table_data.alloc_one_row(); - //tags + // tags row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?; // value From 2e250bc622b50f55053e0f490826859d11e50c28 Mon Sep 17 00:00:00 2001 From: Lilit0x Date: Thu, 19 Oct 2023 09:16:09 +0000 Subject: [PATCH 4/8] fix: avoided cloning datapoints, took ownership instead --- src/frontend/src/instance/opentsdb.rs | 2 +- src/servers/src/opentsdb.rs | 8 ++++---- src/servers/src/opentsdb/codec.rs | 4 ++++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 21eda0ff1644..aeccce445b73 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -38,7 +38,7 @@ impl OpentsdbProtocolHandler for Instance { .check_permission(ctx.current_user(), PermissionReq::Opentsdb) .context(AuthSnafu)?; - let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?; + let (requests, _) = data_point_to_grpc_row_insert_requests(data_points.to_vec())?; let output = self .handle_row_inserts(requests, ctx) .await diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 141511bb17aa..49f60e268ae1 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -132,16 +132,16 @@ impl Server for OpentsdbServer { } pub fn data_point_to_grpc_row_insert_requests( - data_points: &[DataPoint], + data_points: Vec, ) -> Result<(RowInsertRequests, usize)> { let mut multi_table_data = MultiTableData::new(); - for data_point in data_points { + for mut data_point in data_points { + let tags: Vec<(String, String)> = std::mem::take(&mut data_point.tags_mut()); let table_name = data_point.metric(); - let tags = data_point.tags().clone(); let value = data_point.value(); let timestamp = data_point.ts_millis(); - //length of tags + 2 extra columns for greptime_timestamp and the value + // length of tags + 2 extra columns for greptime_timestamp and the value let num_columns = tags.len() + 2; let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1); diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 3ae28cf82b12..55e160460554 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -115,6 +115,10 @@ impl DataPoint { &self.tags } + pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> { + &mut self.tags + } + pub fn ts_millis(&self) -> i64 { self.ts_millis } From 8a7d1136f274b0e9092ae5d6453296a941fba389 Mon Sep 17 00:00:00 2001 From: Lilit0x Date: Thu, 19 Oct 2023 09:19:55 +0000 Subject: [PATCH 5/8] fix: avoided cloning datapoints, took ownership instead --- src/servers/src/opentsdb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 49f60e268ae1..07cde1e14765 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -137,7 +137,7 @@ pub fn data_point_to_grpc_row_insert_requests( let mut multi_table_data = MultiTableData::new(); for mut data_point in data_points { - let tags: Vec<(String, String)> = std::mem::take(&mut data_point.tags_mut()); + let tags: Vec<(String, String)> = std::mem::take(data_point.tags_mut()); let table_name = data_point.metric(); let value = data_point.value(); let timestamp = data_point.ts_millis(); From 85270e0698baac36873f8377bc78399bfe2165c2 Mon Sep 17 00:00:00 2001 From: Lilit0x Date: Thu, 19 Oct 2023 10:09:56 +0000 Subject: [PATCH 6/8] fix: changed vecotr slice to vector --- src/frontend/src/instance/opentsdb.rs | 4 ++-- src/servers/src/http/opentsdb.rs | 4 ++-- src/servers/src/opentsdb/handler.rs | 4 ++-- src/servers/src/query_handler.rs | 2 +- src/servers/tests/http/opentsdb_test.rs | 2 +- src/servers/tests/opentsdb.rs | 2 +- tests-integration/src/opentsdb.rs | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index aeccce445b73..47bb940a1bb2 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -29,7 +29,7 @@ use crate::instance::Instance; impl OpentsdbProtocolHandler for Instance { async fn exec( &self, - data_points: &[DataPoint], + data_points: Vec, ctx: QueryContextRef, ) -> server_error::Result { self.plugins @@ -38,7 +38,7 @@ impl OpentsdbProtocolHandler for Instance { .check_permission(ctx.current_user(), PermissionReq::Opentsdb) .context(AuthSnafu)?; - let (requests, _) = data_point_to_grpc_row_insert_requests(data_points.to_vec())?; + let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?; let output = self .handle_row_inserts(requests, ctx) .await diff --git a/src/servers/src/http/opentsdb.rs b/src/servers/src/http/opentsdb.rs index ea5bb2b2183a..ed4fa66c38b6 100644 --- a/src/servers/src/http/opentsdb.rs +++ b/src/servers/src/http/opentsdb.rs @@ -91,7 +91,7 @@ pub async fn put( .collect::>(); let response = if !summary && !details { - if let Err(e) = opentsdb_handler.exec(&data_points, ctx.clone()).await { + if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await { // Not debugging purpose, failed fast. return error::InternalSnafu { err_msg: e.to_string(), @@ -112,7 +112,7 @@ pub async fn put( for (idx, data_point) in data_points.into_iter().enumerate() { let result = opentsdb_handler - .exec(&[data_point.clone()], ctx.clone()) + .exec(vec![data_point.clone()], ctx.clone()) .await; match result { Ok(affected_rows) => response.on_success(affected_rows), diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs index 6ddad7252bb1..c260ccf1749f 100644 --- a/src/servers/src/opentsdb/handler.rs +++ b/src/servers/src/opentsdb/handler.rs @@ -94,7 +94,7 @@ impl Handler { match DataPoint::try_create(&line) { Ok(data_point) => { let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED); - let result = self.query_handler.exec(&[data_point], ctx.clone()).await; + let result = self.query_handler.exec(vec![data_point], ctx.clone()).await; if let Err(e) = result { self.connection.write_line(e.output_msg()).await?; } @@ -128,7 +128,7 @@ mod tests { #[async_trait] impl OpentsdbProtocolHandler for DummyQueryHandler { - async fn exec(&self, data_points: &[DataPoint], _ctx: QueryContextRef) -> Result { + async fn exec(&self, data_points: Vec, _ctx: QueryContextRef) -> Result { let metric = data_points.first().unwrap().metric(); if metric == "should_failed" { return error::InternalSnafu { diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 955be25426c2..4180c8469c8b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler { pub trait OpentsdbProtocolHandler { /// A successful request will not return a response. /// Only on error will the socket return a line of data. - async fn exec(&self, data_points: &[DataPoint], ctx: QueryContextRef) -> Result; + async fn exec(&self, data_points: Vec, ctx: QueryContextRef) -> Result; } pub struct PromStoreResponse { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index cb11eef388ba..552ef6ecb940 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -51,7 +51,7 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl OpentsdbProtocolHandler for DummyInstance { - async fn exec(&self, data_points: &[DataPoint], _ctx: QueryContextRef) -> Result { + async fn exec(&self, data_points: Vec, _ctx: QueryContextRef) -> Result { let data_point = data_points.first().unwrap(); if data_point.metric() == "should_failed" { return error::InternalSnafu { diff --git a/src/servers/tests/opentsdb.rs b/src/servers/tests/opentsdb.rs index 8589e6637042..79ac2ba21939 100644 --- a/src/servers/tests/opentsdb.rs +++ b/src/servers/tests/opentsdb.rs @@ -37,7 +37,7 @@ struct DummyOpentsdbInstance { #[async_trait] impl OpentsdbProtocolHandler for DummyOpentsdbInstance { - async fn exec(&self, data_points: &[DataPoint], _ctx: QueryContextRef) -> Result { + async fn exec(&self, data_points: Vec, _ctx: QueryContextRef) -> Result { let metric = data_points.first().unwrap().metric(); if metric == "should_failed" { return server_error::InternalSnafu { diff --git a/tests-integration/src/opentsdb.rs b/tests-integration/src/opentsdb.rs index a1930592de72..c5474ea4e240 100644 --- a/tests-integration/src/opentsdb.rs +++ b/tests-integration/src/opentsdb.rs @@ -73,7 +73,7 @@ mod tests { let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); let data_points = vec![data_point1, data_point2, data_point3]; - instance.exec(&data_points, ctx.clone()).await.unwrap(); + instance.exec(data_points, ctx.clone()).await.unwrap(); let output = instance .do_query( From ba8032c8985e1d0b9c496a45ff4fafe83b77d337 Mon Sep 17 00:00:00 2001 From: Lilit0x Date: Thu, 19 Oct 2023 10:36:53 +0000 Subject: [PATCH 7/8] fix: remove clone --- src/servers/src/http/opentsdb.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/servers/src/http/opentsdb.rs b/src/servers/src/http/opentsdb.rs index ed4fa66c38b6..472c55b45466 100644 --- a/src/servers/src/http/opentsdb.rs +++ b/src/servers/src/http/opentsdb.rs @@ -111,9 +111,7 @@ pub async fn put( }; for (idx, data_point) in data_points.into_iter().enumerate() { - let result = opentsdb_handler - .exec(vec![data_point.clone()], ctx.clone()) - .await; + let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await; match result { Ok(affected_rows) => response.on_success(affected_rows), Err(e) => { From 7337bcfd37c78655086047f6a03f9db0d6953167 Mon Sep 17 00:00:00 2001 From: Lilit0x Date: Fri, 20 Oct 2023 05:15:48 +0000 Subject: [PATCH 8/8] fix: combined datapoints and requests with zip instead of enumerating --- src/servers/src/http/opentsdb.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/servers/src/http/opentsdb.rs b/src/servers/src/http/opentsdb.rs index 472c55b45466..054595252ad3 100644 --- a/src/servers/src/http/opentsdb.rs +++ b/src/servers/src/http/opentsdb.rs @@ -110,14 +110,11 @@ pub async fn put( }, }; - for (idx, data_point) in data_points.into_iter().enumerate() { + for (data_point, request) in data_points.into_iter().zip(data_point_requests) { let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await; match result { Ok(affected_rows) => response.on_success(affected_rows), - Err(e) => { - let data_point_request = data_point_requests.get(idx).unwrap(); - response.on_failed(data_point_request.clone(), e); - } + Err(e) => response.on_failed(request, e), } } (