diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 92d7249e33ca..530fba83aa2e 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -32,6 +32,9 @@ use crate::pubsub::Message; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("The target peer is unavailable temporally: {}", peer_id))] + PeerUnavailable { location: Location, peer_id: u64 }, + #[snafu(display("Another migration procedure is running for region: {}", region_id))] MigrationRunning { location: Location, @@ -650,7 +653,8 @@ impl ErrorExt for Error { | Error::Join { .. } | Error::WeightArray { .. } | Error::NotSetWeightArray { .. } - | Error::Unsupported { .. } => StatusCode::Internal, + | Error::Unsupported { .. } + | Error::PeerUnavailable { .. } => StatusCode::Internal, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 6515ade81ebe..cee4737cff86 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -32,6 +32,7 @@ pub mod metasrv; mod metrics; #[cfg(feature = "mock")] pub mod mocks; +pub mod peer; pub mod procedure; pub mod pubsub; pub mod region; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index dba3c4485002..12686998c7e9 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -48,6 +48,7 @@ use crate::error::{ use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; +use crate::peer::NaivePeerRegistry; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; @@ -251,6 +252,7 @@ pub struct MetaSrv { memory_region_keeper: MemoryRegionKeeperRef, greptimedb_telemetry_task: Arc, region_migration_manager: RegionMigrationManagerRef, + datanode_peer_registry: NaivePeerRegistry, plugins: Plugins, } @@ -417,6 +419,10 @@ impl MetaSrv { &self.region_migration_manager } + pub fn datanode_peer_registry(&self) -> &NaivePeerRegistry { + &self.datanode_peer_registry + } + pub fn publish(&self) -> Option { self.plugins.get::() } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index f84c8a487d2f..86c10db10dd8 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -56,6 +56,7 @@ use crate::lock::DistLockRef; use crate::metasrv::{ ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; +use crate::peer::NaivePeerRegistry; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::ContextFactoryImpl; @@ -337,6 +338,7 @@ impl MetaSrvBuilder { plugins: plugins.unwrap_or_else(Plugins::default), memory_region_keeper: opening_region_keeper, region_migration_manager, + datanode_peer_registry: NaivePeerRegistry::default(), }) } } diff --git a/src/meta-srv/src/peer.rs b/src/meta-srv/src/peer.rs new file mode 100644 index 000000000000..c64823bd6f44 --- /dev/null +++ b/src/meta-srv/src/peer.rs @@ -0,0 +1,81 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use common_meta::peer::Peer; +use common_meta::ClusterId; +use common_telemetry::warn; + +/// Used to look up specific peer by `peer_id`. +pub trait PeerLookup: Send + Sync { + /// Returns `None` stands for target [Peer] is unavailable temporally + fn peer(&self, cluster_id: ClusterId, peer_id: u64) -> Option; +} + +pub type PeerIdentifier = (ClusterId, u64); + +#[derive(Debug, Default, Clone)] +pub struct NaivePeerRegistry(Arc>>); + +impl NaivePeerRegistry { + /// Registers a [Peer]. + pub fn register(&self, cluster_id: ClusterId, peer: Peer) { + let mut inner = self.0.write().unwrap(); + if let Some(previous) = inner.insert((cluster_id, peer.id), peer) { + warn!( + "Registered a peer, it overwritten the previous peer: {:?}", + previous + ) + } + } + + /// Deregisters a [Peer]. + pub fn deregister(&self, cluster_id: ClusterId, peer_id: u64) { + let mut inner = self.0.write().unwrap(); + if inner.remove(&(cluster_id, peer_id)).is_none() { + warn!( + "Trying to deregister a non-exist peer, peer_id: {}", + peer_id + ); + } + } +} + +impl PeerLookup for NaivePeerRegistry { + fn peer(&self, cluster_id: ClusterId, peer_id: u64) -> Option { + let inner = self.0.read().unwrap(); + inner.get(&(cluster_id, peer_id)).cloned() + } +} + +#[cfg(test)] +mod tests { + use common_meta::peer::Peer; + + use super::{NaivePeerRegistry, PeerLookup}; + + #[test] + fn test_naive_peer_registry() { + let lookup = NaivePeerRegistry::default(); + lookup.register(0, Peer::empty(1024)); + + assert!(lookup.peer(0, 1024).is_some()); + assert!(lookup.peer(0, 1025).is_none()); + + lookup.deregister(0, 1024); + assert!(lookup.peer(0, 1024).is_none()); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 9124af91b907..5f4329fabe35 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -21,7 +21,7 @@ use common_meta::key::table_route::TableRouteValue; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; use common_meta::ClusterId; -use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId}; +use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::{error, info}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -57,10 +57,10 @@ impl Drop for RegionMigrationProcedureGuard { #[derive(Debug, Clone)] pub(crate) struct RegionMigrationProcedureTask { - cluster_id: ClusterId, - region_id: RegionId, - from_peer: Peer, - to_peer: Peer, + pub(crate) cluster_id: ClusterId, + pub(crate) region_id: RegionId, + pub(crate) from_peer: Peer, + pub(crate) to_peer: Peer, } impl Display for RegionMigrationProcedureTask { @@ -223,7 +223,10 @@ impl RegionMigrationManager { } /// Submits a new region migration procedure. - pub(crate) async fn submit_procedure(&self, task: RegionMigrationProcedureTask) -> Result<()> { + pub(crate) async fn submit_procedure( + &self, + task: RegionMigrationProcedureTask, + ) -> Result> { let Some(guard) = self.insert_running_procedure(&task) else { return error::MigrationRunningSnafu { region_id: task.region_id, @@ -245,7 +248,7 @@ impl RegionMigrationManager { if self.has_migrated(®ion_route, &task)? { info!("Skipping region migration task: {task}"); - return Ok(()); + return Ok(None); } self.verify_region_leader_peer(®ion_route, &task)?; @@ -276,7 +279,7 @@ impl RegionMigrationManager { info!("Region migration procedure {procedure_id} for {task} is finished successfully!"); }); - Ok(()) + Ok(Some(procedure_id)) } } diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index d8ccf5fdc32f..c1cc33f0cda5 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -95,6 +95,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let handler = region_migration::SubmitRegionMigrationTaskHandler { region_migration_manager: meta_srv.region_migration_manager().clone(), + peer_lookup: Arc::new(meta_srv.datanode_peer_registry().clone()), }; let router = router.route("/region-migration", handler); diff --git a/src/meta-srv/src/service/admin/region_migration.rs b/src/meta-srv/src/service/admin/region_migration.rs index 49863be48d52..79a4caf2d2ec 100644 --- a/src/meta-srv/src/service/admin/region_migration.rs +++ b/src/meta-srv/src/service/admin/region_migration.rs @@ -15,21 +15,25 @@ use std::collections::HashMap; use std::num::ParseIntError; use std::str::FromStr; +use std::sync::Arc; -use common_meta::peer::Peer; use common_meta::ClusterId; use serde::Serialize; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use tonic::codegen::http; use super::HttpHandler; use crate::error::{self, Error, Result}; -use crate::procedure::region_migration::manager::RegionMigrationManagerRef; +use crate::peer::PeerLookup; +use crate::procedure::region_migration::manager::{ + RegionMigrationManagerRef, RegionMigrationProcedureTask, +}; /// The handler of submitting migration task. pub struct SubmitRegionMigrationTaskHandler { pub region_migration_manager: RegionMigrationManagerRef, + pub peer_lookup: Arc, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -42,7 +46,8 @@ struct SubmitRegionMigrationTaskRequest { #[derive(Debug, Serialize)] struct SubmitRegionMigrationTaskResponse { - procedure_id: String, + /// The `None` stands region has been migrated. + procedure_id: Option, } fn parse_num_parameter_with_default( @@ -96,10 +101,41 @@ impl SubmitRegionMigrationTaskHandler { /// Submits a region migration task, returns the procedure id. async fn handle_submit( &self, - _task: SubmitRegionMigrationTaskRequest, + task: SubmitRegionMigrationTaskRequest, ) -> Result { - // TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014 - todo!() + let SubmitRegionMigrationTaskRequest { + cluster_id, + region_id, + from_peer_id, + to_peer_id, + } = task; + + let from_peer = self.peer_lookup.peer(cluster_id, from_peer_id).context( + error::PeerUnavailableSnafu { + peer_id: from_peer_id, + }, + )?; + + let to_peer = + self.peer_lookup + .peer(cluster_id, to_peer_id) + .context(error::PeerUnavailableSnafu { + peer_id: to_peer_id, + })?; + + let procedure_id = self + .region_migration_manager + .submit_procedure(RegionMigrationProcedureTask { + cluster_id, + region_id, + from_peer, + to_peer, + }) + .await?; + + Ok(SubmitRegionMigrationTaskResponse { + procedure_id: procedure_id.map(|id| id.to_string()), + }) } } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 4144bc30605d..446f4540b102 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -44,9 +44,11 @@ impl heartbeat_server::Heartbeat for MetaSrv { let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); let handler_group = self.handler_group().clone(); + let datanode_peer_registry = self.datanode_peer_registry().clone(); let ctx = self.new_ctx(); let _handle = common_runtime::spawn_bg(async move { let mut pusher_key = None; + let mut datanode_peer_ident = None; while let Some(msg) = in_stream.next().await { let mut is_not_leader = false; match msg { @@ -65,7 +67,15 @@ impl heartbeat_server::Heartbeat for MetaSrv { if pusher_key.is_none() { let node_id = get_node_id(header); - let role = header.role() as i32; + let role = header.role(); + if let Some(peer) = req.peer.as_ref() { + if matches!(role, Role::Datanode) { + datanode_peer_registry + .register(header.cluster_id, peer.clone().into()); + datanode_peer_ident = Some((header.cluster_id, peer.id)); + } + } + let role = role as i32; let key = format!("{}-{}", role, node_id); let pusher = Pusher::new(tx.clone(), header); handler_group.register(&key, pusher).await; @@ -115,6 +125,9 @@ impl heartbeat_server::Heartbeat for MetaSrv { if let Some(key) = pusher_key { let _ = handler_group.unregister(&key).await; } + if let Some((cluster_id, peer_id)) = datanode_peer_ident { + datanode_peer_registry.deregister(cluster_id, peer_id); + } }); let out_stream = ReceiverStream::new(rx);