Skip to content

Commit

Permalink
feat: add upgrade candidate region step
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 28, 2023
1 parent c5fc8ad commit 3ab250f
Show file tree
Hide file tree
Showing 5 changed files with 610 additions and 6 deletions.
8 changes: 4 additions & 4 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct UpgradeRegion {
///
/// `None` stands for no wait,
/// it's helpful to verify whether the leader region is ready.
pub wait_for_replay_mills: Option<u64>,
pub wait_for_replay_secs: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Display)]
Expand All @@ -123,11 +123,11 @@ pub enum Instruction {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionReply {
/// Returns true if `last_entry_id` has been replayed to the latest.
ready: bool,
pub ready: bool,
/// Indicates whether the region exists.
exists: bool,
pub exists: bool,
/// Returns error if any.
error: Option<String>,
pub error: Option<String>,
}

impl Display for UpgradeRegionReply {
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) mod open_candidate_region;
#[cfg(test)]
pub(crate) mod test_util;
pub(crate) mod update_metadata;
pub(crate) mod upgrade_candidate_region;

use std::any::Any;
use std::fmt::Debug;
Expand Down
41 changes: 39 additions & 2 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@

use std::sync::Arc;

use api::v1::meta::{HeartbeatResponse, RequestHeader};
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_procedure_test::MockContextProvider;
use tokio::sync::mpsc::Sender;
use common_time::util::current_time_millis;
use tokio::sync::mpsc::{Receiver, Sender};

use super::ContextFactoryImpl;
use crate::error::Result;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef};
use crate::service::mailbox::{Channel, MailboxRef};

pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;

/// The context of mailbox.
pub struct MailboxContext {
mailbox: MailboxRef,
Expand Down Expand Up @@ -120,3 +126,34 @@ impl TestingEnv {
}
}
}

/// Generates a [InstructionReply::CloseRegion] reply.
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
result: false,
error: None,
}))
.unwrap(),
)),
}
}

/// Sends a mock reply.
pub fn send_mock_reply(
mailbox: MailboxRef,
mut rx: MockHeartbeatReceiver,
msg: impl FnOnce(u64) -> Result<MailboxMessage> + Send + 'static,
) {
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::procedure::region_migration::{Context, State};
#[serde(tag = "UpdateMetadata")]
pub enum UpdateMetadata {
Downgrade,
Upgrade,
Rollback,
}

#[async_trait::async_trait]
Expand All @@ -38,6 +40,11 @@ impl State for UpdateMetadata {

Ok(Box::new(DowngradeLeaderRegion))
}
_ => {
// TODO(weny): Waits for https://github.com/GreptimeTeam/greptimedb/pull/2811
// and https://github.com/GreptimeTeam/greptimedb/pull/2812
todo!()
}
}
}

Expand Down
Loading

0 comments on commit 3ab250f

Please sign in to comment.