Skip to content

Commit

Permalink
feat: http sql api return schema on empty resultset (#3237)
Browse files Browse the repository at this point in the history
* feat: return schema on empty resultset

* refactor: make schema a required field in http output

* test: update integration test and add schema output
  • Loading branch information
sunng87 authored Jan 25, 2024
1 parent 814924f commit 1bc4f25
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
moka = { workspace = true, features = ["future"] }
moka = { workspace = true, features = ["future", "sync"] }
parking_lot = "0.12"
partition.workspace = true
paste = "1.0"
Expand Down
100 changes: 61 additions & 39 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use common_telemetry::logging::{error, info};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::schema::SchemaRef;
use futures::FutureExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -150,21 +151,36 @@ impl ColumnSchema {
}

#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)]
pub struct Schema {
pub struct OutputSchema {
column_schemas: Vec<ColumnSchema>,
}

impl Schema {
pub fn new(columns: Vec<ColumnSchema>) -> Schema {
Schema {
impl OutputSchema {
pub fn new(columns: Vec<ColumnSchema>) -> OutputSchema {
OutputSchema {
column_schemas: columns,
}
}
}

impl From<SchemaRef> for OutputSchema {
fn from(schema: SchemaRef) -> OutputSchema {
OutputSchema {
column_schemas: schema
.column_schemas()
.iter()
.map(|cs| ColumnSchema {
name: cs.name.clone(),
data_type: cs.data_type.name(),
})
.collect(),
}
}
}

#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq)]
pub struct HttpRecordsOutput {
schema: Option<Schema>,
schema: OutputSchema,
rows: Vec<Vec<Value>>,
}

Expand All @@ -174,48 +190,29 @@ impl HttpRecordsOutput {
}

pub fn num_cols(&self) -> usize {
self.schema
.as_ref()
.map(|x| x.column_schemas.len())
.unwrap_or(0)
self.schema.column_schemas.len()
}

pub fn schema(&self) -> Option<&Schema> {
self.schema.as_ref()
pub fn schema(&self) -> &OutputSchema {
&self.schema
}

pub fn rows(&self) -> &Vec<Vec<Value>> {
&self.rows
}
}

impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
type Error = Error;

// TODO(sunng87): use schema from recordstreams when #366 fixed
fn try_from(
impl HttpRecordsOutput {
pub(crate) fn try_new(
schema: SchemaRef,
recordbatches: Vec<RecordBatch>,
) -> std::result::Result<HttpRecordsOutput, Self::Error> {
) -> std::result::Result<HttpRecordsOutput, Error> {
if recordbatches.is_empty() {
Ok(HttpRecordsOutput {
schema: None,
schema: OutputSchema::from(schema),
rows: vec![],
})
} else {
// safety ensured by previous empty check
let first = &recordbatches[0];
let schema = Schema {
column_schemas: first
.schema
.column_schemas()
.iter()
.map(|cs| ColumnSchema {
name: cs.name.clone(),
data_type: cs.data_type.name(),
})
.collect(),
};

let mut rows =
Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());

Expand All @@ -232,7 +229,7 @@ impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
}

Ok(HttpRecordsOutput {
schema: Some(schema),
schema: OutputSchema::from(schema),
rows,
})
}
Expand Down Expand Up @@ -937,6 +934,33 @@ mod test {
assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
}

#[tokio::test]
async fn test_schema_for_empty_response() {
let column_schemas = vec![
ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas));

let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap();
let outputs = vec![Ok(Output::RecordBatches(recordbatches))];

let json_resp = GreptimedbV1Response::from_output(outputs).await;
if let HttpResponse::GreptimedbV1(json_resp) = json_resp {
let json_output = &json_resp.output[0];
if let GreptimeQueryOutput::Records(r) = json_output {
assert_eq!(r.num_rows(), 0);
assert_eq!(r.num_cols(), 2);
assert_eq!(r.schema.column_schemas[0].name, "numbers");
assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
} else {
panic!("invalid output type");
}
} else {
panic!("invalid format")
}
}

#[tokio::test]
async fn test_recordbatches_conversion() {
let column_schemas = vec![
Expand Down Expand Up @@ -977,9 +1001,8 @@ mod test {
if let GreptimeQueryOutput::Records(r) = json_output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
let schema = r.schema.as_ref().unwrap();
assert_eq!(schema.column_schemas[0].name, "numbers");
assert_eq!(schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.schema.column_schemas[0].name, "numbers");
assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
assert_eq!(r.rows[0][1], serde_json::Value::Null);
} else {
Expand All @@ -1002,9 +1025,8 @@ mod test {
if let GreptimeQueryOutput::Records(r) = output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
let schema = r.schema.as_ref().unwrap();
assert_eq!(schema.column_schemas[0].name, "numbers");
assert_eq!(schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.schema.column_schemas[0].name, "numbers");
assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
assert_eq!(r.rows[0][1], serde_json::Value::Null);
} else {
Expand Down
19 changes: 11 additions & 8 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ pub async fn from_output(
results.push(GreptimeQueryOutput::AffectedRows(rows));
}
Ok(Output::Stream(stream)) => {
let schema = stream.schema().clone();
// TODO(sunng87): streaming response
match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
Ok(rows) => {
results.push(GreptimeQueryOutput::Records(rows));
}
Expand All @@ -150,14 +151,16 @@ pub async fn from_output(
}
}
}
Ok(Output::RecordBatches(rbs)) => match HttpRecordsOutput::try_from(rbs.take()) {
Ok(rows) => {
results.push(GreptimeQueryOutput::Records(rows));
}
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
Ok(Output::RecordBatches(rbs)) => {
match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
Ok(rows) => {
results.push(GreptimeQueryOutput::Records(rows));
}
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
}
}
},
}
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
}
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!(
outputs[1],
serde_json::from_value::<GreptimeQueryOutput>(json!({
"records":{"rows":[]}
"records":{"rows":[], "schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]}}
}))
.unwrap()
);
Expand Down

0 comments on commit 1bc4f25

Please sign in to comment.