diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 897bc95ec9992..b1d299caaf6e4 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -58,13 +58,13 @@ impl Op { } pub fn from_protobuf(prost: &i32) -> ArrayResult { - let op = match PbOp::from_i32(*prost) { - Some(PbOp::Insert) => Op::Insert, - Some(PbOp::Delete) => Op::Delete, - Some(PbOp::UpdateInsert) => Op::UpdateInsert, - Some(PbOp::UpdateDelete) => Op::UpdateDelete, - Some(PbOp::Unspecified) => unreachable!(), - None => bail!("No such op type"), + let op = match PbOp::try_from(*prost) { + Ok(PbOp::Insert) => Op::Insert, + Ok(PbOp::Delete) => Op::Delete, + Ok(PbOp::UpdateInsert) => Op::UpdateInsert, + Ok(PbOp::UpdateDelete) => Op::UpdateDelete, + Ok(PbOp::Unspecified) => unreachable!(), + Err(_) => bail!("No such op type"), }; Ok(op) } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 8c7914fe16354..eb81212836fd3 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -856,7 +856,7 @@ impl SpecificParserConfig { Some(info.proto_message_name.clone()) }, key_record_name: info.key_message_name.clone(), - name_strategy: PbSchemaRegistryNameStrategy::from_i32(info.name_strategy) + name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) .unwrap(), use_schema_registry: info.use_schema_registry, row_schema_location: info.row_schema_location.clone(), @@ -887,7 +887,7 @@ impl SpecificParserConfig { message_name: info.proto_message_name.clone(), use_schema_registry: info.use_schema_registry, row_schema_location: info.row_schema_location.clone(), - name_strategy: PbSchemaRegistryNameStrategy::from_i32(info.name_strategy) + name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) .unwrap(), key_message_name: info.key_message_name.clone(), ..Default::default() @@ -912,7 +912,7 @@ impl SpecificParserConfig { } else { Some(info.proto_message_name.clone()) }, - name_strategy: PbSchemaRegistryNameStrategy::from_i32(info.name_strategy) + name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) .unwrap(), key_record_name: info.key_message_name.clone(), row_schema_location: info.row_schema_location.clone(), diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 1b5a1133f1a9b..31b276994da4a 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -162,7 +162,7 @@ impl SinkParam { .map(|i| *i as usize) .collect(), sink_type: SinkType::from_proto( - PbSinkType::from_i32(pb_param.sink_type).expect("should be able to convert"), + PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"), ), format_desc, db_name: pb_param.db_name, diff --git a/src/expr/core/src/window_function/kind.rs b/src/expr/core/src/window_function/kind.rs index 4c705dd9b19c6..e2790645a6917 100644 --- a/src/expr/core/src/window_function/kind.rs +++ b/src/expr/core/src/window_function/kind.rs @@ -42,18 +42,18 @@ impl WindowFuncKind { use risingwave_pb::expr::window_function::{PbGeneralType, PbType}; let kind = match window_function_type { - PbType::General(typ) => match PbGeneralType::from_i32(*typ) { - Some(PbGeneralType::Unspecified) => bail!("Unspecified window function type"), - Some(PbGeneralType::RowNumber) => Self::RowNumber, - Some(PbGeneralType::Rank) => Self::Rank, - Some(PbGeneralType::DenseRank) => Self::DenseRank, - Some(PbGeneralType::Lag) => Self::Lag, - Some(PbGeneralType::Lead) => Self::Lead, - None => bail!("no such window function type"), + PbType::General(typ) => match PbGeneralType::try_from(*typ) { + Ok(PbGeneralType::Unspecified) => bail!("Unspecified window function type"), + Ok(PbGeneralType::RowNumber) => Self::RowNumber, + Ok(PbGeneralType::Rank) => Self::Rank, + Ok(PbGeneralType::DenseRank) => Self::DenseRank, + Ok(PbGeneralType::Lag) => Self::Lag, + Ok(PbGeneralType::Lead) => Self::Lead, + Err(_) => bail!("no such window function type"), }, - PbType::Aggregate(agg_type) => match PbAggType::from_i32(*agg_type) { - Some(agg_type) => Self::Aggregate(AggKind::from_protobuf(agg_type)?), - None => bail!("no such aggregate function type"), + PbType::Aggregate(agg_type) => match PbAggType::try_from(*agg_type) { + Ok(agg_type) => Self::Aggregate(AggKind::from_protobuf(agg_type)?), + Err(_) => bail!("no such aggregate function type"), }, }; Ok(kind) diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index cf5752d3f0bdd..4cd271f0495b9 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -249,7 +249,7 @@ fn get_acl_items( .unwrap() .iter() .for_each(|(action, option)| { - let str = match Action::from_i32(*action).unwrap() { + let str = match Action::try_from(*action).unwrap() { Action::Select => "r", Action::Insert => "a", Action::Update => "w", diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs index c114a076a4dbb..1f31948139135 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs @@ -51,9 +51,9 @@ impl SysCatalogReaderImpl { for i in 0..32 { let bit = 1 << i; if mask & bit != 0 { - match FragmentTypeFlag::from_i32(bit as i32) { - None => continue, - Some(flag) => result.push(flag), + match FragmentTypeFlag::try_from(bit as i32) { + Err(_) => continue, + Ok(flag) => result.push(flag), }; } } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index fde1bc7244368..7c3030370d56f 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -406,7 +406,7 @@ impl StageRunner { match status_res_inner { Ok(status) => { use risingwave_pb::task_service::task_info_response::TaskStatus as TaskStatusPb; - match TaskStatusPb::from_i32(status.task_status).unwrap() { + match TaskStatusPb::try_from(status.task_status).unwrap() { TaskStatusPb::Running => { running_task_cnt += 1; // The task running count should always less or equal than the diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index c099f691ed0ba..2481b5a1efcdb 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -86,6 +86,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ tokio-retry = "0.3" tokio-stream = { version = "0.1", features = ["net"] } tonic = { workspace = true } +tonic_0_9 = { package = "tonic", version = "0.9" } tower = { version = "0.4", features = ["util", "load-shed"] } tracing = "0.1" url = "2" diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index 2ebc9d924f50b..e00b87b0ed9a4 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -105,8 +105,8 @@ pub(super) mod handlers { let mut result = srv .cluster_manager .list_worker_node( - WorkerType::from_i32(ty) - .ok_or_else(|| anyhow!("invalid worker type")) + WorkerType::try_from(ty) + .map_err(|_| anyhow!("invalid worker type")) .map_err(err)?, None, ) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 26a27674140c6..e8f8a3b1315ca 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -2700,7 +2700,7 @@ impl HummockManager { sorted_output_ssts, table_stats_change }) => { - if let Err(e) = hummock_manager.report_compact_task(task_id, TaskStatus::from_i32(task_status).unwrap(), sorted_output_ssts, Some(table_stats_change)) + if let Err(e) = hummock_manager.report_compact_task(task_id, TaskStatus::try_from(task_status).unwrap(), sorted_output_ssts, Some(table_stats_change)) .await { tracing::error!("report compact_tack fail {e:?}"); } diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 5bcb6f4faa4d7..678c701ca2891 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -292,7 +292,7 @@ impl HummockMetaClient for MockHummockMetaClient { if let Err(e) = hummock_manager_compact .report_compact_task( task_id, - TaskStatus::from_i32(task_status).unwrap(), + TaskStatus::try_from(task_status).unwrap(), sorted_output_ssts, Some(table_stats_change), ) diff --git a/src/meta/src/rpc/service/user_service.rs b/src/meta/src/rpc/service/user_service.rs index e1b7cc27092d5..8c982521b112a 100644 --- a/src/meta/src/rpc/service/user_service.rs +++ b/src/meta/src/rpc/service/user_service.rs @@ -151,7 +151,7 @@ impl UserService for UserServiceImpl { let update_fields = req .update_fields .iter() - .map(|i| UpdateField::from_i32(*i).unwrap()) + .map(|i| UpdateField::try_from(*i).unwrap()) .collect_vec(); let user = req.get_user()?.clone(); let version = self diff --git a/src/meta/src/storage/etcd_retry_client.rs b/src/meta/src/storage/etcd_retry_client.rs index 4ea2b8d3fa54f..ccc61a48dd53f 100644 --- a/src/meta/src/storage/etcd_retry_client.rs +++ b/src/meta/src/storage/etcd_retry_client.rs @@ -50,9 +50,9 @@ impl EtcdRetryClient { fn should_retry(err: &Error) -> bool { match err { Error::GRpcStatus(status) => { - status.code() == tonic::Code::Unavailable - || status.code() == tonic::Code::Unknown - || (status.code() == tonic::Code::Unauthenticated + status.code() == tonic_0_9::Code::Unavailable + || status.code() == tonic_0_9::Code::Unknown + || (status.code() == tonic_0_9::Code::Unauthenticated && status.message().contains("invalid auth token")) } _ => false, diff --git a/src/meta/src/storage/wrapped_etcd_client.rs b/src/meta/src/storage/wrapped_etcd_client.rs index 218749dca9421..84b327862f378 100644 --- a/src/meta/src/storage/wrapped_etcd_client.rs +++ b/src/meta/src/storage/wrapped_etcd_client.rs @@ -82,7 +82,7 @@ impl EtcdRefreshClient { fn should_refresh(err: &etcd_client::Error) -> bool { match err { etcd_client::Error::GRpcStatus(status) => { - status.code() == tonic::Code::Unauthenticated + status.code() == tonic_0_9::Code::Unauthenticated && status.message().contains("invalid auth token") } _ => false,