Skip to content

Commit

Permalink
refactor(error): clean-up direct error formatting (part 3) (#14685)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jan 19, 2024
1 parent 1a655d9 commit 151fd5d
Show file tree
Hide file tree
Showing 43 changed files with 218 additions and 168 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ risingwave_storage = { workspace = true }
risingwave_stream = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
"rt",
Expand Down
3 changes: 2 additions & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_stream::executor::test_utils::prelude::ColumnDesc;
use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError};
use serde::{Deserialize, Deserializer};
use thiserror_ext::AsReport;
use tokio::sync::oneshot::Sender;
use tokio::time::{sleep, Instant};

Expand Down Expand Up @@ -298,7 +299,7 @@ where
}
let log_sinker = sink.new_log_sinker(sink_writer_param).await.unwrap();
if let Err(e) = log_sinker.consume_log_and_sink(&mut log_reader).await {
return Err(e.to_string());
return Err(e.to_report_string());
}
Err("Stream closed".to_string())
}
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl MultiExprError {
impl Display for MultiExprError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (i, e) in self.0.iter().enumerate() {
writeln!(f, "{i}: {e}")?;
writeln!(f, "{i}: {}", e.as_report())?;
}
Ok(())
}
Expand Down
17 changes: 12 additions & 5 deletions src/expr/impl/src/scalar/external/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::expr::BoxedExpression;
use risingwave_expr::{build_function, ExprError, Result};
use thiserror_ext::AsReport;

