From 12531c87d8b38fe3779d0b98f6dbb661073661f6 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 24 Jan 2024 15:15:40 +0800 Subject: [PATCH] refactor(error): dedicated error type for `dml` crate Signed-off-by: Bugen Zhao --- Cargo.lock | 3 +- src/batch/src/error.rs | 8 ++++ src/dml/Cargo.toml | 3 +- src/dml/src/dml_manager.rs | 12 ++---- src/dml/src/error.rs | 28 +++++++++++++ src/dml/src/lib.rs | 2 + src/dml/src/table.rs | 25 +++++------- src/stream/src/executor/dml.rs | 19 +++++---- src/stream/src/executor/error.rs | 7 ++-- .../src/executor/source/fetch_executor.rs | 5 ++- .../src/executor/source/fs_source_executor.rs | 8 ++-- .../src/executor/source/list_executor.rs | 16 ++++---- .../src/executor/source/source_executor.rs | 8 ++-- src/stream/src/executor/stream_reader.rs | 40 ++++++++----------- 14 files changed, 109 insertions(+), 75 deletions(-) create mode 100644 src/dml/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 3cddc5738f867..cab3f9e6cfdab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9088,7 +9088,6 @@ dependencies = [ name = "risingwave_dml" version = "1.7.0-alpha" dependencies = [ - "anyhow", "assert_matches", "criterion", "futures", @@ -9103,6 +9102,8 @@ dependencies = [ "risingwave_pb", "rw_futures_util", "tempfile", + "thiserror", + "thiserror-ext", "tracing", "workspace-hack", ] diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index d5efbbef9dd94..1251c1d9dbe09 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -20,6 +20,7 @@ pub use anyhow::anyhow; use risingwave_common::array::ArrayError; use risingwave_common::error::{BoxedError, ErrorCode, RwError}; use risingwave_common::util::value_encoding::error::ValueEncodingError; +use risingwave_dml::error::DmlError; use risingwave_expr::ExprError; use risingwave_pb::PbFieldNotFound; use risingwave_rpc_client::error::{RpcError, ToTonicStatus}; @@ -101,6 +102,13 @@ pub enum BatchError { BoxedError, ), + #[error(transparent)] + Dml( + #[from] + #[backtrace] + DmlError, + ), + // Make the ref-counted type to be a variant for easier code structuring. #[error(transparent)] Shared( diff --git a/src/dml/Cargo.toml b/src/dml/Cargo.toml index 742b9f5297558..c429ad9238649 100644 --- a/src/dml/Cargo.toml +++ b/src/dml/Cargo.toml @@ -14,7 +14,6 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] -anyhow = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } itertools = "0.12" @@ -24,6 +23,8 @@ risingwave_common = { workspace = true } risingwave_connector = { workspace = true } risingwave_pb = { workspace = true } rw_futures_util = { workspace = true } +thiserror = "1" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] } tracing = { version = "0.1" } diff --git a/src/dml/src/dml_manager.rs b/src/dml/src/dml_manager.rs index b4b03f9798c56..f6611b6c94c21 100644 --- a/src/dml/src/dml_manager.rs +++ b/src/dml/src/dml_manager.rs @@ -17,14 +17,12 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::{Arc, Weak}; -use anyhow::Context; use parking_lot::RwLock; -use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, TableId, TableVersionId}; -use risingwave_common::error::Result; use risingwave_common::transaction::transaction_id::{TxnId, TxnIdGenerator}; use risingwave_common::util::worker_util::WorkerNodeId; +use crate::error::{DmlError, Result}; use crate::{TableDmlHandle, TableDmlHandleRef}; pub type DmlManagerRef = Arc; @@ -112,9 +110,7 @@ impl DmlManager { "dml handler registers with same version but different schema" ) }) - .with_context(|| { - format!("fail to register reader for table with key `{table_id:?}`") - })?, + .expect("the first dml executor is gone"), // this should never happen // A new version of the table is activated, overwrite the old reader. Ordering::Greater => new_handle!(o), @@ -139,7 +135,7 @@ impl DmlManager { // A new version of the table is activated, but the DML request is still on // the old version. Ordering::Less => { - bail!("schema changed for table `{table_id:?}`, please retry later") + return Err(DmlError::SchemaChanged); } // Write the chunk of correct version to the table. @@ -155,7 +151,7 @@ impl DmlManager { None => None, } } - .with_context(|| format!("no reader for dml in table `{table_id:?}`"))?; + .ok_or(DmlError::NoReader)?; Ok(table_dml_handle) } diff --git a/src/dml/src/error.rs b/src/dml/src/error.rs new file mode 100644 index 0000000000000..d8c4ea41a03ce --- /dev/null +++ b/src/dml/src/error.rs @@ -0,0 +1,28 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// The error type for DML operations. +#[derive(thiserror::Error, Debug)] +pub enum DmlError { + #[error("table schema has changed, please try again later")] + SchemaChanged, + + #[error("no available table reader in streaming executors")] + NoReader, + + #[error("table reader closed")] + ReaderClosed, +} + +pub type Result = std::result::Result; diff --git a/src/dml/src/lib.rs b/src/dml/src/lib.rs index 268f3d5006f7a..a15a4dfb3fba9 100644 --- a/src/dml/src/lib.rs +++ b/src/dml/src/lib.rs @@ -20,9 +20,11 @@ #![feature(type_alias_impl_trait)] #![feature(box_patterns)] #![feature(stmt_expr_attributes)] +#![feature(error_generic_member_access)] pub use table::*; pub mod dml_manager; +pub mod error; mod table; mod txn_channel; diff --git a/src/dml/src/table.rs b/src/dml/src/table.rs index ba0292e4f6caf..ad20b1c13f12a 100644 --- a/src/dml/src/table.rs +++ b/src/dml/src/table.rs @@ -14,17 +14,16 @@ use std::sync::Arc; -use anyhow::{anyhow, Context}; use futures_async_stream::try_stream; use parking_lot::RwLock; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::ColumnDesc; -use risingwave_common::error::{Result, RwError}; use risingwave_common::transaction::transaction_id::TxnId; use risingwave_common::transaction::transaction_message::TxnMsg; use risingwave_connector::source::StreamChunkWithState; use tokio::sync::oneshot; +use crate::error::{DmlError, Result}; use crate::txn_channel::{txn_channel, Receiver, Sender}; pub type TableDmlHandleRef = Arc; @@ -89,9 +88,7 @@ impl TableDmlHandle { loop { let guard = self.core.read(); if guard.changes_txs.is_empty() { - return Err(RwError::from(anyhow!( - "no available table reader in streaming source executors" - ))); + return Err(DmlError::NoReader); } let len = guard.changes_txs.len(); // Use session id instead of txn_id to choose channel so that we can preserve transaction order in the same session. @@ -99,7 +96,7 @@ impl TableDmlHandle { let sender = guard .changes_txs .get((session_id % len as u32) as usize) - .context("no available table reader in streaming source executors")? + .unwrap() .clone(); drop(guard); @@ -183,7 +180,8 @@ impl WriteHandle { pub async fn write_chunk(&self, chunk: StreamChunk) -> Result<()> { assert_eq!(self.txn_state, TxnState::Begin); // Ignore the notifier. - self.write_txn_data_msg(TxnMsg::Data(self.txn_id, chunk)) + let _notifier = self + .write_txn_data_msg(TxnMsg::Data(self.txn_id, chunk)) .await?; Ok(()) } @@ -192,9 +190,8 @@ impl WriteHandle { assert_eq!(self.txn_state, TxnState::Begin); self.txn_state = TxnState::Committed; // Await the notifier. - self.write_txn_control_msg(TxnMsg::End(self.txn_id))? - .await - .context("failed to wait the end message")?; + let notifier = self.write_txn_control_msg(TxnMsg::End(self.txn_id))?; + notifier.await.map_err(|_| DmlError::ReaderClosed)?; Ok(()) } @@ -221,7 +218,7 @@ impl WriteHandle { // It's possible that the source executor is scaled in or migrated, so the channel // is closed. To guarantee the transactional atomicity, bail out. - Err(_) => Err(RwError::from("write txn_msg channel closed".to_string())), + Err(_) => Err(DmlError::ReaderClosed), } } @@ -235,7 +232,7 @@ impl WriteHandle { // It's possible that the source executor is scaled in or migrated, so the channel // is closed. To guarantee the transactional atomicity, bail out. - Err(_) => Err(RwError::from("write txn_msg channel closed".to_string())), + Err(_) => Err(DmlError::ReaderClosed), } } } @@ -251,7 +248,7 @@ pub struct TableStreamReader { } impl TableStreamReader { - #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] + #[try_stream(boxed, ok = StreamChunkWithState, error = DmlError)] pub async fn into_data_stream_for_test(mut self) { while let Some((txn_msg, notifier)) = self.rx.recv().await { // Notify about that we've taken the chunk. @@ -267,7 +264,7 @@ impl TableStreamReader { } } - #[try_stream(boxed, ok = TxnMsg, error = RwError)] + #[try_stream(boxed, ok = TxnMsg, error = DmlError)] pub async fn into_stream(mut self) { while let Some((txn_msg, notifier)) = self.rx.recv().await { // Notify about that we've taken the chunk. diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index 3c4cf25f19869..8e0a186e9c81c 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use std::mem; use either::Either; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId}; @@ -106,16 +106,21 @@ impl DmlExecutor { // Note(bugen): Only register after the first barrier message is received, which means the // current executor is activated. This avoids the new reader overwriting the old one during // the preparation of schema change. - let batch_reader = self - .dml_manager - .register_reader(self.table_id, self.table_version_id, &self.column_descs) - .map_err(StreamExecutorError::connector_error)?; - let batch_reader = batch_reader.stream_reader().into_stream(); + let handle = self.dml_manager.register_reader( + self.table_id, + self.table_version_id, + &self.column_descs, + )?; + let reader = handle + .stream_reader() + .into_stream() + .map_err(StreamExecutorError::from) + .boxed(); // Merge the two streams using `StreamReaderWithPause` because when we receive a pause // barrier, we should stop receiving the data from DML. We poll data from the two streams in // a round robin way. - let mut stream = StreamReaderWithPause::::new(upstream, batch_reader); + let mut stream = StreamReaderWithPause::::new(upstream, reader); // If the first barrier requires us to pause on startup, pause the stream. if barrier.is_pause_on_startup() { diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index 0a00aa9f0b678..8b3e8bf5212b2 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -17,6 +17,7 @@ use risingwave_common::error::{BoxedError, NotImplemented}; use risingwave_common::util::value_encoding::error::ValueEncodingError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; +use risingwave_dml::error::DmlError; use risingwave_expr::ExprError; use risingwave_pb::PbFieldNotFound; use risingwave_rpc_client::error::RpcError; @@ -87,11 +88,11 @@ pub enum ErrorKind { BoxedError, ), - #[error("Dml error: {0}")] + #[error(transparent)] DmlError( - #[source] + #[from] #[backtrace] - BoxedError, + DmlError, ), #[error(transparent)] diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 98b6c3c87c32f..324a12f8df6f5 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -18,8 +18,8 @@ use std::ops::Bound; use std::sync::Arc; use either::Either; -use futures::pin_mut; use futures::stream::{self, StreamExt}; +use futures::{pin_mut, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::catalog::{ColumnId, Schema, TableId}; use risingwave_common::hash::VnodeBitmapExt; @@ -147,7 +147,8 @@ impl FsFetchExecutor { *splits_on_fetch += batch.len(); let batch_reader = Self::build_batched_stream_reader(column_ids, source_ctx, source_desc, Some(batch)) - .await?; + .await? + .map_err(StreamExecutorError::connector_error); stream.replace_data_stream(batch_reader); } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index e0129f34a1aae..5e78532b9282b 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use anyhow::anyhow; use either::Either; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::catalog::Schema; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; @@ -196,7 +196,8 @@ impl FsSourceExecutor { // Replace the source reader with a new one of the new state. let reader = self .build_stream_source_reader(source_desc, Some(target_state.clone())) - .await?; + .await? + .map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); self.stream_source_core.stream_source_splits = target_state @@ -337,7 +338,8 @@ impl FsSourceExecutor { let source_chunk_reader = self .build_stream_source_reader(&source_desc, recover_state) .instrument_await("fs_source_start_reader") - .await?; + .await? + .map_err(StreamExecutorError::connector_error); // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index b1cc8feb6adc4..f2a6e47ae8641 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -17,14 +17,13 @@ use std::sync::Arc; use anyhow::anyhow; use either::Either; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::Op; use risingwave_common::catalog::Schema; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; -use risingwave_connector::source::filesystem::FsPageItem; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; -use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts}; +use risingwave_connector::source::SourceCtrlOpts; use risingwave_connector::ConnectorParams; use risingwave_storage::StateStore; use thiserror_ext::AsReport; @@ -89,13 +88,12 @@ impl FsListExecutor { fn build_chunked_paginate_stream( &self, source_desc: &SourceDesc, - ) -> StreamExecutorResult> { - let stream: std::pin::Pin< - Box> + Send>, - > = source_desc + ) -> StreamExecutorResult>> { + let stream = source_desc .source .get_source_list() - .map_err(StreamExecutorError::connector_error)?; + .map_err(StreamExecutorError::connector_error)? + .map_err(StreamExecutorError::connector_error); // Group FsPageItem stream into chunks of size 1024. let chunked_stream = stream.chunks(CHUNK_SIZE).map(|chunk| { @@ -120,7 +118,7 @@ impl FsListExecutor { )) }); - Ok(chunked_stream.boxed()) + Ok(chunked_stream) } #[try_stream(ok = Message, error = StreamExecutorError)] diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index c15d989e98f9a..e3675ccb33555 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -17,7 +17,7 @@ use std::time::Duration; use anyhow::anyhow; use either::Either; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; @@ -285,7 +285,8 @@ impl SourceExecutor { // Replace the source reader with a new one of the new state. let reader = self .build_stream_source_reader(source_desc, Some(target_state.clone())) - .await?; + .await? + .map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); @@ -420,7 +421,8 @@ impl SourceExecutor { let source_chunk_reader = self .build_stream_source_reader(&source_desc, recover_state) .instrument_await("source_build_reader") - .await?; + .await? + .map_err(StreamExecutorError::connector_error); // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index aee92ab430e33..263c0a98f8a71 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -18,10 +18,8 @@ use std::task::Poll; use either::Either; use futures::stream::{select_with_strategy, BoxStream, PollNext, SelectWithStrategy}; use futures::{Stream, StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; -use risingwave_connector::source::BoxTryStream; -use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; +use crate::executor::error::StreamExecutorResult; use crate::executor::Message; type ExecutorMessageStream = BoxStream<'static, StreamExecutorResult>; @@ -48,26 +46,14 @@ pub(super) struct StreamReaderWithPause { } impl StreamReaderWithPause { - /// Receive messages from the reader. Hang up on error. - #[try_stream(ok = M, error = StreamExecutorError)] - async fn data_stream(stream: BoxTryStream) { - // TODO: support stack trace for Stream - #[for_await] - for m in stream { - match m { - Ok(m) => yield m, - Err(err) => { - return Err(StreamExecutorError::connector_error(err)); - } - } - } - } - /// Construct a `StreamReaderWithPause` with one stream receiving barrier messages (and maybe /// other types of messages) and the other receiving data only (no barrier). - pub fn new(message_stream: ExecutorMessageStream, data_stream: BoxTryStream) -> Self { + pub fn new( + message_stream: ExecutorMessageStream, + data_stream: impl Stream> + Send + 'static, + ) -> Self { let message_stream_arm = message_stream.map_ok(Either::Left).boxed(); - let data_stream_arm = Self::data_stream(data_stream).map_ok(Either::Right).boxed(); + let data_stream_arm = data_stream.map_ok(Either::Right).boxed(); let inner = Self::new_inner(message_stream_arm, data_stream_arm); Self { inner, @@ -89,7 +75,10 @@ impl StreamReaderWithPause { } /// Replace the data stream with a new one for given `stream`. Used for split change. - pub fn replace_data_stream(&mut self, data_stream: BoxTryStream) { + pub fn replace_data_stream( + &mut self, + data_stream: impl Stream> + Send + 'static, + ) { // Take the barrier receiver arm. let barrier_receiver_arm = std::mem::replace( self.inner.get_mut().0, @@ -100,7 +89,7 @@ impl StreamReaderWithPause { // to ensure the internal state of the `SelectWithStrategy` is reset. (#6300) self.inner = Self::new_inner( barrier_receiver_arm, - Self::data_stream(data_stream).map_ok(Either::Right).boxed(), + data_stream.map_ok(Either::Right).boxed(), ); } @@ -148,7 +137,7 @@ mod tests { use tokio::sync::mpsc; use super::*; - use crate::executor::{barrier_to_message_stream, Barrier}; + use crate::executor::{barrier_to_message_stream, Barrier, StreamExecutorError}; const TEST_TRANSACTION_ID1: TxnId = 0; const TEST_TRANSACTION_ID2: TxnId = 1; @@ -161,7 +150,10 @@ mod tests { let table_dml_handle = TableDmlHandle::new(vec![], TEST_DML_CHANNEL_INIT_PERMITS); - let source_stream = table_dml_handle.stream_reader().into_data_stream_for_test(); + let source_stream = table_dml_handle + .stream_reader() + .into_data_stream_for_test() + .map_err(StreamExecutorError::from); let mut write_handle1 = table_dml_handle .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID1)