diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 6abe36d7429c9..119170640b652 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -266,7 +266,7 @@ macro_rules! impl_set_system_param { let v = if let Some(v) = value { v.parse().map_err(|_| format!("cannot parse parameter value"))? } else { - $default.ok_or(format!("{} does not have a default value", key))? + $default.ok_or_else(|| format!("{} does not have a default value", key))? }; OverrideValidateOnSet::$field(&v)?; params.$field = Some(v); diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index 1a373c1a99dea..ea9841dd981c0 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -127,9 +127,9 @@ impl<'a> AvroParseOptions<'a> { .find(|field| field.0 == field_name) .map(|field| &field.1) }; - let scale = match find_in_records("scale").ok_or(AccessError::Other(anyhow!( - "scale field not found in VariableScaleDecimal" - )))? { + let scale = match find_in_records("scale").ok_or_else(|| { + AccessError::Other(anyhow!("scale field not found in VariableScaleDecimal")) + })? { Value::Int(scale) => Ok(*scale), avro_value => Err(AccessError::Other(anyhow!( "scale field in VariableScaleDecimal is not int, got {:?}", @@ -137,9 +137,9 @@ impl<'a> AvroParseOptions<'a> { ))), }?; - let value: BigInt = match find_in_records("value").ok_or(AccessError::Other( - anyhow!("value field not found in VariableScaleDecimal"), - ))? { + let value: BigInt = match find_in_records("value").ok_or_else(|| { + AccessError::Other(anyhow!("value field not found in VariableScaleDecimal")) + })? { Value::Bytes(bytes) => Ok(BigInt::from_signed_bytes_be(bytes)), avro_value => Err(AccessError::Other(anyhow!( "value field in VariableScaleDecimal is not bytes, got {:?}", diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index a688149dab378..9b3ac805a95a7 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -263,10 +263,10 @@ impl ClickHouseSinkWriter { .r#type .split("DateTime64(") .last() - .ok_or(SinkError::ClickHouse("must have last".to_string()))? + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? .split(')') .next() - .ok_or(SinkError::ClickHouse("must have next".to_string()))? + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? .parse::() .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))? } else { @@ -482,7 +482,7 @@ impl ClickHouseSinkWriter { Op::UpdateDelete => continue, Op::UpdateInsert => { let pk = Self::build_ck_fields(row.datum_at(pk_index), accuracy_time)? - .ok_or(SinkError::ClickHouse("pk can not be none".to_string()))?; + .ok_or_else(|| SinkError::ClickHouse("pk can not be none".to_string()))?; let fields_vec = self.build_update_fields(row, accuracy_time)?; self.client .update( diff --git a/src/connector/src/sink/coordinate.rs b/src/connector/src/sink/coordinate.rs index b8a4c2771e132..f81a2fe1bac3b 100644 --- a/src/connector/src/sink/coordinate.rs +++ b/src/connector/src/sink/coordinate.rs @@ -61,9 +61,9 @@ impl>> SinkWriter for Coordi async fn barrier(&mut self, is_checkpoint: bool) -> Result { let metadata = self.inner.barrier(is_checkpoint).await?; if is_checkpoint { - let metadata = metadata.ok_or(SinkError::Coordinator(anyhow!( - "should get metadata on checkpoint barrier" - )))?; + let metadata = metadata.ok_or_else(|| { + SinkError::Coordinator(anyhow!("should get metadata on checkpoint barrier")) + })?; // TODO: add metrics to measure time to commit self.coordinator_stream_handle .commit(self.epoch, metadata) diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 8d9f5a0c492e9..8086df6c04ed9 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -147,7 +147,7 @@ fn get_property_required( with_properties .get(property) .map(|s| s.to_lowercase()) - .ok_or(anyhow!("Required property \"{property}\" is not provided")) + .ok_or_else(|| anyhow!("Required property \"{property}\" is not provided")) } #[inline(always)] diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index 3b7dc31afd0fc..ef1caee50a55d 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -42,9 +42,11 @@ fn get_connection_property_required( with_properties .get(property) .map(|s| s.to_lowercase()) - .ok_or(RwError::from(ProtocolError(format!( - "Required property \"{property}\" is not provided" - )))) + .ok_or_else(|| { + RwError::from(ProtocolError(format!( + "Required property \"{property}\" is not provided" + ))) + }) } fn resolve_private_link_properties( diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 004ff086d17d3..ff876b1b6f23f 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -155,9 +155,9 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result> .. } = column; - let data_type = data_type.clone().ok_or(ErrorCode::InvalidInputSyntax( - "data type is not specified".into(), - ))?; + let data_type = data_type + .clone() + .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?; if let Some(collation) = collation { return Err(ErrorCode::NotImplemented( format!("collation \"{}\"", collation), diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 844b117f6cffa..77263e10a752f 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -668,10 +668,12 @@ impl SessionImpl { let schema = catalog_reader.get_schema_by_name(db_name, schema.name().as_str())?; let connection = schema .get_connection_by_name(connection_name) - .ok_or(RwError::from(ErrorCode::ItemNotFound(format!( - "connection {} not found", - connection_name - ))))?; + .ok_or_else(|| { + RwError::from(ErrorCode::ItemNotFound(format!( + "connection {} not found", + connection_name + ))) + })?; Ok(connection.clone()) } diff --git a/src/meta/src/rpc/cloud_provider.rs b/src/meta/src/rpc/cloud_provider.rs index ebb67fcb632f1..2b307f955e873 100644 --- a/src/meta/src/rpc/cloud_provider.rs +++ b/src/meta/src/rpc/cloud_provider.rs @@ -55,11 +55,11 @@ impl AwsEc2Client { .send() .await .map_err(|e| { - MetaError::from(anyhow!( + anyhow!( "Failed to delete VPC endpoint. endpoint_id {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}", e.message(), e.meta().extra("aws_request_id") - )) + ) })?; if let Some(ret) = output.unsuccessful() { @@ -167,18 +167,19 @@ impl AwsEc2Client { .send() .await .map_err(|e| { - MetaError::from(anyhow!( + anyhow!( "Failed to check availability of VPC endpoint. endpoint_id: {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}", e.message(), e.meta().extra("aws_request_id") - )) + ) })?; match output.vpc_endpoints { Some(endpoints) => { - let endpoint = endpoints.into_iter().exactly_one().map_err(|_| { - MetaError::from(anyhow!("More than one VPC endpoint found with the same ID")) - })?; + let endpoint = endpoints + .into_iter() + .exactly_one() + .map_err(|_| anyhow!("More than one VPC endpoint found with the same ID"))?; if let Some(state) = endpoint.state { match state { State::Available => { @@ -210,19 +211,17 @@ impl AwsEc2Client { .send() .await .map_err(|e| { - MetaError::from(anyhow!( + anyhow!( "Failed to describe VPC endpoint service, error: {:?}, aws_request_id: {:?}", e.message(), e.meta().extra("aws_request_id") - )) + ) })?; match output.service_details { Some(details) => { let detail = details.into_iter().exactly_one().map_err(|_| { - MetaError::from(anyhow!( - "More than one VPC endpoint service found with the same name" - )) + anyhow!("More than one VPC endpoint service found with the same name") })?; if let Some(azs) = detail.availability_zones { service_azs.extend(azs.into_iter()); @@ -255,9 +254,9 @@ impl AwsEc2Client { .send() .await .map_err(|e| { - MetaError::from(anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}", + anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}", e.message(), - e.meta().extra("aws_request_id"))) + e.meta().extra("aws_request_id")) })?; let subnets = output @@ -315,12 +314,12 @@ impl AwsEc2Client { .send() .await .map_err(|e| { - MetaError::from(anyhow!( + anyhow!( "Failed to create vpc endpoint: vpc_id {vpc_id}, \ service_name {service_name}. error: {:?}, aws_request_id: {:?}", e.message(), e.meta().extra("aws_request_id") - )) + ) })?; let endpoint = output.vpc_endpoint().unwrap(); diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index cf695862f7fc0..8230e17227bdb 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -233,7 +233,7 @@ impl BidiStreamHandle { request_sender .send(first_request) .await - .map_err(|err| RpcError::Internal(anyhow!(err.to_string())))?; + .map_err(|err| anyhow!(err.to_string()))?; let mut response_stream = init_stream_fn(Request::new(ReceiverStream::new(request_receiver))) @@ -243,9 +243,7 @@ impl BidiStreamHandle { let first_response = response_stream .next() .await - .ok_or(RpcError::Internal(anyhow!( - "get empty response from start sink request" - )))??; + .ok_or_else(|| anyhow!("get empty response from start sink request"))??; Ok(( Self { @@ -261,12 +259,14 @@ impl BidiStreamHandle { .response_stream .next() .await - .ok_or(RpcError::Internal(anyhow!("end of response stream")))??) + .ok_or_else(|| anyhow!("end of response stream"))??) } pub async fn send_request(&mut self, request: REQ) -> Result<()> { - self.request_sender.send(request).await.map_err(|_| { - RpcError::Internal(anyhow!("unable to send request {}", type_name::())) - }) + Ok(self + .request_sender + .send(request) + .await + .map_err(|_| anyhow!("unable to send request {}", type_name::()))?) } } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 2b02d5c570ab2..a4d993a126f64 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -618,9 +618,9 @@ impl SourceSchemaV2 { }; let consume_string_from_options = |row_options: &BTreeMap, key: &str| -> Result { - try_consume_string_from_options(row_options, key).ok_or(ParserError::ParserError( - format!("missing field {} in row format options", key), - )) + try_consume_string_from_options(row_options, key).ok_or_else(|| { + ParserError::ParserError(format!("missing field {} in row format options", key)) + }) }; let get_schema_location = |row_options: &BTreeMap| -> Result<(AstString, bool), ParserError> { diff --git a/src/stream/src/common/log_store/in_mem.rs b/src/stream/src/common/log_store/in_mem.rs index 90af47d0aca3d..dec5eb2280f5b 100644 --- a/src/stream/src/common/log_store/in_mem.rs +++ b/src/stream/src/common/log_store/in_mem.rs @@ -241,7 +241,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { .truncated_epoch_rx .recv() .await - .ok_or(anyhow!("cannot get truncated epoch"))?; + .ok_or_else(|| anyhow!("cannot get truncated epoch"))?; assert_eq!(truncated_epoch, prev_epoch); } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 24f0d546e76cc..dd70e908eff03 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -265,11 +265,11 @@ impl FsSourceExecutor { .instrument_await("source_recv_first_barrier") .await .ok_or_else(|| { - StreamExecutorError::from(anyhow!( + anyhow!( "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}", self.actor_ctx.id, self.stream_source_core.source_id - )) + ) })?; let source_desc_builder: SourceDescBuilder = diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 633722f750550..ef3af4cdee247 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -346,11 +346,11 @@ impl SourceExecutor { .instrument_await("source_recv_first_barrier") .await .ok_or_else(|| { - StreamExecutorError::from(anyhow!( + anyhow!( "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}", self.actor_ctx.id, self.stream_source_core.as_ref().unwrap().source_id - )) + ) })?; let mut core = self.stream_source_core.unwrap(); @@ -580,10 +580,10 @@ impl SourceExecutor { .instrument_await("source_recv_first_barrier") .await .ok_or_else(|| { - StreamExecutorError::from(anyhow!( + anyhow!( "failed to receive the first barrier, actor_id: {:?} with no stream source", self.actor_ctx.id - )) + ) })?; yield Message::Barrier(barrier); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 24cabfd071593..ba20589ff4157 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -41,7 +41,7 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use super::{unique_executor_id, unique_operator_id, CollectResult}; -use crate::error::{StreamError, StreamResult}; +use crate::error::StreamResult; use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; @@ -605,9 +605,10 @@ impl LocalStreamManagerCore { env: StreamEnvironment, ) -> StreamResult<()> { for &actor_id in actors { - let actor = self.actors.remove(&actor_id).ok_or_else(|| { - StreamError::from(anyhow!("No such actor with actor id:{}", actor_id)) - })?; + let actor = self + .actors + .remove(&actor_id) + .ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id))?; let mview_definition = &actor.mview_definition; let actor_context = ActorContext::create_with_metrics( actor_id,