From 0aed6823504129ff3ef91f4c68b5eaffc64ebc82 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 8 Aug 2024 17:19:44 +0800 Subject: [PATCH] fix(error): add sink_id in sinkerror (#17969) --- src/stream/src/error.rs | 8 -------- src/stream/src/executor/error.rs | 20 +++++++++----------- src/stream/src/executor/sink.rs | 7 +++++-- src/stream/src/from_proto/sink.rs | 25 +++++++++++++++++-------- src/stream/src/task/barrier_manager.rs | 3 +-- 5 files changed, 32 insertions(+), 31 deletions(-) diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index 98424f324965..725e7706c9fb 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -15,7 +15,6 @@ use risingwave_common::array::ArrayError; use risingwave_common::secret::SecretError; use risingwave_connector::error::ConnectorError; -use risingwave_connector::sink::SinkError; use risingwave_expr::ExprError; use risingwave_pb::PbFieldNotFound; use risingwave_rpc_client::error::ToTonicStatus; @@ -66,13 +65,6 @@ pub enum ErrorKind { StreamExecutorError, ), - #[error("Sink error: {0}")] - Sink( - #[from] - #[backtrace] - SinkError, - ), - #[error("Actor {actor_id} exited unexpectedly: {source}")] UnexpectedExit { actor_id: ActorId, diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index 41b8198b646a..fa625d8bb8ce 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -66,12 +66,8 @@ pub enum ErrorKind { BoxedError, ), - #[error("Sink error: {0}")] - SinkError( - #[from] - #[backtrace] - SinkError, - ), + #[error("Sink error: sink_id={1}, error: {0}")] + SinkError(SinkError, u32), #[error(transparent)] RpcError( @@ -94,11 +90,7 @@ pub enum ErrorKind { AlignBarrier(Box, Box), #[error("Connector error: {0}")] - ConnectorError( - #[source] - #[backtrace] - BoxedError, - ), + ConnectorError(BoxedError), #[error(transparent)] DmlError( @@ -152,6 +144,12 @@ impl From for StreamExecutorError { } } +impl From<(SinkError, u32)> for StreamExecutorError { + fn from((err, sink_id): (SinkError, u32)) -> Self { + ErrorKind::SinkError(err, sink_id).into() + } +} + impl StreamExecutorError { pub fn variant_name(&self) -> &str { self.0.inner().as_ref() diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index ca4fea1fdde4..ef8b57781cd9 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -93,7 +93,8 @@ impl SinkExecutor { chunk_size: usize, input_data_types: Vec, ) -> StreamExecutorResult { - let sink = build_sink(sink_param.clone())?; + let sink = build_sink(sink_param.clone()) + .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?; let sink_input_schema: Schema = columns .iter() .map(|column| Field::from(&column.column_desc)) @@ -481,6 +482,7 @@ impl SinkExecutor { warn!( error = %e.as_report(), executor_id = sink_writer_param.executor_id, + sink_id = sink_param.sink_id.sink_id, "rewind successfully after sink error" ); sink_writer_param.vnode_bitmap = curr_vnode_bitmap; @@ -494,7 +496,8 @@ impl SinkExecutor { ); Err(e) } - }?; + } + .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?; } Err(anyhow!("end of stream").into()) } diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 47b6422b2c6c..cff2009af783 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -33,7 +33,7 @@ use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; use crate::common::log_store_impl::kv_log_store::{ KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo, KV_LOG_STORE_V2_INFO, }; -use crate::executor::SinkExecutor; +use crate::executor::{SinkExecutor, StreamExecutorError}; use crate::telemetry::report_event; pub struct SinkExecutorBuilder; @@ -159,7 +159,10 @@ impl ExecutorBuilder for SinkExecutorBuilder { let connector = { let sink_type = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { - SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) + StreamExecutorError::from(( + SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)), + sink_id.sink_id, + )) })?; match_sink_name_str!( @@ -167,19 +170,24 @@ impl ExecutorBuilder for SinkExecutorBuilder { SinkType, Ok(SinkType::SINK_NAME), |other| { - Err(SinkError::Config(anyhow!( - "unsupported sink connector {}", - other + Err(StreamExecutorError::from(( + SinkError::Config(anyhow!("unsupported sink connector {}", other)), + sink_id.sink_id, ))) } ) }?; let format_desc = match &sink_desc.format_desc { // Case A: new syntax `format ... encode ...` - Some(f) => Some(f.clone().try_into()?), + Some(f) => Some( + f.clone() + .try_into() + .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?, + ), None => match sink_desc.properties.get(SINK_TYPE_OPTION) { // Case B: old syntax `type = '...'` - Some(t) => SinkFormatDesc::from_legacy_type(connector, t)?, + Some(t) => SinkFormatDesc::from_legacy_type(connector, t) + .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?, // Case C: no format + encode required None => None, }, @@ -188,7 +196,8 @@ impl ExecutorBuilder for SinkExecutorBuilder { let properties_with_secret = LocalSecretManager::global().fill_secrets(properties, secret_refs)?; - let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc)?; + let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc) + .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?; let actor_id_str = format!("{}", params.actor_context.id); let sink_id_str = format!("{}", sink_id.sink_id); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 94dbc0764497..654980db1c17 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -1070,7 +1070,7 @@ impl ScoredStreamError { | ErrorKind::ArrayError(_) | ErrorKind::ExprError(_) | ErrorKind::SerdeError(_) - | ErrorKind::SinkError(_) + | ErrorKind::SinkError(_, _) | ErrorKind::RpcError(_) | ErrorKind::AlignBarrier(_, _) | ErrorKind::ConnectorError(_) @@ -1096,7 +1096,6 @@ impl ScoredStreamError { | ErrorKind::Storage(_) | ErrorKind::Expression(_) | ErrorKind::Array(_) - | ErrorKind::Sink(_) | ErrorKind::Secret(_) => 1000, } }