diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 83a5dd984cd0..7e460664c181 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -79,6 +79,17 @@ impl MetaPeerClient { to_stat_kv_map(kvs) } + // Get kv information from the leader's in_mem kv store. + pub async fn get(&self, key: Vec) -> Result> { + let mut kvs = self.range(key, vec![], false).await?; + Ok(if kvs.is_empty() { + None + } else { + debug_assert_eq!(kvs.len(), 1); + Some(kvs.remove(0)) + }) + } + // Range kv information from the leader's in_mem kv store pub async fn range( &self, @@ -228,7 +239,7 @@ impl MetaPeerClient { // Check if the meta node is a leader node. // Note: when self.election is None, we also consider the meta node is leader - fn is_leader(&self) -> bool { + pub(crate) fn is_leader(&self) -> bool { self.election .as_ref() .map(|election| election.is_leader()) diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 0f8c409406c9..2fa3224e35b4 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -14,27 +14,57 @@ use std::collections::HashMap; -use common_meta::util; +use common_meta::peer::Peer; +use common_meta::{util, ClusterId}; use common_time::util as time_util; use crate::cluster::MetaPeerClientRef; use crate::error::Result; use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX}; +fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseKey, &LeaseValue) -> bool { + move |_: &LeaseKey, v: &LeaseValue| { + ((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000 + } +} + +pub async fn lookup_alive_datanode_peer( + cluster_id: ClusterId, + datanode_id: u64, + meta_peer_client: &MetaPeerClientRef, + lease_secs: u64, +) -> Result> { + let lease_filter = build_lease_filter(lease_secs); + let lease_key = LeaseKey { + cluster_id, + node_id: datanode_id, + }; + let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else { + return Ok(None); + }; + let lease_value: LeaseValue = kv.value.try_into()?; + if lease_filter(&lease_key, &lease_value) { + Ok(Some(Peer { + id: lease_key.node_id, + addr: lease_value.node_addr, + })) + } else { + Ok(None) + } +} + pub async fn alive_datanodes( - cluster_id: u64, + cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, lease_secs: u64, ) -> Result> { - let lease_filter = |_: &LeaseKey, v: &LeaseValue| { - ((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000 - }; + let lease_filter = build_lease_filter(lease_secs); filter_datanodes(cluster_id, meta_peer_client, lease_filter).await } pub async fn filter_datanodes

