From 7b407fcb67648aa750229d75c407ef3aa756cdda Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 18 Mar 2024 16:52:07 +0800 Subject: [PATCH] always read kv_backend maintenance state Signed-off-by: tison --- src/meta-srv/src/handler/failure_handler.rs | 6 +++--- src/meta-srv/src/metasrv.rs | 17 +---------------- src/meta-srv/src/metasrv/builder.rs | 1 + src/meta-srv/src/procedure/region_failover.rs | 11 +++++++++-- src/meta-srv/src/service/admin.rs | 1 - src/meta-srv/src/service/admin/maintenance.rs | 10 ++-------- src/meta-srv/src/test_util.rs | 1 + tests-integration/tests/region_failover.rs | 1 + 8 files changed, 18 insertions(+), 30 deletions(-) diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 2f4bf6d491cb..9748737fae30 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -170,7 +170,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_maintenance_mode() { let region_failover_manager = create_region_failover_manager(); - let in_memory = region_failover_manager.create_context().in_memory.clone(); + let kv_backend = region_failover_manager.create_context().kv_backend.clone(); let _handler = RegionFailureHandler::try_new( None, region_failover_manager.clone(), @@ -184,13 +184,13 @@ mod tests { value: vec![], prev_kv: false, }; - let _ = in_memory.put(kv_req.clone()).await.unwrap(); + let _ = kv_backend.put(kv_req.clone()).await.unwrap(); assert_matches!( region_failover_manager.is_maintenance_mode().await, Ok(true) ); - let _ = in_memory + let _ = kv_backend .delete(MAINTENANCE_KEY.as_bytes(), false) .await .unwrap(); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c0f40087eb9d..68b3579298db 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -22,11 +22,10 @@ use common_base::Plugins; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::ProcedureExecutorRef; -use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; +use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; -use common_meta::rpc::store::PutRequest; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; use common_meta::{distributed_time_constants, ClusterId}; use common_procedure::options::ProcedureConfig; @@ -365,20 +364,6 @@ impl MetaSrv { .context(StartProcedureManagerSnafu)?; } - if self - .kv_backend - .exists(MAINTENANCE_KEY.as_bytes()) - .await - .context(KvBackendSnafu)? - { - let req = PutRequest { - key: Vec::from(MAINTENANCE_KEY), - value: vec![], - prev_kv: false, - }; - self.in_memory.put(req).await.context(KvBackendSnafu)?; - } - info!("MetaSrv started"); Ok(()) } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 7447fdc67b1a..81c83816eed1 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -260,6 +260,7 @@ impl MetaSrvBuilder { let region_failover_manager = Arc::new(RegionFailoverManager::new( distributed_time_constants::REGION_LEASE_SECS, in_memory.clone(), + kv_backend.clone(), mailbox.clone(), procedure_manager.clone(), (selector.clone(), selector_ctx.clone()), diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index c376ff632574..7d82ad36d520 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -27,7 +27,7 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::key::datanode_table::DatanodeTableKey; use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; -use common_meta::kv_backend::ResettableKvBackendRef; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; use common_meta::table_name::TableName; use common_meta::{ClusterId, RegionIdent}; @@ -75,6 +75,7 @@ impl From for RegionFailoverKey { pub(crate) struct RegionFailoverManager { region_lease_secs: u64, in_memory: ResettableKvBackendRef, + kv_backend: KvBackendRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, selector: SelectorRef, @@ -96,9 +97,11 @@ impl Drop for FailoverProcedureGuard { } impl RegionFailoverManager { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( region_lease_secs: u64, in_memory: ResettableKvBackendRef, + kv_backend: KvBackendRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, (selector, selector_ctx): (SelectorRef, SelectorContext), @@ -108,6 +111,7 @@ impl RegionFailoverManager { Self { region_lease_secs, in_memory, + kv_backend, mailbox, procedure_manager, selector, @@ -122,6 +126,7 @@ impl RegionFailoverManager { RegionFailoverContext { region_lease_secs: self.region_lease_secs, in_memory: self.in_memory.clone(), + kv_backend: self.kv_backend.clone(), mailbox: self.mailbox.clone(), selector: self.selector.clone(), selector_ctx: self.selector_ctx.clone(), @@ -162,7 +167,7 @@ impl RegionFailoverManager { } pub(crate) async fn is_maintenance_mode(&self) -> Result { - self.in_memory + self.kv_backend .exists(MAINTENANCE_KEY.as_bytes()) .await .context(KvBackendSnafu) @@ -273,6 +278,7 @@ struct Node { pub struct RegionFailoverContext { pub region_lease_secs: u64, pub in_memory: ResettableKvBackendRef, + pub kv_backend: KvBackendRef, pub mailbox: MailboxRef, pub selector: SelectorRef, pub selector_ctx: SelectorContext, @@ -578,6 +584,7 @@ mod tests { context: RegionFailoverContext { region_lease_secs: 10, in_memory, + kv_backend, mailbox, selector, selector_ctx, diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index d285e1fabe9d..7bf0d04640de 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -100,7 +100,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let handler = maintenance::MaintenanceHandler { kv_backend: meta_srv.kv_backend().clone(), - in_memory: meta_srv.in_memory().clone(), }; let router = router .route("/maintenance", handler.clone()) diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs index 9e7237156cba..01e62aece6ef 100644 --- a/src/meta-srv/src/service/admin/maintenance.rs +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use common_meta::key::MAINTENANCE_KEY; -use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +use common_meta::kv_backend::KvBackendRef; use common_meta::rpc::store::PutRequest; use snafu::{OptionExt, ResultExt}; use tonic::codegen::http; @@ -29,13 +29,12 @@ use crate::service::admin::HttpHandler; #[derive(Clone)] pub struct MaintenanceHandler { pub kv_backend: KvBackendRef, - pub in_memory: ResettableKvBackendRef, } impl MaintenanceHandler { async fn get_maintenance(&self) -> crate::Result> { let enabled = self - .in_memory + .kv_backend .exists(MAINTENANCE_KEY.as_bytes()) .await .context(KvBackendSnafu)?; @@ -72,17 +71,12 @@ impl MaintenanceHandler { .put(req.clone()) .await .context(KvBackendSnafu)?; - self.in_memory.put(req).await.context(KvBackendSnafu)?; "Maintenance mode enabled" } else { self.kv_backend .delete(MAINTENANCE_KEY.as_bytes(), false) .await .context(KvBackendSnafu)?; - self.in_memory - .delete(MAINTENANCE_KEY.as_bytes(), false) - .await - .context(KvBackendSnafu)?; "Maintenance mode disabled" }; diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 4d021fae97fa..b6fa285311f6 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -86,6 +86,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { Arc::new(RegionFailoverManager::new( 10, in_memory, + kv_backend.clone(), mailbox, procedure_manager, (selector, selector_ctx), diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 74289149c066..e0d82f658704 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -352,6 +352,7 @@ async fn run_region_failover_procedure( RegionFailoverContext { region_lease_secs: 10, in_memory: meta_srv.in_memory().clone(), + kv_backend: meta_srv.kv_backend().clone(), mailbox: meta_srv.mailbox().clone(), selector, selector_ctx: SelectorContext {