diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 83a5dd984cd0..7e460664c181 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -79,6 +79,17 @@ impl MetaPeerClient { to_stat_kv_map(kvs) } + // Get kv information from the leader's in_mem kv store. + pub async fn get(&self, key: Vec) -> Result> { + let mut kvs = self.range(key, vec![], false).await?; + Ok(if kvs.is_empty() { + None + } else { + debug_assert_eq!(kvs.len(), 1); + Some(kvs.remove(0)) + }) + } + // Range kv information from the leader's in_mem kv store pub async fn range( &self, @@ -228,7 +239,7 @@ impl MetaPeerClient { // Check if the meta node is a leader node. // Note: when self.election is None, we also consider the meta node is leader - fn is_leader(&self) -> bool { + pub(crate) fn is_leader(&self) -> bool { self.election .as_ref() .map(|election| election.is_leader()) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 92d7249e33ca..530fba83aa2e 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -32,6 +32,9 @@ use crate::pubsub::Message; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("The target peer is unavailable temporally: {}", peer_id))] + PeerUnavailable { location: Location, peer_id: u64 }, + #[snafu(display("Another migration procedure is running for region: {}", region_id))] MigrationRunning { location: Location, @@ -650,7 +653,8 @@ impl ErrorExt for Error { | Error::Join { .. } | Error::WeightArray { .. } | Error::NotSetWeightArray { .. } - | Error::Unsupported { .. } => StatusCode::Internal, + | Error::Unsupported { .. } + | Error::PeerUnavailable { .. } => StatusCode::Internal, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 8be541ee68bc..4de3d5530dd8 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -229,7 +229,7 @@ impl HeartbeatHandlerGroup { let _ = self.pushers.insert(key.to_string(), pusher).await; } - pub async fn unregister(&self, key: impl AsRef) -> Option { + pub async fn deregister(&self, key: impl AsRef) -> Option { let key = key.as_ref(); METRIC_META_HEARTBEAT_CONNECTION_NUM.dec(); info!("Pusher unregister: {}", key); diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 0f8c409406c9..2fa3224e35b4 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -14,27 +14,57 @@ use std::collections::HashMap; -use common_meta::util; +use common_meta::peer::Peer; +use common_meta::{util, ClusterId}; use common_time::util as time_util; use crate::cluster::MetaPeerClientRef; use crate::error::Result; use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX}; +fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseKey, &LeaseValue) -> bool { + move |_: &LeaseKey, v: &LeaseValue| { + ((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000 + } +} + +pub async fn lookup_alive_datanode_peer( + cluster_id: ClusterId, + datanode_id: u64, + meta_peer_client: &MetaPeerClientRef, + lease_secs: u64, +) -> Result> { + let lease_filter = build_lease_filter(lease_secs); + let lease_key = LeaseKey { + cluster_id, + node_id: datanode_id, + }; + let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else { + return Ok(None); + }; + let lease_value: LeaseValue = kv.value.try_into()?; + if lease_filter(&lease_key, &lease_value) { + Ok(Some(Peer { + id: lease_key.node_id, + addr: lease_value.node_addr, + })) + } else { + Ok(None) + } +} + pub async fn alive_datanodes( - cluster_id: u64, + cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, lease_secs: u64, ) -> Result> { - let lease_filter = |_: &LeaseKey, v: &LeaseValue| { - ((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000 - }; + let lease_filter = build_lease_filter(lease_secs); filter_datanodes(cluster_id, meta_peer_client, lease_filter).await } pub async fn filter_datanodes

