Skip to content

Commit

Permalink
refactor: use lease-based alive datanode checking
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 29, 2023
1 parent e95e2b7 commit 23c4d6d
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 145 deletions.
13 changes: 12 additions & 1 deletion src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ impl MetaPeerClient {
to_stat_kv_map(kvs)
}

// Get kv information from the leader's in_mem kv store.
pub async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>> {
let mut kvs = self.range(key, vec![], false).await?;
Ok(if kvs.is_empty() {
None
} else {
debug_assert_eq!(kvs.len(), 1);
Some(kvs.remove(0))
})
}

// Range kv information from the leader's in_mem kv store
pub async fn range(
&self,
Expand Down Expand Up @@ -228,7 +239,7 @@ impl MetaPeerClient {

// Check if the meta node is a leader node.
// Note: when self.election is None, we also consider the meta node is leader
fn is_leader(&self) -> bool {
pub(crate) fn is_leader(&self) -> bool {
self.election
.as_ref()
.map(|election| election.is_leader())
Expand Down
42 changes: 36 additions & 6 deletions src/meta-srv/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,57 @@

use std::collections::HashMap;

use common_meta::util;
use common_meta::peer::Peer;
use common_meta::{util, ClusterId};
use common_time::util as time_util;

use crate::cluster::MetaPeerClientRef;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};

fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseKey, &LeaseValue) -> bool {
move |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
}
}

pub async fn lookup_alive_datanode_peer(
cluster_id: ClusterId,
datanode_id: u64,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease_secs);
let lease_key = LeaseKey {
cluster_id,
node_id: datanode_id,
};
let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
if lease_filter(&lease_key, &lease_value) {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}

pub async fn alive_datanodes(
cluster_id: u64,
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<LeaseKey, LeaseValue>> {
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
};
let lease_filter = build_lease_filter(lease_secs);

filter_datanodes(cluster_id, meta_peer_client, lease_filter).await
}

pub async fn filter_datanodes<P>(
cluster_id: u64,
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<LeaseKey, LeaseValue>>
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub mod metasrv;
mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod peer;
pub mod procedure;
pub mod pubsub;
pub mod region;
Expand Down
6 changes: 0 additions & 6 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use crate::error::{
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::peer::NaivePeerRegistry;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
Expand Down Expand Up @@ -252,7 +251,6 @@ pub struct MetaSrv {
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
datanode_peer_registry: NaivePeerRegistry,

plugins: Plugins,
}
Expand Down Expand Up @@ -419,10 +417,6 @@ impl MetaSrv {
&self.region_migration_manager
}

pub fn datanode_peer_registry(&self) -> &NaivePeerRegistry {
&self.datanode_peer_registry
}

pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
}
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use crate::lock::DistLockRef;
use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::peer::NaivePeerRegistry;
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
Expand Down Expand Up @@ -338,7 +337,6 @@ impl MetaSrvBuilder {
plugins: plugins.unwrap_or_else(Plugins::default),
memory_region_keeper: opening_region_keeper,
region_migration_manager,
datanode_peer_registry: NaivePeerRegistry::default(),
})
}
}
Expand Down
100 changes: 0 additions & 100 deletions src/meta-srv/src/peer.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {

let handler = region_migration::SubmitRegionMigrationTaskHandler {
region_migration_manager: meta_srv.region_migration_manager().clone(),
peer_lookup: Arc::new(meta_srv.datanode_peer_registry().clone()),
meta_peer_client: meta_srv.meta_peer_client().clone(),
};
let router = router.route("/region-migration", handler);

Expand Down
48 changes: 34 additions & 14 deletions src/meta-srv/src/service/admin/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@
use std::collections::HashMap;
use std::num::ParseIntError;
use std::str::FromStr;
use std::sync::Arc;

use common_meta::ClusterId;
use common_meta::peer::Peer;
use common_meta::{distributed_time_constants, ClusterId};
use serde::Serialize;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use tonic::codegen::http;

use super::HttpHandler;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Error, Result};
use crate::peer::PeerLookup;
use crate::lease::lookup_alive_datanode_peer;
use crate::procedure::region_migration::manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask,
};

