diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 2e5d82ac37f1..4144bc30605d 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -55,10 +55,12 @@ impl heartbeat_server::Heartbeat for MetaSrv { Some(header) => header, None => { let err = error::MissingRequestHeaderSnafu {}.build(); - tx.send(Err(err.into())).await.expect("working rx"); + error!("Exit on malformed request: MissingRequestHeader"); + let _ = tx.send(Err(err.into())).await; break; } }; + debug!("Receiving heartbeat request: {:?}", req); if pusher_key.is_none() { @@ -78,7 +80,10 @@ impl heartbeat_server::Heartbeat for MetaSrv { is_not_leader = res.as_ref().map_or(false, |r| r.is_not_leader()); debug!("Sending heartbeat response: {:?}", res); - tx.send(res).await.expect("working rx"); + if tx.send(res).await.is_err() { + info!("ReceiverStream was dropped; shutting down"); + break; + } } Err(err) => { if let Some(io_err) = error::match_for_io_error(&err) { @@ -89,9 +94,9 @@ impl heartbeat_server::Heartbeat for MetaSrv { } } - match tx.send(Err(err)).await { - Ok(_) => (), - Err(_err) => break, // response was dropped + if tx.send(Err(err)).await.is_err() { + info!("ReceiverStream was dropped; shutting down"); + break; } } } @@ -101,10 +106,12 @@ impl heartbeat_server::Heartbeat for MetaSrv { break; } } + info!( - "Heartbeat stream broken: {:?}", + "Heartbeat stream closed: {:?}", pusher_key.as_ref().unwrap_or(&"unknown".to_string()) ); + if let Some(key) = pusher_key { let _ = handler_group.unregister(&key).await; }