Skip to content

Commit

Permalink
fix(error): add sink_id in sinkerror (#17969)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Aug 8, 2024
1 parent de32dab commit 0aed682
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 31 deletions.
8 changes: 0 additions & 8 deletions src/stream/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use risingwave_common::array::ArrayError;
use risingwave_common::secret::SecretError;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::sink::SinkError;
use risingwave_expr::ExprError;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::ToTonicStatus;
Expand Down Expand Up @@ -66,13 +65,6 @@ pub enum ErrorKind {
StreamExecutorError,
),

#[error("Sink error: {0}")]
Sink(
#[from]
#[backtrace]
SinkError,
),

#[error("Actor {actor_id} exited unexpectedly: {source}")]
UnexpectedExit {
actor_id: ActorId,
Expand Down
20 changes: 9 additions & 11 deletions src/stream/src/executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,8 @@ pub enum ErrorKind {
BoxedError,
),

#[error("Sink error: {0}")]
SinkError(
#[from]
#[backtrace]
SinkError,
),
#[error("Sink error: sink_id={1}, error: {0}")]
SinkError(SinkError, u32),

#[error(transparent)]
RpcError(
Expand All @@ -94,11 +90,7 @@ pub enum ErrorKind {
AlignBarrier(Box<Barrier>, Box<Barrier>),

#[error("Connector error: {0}")]
ConnectorError(
#[source]
#[backtrace]
BoxedError,
),
ConnectorError(BoxedError),

#[error(transparent)]
DmlError(
Expand Down Expand Up @@ -152,6 +144,12 @@ impl From<String> for StreamExecutorError {
}
}

impl From<(SinkError, u32)> for StreamExecutorError {
fn from((err, sink_id): (SinkError, u32)) -> Self {
ErrorKind::SinkError(err, sink_id).into()
}
}

impl StreamExecutorError {
pub fn variant_name(&self) -> &str {
self.0.inner().as_ref()
Expand Down
7 changes: 5 additions & 2 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
chunk_size: usize,
input_data_types: Vec<DataType>,
) -> StreamExecutorResult<Self> {
let sink = build_sink(sink_param.clone())?;
let sink = build_sink(sink_param.clone())
.map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
let sink_input_schema: Schema = columns
.iter()
.map(|column| Field::from(&column.column_desc))
Expand Down Expand Up @@ -481,6 +482,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
warn!(
error = %e.as_report(),
executor_id = sink_writer_param.executor_id,
sink_id = sink_param.sink_id.sink_id,
"rewind successfully after sink error"
);
sink_writer_param.vnode_bitmap = curr_vnode_bitmap;
Expand All @@ -494,7 +496,8 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
);
Err(e)
}
}?;
}
.map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
}
Err(anyhow!("end of stream").into())
}
Expand Down
25 changes: 17 additions & 8 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
use crate::common::log_store_impl::kv_log_store::{
KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo, KV_LOG_STORE_V2_INFO,
};
use crate::executor::SinkExecutor;
use crate::executor::{SinkExecutor, StreamExecutorError};
use crate::telemetry::report_event;

pub struct SinkExecutorBuilder;
Expand Down Expand Up @@ -159,27 +159,35 @@ impl ExecutorBuilder for SinkExecutorBuilder {

let connector = {
let sink_type = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| {
SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY))
StreamExecutorError::from((
SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
sink_id.sink_id,
))
})?;

match_sink_name_str!(
sink_type.to_lowercase().as_str(),
SinkType,
Ok(SinkType::SINK_NAME),
|other| {
Err(SinkError::Config(anyhow!(
"unsupported sink connector {}",
other
Err(StreamExecutorError::from((
SinkError::Config(anyhow!("unsupported sink connector {}", other)),
sink_id.sink_id,
)))
}
)
}?;
let format_desc = match &sink_desc.format_desc {
// Case A: new syntax `format ... encode ...`
Some(f) => Some(f.clone().try_into()?),
Some(f) => Some(
f.clone()
.try_into()
.map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
),
None => match sink_desc.properties.get(SINK_TYPE_OPTION) {
// Case B: old syntax `type = '...'`
Some(t) => SinkFormatDesc::from_legacy_type(connector, t)?,
Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
.map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
// Case C: no format + encode required
None => None,
},
Expand All @@ -188,7 +196,8 @@ impl ExecutorBuilder for SinkExecutorBuilder {
let properties_with_secret =
LocalSecretManager::global().fill_secrets(properties, secret_refs)?;

let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc)?;
let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc)
.map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?;

let actor_id_str = format!("{}", params.actor_context.id);
let sink_id_str = format!("{}", sink_id.sink_id);
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ impl ScoredStreamError {
| ErrorKind::ArrayError(_)
| ErrorKind::ExprError(_)
| ErrorKind::SerdeError(_)
| ErrorKind::SinkError(_)
| ErrorKind::SinkError(_, _)
| ErrorKind::RpcError(_)
| ErrorKind::AlignBarrier(_, _)
| ErrorKind::ConnectorError(_)
Expand All @@ -1096,7 +1096,6 @@ impl ScoredStreamError {
| ErrorKind::Storage(_)
| ErrorKind::Expression(_)
| ErrorKind::Array(_)
| ErrorKind::Sink(_)
| ErrorKind::Secret(_) => 1000,
}
}
Expand Down

0 comments on commit 0aed682

Please sign in to comment.