Skip to content

Commit

Permalink
refactor(error): dedicated error type for dml crate
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 24, 2024
1 parent 30fcc93 commit 12531c8
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 75 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use anyhow::anyhow;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{BoxedError, ErrorCode, RwError};
use risingwave_common::util::value_encoding::error::ValueEncodingError;
use risingwave_dml::error::DmlError;
use risingwave_expr::ExprError;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
Expand Down Expand Up @@ -101,6 +102,13 @@ pub enum BatchError {
BoxedError,
),

#[error(transparent)]
Dml(
#[from]
#[backtrace]
DmlError,
),

// Make the ref-counted type to be a variant for easier code structuring.
#[error(transparent)]
Shared(
Expand Down
3 changes: 2 additions & 1 deletion src/dml/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
itertools = "0.12"
Expand All @@ -24,6 +23,8 @@ risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_pb = { workspace = true }
rw_futures_util = { workspace = true }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] }
tracing = { version = "0.1" }

Expand Down
12 changes: 4 additions & 8 deletions src/dml/src/dml_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, Weak};

use anyhow::Context;
use parking_lot::RwLock;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, TableId, TableVersionId};
use risingwave_common::error::Result;
use risingwave_common::transaction::transaction_id::{TxnId, TxnIdGenerator};
use risingwave_common::util::worker_util::WorkerNodeId;

use crate::error::{DmlError, Result};
use crate::{TableDmlHandle, TableDmlHandleRef};

pub type DmlManagerRef = Arc<DmlManager>;
Expand Down Expand Up @@ -112,9 +110,7 @@ impl DmlManager {
"dml handler registers with same version but different schema"
)
})
.with_context(|| {
format!("fail to register reader for table with key `{table_id:?}`")
})?,
.expect("the first dml executor is gone"), // this should never happen

// A new version of the table is activated, overwrite the old reader.
Ordering::Greater => new_handle!(o),
Expand All @@ -139,7 +135,7 @@ impl DmlManager {
// A new version of the table is activated, but the DML request is still on
// the old version.
Ordering::Less => {
bail!("schema changed for table `{table_id:?}`, please retry later")
return Err(DmlError::SchemaChanged);
}

// Write the chunk of correct version to the table.
Expand All @@ -155,7 +151,7 @@ impl DmlManager {
None => None,
}
}
.with_context(|| format!("no reader for dml in table `{table_id:?}`"))?;
.ok_or(DmlError::NoReader)?;

Ok(table_dml_handle)
}
Expand Down
28 changes: 28 additions & 0 deletions src/dml/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// The error type for DML operations.
#[derive(thiserror::Error, Debug)]
pub enum DmlError {
#[error("table schema has changed, please try again later")]
SchemaChanged,

#[error("no available table reader in streaming executors")]
NoReader,

#[error("table reader closed")]
ReaderClosed,
}

pub type Result<T> = std::result::Result<T, DmlError>;
2 changes: 2 additions & 0 deletions src/dml/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#![feature(type_alias_impl_trait)]
#![feature(box_patterns)]
#![feature(stmt_expr_attributes)]
#![feature(error_generic_member_access)]

pub use table::*;

pub mod dml_manager;
pub mod error;
mod table;
mod txn_channel;
25 changes: 11 additions & 14 deletions src/dml/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@

use std::sync::Arc;

use anyhow::{anyhow, Context};
use futures_async_stream::try_stream;
use parking_lot::RwLock;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::error::{Result, RwError};
use risingwave_common::transaction::transaction_id::TxnId;
use risingwave_common::transaction::transaction_message::TxnMsg;
use risingwave_connector::source::StreamChunkWithState;
use tokio::sync::oneshot;

use crate::error::{DmlError, Result};
use crate::txn_channel::{txn_channel, Receiver, Sender};

