Skip to content

Commit

Permalink
feat: implement naive peer registry
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 28, 2023
1 parent 6d52d3c commit b6f7d23
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 17 deletions.
6 changes: 5 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
PeerUnavailable { location: Location, peer_id: u64 },

#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
location: Location,
Expand Down Expand Up @@ -650,7 +653,8 @@ impl ErrorExt for Error {
| Error::Join { .. }
| Error::WeightArray { .. }
| Error::NotSetWeightArray { .. }
| Error::Unsupported { .. } => StatusCode::Internal,
| Error::Unsupported { .. }
| Error::PeerUnavailable { .. } => StatusCode::Internal,
Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod metasrv;
mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod peer;
pub mod procedure;
pub mod pubsub;
pub mod region;
Expand Down
6 changes: 6 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::error::{
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::peer::NaivePeerRegistry;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
Expand Down Expand Up @@ -251,6 +252,7 @@ pub struct MetaSrv {
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
datanode_peer_registry: NaivePeerRegistry,

plugins: Plugins,
}
Expand Down Expand Up @@ -417,6 +419,10 @@ impl MetaSrv {
&self.region_migration_manager
}

pub fn datanode_peer_registry(&self) -> &NaivePeerRegistry {
&self.datanode_peer_registry
}

pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::lock::DistLockRef;
use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::peer::NaivePeerRegistry;
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::ContextFactoryImpl;
Expand Down Expand Up @@ -337,6 +338,7 @@ impl MetaSrvBuilder {
plugins: plugins.unwrap_or_else(Plugins::default),
memory_region_keeper: opening_region_keeper,
region_migration_manager,
datanode_peer_registry: NaivePeerRegistry::default(),
})
}
}
Expand Down
81 changes: 81 additions & 0 deletions src/meta-srv/src/peer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use common_meta::peer::Peer;
use common_meta::ClusterId;
use common_telemetry::warn;

/// Used to look up specific peer by `peer_id`.
pub trait PeerLookup: Send + Sync {
/// Returns `None` stands for target [Peer] is unavailable temporally
fn peer(&self, cluster_id: ClusterId, peer_id: u64) -> Option<Peer>;
}

pub type PeerIdentifier = (ClusterId, u64);

#[derive(Debug, Default, Clone)]
pub struct NaivePeerRegistry(Arc<RwLock<HashMap<PeerIdentifier, Peer>>>);

impl NaivePeerRegistry {
/// Registers a [Peer].
pub fn register(&self, cluster_id: ClusterId, peer: Peer) {
let mut inner = self.0.write().unwrap();
if let Some(previous) = inner.insert((cluster_id, peer.id), peer) {
warn!(
"Registered a peer, it overwritten the previous peer: {:?}",
previous
)
}
}

/// Deregisters a [Peer].
pub fn deregister(&self, cluster_id: ClusterId, peer_id: u64) {
let mut inner = self.0.write().unwrap();
if inner.remove(&(cluster_id, peer_id)).is_none() {
warn!(
"Trying to deregister a non-exist peer, peer_id: {}",
peer_id
);
}
}
}

impl PeerLookup for NaivePeerRegistry {
fn peer(&self, cluster_id: ClusterId, peer_id: u64) -> Option<Peer> {
let inner = self.0.read().unwrap();
inner.get(&(cluster_id, peer_id)).cloned()
}
}

