From 5c4f1d8d2881a9090284ea646fdc411b0641898d Mon Sep 17 00:00:00 2001 From: masonyc Date: Fri, 13 Oct 2023 00:26:51 +1300 Subject: [PATCH] fix: change more grpc sender/recv message size to ReadableSize fix: format fix: cargo fmt fix: change cmd test to use durations fix: revert metaclient change fix: convert default fields in meta client options fix: human serde meta client durations --- Cargo.lock | 1 + src/client/src/client.rs | 12 +++++++++-- src/cmd/src/datanode.rs | 22 +++++++++---------- src/cmd/src/frontend.rs | 4 ++-- src/cmd/src/options.rs | 10 +++++++-- src/common/grpc/src/channel_manager.rs | 9 ++++---- src/datanode/src/config.rs | 4 ++-- src/datanode/src/heartbeat.rs | 8 +++---- src/datanode/src/server.rs | 4 ++-- src/frontend/src/instance.rs | 7 ++++--- src/frontend/src/service_config/grpc.rs | 4 ++-- src/meta-client/Cargo.toml | 1 + src/meta-client/src/lib.rs | 28 ++++++++++++++----------- src/servers/src/grpc.rs | 4 ++-- src/servers/src/heartbeat_options.rs | 19 ++++++++++++----- 15 files changed, 84 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2ef70d9bb96..f5359f92e153 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5334,6 +5334,7 @@ dependencies = [ "datatypes", "etcd-client", "futures", + "humantime-serde", "meta-srv", "rand", "serde", diff --git a/src/client/src/client.rs b/src/client/src/client.rs index c5457e58741f..137565bdb690 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -139,11 +139,19 @@ impl Client { } fn max_grpc_recv_message_size(&self) -> usize { - self.inner.channel_manager.config().max_recv_message_size + self.inner + .channel_manager + .config() + .max_recv_message_size + .as_bytes() as usize } fn max_grpc_send_message_size(&self) -> usize { - self.inner.channel_manager.config().max_send_message_size + self.inner + .channel_manager + .config() + .max_send_message_size + .as_bytes() as usize } pub(crate) fn make_flight_client(&self) -> Result { diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index e2ff160eefb5..b21aa38f5faa 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -198,9 +198,9 @@ mod tests { [meta_client] metasrv_addrs = ["127.0.0.1:3002"] - timeout_millis = 3000 - connect_timeout_millis = 5000 - ddl_timeout_millis= 10000 + timeout = "3s" + connect_timeout = "5s" + ddl_timeout = "10s" tcp_nodelay = true [wal] @@ -251,17 +251,17 @@ mod tests { let MetaClientOptions { metasrv_addrs: metasrv_addr, - timeout_millis, - connect_timeout_millis, + timeout: timeout_millis, + connect_timeout: connect_timeout_millis, tcp_nodelay, - ddl_timeout_millis, + ddl_timeout: ddl_timeout_millis, .. } = options.meta_client.unwrap(); assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr); - assert_eq!(5000, connect_timeout_millis); - assert_eq!(10000, ddl_timeout_millis); - assert_eq!(3000, timeout_millis); + assert_eq!(5000, connect_timeout_millis.as_millis()); + assert_eq!(10000, ddl_timeout_millis.as_millis()); + assert_eq!(3000, timeout_millis.as_millis()); assert!(tcp_nodelay); assert_eq!("/tmp/greptimedb/", options.storage.data_home); assert!(matches!( @@ -355,8 +355,8 @@ mod tests { rpc_runtime_size = 8 [meta_client] - timeout_millis = 3000 - connect_timeout_millis = 5000 + timeout = "3s" + connect_timeout = "5s" tcp_nodelay = true [wal] diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 61f0bab6a3d8..f83fed2e3ae0 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -350,8 +350,8 @@ mod tests { addr = "127.0.0.1:4000" [meta_client] - timeout_millis = 3000 - connect_timeout_millis = 5000 + timeout = "3s" + connect_timeout = "5s" tcp_nodelay = true [mysql] diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index a95a10b30b30..eeed8ac56c8e 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -110,6 +110,12 @@ impl Options { if let Some(config_file) = config_file { layered_config = layered_config.add_source(File::new(config_file, FileFormat::Toml)); } + let oapts = layered_config + .clone() + .build() + .context(LoadLayeredConfigSnafu)?; + + dbg!(oapts); let opts = layered_config .build() @@ -144,8 +150,8 @@ mod tests { mysql_runtime_size = 2 [meta_client] - timeout_millis = 3000 - connect_timeout_millis = 5000 + timeout = "3s" + connect_timeout = "5s" tcp_nodelay = true [wal] diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index d1e372b677c9..98451f13eee7 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_base::readable_size::ReadableSize; use common_telemetry::info; use dashmap::mapref::entry::Entry; use dashmap::DashMap; @@ -31,8 +32,8 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10; pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 1; -pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024; -pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024; +pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: ReadableSize = ReadableSize::mb(512); +pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: ReadableSize = ReadableSize::mb(512); lazy_static! { static ref ID: AtomicU64 = AtomicU64::new(0); @@ -250,9 +251,9 @@ pub struct ChannelConfig { pub tcp_nodelay: bool, pub client_tls: Option, // Max gRPC receiving(decoding) message size - pub max_recv_message_size: usize, + pub max_recv_message_size: ReadableSize, // Max gRPC sending(encoding) message size - pub max_send_message_size: usize, + pub max_send_message_size: ReadableSize, } impl Default for ChannelConfig { diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 74c86c5ced53..0b1da56c89a9 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -328,9 +328,9 @@ pub struct DatanodeOptions { pub rpc_hostname: Option, pub rpc_runtime_size: usize, // Max gRPC receiving(decoding) message size - pub rpc_max_recv_message_size: usize, + pub rpc_max_recv_message_size: ReadableSize, // Max gRPC sending(encoding) message size - pub rpc_max_send_message_size: usize, + pub rpc_max_send_message_size: ReadableSize, pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub meta_client: Option, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index d85aab1f1300..fe9060a54e4d 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -332,14 +332,14 @@ pub async fn new_metasrv_client( let member_id = node_id; let config = ChannelConfig::new() - .timeout(meta_config.timeout_millis) - .connect_timeout(meta_config.connect_timeout_millis) + .timeout(meta_config.timeout) + .connect_timeout(meta_config.connect_timeout) .tcp_nodelay(meta_config.tcp_nodelay); let channel_manager = ChannelManager::with_config(config.clone()); let heartbeat_channel_manager = ChannelManager::with_config( config - .timeout(meta_config.heartbeat_timeout_millis) - .connect_timeout(meta_config.heartbeat_timeout_millis), + .timeout(meta_config.timeout) + .connect_timeout(meta_config.connect_timeout), ); let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode) diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 1847dc4c992a..ad198af81350 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -40,8 +40,8 @@ impl Services { let region_server_handler = Some(Arc::new(region_server.clone()) as _); let runtime = region_server.runtime(); let grpc_config = GrpcServerConfig { - max_recv_message_size: opts.rpc_max_recv_message_size, - max_send_message_size: opts.rpc_max_send_message_size, + max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize, + max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize, }; Ok(Self { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 27ec02dc4543..e371f3f64e7a 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -22,6 +22,7 @@ mod script; mod standalone; use std::collections::HashMap; use std::sync::Arc; + use api::v1::meta::Role; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; @@ -234,12 +235,12 @@ impl Instance { ); let channel_config = ChannelConfig::new() - .timeout(meta_client_options.timeout_millis) - .connect_timeout(meta_client_options.connect_timeout_millis) + .timeout(meta_client_options.timeout) + .connect_timeout(meta_client_options.connect_timeout) .tcp_nodelay(meta_client_options.tcp_nodelay); let ddl_channel_config = channel_config .clone() - .timeout(meta_client_options.ddl_timeout_millis); + .timeout(meta_client_options.ddl_timeout); let channel_manager = ChannelManager::with_config(channel_config); let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config); diff --git a/src/frontend/src/service_config/grpc.rs b/src/frontend/src/service_config/grpc.rs index e38d6e2a74d1..899a22d3ebf3 100644 --- a/src/frontend/src/service_config/grpc.rs +++ b/src/frontend/src/service_config/grpc.rs @@ -33,8 +33,8 @@ impl Default for GrpcOptions { Self { addr: "127.0.0.1:4001".to_string(), runtime_size: 8, - max_recv_message_size: ReadableSize(DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE as u64), - max_send_message_size: ReadableSize(DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE as u64), + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 7f8a9035d8de..0a38e1bf69df 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -14,6 +14,7 @@ common-macro = { workspace = true } common-meta = { workspace = true } common-telemetry = { workspace = true } etcd-client.workspace = true +humantime-serde.workspace = true rand.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 9788f92abeed..511d83ca760d 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -23,20 +23,24 @@ pub mod error; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct MetaClientOptions { pub metasrv_addrs: Vec, - pub timeout_millis: Duration, - #[serde(default = "default_heartbeat_timeout_millis")] - pub heartbeat_timeout_millis: Duration, - #[serde(default = "default_ddl_timeout_millis")] - pub ddl_timeout_millis: Duration, - pub connect_timeout_millis: Duration, + #[serde(with = "humantime_serde")] + pub timeout: Duration, + #[serde(default = "default_heartbeat_timeout")] + #[serde(with = "humantime_serde")] + pub heartbeat_timeout: Duration, + #[serde(default = "default_ddl_timeout")] + #[serde(with = "humantime_serde")] + pub ddl_timeout: Duration, + #[serde(with = "humantime_serde")] + pub connect_timeout: Duration, pub tcp_nodelay: bool, } -fn default_heartbeat_timeout_millis() -> Duration { +fn default_heartbeat_timeout() -> Duration { Duration::from_millis(500u64) } -fn default_ddl_timeout_millis() -> Duration { +fn default_ddl_timeout() -> Duration { Duration::from_millis(10_000u64) } @@ -44,10 +48,10 @@ impl Default for MetaClientOptions { fn default() -> Self { Self { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], - timeout_millis: Duration::from_millis(3_000u64), - heartbeat_timeout_millis: default_heartbeat_timeout_millis(), - ddl_timeout_millis: default_ddl_timeout_millis(), - connect_timeout_millis: Duration::from_millis(1_000u64), + timeout: Duration::from_millis(3_000u64), + heartbeat_timeout: default_heartbeat_timeout(), + ddl_timeout: default_ddl_timeout(), + connect_timeout: Duration::from_millis(1_000u64), tcp_nodelay: true, } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 69ea1943a6fa..f87fd7737dca 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -93,8 +93,8 @@ pub struct GrpcServerConfig { impl Default for GrpcServerConfig { fn default() -> Self { Self { - max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, - max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize, } } } diff --git a/src/servers/src/heartbeat_options.rs b/src/servers/src/heartbeat_options.rs index b7139755680f..d5c7c55a9f71 100644 --- a/src/servers/src/heartbeat_options.rs +++ b/src/servers/src/heartbeat_options.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::distributed_time_constants; use std::time::Duration; + +use common_meta::distributed_time_constants; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -31,8 +32,12 @@ impl HeartbeatOptions { pub fn frontend_default() -> Self { Self { // Frontend can send heartbeat with a longer interval. - interval_millis: Duration::from_millis(distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS), - retry_interval_millis:Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS), + interval_millis: Duration::from_millis( + distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS, + ), + retry_interval_millis: Duration::from_millis( + distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + ), } } } @@ -40,8 +45,12 @@ impl HeartbeatOptions { impl Default for HeartbeatOptions { fn default() -> Self { Self { - interval_millis: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS), - retry_interval_millis:Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS), + interval_millis: Duration::from_millis( + distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + ), + retry_interval_millis: Duration::from_millis( + distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + ), } } }