Skip to content

Commit

Permalink
feat(error): provide service name on the client side of gRPC (#16254)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Apr 12, 2024
1 parent 8b09aeb commit 089a997
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 69 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,6 @@ impl From<ValueEncodingError> for BatchError {
}
}

impl From<tonic::Status> for BatchError {
fn from(status: tonic::Status) -> Self {
// Always wrap the status into a `RpcError`.
Self::from(RpcError::from(status))
}
}

impl<'a> From<&'a BatchError> for Status {
fn from(err: &'a BatchError) -> Self {
err.to_status(tonic::Code::Internal, "batch")
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/execution/grpc_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_expr::expr_context::capture_expr_context;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::{self, Plan};
use risingwave_pb::batch_plan::TaskOutputId;
use risingwave_pb::task_service::{ExecuteRequest, GetDataResponse};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::ComputeClient;
use tonic::Streaming;

Expand Down Expand Up @@ -81,7 +82,7 @@ impl ExchangeSource for GrpcExchangeSource {
}
Some(r) => r,
};
let task_data = res?;
let task_data = res.map_err(RpcError::from_batch_status)?;
let data = DataChunk::from_protobuf(task_data.get_record_batch()?)?.compact();
trace!(
"Receiver taskOutput = {:?}, data = {:?}",
Expand Down
4 changes: 2 additions & 2 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub enum ObserverError {
}

impl From<tonic::Status> for ObserverError {
fn from(value: tonic::Status) -> Self {
Self::Rpc(value.into())
fn from(status: tonic::Status) -> Self {
Self::Rpc(RpcError::from_meta_status(status))
}
}

Expand Down
59 changes: 51 additions & 8 deletions src/error/src/tonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,24 @@ where
/// A wrapper of [`tonic::Status`] that provides better error message and extracts
/// the source chain from the `details` field.
#[derive(Debug)]
pub struct TonicStatusWrapper(tonic::Status);
pub struct TonicStatusWrapper {
inner: tonic::Status,

/// Optional service name from the client side.
///
/// # Explanation
///
/// [`tonic::Status`] is used for both client and server side. When the error is created on
/// the server side, we encourage developers to provide the service name with
/// [`ToTonicStatus::to_status`], so that the info can be included in the HTTP response and
/// then extracted by the client side (in [`TonicStatusWrapper::new`]).
///
/// However, if there's something wrong with the server side and the error is directly
/// created on the client side, the approach above is not applicable. In this case, the
/// caller should set a "client side" service name to provide better error message. This is
/// achieved by [`TonicStatusWrapperExt::with_client_side_service_name`].
client_side_service_name: Option<ServiceName>,
}

impl TonicStatusWrapper {
/// Create a new [`TonicStatusWrapper`] from the given [`tonic::Status`] and extract
Expand All @@ -115,17 +132,21 @@ impl TonicStatusWrapper {
}
}
}
Self(status)

Self {
inner: status,
client_side_service_name: None,
}
}

/// Returns the reference to the inner [`tonic::Status`].
pub fn inner(&self) -> &tonic::Status {
&self.0
&self.inner
}

/// Consumes `self` and returns the inner [`tonic::Status`].
pub fn into_inner(self) -> tonic::Status {
self.0
self.inner
}
}

Expand All @@ -138,28 +159,50 @@ impl From<tonic::Status> for TonicStatusWrapper {
impl std::fmt::Display for TonicStatusWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "gRPC request")?;

if let Some(service_name) = self
.source()
.and_then(|s| s.downcast_ref::<ServerError>())
.and_then(|s| s.service_name.as_ref())
// if no service name from the server side, use the client side one
.or(self.client_side_service_name.as_ref())
{
write!(f, " to {} service", service_name)?;
}
write!(f, " failed: {}: ", self.0.code())?;
write!(f, " failed: {}: ", self.inner.code())?;

#[expect(rw::format_error)] // intentionally format the source itself
if let Some(source) = self.source() {
// Prefer the source chain from the `details` field.
write!(f, "{}", source)
} else {
write!(f, "{}", self.0.message())
write!(f, "{}", self.inner.message())
}
}
}

#[easy_ext::ext(TonicStatusWrapperExt)]
impl<T> T
where
T: Into<TonicStatusWrapper>,
{
/// Set the client side service name to provide better error message.
///
/// See the documentation on the field `client_side_service_name` for more details.
pub fn with_client_side_service_name(
self,
service_name: impl Into<ServiceName>,
) -> TonicStatusWrapper {
let mut this = self.into();
this.client_side_service_name = Some(service_name.into());
this
}
}

