Skip to content

Commit

Permalink
feat: close downgraded leader region actively
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 17, 2024
1 parent 971eaa1 commit 09f46ac
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 10 deletions.
13 changes: 13 additions & 0 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod close_downgraded_region;
pub(crate) mod downgrade_leader_region;
pub(crate) mod manager;
pub(crate) mod migration_abort;
Expand Down Expand Up @@ -775,6 +776,12 @@ mod tests {
Step::next(
"Should be the region migration end",
None,
Assertion::simple(assert_close_downgraded_region, assert_no_persist),
),
// CloseDowngradedRegion
Step::next(
"Should be the close downgraded region",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
// RegionMigrationEnd
Expand Down Expand Up @@ -1145,6 +1152,12 @@ mod tests {
Step::next(
"Should be the region migration end",
None,
Assertion::simple(assert_close_downgraded_region, assert_no_persist),
),
// CloseDowngradedRegion
Step::next(
"Should be the close downgraded region",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
// RegionMigrationEnd
Expand Down
138 changes: 138 additions & 0 deletions src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::time::Duration;

use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;

use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);

#[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion;

#[async_trait::async_trait]
#[typetag::serde]
impl State for CloseDowngradedRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
if let Err(err) = self.close_downgraded_leader_region(ctx).await {
let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
let region_id = ctx.region_id();
warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode);
}

Ok((Box::new(RegionMigrationEnd), Status::done()))
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl CloseDowngradedRegion {
/// Builds close region instruction.
///
/// Abort(non-retry):
/// - Datanode Table is not found.
async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode_id = pc.from_peer.id;
let cluster_id = pc.cluster_id;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;

let RegionInfo { engine, .. } = datanode_table_value.region_info.clone();

Ok(Instruction::CloseRegion(RegionIdent {
cluster_id,
datanode_id: downgrade_leader_datanode_id,
table_id,
region_number,
engine,
}))
}

/// Closes the downgraded leader region.
async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
let close_instruction = self.build_close_region_instruction(ctx).await?;
let region_id = ctx.region_id();
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode = &pc.from_peer;
let msg = MailboxMessage::json_message(
&format!("Close downgraded region: {}", region_id),
&format!("Meta@{}", ctx.server_addr()),
&format!(
"Datanode-{}@{}",
downgrade_leader_datanode.id, downgrade_leader_datanode.addr
),
common_time::util::current_time_millis(),
&close_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: close_instruction.to_string(),
})?;

let ch = Channel::Datanode(downgrade_leader_datanode.id);
let receiver = ctx
.mailbox
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
.await?;

match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received close downgraded leade region reply: {:?}, region: {}",
reply, region_id
);
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect close region reply",
}
.fail();
};

if result {
Ok(())
} else {
error::UnexpectedSnafu {
violated: format!(
"Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}",
downgrade_leader_datanode,
),
}
.fail()
}
}

Err(e) => Err(e),
}
}
}
20 changes: 15 additions & 5 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@ use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use tokio::sync::mpsc::{Receiver, Sender};

use super::manager::RegionMigrationProcedureTracker;
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::PersistentContext;
use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
use crate::procedure::region_migration::{
Context, ContextFactory, DefaultContextFactory, PersistentContext, State, VolatileContext,
};
use crate::service::mailbox::{Channel, MailboxRef};

pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;
Expand Down Expand Up @@ -569,6 +571,14 @@ pub(crate) fn assert_region_migration_end(next: &dyn State) {
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}

/// Asserts the [State] should be [CloseDowngradedRegion].
pub(crate) fn assert_close_downgraded_region(next: &dyn State) {
let _ = next
.as_any()
.downcast_ref::<CloseDowngradedRegion>()
.unwrap();
}

/// Asserts the [State] should be [RegionMigrationAbort].
pub(crate) fn assert_region_migration_abort(next: &dyn State) {
let _ = next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use common_telemetry::warn;
use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -58,7 +58,7 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
};
Ok((Box::new(RegionMigrationEnd), Status::done()))
Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ mod tests {
use store_api::storage::RegionId;

use crate::error::Error;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
Expand Down Expand Up @@ -443,7 +443,7 @@ mod tests {
}

#[tokio::test]
async fn test_next_migration_end_state() {
async fn test_next_close_downgraded_region_state() {
let mut state = Box::new(UpdateMetadata::Upgrade);
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
Expand Down Expand Up @@ -471,7 +471,10 @@ mod tests {

let (next, _) = state.next(&mut ctx).await.unwrap();

let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
let _ = next
.as_any()
.downcast_ref::<CloseDowngradedRegion>()
.unwrap();

let table_route = table_metadata_manager
.table_route_manager()
Expand Down

0 comments on commit 09f46ac

Please sign in to comment.