Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose region migration http endpoint #3032

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ impl MetaPeerClient {
to_stat_kv_map(kvs)
}

// Get kv information from the leader's in_mem kv store.
pub async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>> {
let mut kvs = self.range(key, vec![], false).await?;
Ok(if kvs.is_empty() {
None
} else {
debug_assert_eq!(kvs.len(), 1);
Some(kvs.remove(0))
})
}

WenyXu marked this conversation as resolved.
Show resolved Hide resolved
// Range kv information from the leader's in_mem kv store
pub async fn range(
&self,
Expand Down Expand Up @@ -228,7 +239,7 @@ impl MetaPeerClient {

// Check if the meta node is a leader node.
// Note: when self.election is None, we also consider the meta node is leader
fn is_leader(&self) -> bool {
pub(crate) fn is_leader(&self) -> bool {
self.election
.as_ref()
.map(|election| election.is_leader())
Expand Down
42 changes: 36 additions & 6 deletions src/meta-srv/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,57 @@

use std::collections::HashMap;

use common_meta::util;
use common_meta::peer::Peer;
use common_meta::{util, ClusterId};
use common_time::util as time_util;

use crate::cluster::MetaPeerClientRef;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};

fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseKey, &LeaseValue) -> bool {
move |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
}
}

pub async fn lookup_alive_datanode_peer(
cluster_id: ClusterId,
datanode_id: u64,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease_secs);
let lease_key = LeaseKey {
cluster_id,
node_id: datanode_id,
};
let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
if lease_filter(&lease_key, &lease_value) {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}

pub async fn alive_datanodes(
cluster_id: u64,
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<LeaseKey, LeaseValue>> {
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
};
let lease_filter = build_lease_filter(lease_secs);

filter_datanodes(cluster_id, meta_peer_client, lease_filter).await
}

pub async fn filter_datanodes<P>(
cluster_id: u64,
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<LeaseKey, LeaseValue>>
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub mod metasrv;
mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod peer;
pub mod procedure;
pub mod pubsub;
pub mod region;
Expand Down
6 changes: 0 additions & 6 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use crate::error::{
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::peer::NaivePeerRegistry;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
Expand Down Expand Up @@ -252,7 +251,6 @@ pub struct MetaSrv {
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
datanode_peer_registry: NaivePeerRegistry,

plugins: Plugins,
}
Expand Down Expand Up @@ -419,10 +417,6 @@ impl MetaSrv {
&self.region_migration_manager
}

pub fn datanode_peer_registry(&self) -> &NaivePeerRegistry {
&self.datanode_peer_registry
}

pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
}
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use crate::lock::DistLockRef;
use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::peer::NaivePeerRegistry;
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
Expand Down Expand Up @@ -338,7 +337,6 @@ impl MetaSrvBuilder {
plugins: plugins.unwrap_or_else(Plugins::default),
memory_region_keeper: opening_region_keeper,
region_migration_manager,
datanode_peer_registry: NaivePeerRegistry::default(),
})
}
}
Expand Down
100 changes: 0 additions & 100 deletions src/meta-srv/src/peer.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {

let handler = region_migration::SubmitRegionMigrationTaskHandler {
region_migration_manager: meta_srv.region_migration_manager().clone(),
peer_lookup: Arc::new(meta_srv.datanode_peer_registry().clone()),
meta_peer_client: meta_srv.meta_peer_client().clone(),
};
let router = router.route("/region-migration", handler);

Expand Down
48 changes: 34 additions & 14 deletions src/meta-srv/src/service/admin/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@
use std::collections::HashMap;
use std::num::ParseIntError;
use std::str::FromStr;
use std::sync::Arc;

use common_meta::ClusterId;
use common_meta::peer::Peer;
use common_meta::{distributed_time_constants, ClusterId};
use serde::Serialize;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use tonic::codegen::http;

use super::HttpHandler;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Error, Result};
use crate::peer::PeerLookup;
use crate::lease::lookup_alive_datanode_peer;
use crate::procedure::region_migration::manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask,
};

/// The handler of submitting migration task.
pub struct SubmitRegionMigrationTaskHandler {
pub region_migration_manager: RegionMigrationManagerRef,
pub peer_lookup: Arc<dyn PeerLookup>,
pub meta_peer_client: MetaPeerClientRef,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -98,31 +99,50 @@ impl TryFrom<&HashMap<String, String>> for SubmitRegionMigrationTaskRequest {
}

impl SubmitRegionMigrationTaskHandler {
fn is_leader(&self) -> bool {
self.meta_peer_client.is_leader()
}

/// Checks the peer is available.
async fn lookup_peer(&self, cluster_id: ClusterId, peer_id: u64) -> Result<Option<Peer>> {
lookup_alive_datanode_peer(
cluster_id,
peer_id,
&self.meta_peer_client,
distributed_time_constants::DATANODE_LEASE_SECS,
)
.await
}

/// Submits a region migration task, returns the procedure id.
async fn handle_submit(
&self,
task: SubmitRegionMigrationTaskRequest,
) -> Result<SubmitRegionMigrationTaskResponse> {
ensure!(
self.is_leader(),
error::UnexpectedSnafu {
violated: "Trying to submit a region migration procedure to non-leader meta server"
}
);

let SubmitRegionMigrationTaskRequest {
cluster_id,
region_id,
from_peer_id,
to_peer_id,
} = task;

let from_peer = self.peer_lookup.peer(cluster_id, from_peer_id).context(
let from_peer = self.lookup_peer(cluster_id, from_peer_id).await?.context(
error::PeerUnavailableSnafu {
peer_id: from_peer_id,
},
)?;

let to_peer =
self.peer_lookup
.peer(cluster_id, to_peer_id)
.context(error::PeerUnavailableSnafu {
peer_id: to_peer_id,
})?;

let to_peer = self.lookup_peer(cluster_id, to_peer_id).await?.context(
error::PeerUnavailableSnafu {
peer_id: to_peer_id,
},
)?;
let procedure_id = self
.region_migration_manager
.submit_procedure(RegionMigrationProcedureTask {
Expand Down
15 changes: 1 addition & 14 deletions src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ impl heartbeat_server::Heartbeat for MetaSrv {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handler_group = self.handler_group().clone();
let datanode_peer_registry = self.datanode_peer_registry().clone();
let ctx = self.new_ctx();
let _handle = common_runtime::spawn_bg(async move {
let mut pusher_key = None;
let mut datanode_peer_ident = None;
while let Some(msg) = in_stream.next().await {
let mut is_not_leader = false;
match msg {
Expand All @@ -67,15 +65,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {

if pusher_key.is_none() {
let node_id = get_node_id(header);
let role = header.role();
if let Some(peer) = req.peer.as_ref() {
if matches!(role, Role::Datanode) {
datanode_peer_registry
.register(header.cluster_id, peer.clone().into());
datanode_peer_ident = Some((header.cluster_id, peer.id));
}
}
let role = role as i32;
let role = header.role() as i32;
let key = format!("{}-{}", role, node_id);
let pusher = Pusher::new(tx.clone(), header);
handler_group.register(&key, pusher).await;
Expand Down Expand Up @@ -125,9 +115,6 @@ impl heartbeat_server::Heartbeat for MetaSrv {
if let Some(key) = pusher_key {
let _ = handler_group.deregister(&key).await;
}
if let Some((cluster_id, peer_id)) = datanode_peer_ident {
datanode_peer_registry.deregister(cluster_id, peer_id);
}
});

let out_stream = ReceiverStream::new(rx);
Expand Down
Loading