Skip to content

Commit

Permalink
fix: keep region failover state not changed upon failure (GreptimeTea…
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield authored and paomian committed Oct 19, 2023
1 parent 4653660 commit 35dc787
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 51 deletions.
121 changes: 94 additions & 27 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl RegionFailoverManager {
#[derive(Serialize, Deserialize, Debug)]
struct Node {
failed_region: RegionIdent,
state: Option<Box<dyn State>>,
state: Box<dyn State>,
}

/// The "Context" of region failover procedure state machine.
Expand All @@ -233,7 +233,7 @@ pub struct RegionFailoverContext {
#[typetag::serde(tag = "region_failover_state")]
trait State: Sync + Send + Debug {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>>;
Expand Down Expand Up @@ -304,7 +304,7 @@ impl RegionFailoverProcedure {
let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Some(Box::new(state)),
state: Box::new(state),
};
Self { node, context }
}
Expand All @@ -322,25 +322,18 @@ impl Procedure for RegionFailoverProcedure {
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
if let Some(state) = self.node.state.take() {
let next_state = state
.next(&self.context, &self.node.failed_region)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
self.node.state = Some(next_state);
}
Ok(self
.node
.state
.as_ref()
.map(|s| s.status())
.unwrap_or(Status::Done))
let state = &mut self.node.state;
*state = state
.next(&self.context, &self.node.failed_region)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}

fn dump(&self) -> ProcedureResult<String> {
Expand All @@ -362,6 +355,7 @@ impl Procedure for RegionFailoverProcedure {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Mutex;

use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader};
Expand All @@ -370,7 +364,8 @@ mod tests {
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::TableMetadataManager;
use common_meta::DatanodeId;
use common_procedure::BoxedProcedure;
use common_procedure::{BoxedProcedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use rand::prelude::SliceRandom;
use tokio::sync::mpsc::Receiver;

Expand Down Expand Up @@ -452,6 +447,11 @@ mod tests {
Self { selector: None }
}

fn with_selector(mut self, selector: SelectorRef) -> Self {
self.selector = Some(selector);
self
}

pub async fn build(self) -> TestingEnv {
let in_memory = Arc::new(MemStore::new());
let kv_store: KvStoreRef = Arc::new(MemStore::new());
Expand Down Expand Up @@ -531,8 +531,6 @@ mod tests {

#[tokio::test]
async fn test_region_failover_procedure() {
common_telemetry::init_default_ut_logging();

let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;

Expand Down Expand Up @@ -662,7 +660,7 @@ mod tests {
let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Some(Box::new(state)),
state: Box::new(state),
};
let procedure = RegionFailoverProcedure {
node,
Expand All @@ -677,7 +675,76 @@ mod tests {
let n: Node = serde_json::from_str(&s).unwrap();
assert_eq!(
format!("{n:?}"),
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: Some(RegionFailoverStart { failover_candidate: None }) }"#
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: RegionFailoverStart { failover_candidate: None } }"#
);
}

#[tokio::test]
async fn test_state_not_changed_upon_failure() {
struct MySelector {
peers: Arc<Mutex<Vec<Option<Peer>>>>,
}

#[async_trait]
impl Selector for MySelector {
type Context = SelectorContext;
type Output = Vec<Peer>;

async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result<Self::Output> {
let mut peers = self.peers.lock().unwrap();
Ok(if let Some(Some(peer)) = peers.pop() {
vec![peer]
} else {
vec![]
})
}
}

// Returns a valid peer the second time called "select".
let selector = MySelector {
peers: Arc::new(Mutex::new(vec![
Some(Peer {
id: 42,
addr: "".to_string(),
}),
None,
])),
};

let env = TestingEnvBuilder::new()
.with_selector(Arc::new(selector))
.build()
.await;
let failed_region = env.failed_region(1).await;

let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Box::new(state),
};
let mut procedure = RegionFailoverProcedure {
node,
context: env.context,
};

let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};

let result = procedure.execute(&ctx).await;
assert!(result.is_err());
assert!(result.unwrap_err().is_retry_later());
assert_eq!(
r#"{"region_failover_state":"RegionFailoverStart","failover_candidate":null}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);

let result = procedure.execute(&ctx).await;
assert!(matches!(result, Ok(Status::Executing { persist: true })));
assert_eq!(
r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""},"region_lease_expiry_seconds":40}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);
}
}
6 changes: 3 additions & 3 deletions src/meta-srv/src/procedure/region_failover/activate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ActivateRegion {
}

async fn handle_response(
self,
&self,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand All @@ -102,7 +102,7 @@ impl ActivateRegion {
.fail();
};
if result {
Ok(Box::new(UpdateRegionMetadata::new(self.candidate)))
Ok(Box::new(UpdateRegionMetadata::new(self.candidate.clone())))
} else {
// The region could be just indeed cannot be opened by the candidate, retry
// would be in vain. Then why not just end the failover procedure? Because we
Expand Down Expand Up @@ -131,7 +131,7 @@ impl ActivateRegion {
#[typetag::serde]
impl State for ActivateRegion {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand Down
10 changes: 5 additions & 5 deletions src/meta-srv/src/procedure/region_failover/deactivate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl DeactivateRegion {
}

async fn handle_response(
self,
&self,
ctx: &RegionFailoverContext,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
Expand All @@ -98,7 +98,7 @@ impl DeactivateRegion {
.deregister_inactive_region(failed_region)
.await?;

Ok(Box::new(ActivateRegion::new(self.candidate)))
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
} else {
// Under rare circumstances would a Datanode fail to close a Region.
// So simply retry.
Expand All @@ -114,7 +114,7 @@ impl DeactivateRegion {
// resides might be unreachable. So we wait for the region lease to expire. The
// region would be closed by its own [RegionAliveKeeper].
self.wait_for_region_lease_expiry().await;
Ok(Box::new(ActivateRegion::new(self.candidate)))
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
}
Err(e) => Err(e),
}
Expand All @@ -132,7 +132,7 @@ impl DeactivateRegion {
#[typetag::serde]
impl State for DeactivateRegion {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand All @@ -144,7 +144,7 @@ impl State for DeactivateRegion {
Err(Error::PusherNotFound { .. }) => {
// See the mailbox received timeout situation comments above.
self.wait_for_region_lease_expiry().await;
return Ok(Box::new(ActivateRegion::new(self.candidate)));
return Ok(Box::new(ActivateRegion::new(self.candidate.clone())));
}
Err(e) => return Err(e),
};
Expand Down
8 changes: 2 additions & 6 deletions src/meta-srv/src/procedure/region_failover/failover_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ pub(super) struct RegionFailoverEnd;
#[async_trait]
#[typetag::serde]
impl State for RegionFailoverEnd {
async fn next(
mut self: Box<Self>,
_: &RegionFailoverContext,
_: &RegionIdent,
) -> Result<Box<dyn State>> {
Ok(self)
async fn next(&mut self, _: &RegionFailoverContext, _: &RegionIdent) -> Result<Box<dyn State>> {
Ok(Box::new(RegionFailoverEnd))
}

fn status(&self) -> Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl RegionFailoverStart {
#[typetag::serde]
impl State for RegionFailoverStart {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl InvalidateCache {
#[typetag::serde]
impl State for InvalidateCache {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand Down Expand Up @@ -108,12 +108,11 @@ mod tests {
let _ = heartbeat_receivers.insert(frontend_id, rx);
}

let state = InvalidateCache;
let table_ident: TableIdent = failed_region.clone().into();

// lexicographical order
// frontend-4,5,6,7
let next_state = Box::new(state)
let next_state = InvalidateCache
.next(&context, &failed_region)
.await
.unwrap();
Expand Down
9 changes: 3 additions & 6 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn pretty_log_table_route_change(
#[typetag::serde]
impl State for UpdateRegionMetadata {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand Down Expand Up @@ -165,12 +165,9 @@ mod tests {
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;

let state = UpdateRegionMetadata::new(Peer::new(2, ""));
let mut state = UpdateRegionMetadata::new(Peer::new(2, ""));

let next_state = Box::new(state)
.next(&env.context, &failed_region)
.await
.unwrap();
let next_state = state.next(&env.context, &failed_region).await.unwrap();
assert_eq!(format!("{next_state:?}"), "InvalidateCache");
}

Expand Down

0 comments on commit 35dc787

Please sign in to comment.