#[cfg(test)]
mod tests {
use common_meta::peer::Peer;

use super::{NaivePeerRegistry, PeerLookup};

#[test]
fn test_naive_peer_registry() {
let lookup = NaivePeerRegistry::default();
lookup.register(0, Peer::empty(1024));

assert!(lookup.peer(0, 1024).is_some());
assert!(lookup.peer(0, 1025).is_none());

lookup.deregister(0, 1024);
assert!(lookup.peer(0, 1024).is_none());
}
}
19 changes: 11 additions & 8 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::ClusterId;
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -57,10 +57,10 @@ impl Drop for RegionMigrationProcedureGuard {

#[derive(Debug, Clone)]
pub(crate) struct RegionMigrationProcedureTask {
cluster_id: ClusterId,
region_id: RegionId,
from_peer: Peer,
to_peer: Peer,
pub(crate) cluster_id: ClusterId,
pub(crate) region_id: RegionId,
pub(crate) from_peer: Peer,
pub(crate) to_peer: Peer,
}

impl Display for RegionMigrationProcedureTask {
Expand Down Expand Up @@ -223,7 +223,10 @@ impl RegionMigrationManager {
}

/// Submits a new region migration procedure.
pub(crate) async fn submit_procedure(&self, task: RegionMigrationProcedureTask) -> Result<()> {
pub(crate) async fn submit_procedure(
&self,
task: RegionMigrationProcedureTask,
) -> Result<Option<ProcedureId>> {
let Some(guard) = self.insert_running_procedure(&task) else {
return error::MigrationRunningSnafu {
region_id: task.region_id,
Expand All @@ -245,7 +248,7 @@ impl RegionMigrationManager {

if self.has_migrated(&region_route, &task)? {
info!("Skipping region migration task: {task}");
return Ok(());
return Ok(None);
}

self.verify_region_leader_peer(&region_route, &task)?;
Expand Down Expand Up @@ -276,7 +279,7 @@ impl RegionMigrationManager {
info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
});

Ok(())
Ok(Some(procedure_id))
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {

let handler = region_migration::SubmitRegionMigrationTaskHandler {
region_migration_manager: meta_srv.region_migration_manager().clone(),
peer_lookup: Arc::new(meta_srv.datanode_peer_registry().clone()),
};
let router = router.route("/region-migration", handler);

Expand Down
50 changes: 43 additions & 7 deletions src/meta-srv/src/service/admin/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
use std::collections::HashMap;
use std::num::ParseIntError;
use std::str::FromStr;
use std::sync::Arc;

use common_meta::peer::Peer;
use common_meta::ClusterId;
use serde::Serialize;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tonic::codegen::http;

use super::HttpHandler;
use crate::error::{self, Error, Result};
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::peer::PeerLookup;
use crate::procedure::region_migration::manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask,
};

/// The handler of submitting migration task.
pub struct SubmitRegionMigrationTaskHandler {
pub region_migration_manager: RegionMigrationManagerRef,
pub peer_lookup: Arc<dyn PeerLookup>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -42,7 +46,8 @@ struct SubmitRegionMigrationTaskRequest {

#[derive(Debug, Serialize)]
struct SubmitRegionMigrationTaskResponse {
procedure_id: String,
/// The `None` stands region has been migrated.
procedure_id: Option<String>,
}

fn parse_num_parameter_with_default<T, F>(
Expand Down Expand Up @@ -96,10 +101,41 @@ impl SubmitRegionMigrationTaskHandler {
/// Submits a region migration task, returns the procedure id.
async fn handle_submit(
&self,
_task: SubmitRegionMigrationTaskRequest,
task: SubmitRegionMigrationTaskRequest,
) -> Result<SubmitRegionMigrationTaskResponse> {
// TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014
todo!()
let SubmitRegionMigrationTaskRequest {
cluster_id,
region_id,
from_peer_id,
to_peer_id,
} = task;

let from_peer = self.peer_lookup.peer(cluster_id, from_peer_id).context(
error::PeerUnavailableSnafu {
peer_id: from_peer_id,
},
)?;

let to_peer =
self.peer_lookup
.peer(cluster_id, to_peer_id)
.context(error::PeerUnavailableSnafu {
peer_id: to_peer_id,
})?;

let procedure_id = self
.region_migration_manager
.submit_procedure(RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer,
to_peer,
})
.await?;

Ok(SubmitRegionMigrationTaskResponse {
procedure_id: procedure_id.map(|id| id.to_string()),
})
}
}

Expand Down
15 changes: 14 additions & 1 deletion src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ impl heartbeat_server::Heartbeat for MetaSrv {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handler_group = self.handler_group().clone();
let datanode_peer_registry = self.datanode_peer_registry().clone();
let ctx = self.new_ctx();
let _handle = common_runtime::spawn_bg(async move {
let mut pusher_key = None;
let mut datanode_peer_ident = None;
while let Some(msg) = in_stream.next().await {
let mut is_not_leader = false;
match msg {
Expand All @@ -65,7 +67,15 @@ impl heartbeat_server::Heartbeat for MetaSrv {

if pusher_key.is_none() {
let node_id = get_node_id(header);
let role = header.role() as i32;
let role = header.role();
if let Some(peer) = req.peer.as_ref() {
if matches!(role, Role::Datanode) {
datanode_peer_registry
.register(header.cluster_id, peer.clone().into());
datanode_peer_ident = Some((header.cluster_id, peer.id));
}
}
let role = role as i32;
let key = format!("{}-{}", role, node_id);
let pusher = Pusher::new(tx.clone(), header);
handler_group.register(&key, pusher).await;
Expand Down Expand Up @@ -115,6 +125,9 @@ impl heartbeat_server::Heartbeat for MetaSrv {
if let Some(key) = pusher_key {
let _ = handler_group.unregister(&key).await;
}
if let Some((cluster_id, peer_id)) = datanode_peer_ident {
datanode_peer_registry.deregister(cluster_id, peer_id);
}
});

let out_stream = ReceiverStream::new(rx);
Expand Down

0 comments on commit b6f7d23

Please sign in to comment.