Skip to content

Commit

Permalink
feat: implement region migration manager
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 27, 2023
1 parent 718447c commit 72d6fa3
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 2 deletions.
9 changes: 8 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Another migration procedure is running for region: {}", region_id,))]
MigrationRunning {
location: Location,
region_id: RegionId,
},

#[snafu(display("The region migration procedure aborted, reason: {}", reason))]
MigrationAbort { location: Location, reason: String },

Expand Down Expand Up @@ -675,7 +681,8 @@ impl ErrorExt for Error {
| Error::TableIdChanged { .. }
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. } => StatusCode::Unexpected,
| Error::MigrationAbort { .. }
| Error::MigrationRunning { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// limitations under the License.

pub(crate) mod downgrade_leader_region;
// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
pub(crate) mod migration_start;
Expand Down Expand Up @@ -123,6 +126,7 @@ pub trait ContextFactory {
}

/// Default implementation.
#[derive(Clone)]
pub struct ContextFactoryImpl {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
Expand Down
207 changes: 207 additions & 0 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::{Arc, RwLock};

use common_meta::peer::Peer;
use common_meta::ClusterId;
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::storage::RegionId;

use crate::error::{self, Result};
use crate::procedure::region_migration::{
ContextFactoryImpl, PersistentContext, RegionMigrationProcedure,
};

/// Manager of region migration procedure.
pub(crate) struct RegionMigrationManager {
procedure_manager: ProcedureManagerRef,
running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,

context_factory: ContextFactoryImpl,
}

/// The guard of running [RegionMigrationProcedureTask].
pub(crate) struct RegionMigrationProcedureGuard {
region_id: RegionId,
running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
}

impl Drop for RegionMigrationProcedureGuard {
fn drop(&mut self) {
self.running_procedures
.write()
.unwrap()
.remove(&self.region_id);
}
}

#[derive(Debug, Clone)]
pub(crate) struct RegionMigrationProcedureTask {
cluster_id: ClusterId,
region_id: RegionId,
from_peer: Peer,
to_peer: Peer,
}

impl Display for RegionMigrationProcedureTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"cluster: {}, region: {}, from_peer: {}, to_peer: {}",
self.cluster_id, self.region_id, self.from_peer, self.to_peer
)
}
}

impl From<RegionMigrationProcedureTask> for PersistentContext {
fn from(
RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer,
to_peer,
}: RegionMigrationProcedureTask,
) -> Self {
PersistentContext {
cluster_id,
from_peer,
to_peer,
region_id,
}
}
}

impl RegionMigrationManager {
/// Returns new [RegionMigrationManager]
pub(crate) fn new(
procedure_manager: ProcedureManagerRef,
context_factory: ContextFactoryImpl,
) -> Self {
Self {
procedure_manager,
running_procedures: Arc::new(RwLock::new(HashMap::new())),
context_factory,
}
}

/// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`.
pub(crate) fn try_start(&self) -> Result<()> {
let context_factory = self.context_factory.clone();
self.procedure_manager
.register_loader(
RegionMigrationProcedure::TYPE_NAME,
Box::new(move |json| {
let context_factory = context_factory.clone();
RegionMigrationProcedure::from_json(json, context_factory)
.map(|p| Box::new(p) as _)
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: RegionMigrationProcedure::TYPE_NAME,
})
}

fn insert_running_procedure(
&self,
task: &RegionMigrationProcedureTask,
) -> Option<RegionMigrationProcedureGuard> {
let mut procedures = self.running_procedures.write().unwrap();

match procedures.entry(task.region_id) {
Entry::Occupied(_) => None,
Entry::Vacant(v) => {
v.insert(task.clone());
Some(RegionMigrationProcedureGuard {
region_id: task.region_id,
running_procedures: self.running_procedures.clone(),
})
}
}
}

/// Submits a new region migration procedure.
pub(crate) fn submit_procedure(&self, task: RegionMigrationProcedureTask) -> Result<()> {
let Some(guard) = self.insert_running_procedure(&task) else {
return error::MigrationRunningSnafu {
region_id: task.region_id,
}
.fail();
};

let procedure =
RegionMigrationProcedure::new(task.clone().into(), self.context_factory.clone());
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region migration procedure {procedure_id} for {task}");

let procedure_manager = self.procedure_manager.clone();

common_runtime::spawn_bg(async move {
let _ = guard;
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
Ok(watcher) => watcher,
Err(e) => {
error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
return;
}
};

if let Err(e) = watcher::wait(watcher).await {
error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
return;
}

info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
});

Ok(())
}
}

#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;

use super::*;
use crate::procedure::region_migration::test_util::TestingEnv;

#[test]
fn test_insert_running_procedure() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
cluster_id: 1,
region_id,
from_peer: Peer::empty(2),
to_peer: Peer::empty(1),
};
// Inserts one
manager
.running_procedures
.write()
.unwrap()
.insert(region_id, task.clone());

let err = manager.submit_procedure(task).unwrap_err();
assert_matches!(err, error::Error::MigrationRunning { .. });
}
}
14 changes: 13 additions & 1 deletion src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::{Sequence, SequenceBuilder};
use common_meta::state_store::KvStateStore;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId, Status};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::{Context as ProcedureContext, ProcedureId, ProcedureManagerRef, Status};
use common_procedure_test::MockContextProvider;
use common_telemetry::debug;
use common_time::util::current_time_millis;
Expand Down Expand Up @@ -90,6 +92,7 @@ pub struct TestingEnv {
mailbox_ctx: MailboxContext,
opening_region_keeper: MemoryRegionKeeperRef,
server_addr: String,
procedure_manager: ProcedureManagerRef,
}

impl TestingEnv {
Expand All @@ -104,11 +107,15 @@ impl TestingEnv {
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());

let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));

Self {
table_metadata_manager,
opening_region_keeper,
mailbox_ctx,
server_addr: "localhost".to_string(),
procedure_manager,
}
}

Expand Down Expand Up @@ -146,6 +153,11 @@ impl TestingEnv {
}
}

/// Returns the [ProcedureManagerRef].
pub fn procedure_manager(&self) -> &ProcedureManagerRef {
&self.procedure_manager
}

// Creates a table metadata with the physical table route.
pub async fn create_physical_table_metadata(
&self,
Expand Down

0 comments on commit 72d6fa3

Please sign in to comment.