Skip to content

Commit

Permalink
feat: sql with influxdb v1 result format (#2917)
Browse files Browse the repository at this point in the history
* feat: sql with influxdb v1 result format

* chore: add unit tests

* feat: minor refactor

* chore: by comment

* chore; u128 to u64 since serde can't deser u128 in enum

* chore: by comment

* chore: apply suggestion

* chore: revert suggestion

* chore: try again

---------

Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
fengjiachun and killme2008 authored Dec 13, 2023
1 parent d3da128 commit 99dda93
Show file tree
Hide file tree
Showing 6 changed files with 740 additions and 122 deletions.
225 changes: 189 additions & 36 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ pub mod script;

#[cfg(feature = "dashboard")]
mod dashboard;
pub mod influxdb_result_v1;

use std::fmt::Display;
use std::net::SocketAddr;
use std::time::{Duration, Instant};

Expand All @@ -47,8 +49,9 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_telemetry::logging::{self, info};
use common_telemetry::{debug, error};
use common_telemetry::logging::{debug, error, info};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use futures::FutureExt;
use schemars::JsonSchema;
Expand All @@ -66,6 +69,7 @@ use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu};
use crate::http::authorize::HttpAuth;
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::prometheus::{
format_query, instant_query, label_values_query, labels_query, range_query, series_query,
};
Expand Down Expand Up @@ -243,17 +247,17 @@ pub enum JsonOutput {
}

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct JsonResponse {
pub struct GreptimedbV1Response {
code: u32,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
output: Vec<JsonOutput>,
#[serde(skip_serializing_if = "Option::is_none")]
output: Option<Vec<JsonOutput>>,
#[serde(skip_serializing_if = "Option::is_none")]
execution_time_ms: Option<u128>,
execution_time_ms: Option<u64>,
}

impl JsonResponse {
impl GreptimedbV1Response {
pub fn with_error(error: impl ErrorExt) -> Self {
let code = error.status_code();
if code.should_log_error() {
Expand All @@ -262,35 +266,34 @@ impl JsonResponse {
debug!("Failed to handle HTTP request, err: {:?}", error);
}

JsonResponse {
GreptimedbV1Response {
error: Some(error.output_msg()),
code: code as u32,
output: None,
output: vec![],
execution_time_ms: None,
}
}

fn with_error_message(err_msg: String, error_code: StatusCode) -> Self {
JsonResponse {
GreptimedbV1Response {
error: Some(err_msg),
code: error_code as u32,
output: None,
output: vec![],
execution_time_ms: None,
}
}

fn with_output(output: Option<Vec<JsonOutput>>) -> Self {
JsonResponse {
fn with_output(output: Vec<JsonOutput>) -> Self {
GreptimedbV1Response {
error: None,
code: StatusCode::Success as u32,
output,
execution_time_ms: None,
}
}

fn with_execution_time(mut self, execution_time: u128) -> Self {
fn with_execution_time(&mut self, execution_time: u64) {
self.execution_time_ms = Some(execution_time);
self
}

/// Create a json response from query result
Expand Down Expand Up @@ -333,7 +336,7 @@ impl JsonResponse {
}
}
}
Self::with_output(Some(results))
Self::with_output(results)
}

pub fn code(&self) -> u32 {
Expand All @@ -348,15 +351,142 @@ impl JsonResponse {
self.error.as_ref()
}

pub fn output(&self) -> Option<&[JsonOutput]> {
self.output.as_deref()
pub fn output(&self) -> &[JsonOutput] {
&self.output
}

pub fn execution_time_ms(&self) -> Option<u128> {
pub fn execution_time_ms(&self) -> Option<u64> {
self.execution_time_ms
}
}

/// It allows the results of SQL queries to be presented in different formats.
/// Currently, `greptimedb_v1` and `influxdb_v1` are supported.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseFormat {
GreptimedbV1,
InfluxdbV1,
}

impl ResponseFormat {
pub fn parse(s: &str) -> Option<Self> {
match s {
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
_ => None,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Epoch {
Nanosecond,
Microsecond,
Millisecond,
Second,
}

impl Epoch {
pub fn parse(s: &str) -> Option<Epoch> {
// Both u and µ indicate microseconds.
// epoch = [ns,u,µ,ms,s],
// For details, see the Influxdb documents.
// https://docs.influxdata.com/influxdb/v1/tools/api/#query-string-parameters-1
match s {
"ns" => Some(Epoch::Nanosecond),
"u" | "µ" => Some(Epoch::Microsecond),
"ms" => Some(Epoch::Millisecond),
"s" => Some(Epoch::Second),
_ => None, // just returns None for other cases
}
}

pub fn convert_timestamp(&self, ts: Timestamp) -> Option<Timestamp> {
match self {
Epoch::Nanosecond => ts.convert_to(TimeUnit::Nanosecond),
Epoch::Microsecond => ts.convert_to(TimeUnit::Microsecond),
Epoch::Millisecond => ts.convert_to(TimeUnit::Millisecond),
Epoch::Second => ts.convert_to(TimeUnit::Second),
}
}
}

impl Display for Epoch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Epoch::Nanosecond => write!(f, "Epoch::Nanosecond"),
Epoch::Microsecond => write!(f, "Epoch::Microsecond"),
Epoch::Millisecond => write!(f, "Epoch::Millisecond"),
Epoch::Second => write!(f, "Epoch::Second"),
}
}
}

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
#[serde(tag = "type")]
pub enum JsonResponse {
GreptimedbV1(GreptimedbV1Response),
InfluxdbV1(InfluxdbV1Response),
}

impl From<GreptimedbV1Response> for JsonResponse {
fn from(value: GreptimedbV1Response) -> Self {
JsonResponse::GreptimedbV1(value)
}
}

impl From<InfluxdbV1Response> for JsonResponse {
fn from(value: InfluxdbV1Response) -> Self {
JsonResponse::InfluxdbV1(value)
}
}

impl JsonResponse {
pub fn with_error(error: impl ErrorExt, response_format: ResponseFormat) -> Self {
match response_format {
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::with_error(error).into(),
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::with_error(error).into(),
}
}

pub fn with_error_message(
err_msg: String,
error_code: StatusCode,
response_format: ResponseFormat,
) -> Self {
match response_format {
ResponseFormat::GreptimedbV1 => {
GreptimedbV1Response::with_error_message(err_msg, error_code).into()
}
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::with_error_message(err_msg).into(),
}
}
pub async fn from_output(
outputs: Vec<Result<Output>>,
response_format: ResponseFormat,
epoch: Option<Epoch>,
) -> Self {
match response_format {
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await.into(),
ResponseFormat::InfluxdbV1 => {
InfluxdbV1Response::from_output(outputs, epoch).await.into()
}
}
}

fn with_execution_time(mut self, execution_time: u128) -> Self {
match &mut self {
JsonResponse::GreptimedbV1(resp) => {
resp.with_execution_time(execution_time as u64);
}
JsonResponse::InfluxdbV1(resp) => {
resp.with_execution_time(execution_time as u64);
}
}
self
}
}

async fn serve_api(Extension(api): Extension<OpenApi>) -> impl IntoApiResponse {
Json(api)
}
Expand Down Expand Up @@ -772,11 +902,12 @@ impl Server for HttpServer {

/// handle error middleware
async fn handle_error(err: BoxError) -> Json<JsonResponse> {
logging::error!("Unhandled internal error: {}", err);
error!(err; "Unhandled internal error");

Json(JsonResponse::with_error_message(
format!("Unhandled internal error: {err}"),
StatusCode::Unexpected,
ResponseFormat::GreptimedbV1,
))
}

Expand Down Expand Up @@ -920,22 +1051,44 @@ mod test {
])),
];
let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap();

let json_resp =
JsonResponse::from_output(vec![Ok(Output::RecordBatches(recordbatches))]).await;

let json_output = &json_resp.output.unwrap()[0];
if let JsonOutput::Records(r) = json_output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
let schema = r.schema.as_ref().unwrap();
assert_eq!(schema.column_schemas[0].name, "numbers");
assert_eq!(schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
assert_eq!(r.rows[0][1], serde_json::Value::Null);
} else {
panic!("invalid output type");

for format in [ResponseFormat::GreptimedbV1, ResponseFormat::InfluxdbV1] {
let recordbatches =
RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
let json_resp = JsonResponse::from_output(
vec![Ok(Output::RecordBatches(recordbatches))],
format,
None,
)
.await;

match json_resp {
JsonResponse::GreptimedbV1(json_resp) => {
let json_output = &json_resp.output[0];
if let JsonOutput::Records(r) = json_output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
let schema = r.schema.as_ref().unwrap();
assert_eq!(schema.column_schemas[0].name, "numbers");
assert_eq!(schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
assert_eq!(r.rows[0][1], serde_json::Value::Null);
} else {
panic!("invalid output type");
}
}
JsonResponse::InfluxdbV1(json_resp) => {
let json_output = &json_resp.results()[0];
assert_eq!(json_output.num_rows(), 4);
assert_eq!(json_output.num_cols(), 2);
assert_eq!(json_output.series[0].columns.clone()[0], "numbers");
assert_eq!(
json_output.series[0].values[0][0],
serde_json::Value::from(1)
);
assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null);
}
}
}
}
}
Loading

0 comments on commit 99dda93

Please sign in to comment.