pub struct IcebergTransform {
child: BoxedExpression,
Expand Down Expand Up @@ -93,23 +94,29 @@ fn build(return_type: DataType, mut children: Vec<BoxedExpression>) -> Result<Bo
let input_type = IcelakeDataType::try_from(ArrowDataType::try_from(children[1].return_type())?)
.map_err(|err| ExprError::InvalidParam {
name: "input type in iceberg_transform",
reason: format!("Failed to convert input type to icelake type, got error: {err}",)
.into(),
reason: format!(
"Failed to convert input type to icelake type, got error: {}",
err.as_report()
)
.into(),
})?;
let expect_res_type = transform_type.result_type(&input_type).map_err(
|err| ExprError::InvalidParam {
name: "input type in iceberg_transform",
reason: format!(
"Failed to get result type for transform type {:?} and input type {:?}, got error: {}",
transform_type, input_type, err
transform_type, input_type, err.as_report()
)
.into()
})?;
let actual_res_type = IcelakeDataType::try_from(ArrowDataType::try_from(return_type.clone())?)
.map_err(|err| ExprError::InvalidParam {
name: "return type in iceberg_transform",
reason: format!("Failed to convert return type to icelake type, got error: {err}",)
.into(),
reason: format!(
"Failed to convert return type to icelake type, got error: {}",
err.as_report()
)
.into(),
})?;
ensure!(
expect_res_type == actual_res_type,
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_pb::meta::table_parallelism::{AutoParallelism, FixedParallelism,
use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
use risingwave_sqlparser::keywords::Keyword;
use thiserror_ext::AsReport;

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
Expand Down Expand Up @@ -115,10 +116,10 @@ fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParal

SetVariableValue::Default => auto_parallelism,
SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
let fixed_parallelism = v.parse().map_err(|e| {
let fixed_parallelism = v.parse::<u32>().map_err(|e| {
ErrorCode::InvalidInputSyntax(format!(
"target parallelism must be a valid number or auto: {}",
e
e.as_report()
))
})?;

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::sync::LazyLock;

use anyhow::Context;
use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
Expand Down Expand Up @@ -385,8 +386,7 @@ pub(crate) async fn bind_columns_from_source(
(Format::Plain, Encode::Csv) => {
let chars =
consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0;
let delimiter =
get_delimiter(chars.as_str()).map_err(|e| RwError::from(e.to_string()))?;
let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?;
let has_header = try_consume_string_from_options(
&mut format_encode_options_to_consume,
"without_header",
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sea-orm = { version = "0.12.0", features = [
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand Down
22 changes: 12 additions & 10 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use either::Either;
use etcd_client::ConnectOptions;
use futures::future::join_all;
Expand Down Expand Up @@ -68,6 +69,7 @@ use risingwave_pb::meta::SystemParams;
use risingwave_pb::user::user_service_server::UserServiceServer;
use risingwave_rpc_client::ComputeClientPool;
use sea_orm::{ConnectionTrait, DbBackend};
use thiserror_ext::AsReport;
use tokio::sync::oneshot::{channel as OneChannel, Receiver as OneReceiver};
use tokio::sync::watch;
use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
Expand Down Expand Up @@ -165,7 +167,7 @@ pub async fn rpc_serve(
let client =
EtcdClient::connect(endpoints.clone(), Some(options.clone()), auth_enabled)
.await
.map_err(|e| anyhow::anyhow!("failed to connect etcd {}", e))?;
.context("failed to connect etcd")?;
let meta_store = EtcdMetaStore::new(client).into_ref();

if election_client.is_none() {
Expand Down Expand Up @@ -234,7 +236,7 @@ pub fn rpc_serve_with_store(
.run_once(lease_interval_secs as i64, stop_rx.clone())
.await
{
tracing::error!("election error happened, {}", e.to_string());
tracing::error!(error = %e.as_report(), "election error happened");
}
});

Expand All @@ -252,8 +254,8 @@ pub fn rpc_serve_with_store(
tokio::select! {
_ = svc_shutdown_rx_clone.changed() => return,
res = is_leader_watcher.changed() => {
if let Err(err) = res {
tracing::error!("leader watcher recv failed {}", err.to_string());
if res.is_err() {
tracing::error!("leader watcher recv failed");
}
}
}
Expand Down Expand Up @@ -284,8 +286,8 @@ pub fn rpc_serve_with_store(
return;
}
res = is_leader_watcher.changed() => {
if let Err(err) = res {
tracing::error!("leader watcher recv failed {}", err.to_string());
if res.is_err() {
tracing::error!("leader watcher recv failed");
}
}
}
Expand Down Expand Up @@ -771,13 +773,13 @@ pub async fn start_service_as_election_leader(
match tokio::time::timeout(Duration::from_secs(1), join_all(handles)).await {
Ok(results) => {
for result in results {
if let Err(err) = result {
tracing::warn!("Failed to join shutdown: {:?}", err);
if result.is_err() {
tracing::warn!("Failed to join shutdown");
}
}
}
Err(e) => {
tracing::warn!("Join shutdown timeout: {:?}", e);
Err(_e) => {
tracing::warn!("Join shutdown timeout");
}
}
};
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ sea-orm = { version = "0.12.0", features = [
"macros",
] }
sync-point = { path = "../../utils/sync-point" }
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand Down
14 changes: 9 additions & 5 deletions src/meta/service/src/cloud_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use risingwave_connector::source::kafka::private_link::insert_privatelink_broker
use risingwave_connector::source::{
ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator,
};
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::manager::{ConnectionId, MetadataManager};
use risingwave_pb::catalog::connection::Info::PrivateLinkService;
use risingwave_pb::cloud_service::cloud_service_server::CloudService;
use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType};
use risingwave_pb::cloud_service::{
RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType,
};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

use crate::rpc::cloud_provider::AwsEc2Client;
Expand Down Expand Up @@ -77,8 +78,11 @@ impl CloudService for CloudServiceImpl {
// if connection_id provided, check whether endpoint service is available and resolve
// broker rewrite map currently only support aws privatelink connection
if let Some(connection_id_str) = source_cfg.get("connection.id") {
let connection_id = connection_id_str.parse().map_err(|e| {
Status::invalid_argument(format!("connection.id is not an integer: {}", e))
let connection_id = connection_id_str.parse::<ConnectionId>().map_err(|e| {
Status::invalid_argument(format!(
"connection.id is not an integer: {}",
e.as_report()
))
})?;

let connection = match &self.metadata_manager {
Expand All @@ -97,7 +101,7 @@ impl CloudService for CloudServiceImpl {
if let Err(e) = connection {
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkConnectionNotFound,
e.to_string(),
e.to_report_string(),
));
}
if let Some(PrivateLinkService(service)) = connection.unwrap().info {
Expand All @@ -115,7 +119,7 @@ impl CloudService for CloudServiceImpl {
Err(e) => {
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkUnavailable,
e.to_string(),
e.to_report_string(),
));
}
Ok(false) => {
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/cluster_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_pb::meta::{
ListAllNodesResponse, UpdateWorkerNodeSchedulabilityRequest,
UpdateWorkerNodeSchedulabilityResponse,
};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

use crate::MetaError;
Expand Down Expand Up @@ -64,7 +65,7 @@ impl ClusterService for ClusterServiceImpl {
return Ok(Response::new(AddWorkerNodeResponse {
status: Some(risingwave_pb::common::Status {
code: risingwave_pb::common::status::Code::UnknownWorker as i32,
message: format!("{}", e),
message: e.to_report_string(),
}),
node_id: None,
}));
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/heartbeat_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use itertools::Itertools;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::meta::heartbeat_service_server::HeartbeatService;
use risingwave_pb::meta::{HeartbeatRequest, HeartbeatResponse};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

#[derive(Clone)]
Expand Down Expand Up @@ -58,7 +59,7 @@ impl HeartbeatService for HeartbeatServiceImpl {
return Ok(Response::new(HeartbeatResponse {
status: Some(risingwave_pb::common::Status {
code: risingwave_pb::common::status::Code::UnknownWorker as i32,
message: format!("{}", e),
message: e.to_report_string(),
}),
}));
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
use risingwave_pb::hummock::*;
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status, Streaming};

use crate::hummock::compaction::selector::ManualCompactionOption;
Expand Down Expand Up @@ -308,7 +309,7 @@ impl HummockManagerService for HummockServiceImpl {
tracing::info!("Full GC results {} SSTs to delete", number);
}
Err(e) => {
tracing::warn!("Full GC SST failed: {:#?}", e);
tracing::warn!(error = %e.as_report(), "Full GC SST failed");
}
}
});
Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;

use crate::backup_restore::meta_snapshot_builder;
Expand Down Expand Up @@ -134,10 +135,10 @@ impl BackupManager {
if let Err(e) = self.set_store(new_config.clone()).await {
// Retry is driven by periodic system params notification.
tracing::warn!(
"failed to apply new backup config: url={}, dir={}, {:#?}",
new_config.0,
new_config.1,
e
url = &new_config.0,
dir = &new_config.1,
error = %e.as_report(),
"failed to apply new backup config",
);
}
}
Expand Down Expand Up @@ -269,7 +270,7 @@ impl BackupManager {
}
BackupJobResult::Failed(e) => {
self.metrics.job_latency_failure.observe(job_latency);
let message = format!("failed backup job {}: {}", job_id, e);
let message = format!("failed backup job {}: {}", job_id, e.as_report());
tracing::warn!(message);
self.latest_job_info
.store(Arc::new((job_id, BackupJobStatus::Failed, message)));
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_hummock_sdk::version_checkpoint_path;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::hummock::PbHummockVersionCheckpoint;
use thiserror_ext::AsReport;

use crate::backup_restore::restore_impl::v1::{LoaderV1, WriterModelV1ToMetaStoreV1};
use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
Expand Down Expand Up @@ -193,7 +194,7 @@ pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
tracing::info!("command succeeded");
}
Err(e) => {
tracing::warn!("command failed: {}", e);
tracing::warn!(error = %e.as_report(), "command failed");
}
}
result
Expand Down
Loading

0 comments on commit 151fd5d

Please sign in to comment.