impl std::error::Error for TonicStatusWrapper {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
// Delegate to `self.0` as if we're transparent.
self.0.source()
// Delegate to `self.inner` as if we're transparent.
self.inner.source()
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use risingwave_pb::batch_plan::{
use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode};
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::ComputeClientPoolRef;
use rw_futures_util::select_all;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -525,7 +526,7 @@ impl StageRunner {
|_| StageState::Failed,
QueryMessage::Stage(Failed {
id: self.stage.id,
reason: SchedulerError::from(e),
reason: RpcError::from_batch_status(e).into(),
}),
)
.await;
Expand Down
11 changes: 0 additions & 11 deletions src/frontend/src/scheduler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use risingwave_common::session_config::QueryMode;
use risingwave_connector::error::ConnectorError;
use risingwave_rpc_client::error::RpcError;
use thiserror::Error;
use tonic::{Code, Status};

use crate::error::{ErrorCode, RwError};
use crate::scheduler::plan_fragmenter::QueryId;
Expand Down Expand Up @@ -69,16 +68,6 @@ pub enum SchedulerError {
),
}

/// Only if the code is Internal, change it to Execution Error. Otherwise convert to Rpc Error.
impl From<tonic::Status> for SchedulerError {
fn from(s: Status) -> Self {
match s.code() {
Code::Internal => Self::TaskExecutionError(s.message().to_string()),
_ => Self::RpcError(s.into()),
}
}
}

impl From<SchedulerError> for RwError {
fn from(s: SchedulerError) -> Self {
ErrorCode::SchedulerError(Box::new(s)).into()
Expand Down
1 change: 1 addition & 0 deletions src/rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ hyper = "0.14" # required by tonic
itertools = { workspace = true }
lru = { workspace = true }
moka = { version = "0.12", features = ["future"] }
paste = "1"
rand = { workspace = true }
risingwave_common = { workspace = true }
risingwave_error = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::sync::RwLock;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tonic::transport::{Channel, Endpoint};

use crate::error::Result;
use crate::error::{Result, RpcError};
use crate::retry_rpc;
const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
Expand Down Expand Up @@ -59,7 +59,8 @@ impl CompactorClient {
.monitor_client
.to_owned()
.stack_trace(StackTraceRequest::default())
.await?
.await
.map_err(RpcError::from_compactor_status)?
.into_inner())
}
}
Expand Down
43 changes: 30 additions & 13 deletions src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::{Channel, Endpoint};
use tonic::Streaming;

use crate::error::Result;
use crate::error::{Result, RpcError};
use crate::{RpcClient, RpcClientPool};

#[derive(Clone)]
Expand Down Expand Up @@ -98,7 +98,8 @@ impl ComputeClient {
.get_data(GetDataRequest {
task_output_id: Some(output_id),
})
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand Down Expand Up @@ -149,7 +150,8 @@ impl ComputeClient {
up_actor_id,
down_actor_id
)
})?
})
.map_err(RpcError::from_compute_status)?
.into_inner();

Ok((response_stream, permits_tx))
Expand All @@ -172,20 +174,28 @@ impl ComputeClient {
tracing_context: TracingContext::from_current_span().to_protobuf(),
expr_context: Some(expr_context),
})
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

pub async fn execute(&self, req: ExecuteRequest) -> Result<Streaming<GetDataResponse>> {
Ok(self.task_client.to_owned().execute(req).await?.into_inner())
Ok(self
.task_client
.to_owned()
.execute(req)
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

pub async fn cancel(&self, req: CancelTaskRequest) -> Result<CancelTaskResponse> {
Ok(self
.task_client
.to_owned()
.cancel_task(req)
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -194,7 +204,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.stack_trace(StackTraceRequest::default())
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -203,7 +214,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.get_back_pressure(GetBackPressureRequest::default())
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -212,7 +224,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.profiling(ProfilingRequest { sleep_s })
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -221,7 +234,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.heap_profiling(HeapProfilingRequest { dir })
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -230,7 +244,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.list_heap_profiling(ListHeapProfilingRequest {})
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -239,7 +254,8 @@ impl ComputeClient {
.monitor_client
.to_owned()
.analyze_heap(AnalyzeHeapRequest { path })
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}

Expand All @@ -248,7 +264,8 @@ impl ComputeClient {
.config_client
.to_owned()
.show_config(ShowConfigRequest {})
.await?
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}
}
Expand Down
Loading

0 comments on commit 089a997

Please sign in to comment.