Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: row protocol support for opentsdb #2623

Merged
merged 9 commits into from
Oct 20, 2023
26 changes: 15 additions & 11 deletions src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::InsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContextRef;
use snafu::prelude::*;
Expand All @@ -27,23 +27,27 @@ use crate::instance::Instance;

#[async_trait]
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> {
async fn exec(
&self,
data_points: Vec<DataPoint>,
ctx: QueryContextRef,
) -> server_error::Result<usize> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Opentsdb)
.context(AuthSnafu)?;

let requests = InsertRequests {
inserts: vec![data_point.as_grpc_insert()],
};
let _ = self
.handle_inserts(requests, ctx)
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
let output = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{data_point:?}"),
})?;
Ok(())
.context(servers::error::ExecuteGrpcQuerySnafu)?;

Ok(match output {
common_query::Output::AffectedRows(rows) => rows,
_ => unreachable!(),
})
}
}
34 changes: 16 additions & 18 deletions src/servers/src/http/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,19 @@ pub async fn put(
let summary = params.contains_key("summary");
let details = params.contains_key("details");

let data_points = parse_data_points(body).await?;
let data_point_requests = parse_data_points(body).await?;
let data_points = data_point_requests
Lilit0x marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.map(|point| point.clone().into())
.collect::<Vec<_>>();

let response = if !summary && !details {
for data_point in data_points.into_iter() {
if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
}
(HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
} else {
Expand All @@ -108,15 +110,11 @@ pub async fn put(
},
};

for data_point in data_points.into_iter() {
let result = opentsdb_handler
.exec(&data_point.clone().into(), ctx.clone())
.await;
for (data_point, request) in data_points.into_iter().zip(data_point_requests) {
let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await;
match result {
Ok(()) => response.on_success(),
Err(e) => {
response.on_failed(data_point, e);
}
Ok(affected_rows) => response.on_success(affected_rows),
Err(e) => response.on_failed(request, e),
}
}
(
Expand Down Expand Up @@ -151,8 +149,8 @@ pub struct OpentsdbDebuggingResponse {
}

impl OpentsdbDebuggingResponse {
fn on_success(&mut self) {
self.success += 1;
fn on_success(&mut self, affected_rows: usize) {
self.success += affected_rows as i32;
}

fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) {
Expand Down
39 changes: 39 additions & 0 deletions src/servers/src/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;

use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::error;
use futures::StreamExt;
use tokio::sync::broadcast;

use self::codec::DataPoint;
use crate::error::Result;
use crate::opentsdb::connection::Connection;
use crate::opentsdb::handler::Handler;
use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::row_writer::{self, MultiTableData};
use crate::server::{AbortableStream, BaseTcpServer, Server};
use crate::shutdown::Shutdown;

Expand Down Expand Up @@ -126,3 +130,38 @@ impl Server for OpentsdbServer {
OPENTSDB_SERVER
}
}

pub fn data_point_to_grpc_row_insert_requests(
data_points: Vec<DataPoint>,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_data = MultiTableData::new();

for mut data_point in data_points {
let tags: Vec<(String, String)> = std::mem::take(data_point.tags_mut());
let table_name = data_point.metric();
let value = data_point.value();
let timestamp = data_point.ts_millis();
// length of tags + 2 extra columns for greptime_timestamp and the value
let num_columns = tags.len() + 2;

let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1);
let mut one_row = table_data.alloc_one_row();

// tags
row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?;

// value
row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?;
// timestamp
row_writer::write_ts_millis(
table_data,
TIMESTAMP_COLUMN_NAME,
Some(timestamp),
&mut one_row,
)?;

table_data.add_row(one_row);
}

Ok(multi_table_data.into_row_insert_requests())
}
6 changes: 5 additions & 1 deletion src/servers/src/opentsdb/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::error::{self, Result};
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value";

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DataPoint {
metric: String,
ts_millis: i64,
Expand Down Expand Up @@ -115,6 +115,10 @@ impl DataPoint {
&self.tags
}

pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.tags
}

pub fn ts_millis(&self) -> i64 {
self.ts_millis
}
Expand Down
8 changes: 4 additions & 4 deletions src/servers/src/opentsdb/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
match DataPoint::try_create(&line) {
Ok(data_point) => {
let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED);
let result = self.query_handler.exec(&data_point, ctx.clone()).await;
let result = self.query_handler.exec(vec![data_point], ctx.clone()).await;
if let Err(e) = result {
self.connection.write_line(e.output_msg()).await?;
}
Expand Down Expand Up @@ -128,16 +128,16 @@ mod tests {

#[async_trait]
impl OpentsdbProtocolHandler for DummyQueryHandler {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
self.tx.send(metric.to_string()).await.unwrap();
Ok(())
Ok(data_points.len())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler {
pub trait OpentsdbProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>;
async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
}

pub struct PromStoreResponse {
Expand Down
10 changes: 4 additions & 6 deletions src/servers/tests/http/opentsdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ impl GrpcQueryHandler for DummyInstance {

#[async_trait]
impl OpentsdbProtocolHandler for DummyInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let data_point = data_points.first().unwrap();
if data_point.metric() == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
let _ = self.tx.send(data_point.metric().to_string()).await;
Ok(())
Ok(data_points.len())
}
}

Expand Down Expand Up @@ -172,10 +173,7 @@ async fn test_opentsdb_put() {
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(
metrics,
vec!["m1".to_string(), "m2".to_string(), "m3".to_string()]
);
assert_eq!(metrics, vec!["m1".to_string(), "m2".to_string()]);
}

#[tokio::test]
Expand Down
6 changes: 3 additions & 3 deletions src/servers/tests/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ struct DummyOpentsdbInstance {

#[async_trait]
impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return server_error::InternalSnafu {
err_msg: "expected",
Expand All @@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
}
let i = metric.parse::<i32>().unwrap();
let _ = self.tx.send(i * i).await;
Ok(())
Ok(data_points.len())
}
}

Expand Down
27 changes: 14 additions & 13 deletions tests-integration/src/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ mod tests {

async fn test_exec(instance: &Arc<Instance>) {
let ctx = QueryContext::arc();

// should create new table "my_metric_1" directly
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
Expand All @@ -55,9 +57,8 @@ mod tests {
("tagk2".to_string(), "tagv2".to_string()),
],
);
// should create new table "my_metric_1" directly
instance.exec(&data_point1, ctx.clone()).await.unwrap();

// should create new column "tagk3" directly
let data_point2 = DataPoint::new(
"my_metric_1".to_string(),
2000,
Expand All @@ -67,12 +68,12 @@ mod tests {
("tagk3".to_string(), "tagv3".to_string()),
],
);
// should create new column "tagk3" directly
instance.exec(&data_point2, ctx.clone()).await.unwrap();

let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
instance.exec(&data_point3, ctx.clone()).await.unwrap();
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);

let data_points = vec![data_point1, data_point2, data_point3];
instance.exec(data_points, ctx.clone()).await.unwrap();

let output = instance
.do_query(
Expand All @@ -87,13 +88,13 @@ mod tests {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print().unwrap();
let expected = vec![
"+---------------------+----------------+-------+-------+-------+",
"| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |",
"+---------------------+----------------+-------+-------+-------+",
"| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |",
"| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |",
"| 1970-01-01T00:00:03 | 3.0 | | | |",
"+---------------------+----------------+-------+-------+-------+",
"+-------+-------+----------------+---------------------+-------+",
"| tagk1 | tagk2 | greptime_value | greptime_timestamp | tagk3 |",
"+-------+-------+----------------+---------------------+-------+",
"| tagv1 | tagv2 | 1.0 | 1970-01-01T00:00:01 | |",
"| | tagv2 | 2.0 | 1970-01-01T00:00:02 | tagv3 |",
"| | | 3.0 | 1970-01-01T00:00:03 | |",
"+-------+-------+----------------+---------------------+-------+",
]
.into_iter()
.join("\n");
Expand Down
Loading