Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: keep region failover state not changed upon failure #2261

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 94 additions & 27 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl RegionFailoverManager {
#[derive(Serialize, Deserialize, Debug)]
struct Node {
failed_region: RegionIdent,
state: Option<Box<dyn State>>,
state: Box<dyn State>,
}

/// The "Context" of region failover procedure state machine.
Expand All @@ -233,7 +233,7 @@ pub struct RegionFailoverContext {
#[typetag::serde(tag = "region_failover_state")]
trait State: Sync + Send + Debug {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>>;
Expand Down Expand Up @@ -304,7 +304,7 @@ impl RegionFailoverProcedure {
let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Some(Box::new(state)),
state: Box::new(state),
};
Self { node, context }
}
Expand All @@ -322,25 +322,18 @@ impl Procedure for RegionFailoverProcedure {
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
if let Some(state) = self.node.state.take() {
let next_state = state
.next(&self.context, &self.node.failed_region)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
self.node.state = Some(next_state);
}
Ok(self
.node
.state
.as_ref()
.map(|s| s.status())
.unwrap_or(Status::Done))
let state = &mut self.node.state;
*state = state
.next(&self.context, &self.node.failed_region)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}

fn dump(&self) -> ProcedureResult<String> {
Expand All @@ -362,6 +355,7 @@ impl Procedure for RegionFailoverProcedure {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Mutex;

use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader};
Expand All @@ -370,7 +364,8 @@ mod tests {
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::TableMetadataManager;
use common_meta::DatanodeId;
use common_procedure::BoxedProcedure;
use common_procedure::{BoxedProcedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use rand::prelude::SliceRandom;
use tokio::sync::mpsc::Receiver;

Expand Down Expand Up @@ -452,6 +447,11 @@ mod tests {
Self { selector: None }
}

fn with_selector(mut self, selector: SelectorRef) -> Self {
self.selector = Some(selector);
self
}

pub async fn build(self) -> TestingEnv {
let in_memory = Arc::new(MemStore::new());
let kv_store: KvStoreRef = Arc::new(MemStore::new());
Expand Down Expand Up @@ -531,8 +531,6 @@ mod tests {

#[tokio::test]
async fn test_region_failover_procedure() {
common_telemetry::init_default_ut_logging();

let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;

Expand Down Expand Up @@ -662,7 +660,7 @@ mod tests {
let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Some(Box::new(state)),
state: Box::new(state),
};
let procedure = RegionFailoverProcedure {
node,
Expand All @@ -677,7 +675,76 @@ mod tests {
let n: Node = serde_json::from_str(&s).unwrap();
assert_eq!(
format!("{n:?}"),
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: Some(RegionFailoverStart { failover_candidate: None }) }"#
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: RegionFailoverStart { failover_candidate: None } }"#
);
}

#[tokio::test]
async fn test_state_not_changed_upon_failure() {
struct MySelector {
peers: Arc<Mutex<Vec<Option<Peer>>>>,
}

#[async_trait]
impl Selector for MySelector {
type Context = SelectorContext;
type Output = Vec<Peer>;

async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result<Self::Output> {
let mut peers = self.peers.lock().unwrap();
Ok(if let Some(Some(peer)) = peers.pop() {
vec![peer]
} else {
vec![]
})
}
}

// Returns a valid peer the second time called "select".
let selector = MySelector {
peers: Arc::new(Mutex::new(vec![
Some(Peer {
id: 42,
addr: "".to_string(),
}),
None,
])),
};

let env = TestingEnvBuilder::new()
.with_selector(Arc::new(selector))
.build()
.await;
let failed_region = env.failed_region(1).await;

let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Box::new(state),
};
let mut procedure = RegionFailoverProcedure {
node,
context: env.context,
};

let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};

let result = procedure.execute(&ctx).await;
assert!(result.is_err());
assert!(result.unwrap_err().is_retry_later());
assert_eq!(
r#"{"region_failover_state":"RegionFailoverStart","failover_candidate":null}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);

let result = procedure.execute(&ctx).await;
assert!(matches!(result, Ok(Status::Executing { persist: true })));
assert_eq!(
r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""},"region_lease_expiry_seconds":40}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);
}
}
6 changes: 3 additions & 3 deletions src/meta-srv/src/procedure/region_failover/activate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ActivateRegion {
}

async fn handle_response(
self,
&self,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand All @@ -102,7 +102,7 @@ impl ActivateRegion {
.fail();
};
if result {
Ok(Box::new(UpdateRegionMetadata::new(self.candidate)))
Ok(Box::new(UpdateRegionMetadata::new(self.candidate.clone())))
} else {
// The region could be just indeed cannot be opened by the candidate, retry
// would be in vain. Then why not just end the failover procedure? Because we
Expand Down Expand Up @@ -131,7 +131,7 @@ impl ActivateRegion {
#[typetag::serde]
impl State for ActivateRegion {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand Down
10 changes: 5 additions & 5 deletions src/meta-srv/src/procedure/region_failover/deactivate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl DeactivateRegion {
}

async fn handle_response(
self,
&self,
ctx: &RegionFailoverContext,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
Expand All @@ -98,7 +98,7 @@ impl DeactivateRegion {
.deregister_inactive_region(failed_region)
.await?;

Ok(Box::new(ActivateRegion::new(self.candidate)))
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
} else {
// Under rare circumstances would a Datanode fail to close a Region.
// So simply retry.
Expand All @@ -114,7 +114,7 @@ impl DeactivateRegion {
// resides might be unreachable. So we wait for the region lease to expire. The
// region would be closed by its own [RegionAliveKeeper].
self.wait_for_region_lease_expiry().await;
Ok(Box::new(ActivateRegion::new(self.candidate)))
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
}
Err(e) => Err(e),
}
Expand All @@ -132,7 +132,7 @@ impl DeactivateRegion {
#[typetag::serde]
impl State for DeactivateRegion {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand All @@ -144,7 +144,7 @@ impl State for DeactivateRegion {
Err(Error::PusherNotFound { .. }) => {
// See the mailbox received timeout situation comments above.
self.wait_for_region_lease_expiry().await;
return Ok(Box::new(ActivateRegion::new(self.candidate)));
return Ok(Box::new(ActivateRegion::new(self.candidate.clone())));
}
Err(e) => return Err(e),
};
Expand Down
8 changes: 2 additions & 6 deletions src/meta-srv/src/procedure/region_failover/failover_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ pub(super) struct RegionFailoverEnd;
#[async_trait]
#[typetag::serde]
impl State for RegionFailoverEnd {
async fn next(
mut self: Box<Self>,
_: &RegionFailoverContext,
_: &RegionIdent,
) -> Result<Box<dyn State>> {
Ok(self)
async fn next(&mut self, _: &RegionFailoverContext, _: &RegionIdent) -> Result<Box<dyn State>> {
Ok(Box::new(RegionFailoverEnd))
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}

fn status(&self) -> Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl RegionFailoverStart {
#[typetag::serde]
impl State for RegionFailoverStart {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl InvalidateCache {
#[typetag::serde]
impl State for InvalidateCache {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand Down Expand Up @@ -108,12 +108,11 @@ mod tests {
let _ = heartbeat_receivers.insert(frontend_id, rx);
}

let state = InvalidateCache;
let table_ident: TableIdent = failed_region.clone().into();

// lexicographical order
// frontend-4,5,6,7
let next_state = Box::new(state)
let next_state = InvalidateCache
.next(&context, &failed_region)
.await
.unwrap();
Expand Down
9 changes: 3 additions & 6 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn pretty_log_table_route_change(
#[typetag::serde]
impl State for UpdateRegionMetadata {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand Down Expand Up @@ -165,12 +165,9 @@ mod tests {
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;

let state = UpdateRegionMetadata::new(Peer::new(2, ""));
let mut state = UpdateRegionMetadata::new(Peer::new(2, ""));

let next_state = Box::new(state)
.next(&env.context, &failed_region)
.await
.unwrap();
let next_state = state.next(&env.context, &failed_region).await.unwrap();
assert_eq!(format!("{next_state:?}"), "InvalidateCache");
}

Expand Down
Loading