Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix incorrect usage of ok_or with anyhow #11589

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ macro_rules! impl_set_system_param {
let v = if let Some(v) = value {
v.parse().map_err(|_| format!("cannot parse parameter value"))?
} else {
$default.ok_or(format!("{} does not have a default value", key))?
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
OverrideValidateOnSet::$field(&v)?;
params.$field = Some(v);
Expand Down
12 changes: 6 additions & 6 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ impl<'a> AvroParseOptions<'a> {
.find(|field| field.0 == field_name)
.map(|field| &field.1)
};
let scale = match find_in_records("scale").ok_or(AccessError::Other(anyhow!(
"scale field not found in VariableScaleDecimal"
)))? {
let scale = match find_in_records("scale").ok_or_else(|| {
AccessError::Other(anyhow!("scale field not found in VariableScaleDecimal"))
})? {
Value::Int(scale) => Ok(*scale),
avro_value => Err(AccessError::Other(anyhow!(
"scale field in VariableScaleDecimal is not int, got {:?}",
avro_value
))),
}?;

let value: BigInt = match find_in_records("value").ok_or(AccessError::Other(
anyhow!("value field not found in VariableScaleDecimal"),
))? {
let value: BigInt = match find_in_records("value").ok_or_else(|| {
AccessError::Other(anyhow!("value field not found in VariableScaleDecimal"))
})? {
Value::Bytes(bytes) => Ok(BigInt::from_signed_bytes_be(bytes)),
avro_value => Err(AccessError::Other(anyhow!(
"value field in VariableScaleDecimal is not bytes, got {:?}",
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ impl ClickHouseSinkWriter {
.r#type
.split("DateTime64(")
.last()
.ok_or(SinkError::ClickHouse("must have last".to_string()))?
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or(SinkError::ClickHouse("must have next".to_string()))?
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?
} else {
Expand Down Expand Up @@ -482,7 +482,7 @@ impl ClickHouseSinkWriter {
Op::UpdateDelete => continue,
Op::UpdateInsert => {
let pk = Self::build_ck_fields(row.datum_at(pk_index), accuracy_time)?
.ok_or(SinkError::ClickHouse("pk can not be none".to_string()))?;
.ok_or_else(|| SinkError::ClickHouse("pk can not be none".to_string()))?;
let fields_vec = self.build_update_fields(row, accuracy_time)?;
self.client
.update(
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> SinkWriter for Coordi
async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
let metadata = self.inner.barrier(is_checkpoint).await?;
if is_checkpoint {
let metadata = metadata.ok_or(SinkError::Coordinator(anyhow!(
"should get metadata on checkpoint barrier"
)))?;
let metadata = metadata.ok_or_else(|| {
SinkError::Coordinator(anyhow!("should get metadata on checkpoint barrier"))
})?;
// TODO: add metrics to measure time to commit
self.coordinator_stream_handle
.commit(self.epoch, metadata)
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn get_property_required(
with_properties
.get(property)
.map(|s| s.to_lowercase())
.ok_or(anyhow!("Required property \"{property}\" is not provided"))
.ok_or_else(|| anyhow!("Required property \"{property}\" is not provided"))
}

#[inline(always)]
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ fn get_connection_property_required(
with_properties
.get(property)
.map(|s| s.to_lowercase())
.ok_or(RwError::from(ProtocolError(format!(
"Required property \"{property}\" is not provided"
))))
.ok_or_else(|| {
RwError::from(ProtocolError(format!(
"Required property \"{property}\" is not provided"
)))
})
}

fn resolve_private_link_properties(
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result<Vec<ColumnCatalog>>
..
} = column;

let data_type = data_type.clone().ok_or(ErrorCode::InvalidInputSyntax(
"data type is not specified".into(),
))?;
let data_type = data_type
.clone()
.ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
if let Some(collation) = collation {
return Err(ErrorCode::NotImplemented(
format!("collation \"{}\"", collation),
Expand Down
10 changes: 6 additions & 4 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,10 +668,12 @@ impl SessionImpl {
let schema = catalog_reader.get_schema_by_name(db_name, schema.name().as_str())?;
let connection = schema
.get_connection_by_name(connection_name)
.ok_or(RwError::from(ErrorCode::ItemNotFound(format!(
"connection {} not found",
connection_name
))))?;
.ok_or_else(|| {
RwError::from(ErrorCode::ItemNotFound(format!(
"connection {} not found",
connection_name
)))
})?;
Ok(connection.clone())
}

Expand Down
31 changes: 15 additions & 16 deletions src/meta/src/rpc/cloud_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!(
anyhow!(
"Failed to delete VPC endpoint. endpoint_id {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")
))
)
})?;

if let Some(ret) = output.unsuccessful() {
Expand Down Expand Up @@ -167,18 +167,19 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!(
anyhow!(
"Failed to check availability of VPC endpoint. endpoint_id: {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")
))
)
})?;

match output.vpc_endpoints {
Some(endpoints) => {
let endpoint = endpoints.into_iter().exactly_one().map_err(|_| {
MetaError::from(anyhow!("More than one VPC endpoint found with the same ID"))
})?;
let endpoint = endpoints
.into_iter()
.exactly_one()
.map_err(|_| anyhow!("More than one VPC endpoint found with the same ID"))?;
if let Some(state) = endpoint.state {
match state {
State::Available => {
Expand Down Expand Up @@ -210,19 +211,17 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!(
anyhow!(
"Failed to describe VPC endpoint service, error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")
))
)
})?;

match output.service_details {
Some(details) => {
let detail = details.into_iter().exactly_one().map_err(|_| {
MetaError::from(anyhow!(
"More than one VPC endpoint service found with the same name"
))
anyhow!("More than one VPC endpoint service found with the same name")
})?;
if let Some(azs) = detail.availability_zones {
service_azs.extend(azs.into_iter());
Expand Down Expand Up @@ -255,9 +254,9 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}",
anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")))
e.meta().extra("aws_request_id"))
})?;

let subnets = output
Expand Down Expand Up @@ -315,12 +314,12 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!(
anyhow!(
"Failed to create vpc endpoint: vpc_id {vpc_id}, \
service_name {service_name}. error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")
))
)
})?;

let endpoint = output.vpc_endpoint().unwrap();
Expand Down
16 changes: 8 additions & 8 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
request_sender
.send(first_request)
.await
.map_err(|err| RpcError::Internal(anyhow!(err.to_string())))?;
.map_err(|err| anyhow!(err.to_string()))?;

let mut response_stream =
init_stream_fn(Request::new(ReceiverStream::new(request_receiver)))
Expand All @@ -243,9 +243,7 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
let first_response = response_stream
.next()
.await
.ok_or(RpcError::Internal(anyhow!(
"get empty response from start sink request"
)))??;
.ok_or_else(|| anyhow!("get empty response from start sink request"))??;

Ok((
Self {
Expand All @@ -261,12 +259,14 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
.response_stream
.next()
.await
.ok_or(RpcError::Internal(anyhow!("end of response stream")))??)
.ok_or_else(|| anyhow!("end of response stream"))??)
}

pub async fn send_request(&mut self, request: REQ) -> Result<()> {
self.request_sender.send(request).await.map_err(|_| {
RpcError::Internal(anyhow!("unable to send request {}", type_name::<REQ>()))
})
Ok(self
.request_sender
.send(request)
.await
.map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()))?)
}
}
6 changes: 3 additions & 3 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,9 @@ impl SourceSchemaV2 {
};
let consume_string_from_options =
|row_options: &BTreeMap<String, String>, key: &str| -> Result<AstString, ParserError> {
try_consume_string_from_options(row_options, key).ok_or(ParserError::ParserError(
format!("missing field {} in row format options", key),
))
try_consume_string_from_options(row_options, key).ok_or_else(|| {
ParserError::ParserError(format!("missing field {} in row format options", key))
})
};
let get_schema_location =
|row_options: &BTreeMap<String, String>| -> Result<(AstString, bool), ParserError> {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/common/log_store/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl LogWriter for BoundedInMemLogStoreWriter {
.truncated_epoch_rx
.recv()
.await
.ok_or(anyhow!("cannot get truncated epoch"))?;
.ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
assert_eq!(truncated_epoch, prev_epoch);
}

Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ impl<S: StateStore> FsSourceExecutor<S> {
.instrument_await("source_recv_first_barrier")
.await
.ok_or_else(|| {
StreamExecutorError::from(anyhow!(
anyhow!(
"failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
self.actor_ctx.id,
self.stream_source_core.source_id
))
)
})?;

let source_desc_builder: SourceDescBuilder =
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,11 @@ impl<S: StateStore> SourceExecutor<S> {
.instrument_await("source_recv_first_barrier")
.await
.ok_or_else(|| {
StreamExecutorError::from(anyhow!(
anyhow!(
"failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
self.actor_ctx.id,
self.stream_source_core.as_ref().unwrap().source_id
))
)
})?;

let mut core = self.stream_source_core.unwrap();
Expand Down Expand Up @@ -580,10 +580,10 @@ impl<S: StateStore> SourceExecutor<S> {
.instrument_await("source_recv_first_barrier")
.await
.ok_or_else(|| {
StreamExecutorError::from(anyhow!(
anyhow!(
"failed to receive the first barrier, actor_id: {:?} with no stream source",
self.actor_ctx.id
))
)
})?;
yield Message::Barrier(barrier);

Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use super::{unique_executor_id, unique_operator_id, CollectResult};
use crate::error::{StreamError, StreamResult};
use crate::error::StreamResult;
use crate::executor::exchange::permit::Receiver;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::subtask::SubtaskHandle;
Expand Down Expand Up @@ -605,9 +605,10 @@ impl LocalStreamManagerCore {
env: StreamEnvironment,
) -> StreamResult<()> {
for &actor_id in actors {
let actor = self.actors.remove(&actor_id).ok_or_else(|| {
StreamError::from(anyhow!("No such actor with actor id:{}", actor_id))
})?;
let actor = self
.actors
.remove(&actor_id)
.ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id))?;
let mview_definition = &actor.mview_definition;
let actor_context = ActorContext::create_with_metrics(
actor_id,
Expand Down