From 1bc4f25de2a64bf6a600a9246e11d27a77cf5016 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 25 Jan 2024 14:44:28 +0800 Subject: [PATCH] feat: http sql api return schema on empty resultset (#3237) * feat: return schema on empty resultset * refactor: make schema a required field in http output * test: update integration test and add schema output --- src/catalog/Cargo.toml | 2 +- src/servers/src/http.rs | 100 +++++++++++++++++++------------- src/servers/src/http/handler.rs | 19 +++--- tests-integration/tests/http.rs | 2 +- 4 files changed, 74 insertions(+), 49 deletions(-) diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 656cc9fd1ee2..16e407a6995c 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -33,7 +33,7 @@ futures-util.workspace = true itertools.workspace = true lazy_static.workspace = true meta-client.workspace = true -moka = { workspace = true, features = ["future"] } +moka = { workspace = true, features = ["future", "sync"] } parking_lot = "0.12" partition.workspace = true paste = "1.0" diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index d9371cab992d..8a58d0454859 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,6 +36,7 @@ use common_telemetry::logging::{error, info}; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; +use datatypes::schema::SchemaRef; use futures::FutureExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -150,21 +151,36 @@ impl ColumnSchema { } #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)] -pub struct Schema { +pub struct OutputSchema { column_schemas: Vec, } -impl Schema { - pub fn new(columns: Vec) -> Schema { - Schema { +impl OutputSchema { + pub fn new(columns: Vec) -> OutputSchema { + OutputSchema { column_schemas: columns, } } } +impl From for OutputSchema { + fn from(schema: SchemaRef) -> OutputSchema { + OutputSchema { + column_schemas: schema + .column_schemas() + .iter() + .map(|cs| ColumnSchema { + name: cs.name.clone(), + data_type: cs.data_type.name(), + }) + .collect(), + } + } +} + #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)] pub struct HttpRecordsOutput { - schema: Option, + schema: OutputSchema, rows: Vec>, } @@ -174,14 +190,11 @@ impl HttpRecordsOutput { } pub fn num_cols(&self) -> usize { - self.schema - .as_ref() - .map(|x| x.column_schemas.len()) - .unwrap_or(0) + self.schema.column_schemas.len() } - pub fn schema(&self) -> Option<&Schema> { - self.schema.as_ref() + pub fn schema(&self) -> &OutputSchema { + &self.schema } pub fn rows(&self) -> &Vec> { @@ -189,33 +202,17 @@ impl HttpRecordsOutput { } } -impl TryFrom> for HttpRecordsOutput { - type Error = Error; - - // TODO(sunng87): use schema from recordstreams when #366 fixed - fn try_from( +impl HttpRecordsOutput { + pub(crate) fn try_new( + schema: SchemaRef, recordbatches: Vec, - ) -> std::result::Result { + ) -> std::result::Result { if recordbatches.is_empty() { Ok(HttpRecordsOutput { - schema: None, + schema: OutputSchema::from(schema), rows: vec![], }) } else { - // safety ensured by previous empty check - let first = &recordbatches[0]; - let schema = Schema { - column_schemas: first - .schema - .column_schemas() - .iter() - .map(|cs| ColumnSchema { - name: cs.name.clone(), - data_type: cs.data_type.name(), - }) - .collect(), - }; - let mut rows = Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::()); @@ -232,7 +229,7 @@ impl TryFrom> for HttpRecordsOutput { } Ok(HttpRecordsOutput { - schema: Some(schema), + schema: OutputSchema::from(schema), rows, }) } @@ -937,6 +934,33 @@ mod test { assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT); } + #[tokio::test] + async fn test_schema_for_empty_response() { + let column_schemas = vec![ + ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false), + ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true), + ]; + let schema = Arc::new(Schema::new(column_schemas)); + + let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap(); + let outputs = vec![Ok(Output::RecordBatches(recordbatches))]; + + let json_resp = GreptimedbV1Response::from_output(outputs).await; + if let HttpResponse::GreptimedbV1(json_resp) = json_resp { + let json_output = &json_resp.output[0]; + if let GreptimeQueryOutput::Records(r) = json_output { + assert_eq!(r.num_rows(), 0); + assert_eq!(r.num_cols(), 2); + assert_eq!(r.schema.column_schemas[0].name, "numbers"); + assert_eq!(r.schema.column_schemas[0].data_type, "UInt32"); + } else { + panic!("invalid output type"); + } + } else { + panic!("invalid format") + } + } + #[tokio::test] async fn test_recordbatches_conversion() { let column_schemas = vec![ @@ -977,9 +1001,8 @@ mod test { if let GreptimeQueryOutput::Records(r) = json_output { assert_eq!(r.num_rows(), 4); assert_eq!(r.num_cols(), 2); - let schema = r.schema.as_ref().unwrap(); - assert_eq!(schema.column_schemas[0].name, "numbers"); - assert_eq!(schema.column_schemas[0].data_type, "UInt32"); + assert_eq!(r.schema.column_schemas[0].name, "numbers"); + assert_eq!(r.schema.column_schemas[0].data_type, "UInt32"); assert_eq!(r.rows[0][0], serde_json::Value::from(1)); assert_eq!(r.rows[0][1], serde_json::Value::Null); } else { @@ -1002,9 +1025,8 @@ mod test { if let GreptimeQueryOutput::Records(r) = output { assert_eq!(r.num_rows(), 4); assert_eq!(r.num_cols(), 2); - let schema = r.schema.as_ref().unwrap(); - assert_eq!(schema.column_schemas[0].name, "numbers"); - assert_eq!(schema.column_schemas[0].data_type, "UInt32"); + assert_eq!(r.schema.column_schemas[0].name, "numbers"); + assert_eq!(r.schema.column_schemas[0].data_type, "UInt32"); assert_eq!(r.rows[0][0], serde_json::Value::from(1)); assert_eq!(r.rows[0][1], serde_json::Value::Null); } else { diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 88b32427557d..a1f5baab4918 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -135,9 +135,10 @@ pub async fn from_output( results.push(GreptimeQueryOutput::AffectedRows(rows)); } Ok(Output::Stream(stream)) => { + let schema = stream.schema().clone(); // TODO(sunng87): streaming response match util::collect(stream).await { - Ok(rows) => match HttpRecordsOutput::try_from(rows) { + Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) { Ok(rows) => { results.push(GreptimeQueryOutput::Records(rows)); } @@ -150,14 +151,16 @@ pub async fn from_output( } } } - Ok(Output::RecordBatches(rbs)) => match HttpRecordsOutput::try_from(rbs.take()) { - Ok(rows) => { - results.push(GreptimeQueryOutput::Records(rows)); - } - Err(err) => { - return Err(ErrorResponse::from_error(ty, err)); + Ok(Output::RecordBatches(rbs)) => { + match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) { + Ok(rows) => { + results.push(GreptimeQueryOutput::Records(rows)); + } + Err(err) => { + return Err(ErrorResponse::from_error(ty, err)); + } } - }, + } Err(err) => { return Err(ErrorResponse::from_error(ty, err)); } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index da41dd300e82..cb48533823ef 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -242,7 +242,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( outputs[1], serde_json::from_value::(json!({ - "records":{"rows":[]} + "records":{"rows":[], "schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]}} })) .unwrap() );