Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Able to pretty print sql query result in http output #3539

Merged
merged 5 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tower_http::decompression::RequestDecompressionLayer;
use tower_http::trace::TraceLayer;

use self::authorize::AuthState;
use self::table_result::TableResponse;
use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu};
use crate::http::arrow_result::ArrowResponse;
Expand Down Expand Up @@ -90,6 +91,7 @@ mod dashboard;
pub mod error_result;
pub mod greptime_result_v1;
pub mod influxdb_result_v1;
pub mod table_result;

pub const HTTP_API_VERSION: &str = "v1";
pub const HTTP_API_PREFIX: &str = "/v1/";
Expand Down Expand Up @@ -254,6 +256,7 @@ pub enum GreptimeQueryOutput {
pub enum ResponseFormat {
Arrow,
Csv,
Table,
#[default]
GreptimedbV1,
InfluxdbV1,
Expand All @@ -264,6 +267,7 @@ impl ResponseFormat {
match s {
"arrow" => Some(ResponseFormat::Arrow),
"csv" => Some(ResponseFormat::Csv),
"table" => Some(ResponseFormat::Table),
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
_ => None,
Expand All @@ -274,6 +278,7 @@ impl ResponseFormat {
match self {
ResponseFormat::Arrow => "arrow",
ResponseFormat::Csv => "csv",
ResponseFormat::Table => "table",
ResponseFormat::GreptimedbV1 => "greptimedb_v1",
ResponseFormat::InfluxdbV1 => "influxdb_v1",
}
Expand Down Expand Up @@ -328,6 +333,7 @@ impl Display for Epoch {
pub enum HttpResponse {
Arrow(ArrowResponse),
Csv(CsvResponse),
Table(TableResponse),
Error(ErrorResponse),
GreptimedbV1(GreptimedbV1Response),
InfluxdbV1(InfluxdbV1Response),
Expand All @@ -338,6 +344,7 @@ impl HttpResponse {
match self {
HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
Expand All @@ -350,6 +357,7 @@ impl IntoResponse for HttpResponse {
match self {
HttpResponse::Arrow(resp) => resp.into_response(),
HttpResponse::Csv(resp) => resp.into_response(),
HttpResponse::Table(resp) => resp.into_response(),
HttpResponse::GreptimedbV1(resp) => resp.into_response(),
HttpResponse::InfluxdbV1(resp) => resp.into_response(),
HttpResponse::Error(resp) => resp.into_response(),
Expand All @@ -373,6 +381,12 @@ impl From<CsvResponse> for HttpResponse {
}
}

impl From<TableResponse> for HttpResponse {
fn from(value: TableResponse) -> Self {
HttpResponse::Table(value)
}
}

impl From<ErrorResponse> for HttpResponse {
fn from(value: ErrorResponse) -> Self {
HttpResponse::Error(value)
Expand Down Expand Up @@ -971,6 +985,7 @@ mod test {
ResponseFormat::GreptimedbV1,
ResponseFormat::InfluxdbV1,
ResponseFormat::Csv,
ResponseFormat::Table,
ResponseFormat::Arrow,
] {
let recordbatches =
Expand All @@ -979,6 +994,7 @@ mod test {
let json_resp = match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
};
Expand Down Expand Up @@ -1021,6 +1037,21 @@ mod test {
panic!("invalid output type");
}
}

HttpResponse::Table(resp) => {
let output = &resp.output()[0];
if let GreptimeQueryOutput::Records(r) = output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
assert_eq!(r.schema.column_schemas[0].name, "numbers");
assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
assert_eq!(r.rows[0][1], serde_json::Value::Null);
} else {
panic!("invalid output type");
}
}

HttpResponse::Arrow(resp) => {
let output = resp.data;
let mut reader =
Expand Down
2 changes: 2 additions & 0 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::table_result::TableResponse;
use crate::http::{
ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
HttpResponse, ResponseFormat,
Expand Down Expand Up @@ -119,6 +120,7 @@ pub async fn sql(
let resp = match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
};
Expand Down
176 changes: 176 additions & 0 deletions src/servers/src/http/table_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::max;
use std::fmt::{Display, Write};

use axum::http::{header, HeaderValue};
use axum::response::{IntoResponse, Response};
use common_error::status_code::StatusCode;
use common_query::Output;
use itertools::Itertools;
use mime_guess::mime;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct TableResponse {
output: Vec<GreptimeQueryOutput>,
execution_time_ms: u64,
}

impl TableResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::Csv, outputs).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo csv.

But we don't need it at all.

I'm working on a patch to clean up all these unnecessary extra format param.

Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Table,
StatusCode::InvalidArguments,
"Multi-statements are not allowed".to_string(),
))
} else {
HttpResponse::Table(TableResponse {
output,
execution_time_ms: 0,
})
}
}
}
}

pub fn output(&self) -> &[GreptimeQueryOutput] {
&self.output
}

pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
}

pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}
YCCDSZXH marked this conversation as resolved.
Show resolved Hide resolved
}

impl Display for TableResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let payload = match self.output.first() {
None => String::default(),
Some(GreptimeQueryOutput::AffectedRows(n)) => {
format!("{n}\n")
}
Some(GreptimeQueryOutput::Records(records)) => {
let mut max_width = vec![0; records.num_cols()];
let mut result = String::new();
// Determine maximum width for each column
for (i, column) in records.schema.column_schemas.iter().enumerate() {
max_width[i] = max(max_width[i], column.name.len());
}
for row in &records.rows {
for (i, v) in row.iter().enumerate() {
let s = v.to_string();
max_width[i] = max(max_width[i], s.len());
}
}

// Construct the header
let head: String = records
.schema
.column_schemas
.iter()
.enumerate()
.map(|(i, column)| format!("─{:─<1$}─", column.name, max_width[i]))
.join("┬");
writeln!(result, "┌{}┐", head).unwrap();

// Construct rows
for row in &records.rows {
let row = row
.iter()
.enumerate()
.map(|(i, v)| {
let s = v.to_string();
format!(" {:1$} ", s, max_width[i])
})
.join("│");
writeln!(result, "│{row}│").unwrap();
}

// Construct the footer
let footer: String = max_width.iter().map(|v| "─".repeat(*v + 2)).join("┴");
writeln!(result, "└{}┘", footer).unwrap();
result
}
};
write!(f, "{}", payload)
}
}

impl IntoResponse for TableResponse {
fn into_response(self) -> Response {
debug_assert!(
self.output.len() <= 1,
"self.output has extra elements: {}",
self.output.len()
);

let execution_time = self.execution_time_ms;

let mut resp = (
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::PLAIN.as_ref()),
)],
self.to_string(),
)
.into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("TABLE"),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
);
resp
}
}
#[cfg(test)]
mod test {

use super::TableResponse;

#[tokio::test]
async fn test_table_format() {
let data = r#"{"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"ts","data_type":"TimestampMillisecond"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"}]},"rows":[["127.0.0.1",1702433141000,0.5,0.2],["127.0.0.1",1702433146000,0.3,0.2],["127.0.0.1",1702433151000,0.4,0.3],["127.0.0.2",1702433141000,0.3,0.1],["127.0.0.2",1702433146000,0.2,0.4],["127.0.0.2",1702433151000,0.2,0.4]]}}],"execution_time_ms":13}"#;
let table_response: TableResponse = serde_json::from_str(data).unwrap();
let payload = table_response.to_string();
let expected_payload = r#"┌─host────────┬─ts────────────┬─cpu─┬─memory─┐
│ "127.0.0.1" │ 1702433141000 │ 0.5 │ 0.2 │
│ "127.0.0.1" │ 1702433146000 │ 0.3 │ 0.2 │
│ "127.0.0.1" │ 1702433151000 │ 0.4 │ 0.3 │
│ "127.0.0.2" │ 1702433141000 │ 0.3 │ 0.1 │
│ "127.0.0.2" │ 1702433146000 │ 0.2 │ 0.4 │
│ "127.0.0.2" │ 1702433151000 │ 0.2 │ 0.4 │
└─────────────┴───────────────┴─────┴────────┘
"#;
assert_eq!(payload, expected_payload);
}
}