Skip to content

Commit

Permalink
fix: missing datanode id on keep lease (GreptimeTeam#2415)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored and paomian committed Oct 19, 2023
1 parent e96c51a commit 1901547
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 22 deletions.
11 changes: 2 additions & 9 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, RequestHeader, Role};
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
Expand Down Expand Up @@ -170,14 +170,9 @@ impl HeartbeatTask {
)
.await?;

let req_header = Some(RequestHeader {
member_id: node_id,
role: Role::Datanode as i32,
..Default::default()
});
let self_peer = Some(Peer {
id: node_id,
addr: addr.clone(),
..Default::default()
});
let epoch = self.region_alive_keeper.epoch();

Expand Down Expand Up @@ -222,7 +217,6 @@ impl HeartbeatTask {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
header: req_header.clone(),
peer: self_peer.clone(),
mailbox_message: Some(message),
..Default::default()
Expand All @@ -243,7 +237,6 @@ impl HeartbeatTask {
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let req = HeartbeatRequest {
header: req_header.clone(),
peer: self_peer.clone(),
region_stats,
duration_since_epoch,
Expand Down
9 changes: 1 addition & 8 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use api::v1::meta::{HeartbeatRequest, RequestHeader, Role};
use api::v1::meta::HeartbeatRequest;
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
Expand Down Expand Up @@ -109,11 +109,6 @@ impl HeartbeatTask {
) {
let report_interval = self.report_interval;

let req_header = Some(RequestHeader {
role: Role::Frontend as i32,
..Default::default()
});

common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
Expand All @@ -125,7 +120,6 @@ impl HeartbeatTask {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
header: req_header.clone(),
mailbox_message: Some(message),
..Default::default()
};
Expand All @@ -144,7 +138,6 @@ impl HeartbeatTask {
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
let req = HeartbeatRequest {
header: req_header.clone(),
..Default::default()
};
Some(req)
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ impl Instance {
let channel_manager = ChannelManager::with_config(channel_config);
let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config);

let mut meta_client = MetaClientBuilder::new(0, 0, Role::Frontend)
let cluster_id = 0; // TODO(jeremy): read from config
let mut meta_client = MetaClientBuilder::new(cluster_id, 0, Role::Frontend)
.enable_router()
.enable_store()
.enable_heartbeat()
Expand Down
5 changes: 4 additions & 1 deletion src/meta-srv/src/handler/keep_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ impl HeartbeatHandler for KeepLeaseHandler {
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let HeartbeatRequest { header, peer, .. } = req;
let Some(header) = &header else {
return Ok(());
};
let Some(peer) = &peer else {
return Ok(());
};

let key = LeaseKey {
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),
cluster_id: header.cluster_id,
node_id: peer.id,
};
let value = LeaseValue {
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl TryFrom<HeartbeatRequest> for Stat {
timestamp_millis: time_util::current_time_millis(),
cluster_id: header.cluster_id,
// datanode id
id: header.member_id,
id: peer.id,
// datanode address
addr: peer.addr,
rcus: region_stats.iter().map(|s| s.rcus).sum(),
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/selector/load_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ impl Selector for LoadBasedSelector {

Ok(tuples
.into_iter()
.map(|(stat_key, lease_val, _)| Peer {
id: stat_key.node_id,
.map(|(lease_key, lease_val, _)| Peer {
id: lease_key.node_id,
addr: lease_val.node_addr,
})
.collect())
Expand Down

0 comments on commit 1901547

Please sign in to comment.