Skip to content

Commit

Permalink
chore: tune return msg (#2506)
Browse files Browse the repository at this point in the history
* chore: test return msg

* fix: test_child_error

Signed-off-by: Ruihang Xia <[email protected]>

* chore: fix test

* chore: minor fix grpc return value

* chore: format return msg

* chore: use root error as return value

* chore: fix empty err display

* chore: iter through external error

* chore: remove err msg

* chore: remove unused field

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
shuiyisong and waynexia authored Sep 27, 2023
1 parent 9282e59 commit ee8d472
Show file tree
Hide file tree
Showing 32 changed files with 138 additions and 191 deletions.
3 changes: 1 addition & 2 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@ pub enum Error {
#[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))]
QueryAccessDenied { catalog: String, schema: String },

#[snafu(display("msg: {}", msg))]
#[snafu(display(""))]
Datafusion {
msg: String,
#[snafu(source)]
error: DataFusionError,
location: Location,
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use query::QueryEngine;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use session::context::QueryContext;
use snafu::{ErrorCompat, ResultExt};
use snafu::ResultExt;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};

use crate::cli::cmd::ReplCommand;
Expand Down Expand Up @@ -148,7 +148,7 @@ impl Repl {
.await
.map_err(|e| {
let status_code = e.status_code();
let root_cause = e.iter_chain().last().unwrap();
let root_cause = e.output_msg();
println!("Error: {}({status_code}), {root_cause}", status_code as u32)
})
.is_ok()
Expand Down
37 changes: 34 additions & 3 deletions src/common/error/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use crate::status_code::StatusCode;

/// Extension to [`Error`](std::error::Error) in std.
pub trait ErrorExt: std::error::Error + StackError {
pub trait ErrorExt: StackError {
/// Map this error to [StatusCode].
fn status_code(&self) -> StatusCode {
StatusCode::Unknown
Expand All @@ -34,12 +34,43 @@ pub trait ErrorExt: std::error::Error + StackError {
/// Returns the error as [Any](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

fn output_msg(&self) -> String
where
Self: Sized,
{
let error = self.last();
if let Some(external_error) = error.source() {
let external_root = external_error.sources().last().unwrap();

if error.to_string().is_empty() {
format!("{external_root}")
} else {
format!("{error}: {external_root}")
}
} else {
format!("{error}")
}
}
}

pub trait StackError {
pub trait StackError: std::error::Error {
fn debug_fmt(&self, layer: usize, buf: &mut Vec<String>);

fn next(&self) -> Option<&dyn StackError>;

fn last(&self) -> &dyn StackError
where
Self: Sized,
{
let Some(mut result) = self.next() else {
return self;
};
while let Some(err) = result.next() {
result = err;
}
result
}
}

impl<T: ?Sized + StackError> StackError for Arc<T> {
Expand All @@ -52,7 +83,7 @@ impl<T: ?Sized + StackError> StackError for Arc<T> {
}
}

impl<T: ?Sized + StackError> StackError for Box<T> {
impl<T: StackError> StackError for Box<T> {
fn debug_fmt(&self, layer: usize, buf: &mut Vec<String>) {
self.as_ref().debug_fmt(layer, buf)
}
Expand Down
1 change: 1 addition & 0 deletions src/common/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(error_iter)]

pub mod ext;
pub mod format;
Expand Down
12 changes: 2 additions & 10 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,14 +453,13 @@ mod tests {
use std::sync::Arc;

use async_trait::async_trait;
use common_error::ext::PlainError;
use common_error::ext::{ErrorExt, PlainError};
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::ObjectStore;
use snafu::ErrorCompat;

use super::*;
use crate::local::test_util;
Expand Down Expand Up @@ -943,14 +942,7 @@ mod tests {

// Run the runner and execute the procedure.
runner.run().await;
let err = meta
.state()
.error()
.unwrap()
.iter_chain()
.last()
.unwrap()
.to_string();
let err = meta.state().error().unwrap().output_msg();
assert!(err.contains("subprocedure failed"), "{err}");
}
}
2 changes: 1 addition & 1 deletion src/common/recordbatch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to poll stream"))]
#[snafu(display(""))]
PollStream {
#[snafu(source)]
error: datafusion::error::DataFusionError,
Expand Down
32 changes: 8 additions & 24 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,7 @@ impl QueryEngine for DatafusionQueryEngine {
Ok(DataFrame::DataFusion(
self.state
.read_table(table)
.context(error::DatafusionSnafu {
msg: "Fail to create dataframe for table",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?,
))
Expand All @@ -295,9 +293,7 @@ impl LogicalOptimizer for DatafusionQueryEngine {
.state
.session_state()
.optimize(df_plan)
.context(error::DatafusionSnafu {
msg: "Fail to optimize logical plan",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

Expand All @@ -321,9 +317,7 @@ impl PhysicalPlanner for DatafusionQueryEngine {
let physical_plan = state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu {
msg: "Fail to create physical plan",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

Expand Down Expand Up @@ -394,9 +388,7 @@ impl QueryExecutor for DatafusionQueryEngine {
assert_eq!(1, plan.output_partitioning().partition_count());
let df_stream = plan
.execute(0, task_ctx)
.context(error::DatafusionSnafu {
msg: "Failed to execute DataFusion merge exec",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let stream = RecordBatchStreamAdapter::try_new(df_stream)
Expand Down Expand Up @@ -447,35 +439,27 @@ pub async fn execute_show_with_filter(
let context = SessionContext::new();
context
.register_batch(table_name, record_batch.into_df_record_batch())
.context(error::DatafusionSnafu {
msg: "Fail to register a record batch as a table",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut dataframe = context
.sql(&format!("SELECT * FROM {table_name}"))
.await
.context(error::DatafusionSnafu {
msg: "Fail to execute a sql",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
if let Some(filter) = filter {
let filter = convert_filter_to_df_filter(filter)?;
dataframe = dataframe
.filter(filter)
.context(error::DatafusionSnafu {
msg: "Fail to filter",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
}
let df_batches = dataframe
.collect()
.await
.context(error::DatafusionSnafu {
msg: "Fail to collect the record batches",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut batches = Vec::with_capacity(df_batches.len());
Expand Down
3 changes: 1 addition & 2 deletions src/query/src/datafusion/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ use snafu::{Location, Snafu};
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum InnerError {
#[snafu(display("msg: {}", msg))]
#[snafu(display(""))]
Datafusion {
msg: &'static str,
#[snafu(source)]
error: DataFusionError,
location: Location,
Expand Down
5 changes: 2 additions & 3 deletions src/query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub enum Error {
location: Location,
},

#[snafu(display("DataFusion error"))]
#[snafu(display(""))]
DataFusion {
#[snafu(source)]
error: DataFusionError,
Expand All @@ -140,9 +140,8 @@ pub enum Error {
source: sql::error::Error,
},

#[snafu(display("Cannot plan SQL: {}", sql))]
#[snafu(display(""))]
PlanSql {
sql: String,
#[snafu(source)]
error: DataFusionError,
location: Location,
Expand Down
11 changes: 3 additions & 8 deletions src/query/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,9 @@ impl DfLogicalPlanner {

let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);

let result = sql_to_rel.statement_to_plan(df_stmt).with_context(|_| {
let sql = if let Statement::Query(query) = stmt {
query.inner.to_string()
} else {
format!("{stmt:?}")
};
PlanSqlSnafu { sql }
})?;
let result = sql_to_rel
.statement_to_plan(df_stmt)
.context(PlanSqlSnafu)?;
let plan = RangePlanRewriter::new(table_provider, context_provider)
.rewrite(result)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/script/src/python/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub enum Error {
error: ArrowError,
},

#[snafu(display("DataFusion error"))]
#[snafu(display(""))]
DataFusion {
location: SnafuLocation,
#[snafu(source)]
Expand Down
11 changes: 5 additions & 6 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_telemetry::logging;
use datatypes::prelude::ConcreteDataType;
use query::parser::PromQuery;
use serde_json::json;
use snafu::{ErrorCompat, Location, Snafu};
use snafu::{Location, Snafu};
use tonic::Code;

#[derive(Snafu)]
Expand Down Expand Up @@ -511,7 +511,6 @@ macro_rules! define_into_tonic_status {
impl From<$Error> for tonic::Status {
fn from(err: $Error) -> Self {
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use snafu::ErrorCompat;
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;

Expand All @@ -521,16 +520,16 @@ macro_rules! define_into_tonic_status {
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
let status_code = err.status_code();
headers.insert(GREPTIME_ERROR_CODE, HeaderValue::from(status_code as u32));
let root_error = err.iter_chain().last().unwrap();
let root_error = err.output_msg();

if let Ok(err_msg) = HeaderValue::from_bytes(root_error.to_string().as_bytes()) {
if let Ok(err_msg) = HeaderValue::from_bytes(root_error.as_bytes()) {
let _ = headers.insert(GREPTIME_ERROR_MSG, err_msg);
}

let metadata = MetadataMap::from_headers(headers);
tonic::Status::with_metadata(
$crate::error::status_to_tonic_code(status_code),
err.to_string(),
root_error,
metadata,
)
}
Expand All @@ -548,7 +547,7 @@ impl From<std::io::Error> for Error {

impl IntoResponse for Error {
fn into_response(self) -> Response {
let error_msg = self.iter_chain().last().unwrap().to_string();
let error_msg = self.output_msg();
let status = match self {
Error::InfluxdbLineProtocol { .. }
| Error::InfluxdbLinesWrite { .. }
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/grpc/prom_query_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl PrometheusGatewayService {
Err(err) => {
return PrometheusJsonResponse::error(
err.status_code().to_string(),
err.to_string(),
err.output_msg(),
)
.0
}
Expand Down
7 changes: 2 additions & 5 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use futures::FutureExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::{ensure, ErrorCompat, ResultExt};
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
use tower::timeout::TimeoutLayer;
Expand Down Expand Up @@ -314,10 +314,7 @@ impl JsonResponse {
}
},
Err(e) => {
return Self::with_error(
e.iter_chain().last().unwrap().to_string(),
e.status_code(),
);
return Self::with_error(e.output_msg(), e.status_code());
}
}
}
Expand Down
Loading

0 comments on commit ee8d472

Please sign in to comment.