pub type TableDmlHandleRef = Arc<TableDmlHandle>;
Expand Down Expand Up @@ -89,17 +88,15 @@ impl TableDmlHandle {
loop {
let guard = self.core.read();
if guard.changes_txs.is_empty() {
return Err(RwError::from(anyhow!(
"no available table reader in streaming source executors"
)));
return Err(DmlError::NoReader);
}
let len = guard.changes_txs.len();
// Use session id instead of txn_id to choose channel so that we can preserve transaction order in the same session.
// PS: only hold if there's no scaling on the table.
let sender = guard
.changes_txs
.get((session_id % len as u32) as usize)
.context("no available table reader in streaming source executors")?
.unwrap()
.clone();

drop(guard);
Expand Down Expand Up @@ -183,7 +180,8 @@ impl WriteHandle {
pub async fn write_chunk(&self, chunk: StreamChunk) -> Result<()> {
assert_eq!(self.txn_state, TxnState::Begin);
// Ignore the notifier.
self.write_txn_data_msg(TxnMsg::Data(self.txn_id, chunk))
let _notifier = self
.write_txn_data_msg(TxnMsg::Data(self.txn_id, chunk))
.await?;
Ok(())
}
Expand All @@ -192,9 +190,8 @@ impl WriteHandle {
assert_eq!(self.txn_state, TxnState::Begin);
self.txn_state = TxnState::Committed;
// Await the notifier.
self.write_txn_control_msg(TxnMsg::End(self.txn_id))?
.await
.context("failed to wait the end message")?;
let notifier = self.write_txn_control_msg(TxnMsg::End(self.txn_id))?;
notifier.await.map_err(|_| DmlError::ReaderClosed)?;
Ok(())
}

Expand All @@ -221,7 +218,7 @@ impl WriteHandle {

// It's possible that the source executor is scaled in or migrated, so the channel
// is closed. To guarantee the transactional atomicity, bail out.
Err(_) => Err(RwError::from("write txn_msg channel closed".to_string())),
Err(_) => Err(DmlError::ReaderClosed),
}
}

Expand All @@ -235,7 +232,7 @@ impl WriteHandle {

// It's possible that the source executor is scaled in or migrated, so the channel
// is closed. To guarantee the transactional atomicity, bail out.
Err(_) => Err(RwError::from("write txn_msg channel closed".to_string())),
Err(_) => Err(DmlError::ReaderClosed),
}
}
}
Expand All @@ -251,7 +248,7 @@ pub struct TableStreamReader {
}

