diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 77197113e5a1..08e4f9ad675d 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -214,7 +214,7 @@ impl RegionFailoverManager { #[derive(Serialize, Deserialize, Debug)] struct Node { failed_region: RegionIdent, - state: Option>, + state: Box, } /// The "Context" of region failover procedure state machine. @@ -233,7 +233,7 @@ pub struct RegionFailoverContext { #[typetag::serde(tag = "region_failover_state")] trait State: Sync + Send + Debug { async fn next( - mut self: Box, + &mut self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result>; @@ -304,7 +304,7 @@ impl RegionFailoverProcedure { let state = RegionFailoverStart::new(); let node = Node { failed_region, - state: Some(Box::new(state)), + state: Box::new(state), }; Self { node, context } } @@ -322,25 +322,18 @@ impl Procedure for RegionFailoverProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - if let Some(state) = self.node.state.take() { - let next_state = state - .next(&self.context, &self.node.failed_region) - .await - .map_err(|e| { - if matches!(e, Error::RetryLater { .. }) { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - })?; - self.node.state = Some(next_state); - } - Ok(self - .node - .state - .as_ref() - .map(|s| s.status()) - .unwrap_or(Status::Done)) + let state = &mut self.node.state; + *state = state + .next(&self.context, &self.node.failed_region) + .await + .map_err(|e| { + if matches!(e, Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + })?; + Ok(state.status()) } fn dump(&self) -> ProcedureResult { @@ -362,6 +355,7 @@ impl Procedure for RegionFailoverProcedure { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Mutex; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader}; @@ -370,7 +364,8 @@ mod tests { use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::TableMetadataManager; use common_meta::DatanodeId; - use common_procedure::BoxedProcedure; + use common_procedure::{BoxedProcedure, ProcedureId}; + use common_procedure_test::MockContextProvider; use rand::prelude::SliceRandom; use tokio::sync::mpsc::Receiver; @@ -452,6 +447,11 @@ mod tests { Self { selector: None } } + fn with_selector(mut self, selector: SelectorRef) -> Self { + self.selector = Some(selector); + self + } + pub async fn build(self) -> TestingEnv { let in_memory = Arc::new(MemStore::new()); let kv_store: KvStoreRef = Arc::new(MemStore::new()); @@ -531,8 +531,6 @@ mod tests { #[tokio::test] async fn test_region_failover_procedure() { - common_telemetry::init_default_ut_logging(); - let mut env = TestingEnvBuilder::new().build().await; let failed_region = env.failed_region(1).await; @@ -662,7 +660,7 @@ mod tests { let state = RegionFailoverStart::new(); let node = Node { failed_region, - state: Some(Box::new(state)), + state: Box::new(state), }; let procedure = RegionFailoverProcedure { node, @@ -677,7 +675,76 @@ mod tests { let n: Node = serde_json::from_str(&s).unwrap(); assert_eq!( format!("{n:?}"), - r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: Some(RegionFailoverStart { failover_candidate: None }) }"# + r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: RegionFailoverStart { failover_candidate: None } }"# + ); + } + + #[tokio::test] + async fn test_state_not_changed_upon_failure() { + struct MySelector { + peers: Arc>>>, + } + + #[async_trait] + impl Selector for MySelector { + type Context = SelectorContext; + type Output = Vec; + + async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result { + let mut peers = self.peers.lock().unwrap(); + Ok(if let Some(Some(peer)) = peers.pop() { + vec![peer] + } else { + vec![] + }) + } + } + + // Returns a valid peer the second time called "select". + let selector = MySelector { + peers: Arc::new(Mutex::new(vec![ + Some(Peer { + id: 42, + addr: "".to_string(), + }), + None, + ])), + }; + + let env = TestingEnvBuilder::new() + .with_selector(Arc::new(selector)) + .build() + .await; + let failed_region = env.failed_region(1).await; + + let state = RegionFailoverStart::new(); + let node = Node { + failed_region, + state: Box::new(state), + }; + let mut procedure = RegionFailoverProcedure { + node, + context: env.context, + }; + + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + + let result = procedure.execute(&ctx).await; + assert!(result.is_err()); + assert!(result.unwrap_err().is_retry_later()); + assert_eq!( + r#"{"region_failover_state":"RegionFailoverStart","failover_candidate":null}"#, + serde_json::to_string(&procedure.node.state).unwrap() + ); + + let result = procedure.execute(&ctx).await; + assert!(matches!(result, Ok(Status::Executing { persist: true }))); + assert_eq!( + r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""},"region_lease_expiry_seconds":40}"#, + serde_json::to_string(&procedure.node.state).unwrap() ); } } diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index b7b4dc7bc064..f250b452e4db 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -85,7 +85,7 @@ impl ActivateRegion { } async fn handle_response( - self, + &self, mailbox_receiver: MailboxReceiver, failed_region: &RegionIdent, ) -> Result> { @@ -102,7 +102,7 @@ impl ActivateRegion { .fail(); }; if result { - Ok(Box::new(UpdateRegionMetadata::new(self.candidate))) + Ok(Box::new(UpdateRegionMetadata::new(self.candidate.clone()))) } else { // The region could be just indeed cannot be opened by the candidate, retry // would be in vain. Then why not just end the failover procedure? Because we @@ -131,7 +131,7 @@ impl ActivateRegion { #[typetag::serde] impl State for ActivateRegion { async fn next( - mut self: Box, + &mut self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index 9f09de0da409..9d8ec5839641 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -76,7 +76,7 @@ impl DeactivateRegion { } async fn handle_response( - self, + &self, ctx: &RegionFailoverContext, mailbox_receiver: MailboxReceiver, failed_region: &RegionIdent, @@ -98,7 +98,7 @@ impl DeactivateRegion { .deregister_inactive_region(failed_region) .await?; - Ok(Box::new(ActivateRegion::new(self.candidate))) + Ok(Box::new(ActivateRegion::new(self.candidate.clone()))) } else { // Under rare circumstances would a Datanode fail to close a Region. // So simply retry. @@ -114,7 +114,7 @@ impl DeactivateRegion { // resides might be unreachable. So we wait for the region lease to expire. The // region would be closed by its own [RegionAliveKeeper]. self.wait_for_region_lease_expiry().await; - Ok(Box::new(ActivateRegion::new(self.candidate))) + Ok(Box::new(ActivateRegion::new(self.candidate.clone()))) } Err(e) => Err(e), } @@ -132,7 +132,7 @@ impl DeactivateRegion { #[typetag::serde] impl State for DeactivateRegion { async fn next( - mut self: Box, + &mut self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { @@ -144,7 +144,7 @@ impl State for DeactivateRegion { Err(Error::PusherNotFound { .. }) => { // See the mailbox received timeout situation comments above. self.wait_for_region_lease_expiry().await; - return Ok(Box::new(ActivateRegion::new(self.candidate))); + return Ok(Box::new(ActivateRegion::new(self.candidate.clone()))); } Err(e) => return Err(e), }; diff --git a/src/meta-srv/src/procedure/region_failover/failover_end.rs b/src/meta-srv/src/procedure/region_failover/failover_end.rs index 221d6b0c7872..fa299ee46422 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_end.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_end.rs @@ -26,12 +26,8 @@ pub(super) struct RegionFailoverEnd; #[async_trait] #[typetag::serde] impl State for RegionFailoverEnd { - async fn next( - mut self: Box, - _: &RegionFailoverContext, - _: &RegionIdent, - ) -> Result> { - Ok(self) + async fn next(&mut self, _: &RegionFailoverContext, _: &RegionIdent) -> Result> { + Ok(Box::new(RegionFailoverEnd)) } fn status(&self) -> Status { diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index 20f33c26b243..52d55bbe9272 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -91,7 +91,7 @@ impl RegionFailoverStart { #[typetag::serde] impl State for RegionFailoverStart { async fn next( - mut self: Box, + &mut self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { diff --git a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs index 0d46e206bf25..52f47219a43a 100644 --- a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs +++ b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs @@ -58,7 +58,7 @@ impl InvalidateCache { #[typetag::serde] impl State for InvalidateCache { async fn next( - mut self: Box, + &mut self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { @@ -108,12 +108,11 @@ mod tests { let _ = heartbeat_receivers.insert(frontend_id, rx); } - let state = InvalidateCache; let table_ident: TableIdent = failed_region.clone().into(); // lexicographical order // frontend-4,5,6,7 - let next_state = Box::new(state) + let next_state = InvalidateCache .next(&context, &failed_region) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 6797f388ddb2..28922b6a7b4d 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -131,7 +131,7 @@ fn pretty_log_table_route_change( #[typetag::serde] impl State for UpdateRegionMetadata { async fn next( - mut self: Box, + &mut self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { @@ -165,12 +165,9 @@ mod tests { let env = TestingEnvBuilder::new().build().await; let failed_region = env.failed_region(1).await; - let state = UpdateRegionMetadata::new(Peer::new(2, "")); + let mut state = UpdateRegionMetadata::new(Peer::new(2, "")); - let next_state = Box::new(state) - .next(&env.context, &failed_region) - .await - .unwrap(); + let next_state = state.next(&env.context, &failed_region).await.unwrap(); assert_eq!(format!("{next_state:?}"), "InvalidateCache"); }