/// The handler of submitting migration task.
pub struct SubmitRegionMigrationTaskHandler {
pub region_migration_manager: RegionMigrationManagerRef,
pub peer_lookup: Arc<dyn PeerLookup>,
pub meta_peer_client: MetaPeerClientRef,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -98,31 +99,50 @@ impl TryFrom<&HashMap<String, String>> for SubmitRegionMigrationTaskRequest {
}

impl SubmitRegionMigrationTaskHandler {
fn is_leader(&self) -> bool {
self.meta_peer_client.is_leader()
}

/// Checks the peer is available.
async fn lookup_peer(&self, cluster_id: ClusterId, peer_id: u64) -> Result<Option<Peer>> {
lookup_alive_datanode_peer(
cluster_id,
peer_id,
&self.meta_peer_client,
distributed_time_constants::DATANODE_LEASE_SECS,
)
.await
}

/// Submits a region migration task, returns the procedure id.
async fn handle_submit(
&self,
task: SubmitRegionMigrationTaskRequest,
) -> Result<SubmitRegionMigrationTaskResponse> {
ensure!(
self.is_leader(),
error::UnexpectedSnafu {
violated: "Trying to submit a region migration procedure to non-leader meta server"
}
);

let SubmitRegionMigrationTaskRequest {
cluster_id,
region_id,
from_peer_id,
to_peer_id,
} = task;

let from_peer = self.peer_lookup.peer(cluster_id, from_peer_id).context(
let from_peer = self.lookup_peer(cluster_id, from_peer_id).await?.context(
error::PeerUnavailableSnafu {
peer_id: from_peer_id,
},
)?;

let to_peer =
self.peer_lookup
.peer(cluster_id, to_peer_id)
.context(error::PeerUnavailableSnafu {
peer_id: to_peer_id,
})?;

let to_peer = self.lookup_peer(cluster_id, to_peer_id).await?.context(
error::PeerUnavailableSnafu {
peer_id: to_peer_id,
},
)?;
let procedure_id = self
.region_migration_manager
.submit_procedure(RegionMigrationProcedureTask {
Expand Down
15 changes: 1 addition & 14 deletions src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ impl heartbeat_server::Heartbeat for MetaSrv {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handler_group = self.handler_group().clone();
let datanode_peer_registry = self.datanode_peer_registry().clone();
let ctx = self.new_ctx();
let _handle = common_runtime::spawn_bg(async move {
let mut pusher_key = None;
let mut datanode_peer_ident = None;
while let Some(msg) = in_stream.next().await {
let mut is_not_leader = false;
match msg {
Expand All @@ -67,15 +65,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {

if pusher_key.is_none() {
let node_id = get_node_id(header);
let role = header.role();
if let Some(peer) = req.peer.as_ref() {
if matches!(role, Role::Datanode) {
datanode_peer_registry
.register(header.cluster_id, peer.clone().into());
datanode_peer_ident = Some((header.cluster_id, peer.id));
}
}
let role = role as i32;
let role = header.role() as i32;
let key = format!("{}-{}", role, node_id);
let pusher = Pusher::new(tx.clone(), header);
handler_group.register(&key, pusher).await;
Expand Down Expand Up @@ -125,9 +115,6 @@ impl heartbeat_server::Heartbeat for MetaSrv {
if let Some(key) = pusher_key {
let _ = handler_group.deregister(&key).await;
}
if let Some((cluster_id, peer_id)) = datanode_peer_ident {
datanode_peer_registry.deregister(cluster_id, peer_id);
}
});

let out_stream = ReceiverStream::new(rx);
Expand Down

0 comments on commit 23c4d6d

Please sign in to comment.