( - cluster_id: u64, + cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, predicate: P, ) -> Result> diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index cee4737cff86..6515ade81ebe 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -32,7 +32,6 @@ pub mod metasrv; mod metrics; #[cfg(feature = "mock")] pub mod mocks; -pub mod peer; pub mod procedure; pub mod pubsub; pub mod region; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 12686998c7e9..dba3c4485002 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -48,7 +48,6 @@ use crate::error::{ use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; -use crate::peer::NaivePeerRegistry; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; @@ -252,7 +251,6 @@ pub struct MetaSrv { memory_region_keeper: MemoryRegionKeeperRef, greptimedb_telemetry_task: Arc, region_migration_manager: RegionMigrationManagerRef, - datanode_peer_registry: NaivePeerRegistry, plugins: Plugins, } @@ -419,10 +417,6 @@ impl MetaSrv { &self.region_migration_manager } - pub fn datanode_peer_registry(&self) -> &NaivePeerRegistry { - &self.datanode_peer_registry - } - pub fn publish(&self) -> Option { self.plugins.get::() } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 9fc044730b5f..28a34714d665 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -56,7 +56,6 @@ use crate::lock::DistLockRef; use crate::metasrv::{ ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; -use crate::peer::NaivePeerRegistry; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; @@ -338,7 +337,6 @@ impl MetaSrvBuilder { plugins: plugins.unwrap_or_else(Plugins::default), memory_region_keeper: opening_region_keeper, region_migration_manager, - datanode_peer_registry: NaivePeerRegistry::default(), }) } } diff --git a/src/meta-srv/src/peer.rs b/src/meta-srv/src/peer.rs deleted file mode 100644 index 30bc6d6d52a6..000000000000 --- a/src/meta-srv/src/peer.rs +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; - -use common_meta::peer::Peer; -use common_meta::ClusterId; -use common_telemetry::warn; - -/// Used to look up specific peer by `peer_id`. -pub trait PeerLookup: Send + Sync { - /// Returns `None` stands for target [Peer] is unavailable temporally - fn peer(&self, cluster_id: ClusterId, peer_id: u64) -> Option; -} - -pub type PeerIdentifier = (ClusterId, u64); - -#[derive(Debug, Default, Clone)] -pub struct NaivePeerRegistry(Arc>>); - -impl NaivePeerRegistry { - /// Registers a [Peer]. - pub fn register(&self, cluster_id: ClusterId, peer: Peer) { - let mut inner = self.0.write().unwrap(); - match inner.entry((cluster_id, peer.id)) { - Entry::Occupied(mut entry) => { - let previous = entry.get(); - if previous != &peer { - warn!( - "Registered a new peer: {}, it overwritten the previous peer: {:?}", - peer, previous - ) - } - entry.insert(peer); - } - Entry::Vacant(entry) => { - entry.insert(peer); - } - }; - } - - /// Deregisters a [Peer]. - pub fn deregister(&self, cluster_id: ClusterId, peer_id: u64) { - let mut inner = self.0.write().unwrap(); - if inner.remove(&(cluster_id, peer_id)).is_none() { - warn!( - "Trying to deregister a non-exist peer, peer_id: {}", - peer_id - ); - } - } -} - -impl PeerLookup for NaivePeerRegistry { - fn peer(&self, cluster_id: ClusterId, peer_id: u64) -> Option { - let inner = self.0.read().unwrap(); - inner.get(&(cluster_id, peer_id)).cloned() - } -} - -#[cfg(test)] -mod tests { - use common_meta::peer::Peer; - - use super::{NaivePeerRegistry, PeerLookup}; - - #[test] - fn test_naive_peer_registry() { - common_telemetry::init_default_ut_logging(); - - let lookup = NaivePeerRegistry::default(); - lookup.register(0, Peer::empty(1024)); - lookup.register( - 0, - Peer { - id: 1024, - addr: "test".to_string(), - }, - ); - - assert!(lookup.peer(0, 1024).is_some()); - assert!(lookup.peer(0, 1025).is_none()); - - lookup.deregister(0, 1024); - assert!(lookup.peer(0, 1024).is_none()); - } -} diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index c1cc33f0cda5..fa1443de32d9 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -95,7 +95,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let handler = region_migration::SubmitRegionMigrationTaskHandler { region_migration_manager: meta_srv.region_migration_manager().clone(), - peer_lookup: Arc::new(meta_srv.datanode_peer_registry().clone()), + meta_peer_client: meta_srv.meta_peer_client().clone(), }; let router = router.route("/region-migration", handler); diff --git a/src/meta-srv/src/service/admin/region_migration.rs b/src/meta-srv/src/service/admin/region_migration.rs index 79a4caf2d2ec..544a6d40859a 100644 --- a/src/meta-srv/src/service/admin/region_migration.rs +++ b/src/meta-srv/src/service/admin/region_migration.rs @@ -15,17 +15,18 @@ use std::collections::HashMap; use std::num::ParseIntError; use std::str::FromStr; -use std::sync::Arc; -use common_meta::ClusterId; +use common_meta::peer::Peer; +use common_meta::{distributed_time_constants, ClusterId}; use serde::Serialize; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; use tonic::codegen::http; use super::HttpHandler; +use crate::cluster::MetaPeerClientRef; use crate::error::{self, Error, Result}; -use crate::peer::PeerLookup; +use crate::lease::lookup_alive_datanode_peer; use crate::procedure::region_migration::manager::{ RegionMigrationManagerRef, RegionMigrationProcedureTask, }; @@ -33,7 +34,7 @@ use crate::procedure::region_migration::manager::{ /// The handler of submitting migration task. pub struct SubmitRegionMigrationTaskHandler { pub region_migration_manager: RegionMigrationManagerRef, - pub peer_lookup: Arc, + pub meta_peer_client: MetaPeerClientRef, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -98,11 +99,33 @@ impl TryFrom<&HashMap> for SubmitRegionMigrationTaskRequest { } impl SubmitRegionMigrationTaskHandler { + fn is_leader(&self) -> bool { + self.meta_peer_client.is_leader() + } + + /// Checks the peer is available. + async fn lookup_peer(&self, cluster_id: ClusterId, peer_id: u64) -> Result> { + lookup_alive_datanode_peer( + cluster_id, + peer_id, + &self.meta_peer_client, + distributed_time_constants::DATANODE_LEASE_SECS, + ) + .await + } + /// Submits a region migration task, returns the procedure id. async fn handle_submit( &self, task: SubmitRegionMigrationTaskRequest, ) -> Result { + ensure!( + self.is_leader(), + error::UnexpectedSnafu { + violated: "Trying to submit a region migration procedure to non-leader meta server" + } + ); + let SubmitRegionMigrationTaskRequest { cluster_id, region_id, @@ -110,19 +133,16 @@ impl SubmitRegionMigrationTaskHandler { to_peer_id, } = task; - let from_peer = self.peer_lookup.peer(cluster_id, from_peer_id).context( + let from_peer = self.lookup_peer(cluster_id, from_peer_id).await?.context( error::PeerUnavailableSnafu { peer_id: from_peer_id, }, )?; - - let to_peer = - self.peer_lookup - .peer(cluster_id, to_peer_id) - .context(error::PeerUnavailableSnafu { - peer_id: to_peer_id, - })?; - + let to_peer = self.lookup_peer(cluster_id, to_peer_id).await?.context( + error::PeerUnavailableSnafu { + peer_id: to_peer_id, + }, + )?; let procedure_id = self .region_migration_manager .submit_procedure(RegionMigrationProcedureTask { diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 174ca44efca9..598edf2ca765 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -44,11 +44,9 @@ impl heartbeat_server::Heartbeat for MetaSrv { let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); let handler_group = self.handler_group().clone(); - let datanode_peer_registry = self.datanode_peer_registry().clone(); let ctx = self.new_ctx(); let _handle = common_runtime::spawn_bg(async move { let mut pusher_key = None; - let mut datanode_peer_ident = None; while let Some(msg) = in_stream.next().await { let mut is_not_leader = false; match msg { @@ -67,15 +65,7 @@ impl heartbeat_server::Heartbeat for MetaSrv { if pusher_key.is_none() { let node_id = get_node_id(header); - let role = header.role(); - if let Some(peer) = req.peer.as_ref() { - if matches!(role, Role::Datanode) { - datanode_peer_registry - .register(header.cluster_id, peer.clone().into()); - datanode_peer_ident = Some((header.cluster_id, peer.id)); - } - } - let role = role as i32; + let role = header.role() as i32; let key = format!("{}-{}", role, node_id); let pusher = Pusher::new(tx.clone(), header); handler_group.register(&key, pusher).await; @@ -125,9 +115,6 @@ impl heartbeat_server::Heartbeat for MetaSrv { if let Some(key) = pusher_key { let _ = handler_group.deregister(&key).await; } - if let Some((cluster_id, peer_id)) = datanode_peer_ident { - datanode_peer_registry.deregister(cluster_id, peer_id); - } }); let out_stream = ReceiverStream::new(rx);