impl TableStreamReader {
#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
#[try_stream(boxed, ok = StreamChunkWithState, error = DmlError)]
pub async fn into_data_stream_for_test(mut self) {
while let Some((txn_msg, notifier)) = self.rx.recv().await {
// Notify about that we've taken the chunk.
Expand All @@ -267,7 +264,7 @@ impl TableStreamReader {
}
}

#[try_stream(boxed, ok = TxnMsg, error = RwError)]
#[try_stream(boxed, ok = TxnMsg, error = DmlError)]
pub async fn into_stream(mut self) {
while let Some((txn_msg, notifier)) = self.rx.recv().await {
// Notify about that we've taken the chunk.
Expand Down
19 changes: 12 additions & 7 deletions src/stream/src/executor/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::BTreeMap;
use std::mem;

use either::Either;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId};
Expand Down Expand Up @@ -106,16 +106,21 @@ impl DmlExecutor {
// Note(bugen): Only register after the first barrier message is received, which means the
// current executor is activated. This avoids the new reader overwriting the old one during
// the preparation of schema change.
let batch_reader = self
.dml_manager
.register_reader(self.table_id, self.table_version_id, &self.column_descs)
.map_err(StreamExecutorError::connector_error)?;
let batch_reader = batch_reader.stream_reader().into_stream();
let handle = self.dml_manager.register_reader(
self.table_id,
self.table_version_id,
&self.column_descs,
)?;
let reader = handle
.stream_reader()
.into_stream()
.map_err(StreamExecutorError::from)
.boxed();

// Merge the two streams using `StreamReaderWithPause` because when we receive a pause
// barrier, we should stop receiving the data from DML. We poll data from the two streams in
// a round robin way.
let mut stream = StreamReaderWithPause::<false, TxnMsg>::new(upstream, batch_reader);
let mut stream = StreamReaderWithPause::<false, TxnMsg>::new(upstream, reader);

// If the first barrier requires us to pause on startup, pause the stream.
if barrier.is_pause_on_startup() {
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::error::{BoxedError, NotImplemented};
use risingwave_common::util::value_encoding::error::ValueEncodingError;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::sink::SinkError;
use risingwave_dml::error::DmlError;
use risingwave_expr::ExprError;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::RpcError;
Expand Down Expand Up @@ -87,11 +88,11 @@ pub enum ErrorKind {
BoxedError,
),

#[error("Dml error: {0}")]
#[error(transparent)]
DmlError(
#[source]
#[from]
#[backtrace]
BoxedError,
DmlError,
),

#[error(transparent)]
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::ops::Bound;
use std::sync::Arc;

use either::Either;
use futures::pin_mut;
use futures::stream::{self, StreamExt};
use futures::{pin_mut, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::catalog::{ColumnId, Schema, TableId};
use risingwave_common::hash::VnodeBitmapExt;
Expand Down Expand Up @@ -147,7 +147,8 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
*splits_on_fetch += batch.len();
let batch_reader =
Self::build_batched_stream_reader(column_ids, source_ctx, source_desc, Some(batch))
.await?;
.await?
.map_err(StreamExecutorError::connector_error);
stream.replace_data_stream(batch_reader);
}

Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use either::Either;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::catalog::Schema;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
Expand Down Expand Up @@ -196,7 +196,8 @@ impl<S: StateStore> FsSourceExecutor<S> {
// Replace the source reader with a new one of the new state.
let reader = self
.build_stream_source_reader(source_desc, Some(target_state.clone()))
.await?;
.await?
.map_err(StreamExecutorError::connector_error);
stream.replace_data_stream(reader);

self.stream_source_core.stream_source_splits = target_state
Expand Down Expand Up @@ -337,7 +338,8 @@ impl<S: StateStore> FsSourceExecutor<S> {
let source_chunk_reader = self
.build_stream_source_reader(&source_desc, recover_state)
.instrument_await("fs_source_start_reader")
.await?;
.await?
.map_err(StreamExecutorError::connector_error);

// Merge the chunks from source and the barriers into a single stream. We prioritize
// barriers over source data chunks here.
Expand Down
16 changes: 7 additions & 9 deletions src/stream/src/executor/source/list_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::sync::Arc;

use anyhow::anyhow;
use either::Either;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::Op;
use risingwave_common::catalog::Schema;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_connector::source::filesystem::FsPageItem;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts};
use risingwave_connector::source::SourceCtrlOpts;
use risingwave_connector::ConnectorParams;
use risingwave_storage::StateStore;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -89,13 +88,12 @@ impl<S: StateStore> FsListExecutor<S> {
fn build_chunked_paginate_stream(
&self,
source_desc: &SourceDesc,
) -> StreamExecutorResult<BoxTryStream<StreamChunk>> {
let stream: std::pin::Pin<
Box<dyn Stream<Item = Result<FsPageItem, risingwave_common::error::RwError>> + Send>,
> = source_desc
) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<StreamChunk>>> {
let stream = source_desc
.source
.get_source_list()
.map_err(StreamExecutorError::connector_error)?;
.map_err(StreamExecutorError::connector_error)?
.map_err(StreamExecutorError::connector_error);

// Group FsPageItem stream into chunks of size 1024.
let chunked_stream = stream.chunks(CHUNK_SIZE).map(|chunk| {
Expand All @@ -120,7 +118,7 @@ impl<S: StateStore> FsListExecutor<S> {
))
});

Ok(chunked_stream.boxed())
Ok(chunked_stream)
}

#[try_stream(ok = Message, error = StreamExecutorError)]
Expand Down
Loading

0 comments on commit 12531c8

Please sign in to comment.