diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 250abc568fdc..2cf9cf8b89ae 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -382,6 +382,13 @@ pub enum Error { actual: i32, location: Location, }, + + #[snafu(display("Failed to convert to json"))] + ToJson { + #[snafu(source)] + error: serde_json::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -475,6 +482,8 @@ impl ErrorExt for Error { Metrics { source } => source.status_code(), ConvertScalarValue { source, .. } => source.status_code(), + + ToJson { .. } => StatusCode::Internal, } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index f627fcd96637..2aa117eebce5 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -48,6 +48,7 @@ use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::{util, RecordBatch}; use common_telemetry::logging::{self, info}; +use common_telemetry::{debug, error}; use datatypes::data_type::DataType; use futures::FutureExt; use schemars::JsonSchema; @@ -61,10 +62,10 @@ use tower::ServiceBuilder; use tower_http::auth::AsyncRequireAuthorizationLayer; use tower_http::trace::TraceLayer; -use self::authorize::HttpAuth; -use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::configurator::ConfiguratorRef; -use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; +use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu}; +use crate::http::authorize::HttpAuth; +use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::prometheus::{ format_query, instant_query, label_values_query, labels_query, range_query, series_query, }; @@ -185,7 +186,7 @@ impl HttpRecordsOutput { } impl TryFrom> for HttpRecordsOutput { - type Error = String; + type Error = Error; // TODO(sunng87): use schema from recordstreams when #366 fixed fn try_from( @@ -218,8 +219,9 @@ impl TryFrom> for HttpRecordsOutput { for row in recordbatch.rows() { let value_row = row .into_iter() - .map(|f| Value::try_from(f).map_err(|err| err.to_string())) - .collect::, _>>()?; + .map(Value::try_from) + .collect::, _>>() + .context(ToJsonSnafu)?; rows.push(value_row); } @@ -252,9 +254,25 @@ pub struct JsonResponse { } impl JsonResponse { - pub fn with_error(error: String, error_code: StatusCode) -> Self { + pub fn with_error(error: impl ErrorExt) -> Self { + let code = error.status_code(); + if code.should_log_error() { + error!(error; "Failed to handle HTTP request"); + } else { + debug!("Failed to handle HTTP request, err: {:?}", error); + } + + JsonResponse { + error: Some(error.output_msg()), + code: code as u32, + output: None, + execution_time_ms: None, + } + } + + fn with_error_message(err_msg: String, error_code: StatusCode) -> Self { JsonResponse { - error: Some(error), + error: Some(err_msg), code: error_code as u32, output: None, execution_time_ms: None, @@ -293,12 +311,12 @@ impl JsonResponse { results.push(JsonOutput::Records(rows)); } Err(err) => { - return Self::with_error(err, StatusCode::Internal); + return Self::with_error(err); } }, Err(e) => { - return Self::with_error(e.output_msg(), e.status_code()); + return Self::with_error(e); } } } @@ -307,11 +325,11 @@ impl JsonResponse { results.push(JsonOutput::Records(rows)); } Err(err) => { - return Self::with_error(err, StatusCode::Internal); + return Self::with_error(err); } }, Err(e) => { - return Self::with_error(e.output_msg(), e.status_code()); + return Self::with_error(e); } } } @@ -756,7 +774,7 @@ impl Server for HttpServer { async fn handle_error(err: BoxError) -> Json { logging::error!("Unhandled internal error: {}", err); - Json(JsonResponse::with_error( + Json(JsonResponse::with_error_message( format!("Unhandled internal error: {err}"), StatusCode::Unexpected, )) diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index d07210266b03..3a117d17b3a2 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -20,6 +20,7 @@ use aide::transform::TransformOperation; use axum::extract::{Json, Query, State}; use axum::response::{IntoResponse, Response}; use axum::{Extension, Form}; +use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use query::parser::PromQuery; use schemars::JsonSchema; @@ -60,7 +61,7 @@ pub async fn sql( JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await).await } else { - JsonResponse::with_error( + JsonResponse::with_error_message( "sql parameter is required.".to_string(), StatusCode::InvalidArguments, ) @@ -191,14 +192,15 @@ async fn validate_schema( .is_valid_schema(query_ctx.current_catalog(), query_ctx.current_schema()) .await { - Ok(false) => Some(JsonResponse::with_error( + Ok(false) => Some(JsonResponse::with_error_message( format!("Database not found: {}", query_ctx.get_db_string()), StatusCode::DatabaseNotFound, )), - Err(e) => Some(JsonResponse::with_error( + Err(e) => Some(JsonResponse::with_error_message( format!( - "Error checking database: {}, {e}", - query_ctx.get_db_string() + "Error checking database: {}, {}", + query_ctx.get_db_string(), + e.output_msg(), ), StatusCode::Internal, )), diff --git a/src/servers/src/http/script.rs b/src/servers/src/http/script.rs index 8be75dc07aee..ce7cbb2faeac 100644 --- a/src/servers/src/http/script.rs +++ b/src/servers/src/http/script.rs @@ -18,22 +18,22 @@ use std::time::Instant; use axum::extract::{Json, Query, RawBody, State}; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use session::context::QueryContext; +use snafu::ResultExt; +use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu}; use crate::http::{ApiState, JsonResponse}; macro_rules! json_err { ($e: expr) => {{ - return Json(JsonResponse::with_error( - format!("Invalid argument: {}", $e), - common_error::status_code::StatusCode::InvalidArguments, - )); + return Json(JsonResponse::with_error($e)); }}; ($msg: expr, $code: expr) => {{ - return Json(JsonResponse::with_error($msg.to_string(), $code)); + return Json(JsonResponse::with_error_message($msg.to_string(), $code)); }}; } @@ -60,18 +60,19 @@ pub async fn scripts( let schema = params.db.as_ref(); if schema.is_none() || schema.unwrap().is_empty() { - json_err!("invalid schema") + json_err!("invalid schema", StatusCode::InvalidArguments) } let name = params.name.as_ref(); if name.is_none() || name.unwrap().is_empty() { - json_err!("invalid name"); + json_err!("invalid name", StatusCode::InvalidArguments); } - let bytes = unwrap_or_json_err!(hyper::body::to_bytes(body).await); + let bytes = unwrap_or_json_err!(hyper::body::to_bytes(body).await.context(HyperSnafu)); - let script = unwrap_or_json_err!(String::from_utf8(bytes.to_vec())); + let script = + unwrap_or_json_err!(String::from_utf8(bytes.to_vec()).context(InvalidUtf8ValueSnafu)); // Safety: schema and name are already checked above. let query_ctx = QueryContext::with(&catalog, schema.unwrap()); @@ -80,12 +81,18 @@ pub async fn scripts( .await { Ok(()) => JsonResponse::with_output(None), - Err(e) => json_err!(format!("Insert script error: {e}"), e.status_code()), + Err(e) => json_err!( + format!("Insert script error: {}", e.output_msg()), + e.status_code() + ), }; Json(body) } else { - json_err!("Script execution not supported, missing script handler"); + json_err!( + "Script execution not supported, missing script handler", + StatusCode::Unsupported + ); } } @@ -112,13 +119,13 @@ pub async fn run_script( let schema = params.db.as_ref(); if schema.is_none() || schema.unwrap().is_empty() { - json_err!("invalid schema") + json_err!("invalid schema", StatusCode::InvalidArguments) } let name = params.name.as_ref(); if name.is_none() || name.unwrap().is_empty() { - json_err!("invalid name"); + json_err!("invalid name", StatusCode::InvalidArguments); } // Safety: schema and name are already checked above. @@ -130,6 +137,9 @@ pub async fn run_script( Json(resp.with_execution_time(start.elapsed().as_millis())) } else { - json_err!("Script execution not supported, missing script handler"); + json_err!( + "Script execution not supported, missing script handler", + StatusCode::Unsupported + ); } } diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 5b86593f428c..012a997b46ea 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -182,7 +182,7 @@ async fn insert_script( ) .await; assert!(!json.success(), "{json:?}"); - assert_eq!(json.error().unwrap(), "Invalid argument: invalid schema"); + assert_eq!(json.error().unwrap(), "invalid schema"); let body = RawBody(Body::from(script.clone())); let exec = create_script_query();