Skip to content

Commit

Permalink
fix: re-create heartbeat stream ASAP (#2499)
Browse files Browse the repository at this point in the history
* chore: set default connect_timeout_millis to 1000

* fix: re-create heartbeat stream ASAP

* chore: apply suggestions
  • Loading branch information
WenyXu authored Sep 27, 2023
1 parent fbe2f2d commit 9282e59
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 7 deletions.
2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ heartbeat_timeout_millis = 500
# Operation timeout in milliseconds, 3000 by default.
timeout_millis = 3000
# Connect server timeout in milliseconds, 5000 by default.
connect_timeout_millis = 5000
connect_timeout_millis = 1000
# `TCP_NODELAY` option for accepted connections, true by default.
tcp_nodelay = true

Expand Down
2 changes: 1 addition & 1 deletion config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ metasrv_addrs = ["127.0.0.1:3002"]
timeout_millis = 3000
# DDL timeouts options.
ddl_timeout_millis = 10000
connect_timeout_millis = 5000
connect_timeout_millis = 1000
tcp_nodelay = true

# Log options, see `standalone.example.toml`
Expand Down
2 changes: 1 addition & 1 deletion src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon

const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 1;
pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024;
pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024;

Expand Down
30 changes: 27 additions & 3 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
Expand Down Expand Up @@ -97,6 +98,7 @@ impl HeartbeatTask {
handler_executor: HeartbeatResponseHandlerExecutorRef,
mailbox: MailboxRef,
mut notify: Option<Arc<Notify>>,
quit_signal: Arc<Notify>,
) -> Result<HeartbeatSender> {
let client_id = meta_client.id();

Expand All @@ -123,7 +125,8 @@ impl HeartbeatTask {
info!("Heartbeat task shutdown");
}
}
info!("Heartbeat handling loop exit.")
quit_signal.notify_one();
info!("Heartbeat handling loop exit.");
});
Ok(tx)
}
Expand Down Expand Up @@ -167,12 +170,15 @@ impl HeartbeatTask {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));

let quit_signal = Arc::new(tokio::sync::Notify::new());

let mut tx = Self::create_streams(
&meta_client,
running.clone(),
handler_executor.clone(),
mailbox.clone(),
notify,
quit_signal.clone(),
)
.await?;

Expand All @@ -187,7 +193,6 @@ impl HeartbeatTask {
common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown heartbeat task");
Expand Down Expand Up @@ -228,6 +233,11 @@ impl HeartbeatTask {
sleep.as_mut().reset(now + Duration::from_millis(interval));
Some(req)
}
// If the heartbeat stream is broken, send a dummy heartbeat request to re-create the heartbeat stream.
_ = quit_signal.notified() => {
let req = HeartbeatRequest::default();
Some(req)
}
};
if let Some(req) = req {
debug!("Sending heartbeat request: {:?}", req);
Expand All @@ -239,6 +249,7 @@ impl HeartbeatTask {
handler_executor.clone(),
mailbox.clone(),
None,
quit_signal.clone(),
)
.await
{
Expand All @@ -249,6 +260,13 @@ impl HeartbeatTask {
sleep.as_mut().reset(Instant::now());
}
Err(e) => {
// Before the META_LEASE_SECS expires,
// any retries are meaningless, it always reads the old meta leader address.
// Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS.
sleep.as_mut().reset(
Instant::now()
+ Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
);
error!(e;"Failed to reconnect to metasrv!");
}
}
Expand Down Expand Up @@ -317,13 +335,19 @@ pub async fn new_metasrv_client(
.timeout(Duration::from_millis(meta_config.timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis))
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(config);
let channel_manager = ChannelManager::with_config(config.clone());
let heartbeat_channel_manager = ChannelManager::with_config(
config
.timeout(Duration::from_millis(meta_config.heartbeat_timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.heartbeat_timeout_millis)),
);

let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.heartbeat_channel_manager(heartbeat_channel_manager)
.build();
meta_client
.start(&meta_config.metasrv_addrs)
Expand Down
2 changes: 1 addition & 1 deletion src/meta-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Default for MetaClientOptions {
timeout_millis: 3_000u64,
heartbeat_timeout_millis: default_heartbeat_timeout_millis(),
ddl_timeout_millis: default_ddl_timeout_millis(),
connect_timeout_millis: 5_000u64,
connect_timeout_millis: 1_000u64,
tcp_nodelay: true,
}
}
Expand Down

0 comments on commit 9282e59

Please sign in to comment.