Skip to content

Commit

Permalink
fix: change more grpc sender/recv message size to ReadableSize
Browse files Browse the repository at this point in the history
fix: format

fix: cargo fmt

fix: change cmd test to use durations

fix: revert metaclient change

fix: convert default fields in meta client options

fix: human serde meta client durations
  • Loading branch information
masonyc committed Oct 12, 2023
1 parent 5ae2da5 commit 5c4f1d8
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions src/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,19 @@ impl Client {
}

fn max_grpc_recv_message_size(&self) -> usize {
self.inner.channel_manager.config().max_recv_message_size
self.inner
.channel_manager
.config()
.max_recv_message_size
.as_bytes() as usize
}

fn max_grpc_send_message_size(&self) -> usize {
self.inner.channel_manager.config().max_send_message_size
self.inner
.channel_manager
.config()
.max_send_message_size
.as_bytes() as usize
}

pub(crate) fn make_flight_client(&self) -> Result<FlightClient> {
Expand Down
22 changes: 11 additions & 11 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ mod tests {
[meta_client]
metasrv_addrs = ["127.0.0.1:3002"]
timeout_millis = 3000
connect_timeout_millis = 5000
ddl_timeout_millis= 10000
timeout = "3s"
connect_timeout = "5s"
ddl_timeout = "10s"
tcp_nodelay = true
[wal]
Expand Down Expand Up @@ -251,17 +251,17 @@ mod tests {

let MetaClientOptions {
metasrv_addrs: metasrv_addr,
timeout_millis,
connect_timeout_millis,
timeout: timeout_millis,
connect_timeout: connect_timeout_millis,
tcp_nodelay,
ddl_timeout_millis,
ddl_timeout: ddl_timeout_millis,
..
} = options.meta_client.unwrap();

assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
assert_eq!(5000, connect_timeout_millis);
assert_eq!(10000, ddl_timeout_millis);
assert_eq!(3000, timeout_millis);
assert_eq!(5000, connect_timeout_millis.as_millis());
assert_eq!(10000, ddl_timeout_millis.as_millis());
assert_eq!(3000, timeout_millis.as_millis());
assert!(tcp_nodelay);
assert_eq!("/tmp/greptimedb/", options.storage.data_home);
assert!(matches!(
Expand Down Expand Up @@ -355,8 +355,8 @@ mod tests {
rpc_runtime_size = 8
[meta_client]
timeout_millis = 3000
connect_timeout_millis = 5000
timeout = "3s"
connect_timeout = "5s"
tcp_nodelay = true
[wal]
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ mod tests {
addr = "127.0.0.1:4000"
[meta_client]
timeout_millis = 3000
connect_timeout_millis = 5000
timeout = "3s"
connect_timeout = "5s"
tcp_nodelay = true
[mysql]
Expand Down
10 changes: 8 additions & 2 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ impl Options {
if let Some(config_file) = config_file {
layered_config = layered_config.add_source(File::new(config_file, FileFormat::Toml));
}
let oapts = layered_config
.clone()
.build()
.context(LoadLayeredConfigSnafu)?;

dbg!(oapts);

let opts = layered_config
.build()
Expand Down Expand Up @@ -144,8 +150,8 @@ mod tests {
mysql_runtime_size = 2
[meta_client]
timeout_millis = 3000
connect_timeout_millis = 5000
timeout = "3s"
connect_timeout = "5s"
tcp_nodelay = true
[wal]
Expand Down
9 changes: 5 additions & 4 deletions src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
Expand All @@ -31,8 +32,8 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon
const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 1;
pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024;
pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024;
pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: ReadableSize = ReadableSize::mb(512);
pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: ReadableSize = ReadableSize::mb(512);

lazy_static! {
static ref ID: AtomicU64 = AtomicU64::new(0);
Expand Down Expand Up @@ -250,9 +251,9 @@ pub struct ChannelConfig {
pub tcp_nodelay: bool,
pub client_tls: Option<ClientTlsOption>,
// Max gRPC receiving(decoding) message size
pub max_recv_message_size: usize,
pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
pub max_send_message_size: ReadableSize,
}

impl Default for ChannelConfig {
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ pub struct DatanodeOptions {
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
// Max gRPC receiving(decoding) message size
pub rpc_max_recv_message_size: usize,
pub rpc_max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub rpc_max_send_message_size: usize,
pub rpc_max_send_message_size: ReadableSize,
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub meta_client: Option<MetaClientOptions>,
Expand Down
8 changes: 4 additions & 4 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,14 @@ pub async fn new_metasrv_client(
let member_id = node_id;

let config = ChannelConfig::new()
.timeout(meta_config.timeout_millis)
.connect_timeout(meta_config.connect_timeout_millis)
.timeout(meta_config.timeout)
.connect_timeout(meta_config.connect_timeout)
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(config.clone());
let heartbeat_channel_manager = ChannelManager::with_config(
config
.timeout(meta_config.heartbeat_timeout_millis)
.connect_timeout(meta_config.heartbeat_timeout_millis),
.timeout(meta_config.timeout)
.connect_timeout(meta_config.connect_timeout),
);

let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode)
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ impl Services {
let region_server_handler = Some(Arc::new(region_server.clone()) as _);
let runtime = region_server.runtime();
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size,
max_send_message_size: opts.rpc_max_send_message_size,
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};

Ok(Self {
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod script;
mod standalone;
use std::collections::HashMap;
use std::sync::Arc;

use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
Expand Down Expand Up @@ -234,12 +235,12 @@ impl Instance {
);

let channel_config = ChannelConfig::new()
.timeout(meta_client_options.timeout_millis)
.connect_timeout(meta_client_options.connect_timeout_millis)
.timeout(meta_client_options.timeout)
.connect_timeout(meta_client_options.connect_timeout)
.tcp_nodelay(meta_client_options.tcp_nodelay);
let ddl_channel_config = channel_config
.clone()
.timeout(meta_client_options.ddl_timeout_millis);
.timeout(meta_client_options.ddl_timeout);
let channel_manager = ChannelManager::with_config(channel_config);
let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config);

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/service_config/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ impl Default for GrpcOptions {
Self {
addr: "127.0.0.1:4001".to_string(),
runtime_size: 8,
max_recv_message_size: ReadableSize(DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE as u64),
max_send_message_size: ReadableSize(DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE as u64),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
}
}
}
1 change: 1 addition & 0 deletions src/meta-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ common-macro = { workspace = true }
common-meta = { workspace = true }
common-telemetry = { workspace = true }
etcd-client.workspace = true
humantime-serde.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
28 changes: 16 additions & 12 deletions src/meta-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,35 @@ pub mod error;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct MetaClientOptions {
pub metasrv_addrs: Vec<String>,
pub timeout_millis: Duration,
#[serde(default = "default_heartbeat_timeout_millis")]
pub heartbeat_timeout_millis: Duration,
#[serde(default = "default_ddl_timeout_millis")]
pub ddl_timeout_millis: Duration,
pub connect_timeout_millis: Duration,
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(default = "default_heartbeat_timeout")]
#[serde(with = "humantime_serde")]
pub heartbeat_timeout: Duration,
#[serde(default = "default_ddl_timeout")]
#[serde(with = "humantime_serde")]
pub ddl_timeout: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub tcp_nodelay: bool,
}

fn default_heartbeat_timeout_millis() -> Duration {
fn default_heartbeat_timeout() -> Duration {
Duration::from_millis(500u64)
}

fn default_ddl_timeout_millis() -> Duration {
fn default_ddl_timeout() -> Duration {
Duration::from_millis(10_000u64)
}

impl Default for MetaClientOptions {
fn default() -> Self {
Self {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout_millis: Duration::from_millis(3_000u64),
heartbeat_timeout_millis: default_heartbeat_timeout_millis(),
ddl_timeout_millis: default_ddl_timeout_millis(),
connect_timeout_millis: Duration::from_millis(1_000u64),
timeout: Duration::from_millis(3_000u64),
heartbeat_timeout: default_heartbeat_timeout(),
ddl_timeout: default_ddl_timeout(),
connect_timeout: Duration::from_millis(1_000u64),
tcp_nodelay: true,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ pub struct GrpcServerConfig {
impl Default for GrpcServerConfig {
fn default() -> Self {
Self {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
}
}
}
Expand Down
19 changes: 14 additions & 5 deletions src/servers/src/heartbeat_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_meta::distributed_time_constants;
use std::time::Duration;

use common_meta::distributed_time_constants;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand All @@ -31,17 +32,25 @@ impl HeartbeatOptions {
pub fn frontend_default() -> Self {
Self {
// Frontend can send heartbeat with a longer interval.
interval_millis: Duration::from_millis(distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
retry_interval_millis:Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
interval_millis: Duration::from_millis(
distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
),
retry_interval_millis: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
}
}
}

impl Default for HeartbeatOptions {
fn default() -> Self {
Self {
interval_millis: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
retry_interval_millis:Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
interval_millis: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
retry_interval_millis: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
}
}
}

0 comments on commit 5c4f1d8

Please sign in to comment.