diff --git a/Cargo.lock b/Cargo.lock index c835d484fb1e0..41332d02bdf02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10360,6 +10360,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "moka", + "paste", "rand", "risingwave_common", "risingwave_error", diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 13803437bb075..5751ab86ea9aa 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -150,13 +150,6 @@ impl From for BatchError { } } -impl From for BatchError { - fn from(status: tonic::Status) -> Self { - // Always wrap the status into a `RpcError`. - Self::from(RpcError::from(status)) - } -} - impl<'a> From<&'a BatchError> for Status { fn from(err: &'a BatchError) -> Self { err.to_status(tonic::Code::Internal, "batch") diff --git a/src/batch/src/execution/grpc_exchange.rs b/src/batch/src/execution/grpc_exchange.rs index 727a005eb2bf0..27f64b1d5ed48 100644 --- a/src/batch/src/execution/grpc_exchange.rs +++ b/src/batch/src/execution/grpc_exchange.rs @@ -20,6 +20,7 @@ use risingwave_expr::expr_context::capture_expr_context; use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::{self, Plan}; use risingwave_pb::batch_plan::TaskOutputId; use risingwave_pb::task_service::{ExecuteRequest, GetDataResponse}; +use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::ComputeClient; use tonic::Streaming; @@ -81,7 +82,7 @@ impl ExchangeSource for GrpcExchangeSource { } Some(r) => r, }; - let task_data = res?; + let task_data = res.map_err(RpcError::from_batch_status)?; let data = DataChunk::from_protobuf(task_data.get_record_batch()?)?.compact(); trace!( "Receiver taskOutput = {:?}, data = {:?}", diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 53225ab3515e4..aa1eeed8f0fa1 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -94,8 +94,8 @@ pub enum ObserverError { } impl From for ObserverError { - fn from(value: tonic::Status) -> Self { - Self::Rpc(value.into()) + fn from(status: tonic::Status) -> Self { + Self::Rpc(RpcError::from_meta_status(status)) } } diff --git a/src/error/src/tonic.rs b/src/error/src/tonic.rs index 8aec1837da973..11cbb71063652 100644 --- a/src/error/src/tonic.rs +++ b/src/error/src/tonic.rs @@ -98,7 +98,24 @@ where /// A wrapper of [`tonic::Status`] that provides better error message and extracts /// the source chain from the `details` field. #[derive(Debug)] -pub struct TonicStatusWrapper(tonic::Status); +pub struct TonicStatusWrapper { + inner: tonic::Status, + + /// Optional service name from the client side. + /// + /// # Explanation + /// + /// [`tonic::Status`] is used for both client and server side. When the error is created on + /// the server side, we encourage developers to provide the service name with + /// [`ToTonicStatus::to_status`], so that the info can be included in the HTTP response and + /// then extracted by the client side (in [`TonicStatusWrapper::new`]). + /// + /// However, if there's something wrong with the server side and the error is directly + /// created on the client side, the approach above is not applicable. In this case, the + /// caller should set a "client side" service name to provide better error message. This is + /// achieved by [`TonicStatusWrapperExt::with_client_side_service_name`]. + client_side_service_name: Option, +} impl TonicStatusWrapper { /// Create a new [`TonicStatusWrapper`] from the given [`tonic::Status`] and extract @@ -115,17 +132,21 @@ impl TonicStatusWrapper { } } } - Self(status) + + Self { + inner: status, + client_side_service_name: None, + } } /// Returns the reference to the inner [`tonic::Status`]. pub fn inner(&self) -> &tonic::Status { - &self.0 + &self.inner } /// Consumes `self` and returns the inner [`tonic::Status`]. pub fn into_inner(self) -> tonic::Status { - self.0 + self.inner } } @@ -138,28 +159,50 @@ impl From for TonicStatusWrapper { impl std::fmt::Display for TonicStatusWrapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "gRPC request")?; + if let Some(service_name) = self .source() .and_then(|s| s.downcast_ref::()) .and_then(|s| s.service_name.as_ref()) + // if no service name from the server side, use the client side one + .or(self.client_side_service_name.as_ref()) { write!(f, " to {} service", service_name)?; } - write!(f, " failed: {}: ", self.0.code())?; + write!(f, " failed: {}: ", self.inner.code())?; + #[expect(rw::format_error)] // intentionally format the source itself if let Some(source) = self.source() { // Prefer the source chain from the `details` field. write!(f, "{}", source) } else { - write!(f, "{}", self.0.message()) + write!(f, "{}", self.inner.message()) } } } +#[easy_ext::ext(TonicStatusWrapperExt)] +impl T +where + T: Into, +{ + /// Set the client side service name to provide better error message. + /// + /// See the documentation on the field `client_side_service_name` for more details. + pub fn with_client_side_service_name( + self, + service_name: impl Into, + ) -> TonicStatusWrapper { + let mut this = self.into(); + this.client_side_service_name = Some(service_name.into()); + this + } +} + impl std::error::Error for TonicStatusWrapper { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - // Delegate to `self.0` as if we're transparent. - self.0.source() + // Delegate to `self.inner` as if we're transparent. + self.inner.source() } } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index e957c81483fa1..9a5e00fbfd702 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -45,6 +45,7 @@ use risingwave_pb::batch_plan::{ use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode}; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse}; +use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::ComputeClientPoolRef; use rw_futures_util::select_all; use thiserror_ext::AsReport; @@ -525,7 +526,7 @@ impl StageRunner { |_| StageState::Failed, QueryMessage::Stage(Failed { id: self.stage.id, - reason: SchedulerError::from(e), + reason: RpcError::from_batch_status(e).into(), }), ) .await; diff --git a/src/frontend/src/scheduler/error.rs b/src/frontend/src/scheduler/error.rs index f841a27b6b42d..3fefe1233cbda 100644 --- a/src/frontend/src/scheduler/error.rs +++ b/src/frontend/src/scheduler/error.rs @@ -17,7 +17,6 @@ use risingwave_common::session_config::QueryMode; use risingwave_connector::error::ConnectorError; use risingwave_rpc_client::error::RpcError; use thiserror::Error; -use tonic::{Code, Status}; use crate::error::{ErrorCode, RwError}; use crate::scheduler::plan_fragmenter::QueryId; @@ -69,16 +68,6 @@ pub enum SchedulerError { ), } -/// Only if the code is Internal, change it to Execution Error. Otherwise convert to Rpc Error. -impl From for SchedulerError { - fn from(s: Status) -> Self { - match s.code() { - Code::Internal => Self::TaskExecutionError(s.message().to_string()), - _ => Self::RpcError(s.into()), - } - } -} - impl From for RwError { fn from(s: SchedulerError) -> Self { ErrorCode::SchedulerError(Box::new(s)).into() diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 5a85c2ac4bdc0..6852fb11f717b 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -24,6 +24,7 @@ hyper = "0.14" # required by tonic itertools = { workspace = true } lru = { workspace = true } moka = { version = "0.12", features = ["future"] } +paste = "1" rand = { workspace = true } risingwave_common = { workspace = true } risingwave_error = { workspace = true } diff --git a/src/rpc_client/src/compactor_client.rs b/src/rpc_client/src/compactor_client.rs index ed0061beb48fc..e571dbd504d4b 100644 --- a/src/rpc_client/src/compactor_client.rs +++ b/src/rpc_client/src/compactor_client.rs @@ -30,7 +30,7 @@ use tokio::sync::RwLock; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tonic::transport::{Channel, Endpoint}; -use crate::error::Result; +use crate::error::{Result, RpcError}; use crate::retry_rpc; const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60; const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60; @@ -59,7 +59,8 @@ impl CompactorClient { .monitor_client .to_owned() .stack_trace(StackTraceRequest::default()) - .await? + .await + .map_err(RpcError::from_compactor_status)? .into_inner()) } } diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index 8e96f7a81702d..f908bb21aa3a2 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -45,7 +45,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::{Channel, Endpoint}; use tonic::Streaming; -use crate::error::Result; +use crate::error::{Result, RpcError}; use crate::{RpcClient, RpcClientPool}; #[derive(Clone)] @@ -98,7 +98,8 @@ impl ComputeClient { .get_data(GetDataRequest { task_output_id: Some(output_id), }) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } @@ -149,7 +150,8 @@ impl ComputeClient { up_actor_id, down_actor_id ) - })? + }) + .map_err(RpcError::from_compute_status)? .into_inner(); Ok((response_stream, permits_tx)) @@ -172,12 +174,19 @@ impl ComputeClient { tracing_context: TracingContext::from_current_span().to_protobuf(), expr_context: Some(expr_context), }) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } pub async fn execute(&self, req: ExecuteRequest) -> Result> { - Ok(self.task_client.to_owned().execute(req).await?.into_inner()) + Ok(self + .task_client + .to_owned() + .execute(req) + .await + .map_err(RpcError::from_compute_status)? + .into_inner()) } pub async fn cancel(&self, req: CancelTaskRequest) -> Result { @@ -185,7 +194,8 @@ impl ComputeClient { .task_client .to_owned() .cancel_task(req) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } @@ -194,7 +204,8 @@ impl ComputeClient { .monitor_client .to_owned() .stack_trace(StackTraceRequest::default()) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } @@ -203,7 +214,8 @@ impl ComputeClient { .monitor_client .to_owned() .get_back_pressure(GetBackPressureRequest::default()) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } @@ -212,7 +224,8 @@ impl ComputeClient { .monitor_client .to_owned() .profiling(ProfilingRequest { sleep_s }) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } @@ -221,7 +234,8 @@ impl ComputeClient { .monitor_client .to_owned() .heap_profiling(HeapProfilingRequest { dir }) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } @@ -230,7 +244,8 @@ impl ComputeClient { .monitor_client .to_owned() .list_heap_profiling(ListHeapProfilingRequest {}) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } @@ -239,7 +254,8 @@ impl ComputeClient { .monitor_client .to_owned() .analyze_heap(AnalyzeHeapRequest { path }) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } @@ -248,7 +264,8 @@ impl ComputeClient { .config_client .to_owned() .show_config(ShowConfigRequest {}) - .await? + .await + .map_err(RpcError::from_compute_status)? .into_inner()) } } diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index 1acc538eca23a..896f198b8a5f1 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -223,7 +223,8 @@ impl ConnectorClient { source_id, err.message() ) - })? + }) + .map_err(RpcError::from_connector_status)? .into_inner()) } @@ -251,7 +252,8 @@ impl ConnectorClient { .await .inspect_err(|err| { tracing::error!("failed to validate source#{}: {}", source_id, err.message()) - })? + }) + .map_err(RpcError::from_connector_status)? .into_inner(); response.error.map_or(Ok(()), |err| { @@ -281,8 +283,12 @@ impl ConnectorClient { rpc_client .sink_writer_stream(ReceiverStream::new(rx)) .await - .map(|response| response.into_inner().map_err(RpcError::from)) - .map_err(RpcError::from) + .map(|response| { + response + .into_inner() + .map_err(RpcError::from_connector_status) + }) + .map_err(RpcError::from_connector_status) }, ) .await?; @@ -313,8 +319,12 @@ impl ConnectorClient { rpc_client .sink_coordinator_stream(ReceiverStream::new(rx)) .await - .map(|response| response.into_inner().map_err(RpcError::from)) - .map_err(RpcError::from) + .map(|response| { + response + .into_inner() + .map_err(RpcError::from_connector_status) + }) + .map_err(RpcError::from_connector_status) }, ) .await?; @@ -340,7 +350,8 @@ impl ConnectorClient { .await .inspect_err(|err| { tracing::error!("failed to validate sink properties: {}", err.message()) - })? + }) + .map_err(RpcError::from_connector_status)? .into_inner(); response.error.map_or_else( || Ok(()), // If there is no error message, return Ok here. diff --git a/src/rpc_client/src/error.rs b/src/rpc_client/src/error.rs index e44d3ac8c0b9f..5626912c2f88b 100644 --- a/src/rpc_client/src/error.rs +++ b/src/rpc_client/src/error.rs @@ -13,14 +13,16 @@ // limitations under the License. use risingwave_common::util::meta_addr::MetaAddressStrategyParseError; +use risingwave_error::tonic::TonicStatusWrapperExt as _; use thiserror::Error; +use thiserror_ext::Construct; pub type Result = std::result::Result; // Re-export these types as they're commonly used together with `RpcError`. pub use risingwave_error::tonic::{ToTonicStatus, TonicStatusWrapper}; -#[derive(Error, Debug)] +#[derive(Error, Debug, Construct)] pub enum RpcError { #[error(transparent)] TransportError(Box), @@ -48,8 +50,23 @@ impl From for RpcError { } } -impl From for RpcError { - fn from(s: tonic::Status) -> Self { - RpcError::GrpcStatus(Box::new(TonicStatusWrapper::new(s))) - } +/// Intentionally not implemented to enforce using `RpcError::from_xxx_status`, so that +/// the service name can always be included in the error message. +impl !From for RpcError {} + +macro_rules! impl_from_status { + ($($service:ident),* $(,)?) => { + paste::paste! { + impl RpcError { + $( + #[doc = "Convert a gRPC status from " $service " service into an [`RpcError`]."] + pub fn [](s: tonic::Status) -> Self { + Self::grpc_status(s.with_client_side_service_name(stringify!($service))) + } + )* + } + } + }; } + +impl_from_status!(stream, batch, meta, compute, compactor, connector); diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 6058371556b6a..fa276bdd0a5ce 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -26,6 +26,7 @@ #![feature(impl_trait_in_assoc_type)] #![feature(error_generic_member_access)] #![feature(panic_update_hook)] +#![feature(negative_impls)] use std::any::type_name; use std::fmt::{Debug, Formatter}; @@ -145,7 +146,7 @@ pub trait ExtraInfoSource: Send + Sync { pub type ExtraInfoSourceRef = Arc; #[macro_export] -macro_rules! rpc_client_method_impl { +macro_rules! stream_rpc_client_method_impl { ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => { $( pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> { @@ -153,7 +154,8 @@ macro_rules! rpc_client_method_impl { .$client .to_owned() .$fn_name(request) - .await? + .await + .map_err($crate::error::RpcError::from_stream_status)? .into_inner()) } )* @@ -170,7 +172,7 @@ macro_rules! meta_rpc_client_method_impl { Ok(resp) => Ok(resp.into_inner()), Err(e) => { self.refresh_client_if_needed(e.code()).await; - Err(RpcError::from(e)) + Err($crate::error::RpcError::from_meta_status(e)) } } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f0dcba7d729ed..0f7bfeb6bccb8 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1554,7 +1554,7 @@ impl GrpcMetaClientCore { /// Client to meta server. Cloning the instance is lightweight. /// -/// It is a wrapper of tonic client. See [`crate::rpc_client_method_impl`]. +/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`]. #[derive(Debug, Clone)] struct GrpcMetaClient { member_monitor_event_sender: mpsc::Sender>>, @@ -1591,7 +1591,11 @@ impl MetaMemberManagement { async fn refresh_members(&mut self) -> Result<()> { let leader_addr = match self.members.as_mut() { Either::Left(client) => { - let resp = client.to_owned().members(MembersRequest {}).await?; + let resp = client + .to_owned() + .members(MembersRequest {}) + .await + .map_err(RpcError::from_meta_status)?; let resp = resp.into_inner(); resp.members.into_iter().find(|member| member.is_leader) } diff --git a/src/rpc_client/src/sink_coordinate_client.rs b/src/rpc_client/src/sink_coordinate_client.rs index 5d021caa87d0f..74c05fa85de8e 100644 --- a/src/rpc_client/src/sink_coordinate_client.rs +++ b/src/rpc_client/src/sink_coordinate_client.rs @@ -67,8 +67,12 @@ impl CoordinatorStreamHandle { move |rx| async move { init_stream(rx) .await - .map(|response| response.into_inner().map_err(RpcError::from)) - .map_err(RpcError::from) + .map(|response| { + response + .into_inner() + .map_err(RpcError::from_connector_status) + }) + .map_err(RpcError::from_connector_status) }, ) .await?; diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index ae5af65f28220..4710be7085ef6 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -30,7 +30,7 @@ use tonic::transport::Endpoint; use crate::error::{Result, RpcError}; use crate::tracing::{Channel, TracingInjectedChannelExt}; -use crate::{rpc_client_method_impl, RpcClient, RpcClientPool, UnboundedBidiStreamHandle}; +use crate::{stream_rpc_client_method_impl, RpcClient, RpcClientPool, UnboundedBidiStreamHandle}; #[derive(Clone)] pub struct StreamClient(StreamServiceClient); @@ -79,7 +79,7 @@ macro_rules! for_all_stream_rpc { } impl StreamClient { - for_all_stream_rpc! { rpc_client_method_impl } + for_all_stream_rpc! { stream_rpc_client_method_impl } } pub type StreamingControlHandle = @@ -98,8 +98,8 @@ impl StreamClient { client .streaming_control_stream(UnboundedReceiverStream::new(rx)) .await - .map(|response| response.into_inner().map_err(RpcError::from)) - .map_err(RpcError::from) + .map(|response| response.into_inner().map_err(RpcError::from_stream_status)) + .map_err(RpcError::from_stream_status) }) .await?; match first_rsp { diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 59bd3fe6e2d32..7b7cc151f46cd 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -217,7 +217,7 @@ impl RemoteInput { // TODO(error-handling): maintain the source chain return Err(StreamExecutorError::channel_closed(format!( "RemoteInput tonic error: {}", - TonicStatusWrapper::from(e).as_report() + TonicStatusWrapper::new(e).as_report() ))); } }