From 1caa4b77330958609eaed062335d514909edcc21 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 2 Feb 2024 15:07:55 +0800 Subject: [PATCH] cleanup misc `RwError` usage Signed-off-by: Bugen Zhao --- Cargo.lock | 1 + src/bench/Cargo.toml | 1 + src/bench/s3_bench/main.rs | 7 +++---- src/bench/sink_bench/main.rs | 8 ++++---- src/compute/tests/integration_tests.rs | 7 +++---- src/storage/benches/bench_row.rs | 7 +++---- .../tests/integration_tests/recovery/background_ddl.rs | 5 ++--- 7 files changed, 17 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac02d4713c73e..4569d6be594e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8620,6 +8620,7 @@ dependencies = [ name = "risingwave_bench" version = "1.7.0-alpha" dependencies = [ + "anyhow", "async-trait", "aws-config", "aws-sdk-s3", diff --git a/src/bench/Cargo.toml b/src/bench/Cargo.toml index 86e88b53a6149..63e8d89844c54 100644 --- a/src/bench/Cargo.toml +++ b/src/bench/Cargo.toml @@ -8,6 +8,7 @@ license = { workspace = true } repository = { workspace = true } [dependencies] +anyhow = "1" async-trait = "0.1" aws-config = { workspace = true } aws-sdk-s3 = { workspace = true } diff --git a/src/bench/s3_bench/main.rs b/src/bench/s3_bench/main.rs index 8eec3e6cfbeea..792c9c4743dbf 100644 --- a/src/bench/s3_bench/main.rs +++ b/src/bench/s3_bench/main.rs @@ -28,7 +28,6 @@ use futures::stream::{self, StreamExt}; use futures::{future, Future, FutureExt}; use itertools::Itertools; use rand::{Rng, SeedableRng}; -use risingwave_common::error::RwError; use tokio::join; use tokio::sync::RwLock; use tracing::debug; @@ -233,7 +232,7 @@ async fn multi_part_upload( let part_t = Instant::now(); let result = a.send().await.unwrap(); let part_ttl = part_t.elapsed(); - Ok::<_, RwError>((result, part_ttl)) + Ok::<_, anyhow::Error>((result, part_ttl)) }) .collect_vec(); let ttfb = t.elapsed(); @@ -318,7 +317,7 @@ async fn multi_part_get( .into_iter() .map(create_part_get) .map(|resp| async move { - let result: Result<(usize, Duration), RwError> = Ok(( + let result: anyhow::Result<(usize, Duration)> = Ok(( resp.await .unwrap() .body @@ -381,7 +380,7 @@ async fn run_case( cfg: Arc, client: Arc, objs: Arc>, -) -> Result<(), RwError> { +) -> anyhow::Result<()> { let (name, analysis) = match case.clone() { Case::Put { name, diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 067460db830db..66572ca67cdf2 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -19,6 +19,7 @@ use core::str::FromStr; use std::collections::HashMap; +use anyhow::anyhow; use clap::Parser; use futures::prelude::future::Either; use futures::prelude::stream::{BoxStream, PollNext}; @@ -27,7 +28,6 @@ use futures::{FutureExt, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::ColumnId; -use risingwave_common::error::anyhow_error; use risingwave_connector::dispatch_sink; use risingwave_connector::parser::{ EncodingProperties, ParserConfig, ProtocolProperties, SpecificParserConfig, @@ -82,7 +82,7 @@ impl LogReader for MockRangeLogReader { .take() .unwrap() .send(self.throughput_metric.take().unwrap()) - .map_err(|_| anyhow_error!("Can't send throughput_metric"))?; + .map_err(|_| anyhow!("Can't send throughput_metric"))?; futures::future::pending().await }, item = self.upstreams.next() => { @@ -108,7 +108,7 @@ impl LogReader for MockRangeLogReader { }, )) } - _ => Err(anyhow_error!("Can't assert message type".to_string())), + _ => Err(anyhow!("Can't assert message type".to_string())), } } } @@ -390,7 +390,7 @@ fn mock_from_legacy_type( SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly, SINK_TYPE_UPSERT => SinkFormat::Upsert, _ => { - return Err(SinkError::Config(risingwave_common::array::error::anyhow!( + return Err(SinkError::Config(anyhow!( "sink type unsupported: {}", r#type ))) diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index d9e9e608abc92..21fbe90b514e0 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -32,7 +32,6 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId, INITIAL_TABLE_VERSION_ID, }; -use risingwave_common::error::{Result, RwError}; use risingwave_common::row::OwnedRow; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::test_prelude::DataChunkTestExt; @@ -297,7 +296,7 @@ async fn test_table_materialize() -> StreamResult<()> { barrier_tx_clone .send(Barrier::new_test_barrier(curr_epoch)) .unwrap(); - Ok::<_, RwError>(()) + anyhow::Ok(()) }); // Poll `Materialize`, should output the same insertion stream chunk. @@ -379,7 +378,7 @@ async fn test_table_materialize() -> StreamResult<()> { barrier_tx_clone .send(Barrier::new_test_barrier(curr_epoch)) .unwrap(); - Ok::<_, RwError>(()) + anyhow::Ok(()) }); // Poll `Materialize`, should output the same deletion stream chunk. @@ -429,7 +428,7 @@ async fn test_table_materialize() -> StreamResult<()> { } #[tokio::test] -async fn test_row_seq_scan() -> Result<()> { +async fn test_row_seq_scan() -> StreamResult<()> { // In this test we test if the memtable can be correctly scanned for K-V pair insertions. let memory_state_store = MemoryStateStore::new(); diff --git a/src/storage/benches/bench_row.rs b/src/storage/benches/bench_row.rs index 6a612e87241cd..49fa52ad5adde 100644 --- a/src/storage/benches/bench_row.rs +++ b/src/storage/benches/bench_row.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use itertools::Itertools; use risingwave_common::catalog::{ColumnDesc, ColumnId}; -use risingwave_common::error::Result; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum, ScalarImpl}; use risingwave_common::util::row_serde::OrderedRowSerde; @@ -104,7 +103,7 @@ fn column_aware_encode(c: &Case) -> Vec> { array } -fn memcmp_decode(c: &Case, bytes: &Vec>) -> Result>> { +fn memcmp_decode(c: &Case, bytes: &Vec>) -> anyhow::Result>> { let serde = OrderedRowSerde::new( c.schema.to_vec(), vec![OrderType::descending(); c.schema.len()], @@ -146,7 +145,7 @@ fn memcmp_decode(c: &Case, bytes: &Vec>) -> Result>> { Ok(res) } -fn basic_decode(c: &Case, bytes: &Vec>) -> Result>> { +fn basic_decode(c: &Case, bytes: &Vec>) -> anyhow::Result>> { let table_columns = c .column_ids .iter() @@ -195,7 +194,7 @@ fn basic_decode(c: &Case, bytes: &Vec>) -> Result>> { Ok(res) } -fn column_aware_decode(c: &Case, bytes: &Vec>) -> Result>> { +fn column_aware_decode(c: &Case, bytes: &Vec>) -> anyhow::Result>> { let table_columns = c .column_ids .iter() diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index e7792d5930e03..91aea55a2dc31 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -14,8 +14,7 @@ use std::time::Duration; -use anyhow::Result; -use risingwave_common::error::anyhow_error; +use anyhow::{anyhow, Result}; use risingwave_simulation::cluster::{Cluster, Configuration, Session}; use tokio::time::sleep; @@ -49,7 +48,7 @@ async fn cancel_stream_jobs(session: &mut Session) -> Result> { .split('\n') .map(|s| { s.parse::() - .map_err(|_e| anyhow_error!("failed to parse {}", s)) + .map_err(|_e| anyhow!("failed to parse {}", s)) }) .collect::>>()?; Ok(ids)