( - cluster_id: u64, + cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, predicate: P, ) -> Result> diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 4d6782a2b630..dba3c4485002 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -48,6 +48,7 @@ use crate::error::{ use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; +use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; @@ -249,6 +250,7 @@ pub struct MetaSrv { table_metadata_manager: TableMetadataManagerRef, memory_region_keeper: MemoryRegionKeeperRef, greptimedb_telemetry_task: Arc, + region_migration_manager: RegionMigrationManagerRef, plugins: Plugins, } @@ -411,6 +413,10 @@ impl MetaSrv { &self.memory_region_keeper } + pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef { + &self.region_migration_manager + } + pub fn publish(&self) -> Option { self.plugins.get::() } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 105e9dab0017..28a34714d665 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -57,6 +57,8 @@ use crate::metasrv::{ ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; use crate::procedure::region_failover::RegionFailoverManager; +use crate::procedure::region_migration::manager::RegionMigrationManager; +use crate::procedure::region_migration::DefaultContextFactory; use crate::pubsub::PublishRef; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; @@ -236,6 +238,17 @@ impl MetaSrvBuilder { &opening_region_keeper, )?; + let region_migration_manager = Arc::new(RegionMigrationManager::new( + procedure_manager.clone(), + DefaultContextFactory::new( + table_metadata_manager.clone(), + opening_region_keeper.clone(), + mailbox.clone(), + options.server_addr.clone(), + ), + )); + region_migration_manager.try_start()?; + let handler_group = match handler_group { Some(handler_group) => handler_group, None => { @@ -323,6 +336,7 @@ impl MetaSrvBuilder { .await, plugins: plugins.unwrap_or_else(Plugins::default), memory_region_keeper: opening_region_keeper, + region_migration_manager, }) } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 5299972236e0..a1e92277d60b 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -127,7 +127,7 @@ pub trait ContextFactory { /// Default implementation. #[derive(Clone)] -pub struct ContextFactoryImpl { +pub struct DefaultContextFactory { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, @@ -135,7 +135,25 @@ pub struct ContextFactoryImpl { server_addr: String, } -impl ContextFactory for ContextFactoryImpl { +impl DefaultContextFactory { + /// Returns an [ContextFactoryImpl]. + pub fn new( + table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: MemoryRegionKeeperRef, + mailbox: MailboxRef, + server_addr: String, + ) -> Self { + Self { + volatile_ctx: VolatileContext::default(), + table_metadata_manager, + opening_region_keeper, + mailbox, + server_addr, + } + } +} + +impl ContextFactory for DefaultContextFactory { fn new_context(self, persistent_ctx: PersistentContext) -> Context { Context { persistent_ctx, diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index cfb125c49ba2..03794ed85d11 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -21,21 +21,23 @@ use common_meta::key::table_route::TableRouteValue; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; use common_meta::ClusterId; -use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId}; +use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::{error, info}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; use crate::error::{self, Result}; use crate::procedure::region_migration::{ - ContextFactoryImpl, PersistentContext, RegionMigrationProcedure, + DefaultContextFactory, PersistentContext, RegionMigrationProcedure, }; +pub type RegionMigrationManagerRef = Arc; + /// Manager of region migration procedure. -pub(crate) struct RegionMigrationManager { +pub struct RegionMigrationManager { procedure_manager: ProcedureManagerRef, running_procedures: Arc>>, - context_factory: ContextFactoryImpl, + context_factory: DefaultContextFactory, } /// The guard of running [RegionMigrationProcedureTask]. @@ -55,10 +57,10 @@ impl Drop for RegionMigrationProcedureGuard { #[derive(Debug, Clone)] pub(crate) struct RegionMigrationProcedureTask { - cluster_id: ClusterId, - region_id: RegionId, - from_peer: Peer, - to_peer: Peer, + pub(crate) cluster_id: ClusterId, + pub(crate) region_id: RegionId, + pub(crate) from_peer: Peer, + pub(crate) to_peer: Peer, } impl Display for RegionMigrationProcedureTask { @@ -93,7 +95,7 @@ impl RegionMigrationManager { /// Returns new [RegionMigrationManager] pub(crate) fn new( procedure_manager: ProcedureManagerRef, - context_factory: ContextFactoryImpl, + context_factory: DefaultContextFactory, ) -> Self { Self { procedure_manager, @@ -221,7 +223,10 @@ impl RegionMigrationManager { } /// Submits a new region migration procedure. - pub(crate) async fn submit_procedure(&self, task: RegionMigrationProcedureTask) -> Result<()> { + pub(crate) async fn submit_procedure( + &self, + task: RegionMigrationProcedureTask, + ) -> Result> { let Some(guard) = self.insert_running_procedure(&task) else { return error::MigrationRunningSnafu { region_id: task.region_id, @@ -243,7 +248,7 @@ impl RegionMigrationManager { if self.has_migrated(®ion_route, &task)? { info!("Skipping region migration task: {task}"); - return Ok(()); + return Ok(None); } self.verify_region_leader_peer(®ion_route, &task)?; @@ -274,7 +279,7 @@ impl RegionMigrationManager { info!("Region migration procedure {procedure_id} for {task} is finished successfully!"); }); - Ok(()) + Ok(Some(procedure_id)) } } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index f3a0183f233f..4431791ff70f 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -43,7 +43,7 @@ use tokio::sync::mpsc::{Receiver, Sender}; use super::migration_abort::RegionMigrationAbort; use super::upgrade_candidate_region::UpgradeCandidateRegion; -use super::{Context, ContextFactory, ContextFactoryImpl, State, VolatileContext}; +use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext}; use crate::error::{self, Error, Result}; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; @@ -120,8 +120,8 @@ impl TestingEnv { } /// Returns a context of region migration procedure. - pub fn context_factory(&self) -> ContextFactoryImpl { - ContextFactoryImpl { + pub fn context_factory(&self) -> DefaultContextFactory { + DefaultContextFactory { table_metadata_manager: self.table_metadata_manager.clone(), opening_region_keeper: self.opening_region_keeper.clone(), volatile_ctx: Default::default(), diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 64965571b270..fa1443de32d9 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -93,6 +93,12 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { .route("/route", handler.clone()) .route("/route/help", handler); + let handler = region_migration::SubmitRegionMigrationTaskHandler { + region_migration_manager: meta_srv.region_migration_manager().clone(), + meta_peer_client: meta_srv.meta_peer_client().clone(), + }; + let router = router.route("/region-migration", handler); + let router = Router::nest("/admin", router); Admin::new(router) diff --git a/src/meta-srv/src/service/admin/region_migration.rs b/src/meta-srv/src/service/admin/region_migration.rs index 0cc1457f5019..544a6d40859a 100644 --- a/src/meta-srv/src/service/admin/region_migration.rs +++ b/src/meta-srv/src/service/admin/region_migration.rs @@ -17,22 +17,24 @@ use std::num::ParseIntError; use std::str::FromStr; use common_meta::peer::Peer; -use common_meta::ClusterId; +use common_meta::{distributed_time_constants, ClusterId}; use serde::Serialize; -use snafu::ResultExt; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; use tonic::codegen::http; use super::HttpHandler; +use crate::cluster::MetaPeerClientRef; use crate::error::{self, Error, Result}; - -pub trait PeerLookup: Send + Sync { - fn peer(&self, peer_id: u64) -> Option; -} +use crate::lease::lookup_alive_datanode_peer; +use crate::procedure::region_migration::manager::{ + RegionMigrationManagerRef, RegionMigrationProcedureTask, +}; /// The handler of submitting migration task. pub struct SubmitRegionMigrationTaskHandler { - // TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014 + pub region_migration_manager: RegionMigrationManagerRef, + pub meta_peer_client: MetaPeerClientRef, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -45,7 +47,8 @@ struct SubmitRegionMigrationTaskRequest { #[derive(Debug, Serialize)] struct SubmitRegionMigrationTaskResponse { - procedure_id: String, + /// The `None` stands region has been migrated. + procedure_id: Option, } fn parse_num_parameter_with_default( @@ -96,13 +99,63 @@ impl TryFrom<&HashMap> for SubmitRegionMigrationTaskRequest { } impl SubmitRegionMigrationTaskHandler { + fn is_leader(&self) -> bool { + self.meta_peer_client.is_leader() + } + + /// Checks the peer is available. + async fn lookup_peer(&self, cluster_id: ClusterId, peer_id: u64) -> Result> { + lookup_alive_datanode_peer( + cluster_id, + peer_id, + &self.meta_peer_client, + distributed_time_constants::DATANODE_LEASE_SECS, + ) + .await + } + /// Submits a region migration task, returns the procedure id. async fn handle_submit( &self, - _task: SubmitRegionMigrationTaskRequest, + task: SubmitRegionMigrationTaskRequest, ) -> Result { - // TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014 - todo!() + ensure!( + self.is_leader(), + error::UnexpectedSnafu { + violated: "Trying to submit a region migration procedure to non-leader meta server" + } + ); + + let SubmitRegionMigrationTaskRequest { + cluster_id, + region_id, + from_peer_id, + to_peer_id, + } = task; + + let from_peer = self.lookup_peer(cluster_id, from_peer_id).await?.context( + error::PeerUnavailableSnafu { + peer_id: from_peer_id, + }, + )?; + let to_peer = self.lookup_peer(cluster_id, to_peer_id).await?.context( + error::PeerUnavailableSnafu { + peer_id: to_peer_id, + }, + )?; + let procedure_id = self + .region_migration_manager + .submit_procedure(RegionMigrationProcedureTask { + cluster_id, + region_id, + from_peer, + to_peer, + }) + .await?; + + Ok(SubmitRegionMigrationTaskResponse { + procedure_id: procedure_id.map(|id| id.to_string()), + }) } } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 4144bc30605d..598edf2ca765 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -113,7 +113,7 @@ impl heartbeat_server::Heartbeat for MetaSrv { ); if let Some(key) = pusher_key { - let _ = handler_group.unregister(&key).await; + let _ = handler_group.deregister(&key).await; } });