Skip to content

Commit

Permalink
cleanup misc RwError usage
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Feb 2, 2024
1 parent 5db1ab7 commit 1caa4b7
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
anyhow = "1"
async-trait = "0.1"
aws-config = { workspace = true }
aws-sdk-s3 = { workspace = true }
Expand Down
7 changes: 3 additions & 4 deletions src/bench/s3_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use futures::stream::{self, StreamExt};
use futures::{future, Future, FutureExt};
use itertools::Itertools;
use rand::{Rng, SeedableRng};
use risingwave_common::error::RwError;
use tokio::join;
use tokio::sync::RwLock;
use tracing::debug;
Expand Down Expand Up @@ -233,7 +232,7 @@ async fn multi_part_upload(
let part_t = Instant::now();
let result = a.send().await.unwrap();
let part_ttl = part_t.elapsed();
Ok::<_, RwError>((result, part_ttl))
Ok::<_, anyhow::Error>((result, part_ttl))
})
.collect_vec();
let ttfb = t.elapsed();
Expand Down Expand Up @@ -318,7 +317,7 @@ async fn multi_part_get(
.into_iter()
.map(create_part_get)
.map(|resp| async move {
let result: Result<(usize, Duration), RwError> = Ok((
let result: anyhow::Result<(usize, Duration)> = Ok((
resp.await
.unwrap()
.body
Expand Down Expand Up @@ -381,7 +380,7 @@ async fn run_case(
cfg: Arc<Config>,
client: Arc<Client>,
objs: Arc<RwLock<ObjPool>>,
) -> Result<(), RwError> {
) -> anyhow::Result<()> {
let (name, analysis) = match case.clone() {
Case::Put {
name,
Expand Down
8 changes: 4 additions & 4 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use core::str::FromStr;
use std::collections::HashMap;

use anyhow::anyhow;
use clap::Parser;
use futures::prelude::future::Either;
use futures::prelude::stream::{BoxStream, PollNext};
Expand All @@ -27,7 +28,6 @@ use futures::{FutureExt, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::ColumnId;
use risingwave_common::error::anyhow_error;
use risingwave_connector::dispatch_sink;
use risingwave_connector::parser::{
EncodingProperties, ParserConfig, ProtocolProperties, SpecificParserConfig,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl LogReader for MockRangeLogReader {
.take()
.unwrap()
.send(self.throughput_metric.take().unwrap())
.map_err(|_| anyhow_error!("Can't send throughput_metric"))?;
.map_err(|_| anyhow!("Can't send throughput_metric"))?;
futures::future::pending().await
},
item = self.upstreams.next() => {
Expand All @@ -108,7 +108,7 @@ impl LogReader for MockRangeLogReader {
},
))
}
_ => Err(anyhow_error!("Can't assert message type".to_string())),
_ => Err(anyhow!("Can't assert message type".to_string())),
}
}
}
Expand Down Expand Up @@ -390,7 +390,7 @@ fn mock_from_legacy_type(
SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
SINK_TYPE_UPSERT => SinkFormat::Upsert,
_ => {
return Err(SinkError::Config(risingwave_common::array::error::anyhow!(
return Err(SinkError::Config(anyhow!(
"sink type unsupported: {}",
r#type
)))
Expand Down
7 changes: 3 additions & 4 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{
ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId, INITIAL_TABLE_VERSION_ID,
};
use risingwave_common::error::{Result, RwError};
use risingwave_common::row::OwnedRow;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::test_prelude::DataChunkTestExt;
Expand Down Expand Up @@ -297,7 +296,7 @@ async fn test_table_materialize() -> StreamResult<()> {
barrier_tx_clone
.send(Barrier::new_test_barrier(curr_epoch))
.unwrap();
Ok::<_, RwError>(())
anyhow::Ok(())
});

// Poll `Materialize`, should output the same insertion stream chunk.
Expand Down Expand Up @@ -379,7 +378,7 @@ async fn test_table_materialize() -> StreamResult<()> {
barrier_tx_clone
.send(Barrier::new_test_barrier(curr_epoch))
.unwrap();
Ok::<_, RwError>(())
anyhow::Ok(())
});

// Poll `Materialize`, should output the same deletion stream chunk.
Expand Down Expand Up @@ -429,7 +428,7 @@ async fn test_table_materialize() -> StreamResult<()> {
}

#[tokio::test]
async fn test_row_seq_scan() -> Result<()> {
async fn test_row_seq_scan() -> StreamResult<()> {
// In this test we test if the memtable can be correctly scanned for K-V pair insertions.
let memory_state_store = MemoryStateStore::new();

Expand Down
7 changes: 3 additions & 4 deletions src/storage/benches/bench_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::error::Result;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum, ScalarImpl};
use risingwave_common::util::row_serde::OrderedRowSerde;
Expand Down Expand Up @@ -104,7 +103,7 @@ fn column_aware_encode(c: &Case) -> Vec<Vec<u8>> {
array
}

fn memcmp_decode(c: &Case, bytes: &Vec<Vec<u8>>) -> Result<Vec<Vec<Datum>>> {
fn memcmp_decode(c: &Case, bytes: &Vec<Vec<u8>>) -> anyhow::Result<Vec<Vec<Datum>>> {
let serde = OrderedRowSerde::new(
c.schema.to_vec(),
vec![OrderType::descending(); c.schema.len()],
Expand Down Expand Up @@ -146,7 +145,7 @@ fn memcmp_decode(c: &Case, bytes: &Vec<Vec<u8>>) -> Result<Vec<Vec<Datum>>> {
Ok(res)
}

fn basic_decode(c: &Case, bytes: &Vec<Vec<u8>>) -> Result<Vec<Vec<Datum>>> {
fn basic_decode(c: &Case, bytes: &Vec<Vec<u8>>) -> anyhow::Result<Vec<Vec<Datum>>> {
let table_columns = c
.column_ids
.iter()
Expand Down Expand Up @@ -195,7 +194,7 @@ fn basic_decode(c: &Case, bytes: &Vec<Vec<u8>>) -> Result<Vec<Vec<Datum>>> {
Ok(res)
}

fn column_aware_decode(c: &Case, bytes: &Vec<Vec<u8>>) -> Result<Vec<Vec<Datum>>> {
fn column_aware_decode(c: &Case, bytes: &Vec<Vec<u8>>) -> anyhow::Result<Vec<Vec<Datum>>> {
let table_columns = c
.column_ids
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

use std::time::Duration;

use anyhow::Result;
use risingwave_common::error::anyhow_error;
use anyhow::{anyhow, Result};
use risingwave_simulation::cluster::{Cluster, Configuration, Session};
use tokio::time::sleep;

Expand Down Expand Up @@ -49,7 +48,7 @@ async fn cancel_stream_jobs(session: &mut Session) -> Result<Vec<u32>> {
.split('\n')
.map(|s| {
s.parse::<u32>()
.map_err(|_e| anyhow_error!("failed to parse {}", s))
.map_err(|_e| anyhow!("failed to parse {}", s))
})
.collect::<Result<Vec<_>>>()?;
Ok(ids)
Expand Down

0 comments on commit 1caa4b7

Please sign in to comment.