Skip to content

Commit

Permalink
feat: add upgrade candidate region step
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 30, 2023
1 parent ae81535 commit 86c3fbf
Show file tree
Hide file tree
Showing 5 changed files with 595 additions and 7 deletions.
49 changes: 49 additions & 0 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl OpenRegion {
/// The instruction of downgrading leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DowngradeRegion {
/// The [RegionId].
pub region_id: RegionId,
}

Expand All @@ -120,20 +121,67 @@ impl Display for DowngradeRegion {
}
}

/// Upgrades a follower region to leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpgradeRegion {
/// The [RegionId].
pub region_id: RegionId,
/// The `last_entry_id` of old leader region.
pub last_entry_id: Option<u64>,
/// The second of waiting for a wal replay.
///
/// `None` stands for no wait,
/// it's helpful to verify whether the leader region is ready.
pub wait_for_replay_secs: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Display)]
pub enum Instruction {
/// Opens a region.
///
/// - Returns true if a specified region exists.
OpenRegion(OpenRegion),
/// Closes a region.
///
/// - Returns true if a specified region does not exist.
CloseRegion(RegionIdent),
/// Upgrades a region.
UpgradeRegion(UpgradeRegion),
/// Downgrades a region.
DowngradeRegion(DowngradeRegion),
/// Invalidates a specified table cache.
InvalidateTableIdCache(TableId),
/// Invalidates a specified table name index cache.
InvalidateTableNameCache(TableName),
}

/// The reply of [UpgradeRegion].
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionReply {
/// Returns true if `last_entry_id` has been replayed to the latest.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
/// Returns error if any.
pub error: Option<String>,
}

impl Display for UpgradeRegionReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"(ready={}, exists={}, error={:?})",
self.ready, self.exists, self.error
)
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
OpenRegion(SimpleReply),
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
InvalidateTableCache(SimpleReply),
DowngradeRegion(DowngradeRegionReply),
}
Expand All @@ -143,6 +191,7 @@ impl Display for InstructionReply {
match self {
Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
Self::InvalidateTableCache(reply) => {
write!(f, "InstructionReply::Invalidate({})", reply)
}
Expand Down
9 changes: 9 additions & 0 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ impl RegionHeartbeatResponseHandler {
let close_region_req = RegionRequest::Close(RegionCloseRequest {});
Ok((region_id, close_region_req))
}
Instruction::UpgradeRegion(_) => {
todo!()
}
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InvalidHeartbeatResponseSnafu.fail()
}
Expand All @@ -86,6 +89,9 @@ impl RegionHeartbeatResponseHandler {
result: false,
error: None,
}),
Instruction::UpgradeRegion(_) => {
todo!()
}
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InstructionReply::InvalidateTableCache(SimpleReply {
result: false,
Expand Down Expand Up @@ -118,6 +124,9 @@ impl RegionHeartbeatResponseHandler {
reply.error = error;
}
},
InstructionReply::UpgradeRegion(_) => {
todo!()
}
InstructionReply::InvalidateTableCache(reply) => {
reply.result = success;
reply.error = error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
tokio::time::sleep_until(*deadline).await;
}

Ok(Box::new(UpgradeCandidateRegion))
Ok(Box::<UpgradeCandidateRegion>::default())
}

fn as_any(&self) -> &dyn Any {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use crate::procedure::region_migration::{Context, State};
pub enum UpdateMetadata {
/// Downgrades the leader region.
Downgrade,
/// Upgrade the candidate region.
/// Upgrades the candidate region.
Upgrade,
/// Rolls back the downgraded region.
Rollback,
}

#[async_trait::async_trait]
Expand All @@ -49,6 +51,9 @@ impl State for UpdateMetadata {
// TODO(weny): invalidate fe cache.
Ok(Box::new(RegionMigrationEnd))
}
_ => {
todo!()
}
}
}

Expand Down
Loading

0 comments on commit 86c3fbf

Please sign in to comment.