From 033a065359e7ee2e8801f299285cf03eecffe63d Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Mon, 18 Dec 2023 15:05:28 +0800 Subject: [PATCH] refactor: make sequence bounded with max value (#2937) * refactor: make sequence bounded with max value (cherry picked from commit 3a8eba6f863327a96b617cd86ee2d39fac30abb2) * fix: resolve PR comments --- src/cmd/src/standalone.rs | 11 +- src/common/meta/src/sequence.rs | 101 ++++++++++++------ src/frontend/src/instance/standalone.rs | 11 +- src/meta-srv/src/handler.rs | 4 +- .../src/handler/persist_stats_handler.rs | 4 +- .../src/handler/response_header_handler.rs | 4 +- src/meta-srv/src/metasrv.rs | 6 -- src/meta-srv/src/metasrv/builder.rs | 20 +++- src/meta-srv/src/procedure/region_failover.rs | 7 +- .../procedure/region_migration/test_util.rs | 5 +- src/meta-srv/src/procedure/utils.rs | 5 +- src/meta-srv/src/test_util.rs | 5 +- tests-integration/src/standalone.rs | 12 ++- 13 files changed, 128 insertions(+), 67 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index e92d6a36f78b..3a499f6d6df7 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -17,6 +17,7 @@ use std::{fs, path}; use async_trait::async_trait; use clap::Parser; +use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; @@ -25,6 +26,7 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; +use common_meta::sequence::SequenceBuilder; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; @@ -364,8 +366,13 @@ impl StartCommand { let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); - let table_meta_allocator = - Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())); + let table_id_sequence = Arc::new( + SequenceBuilder::new("table_id", kv_backend.clone()) + .initial(MIN_USER_TABLE_ID as u64) + .step(10) + .build(), + ); + let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence)); let ddl_task_executor = Self::create_ddl_task_executor( kv_backend.clone(), diff --git a/src/common/meta/src/sequence.rs b/src/common/meta/src/sequence.rs index b87befcf5268..d297ab30bd3d 100644 --- a/src/common/meta/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -15,7 +15,7 @@ use std::ops::Range; use std::sync::Arc; -use snafu::{ensure, OptionExt}; +use snafu::ensure; use tokio::sync::Mutex; use crate::error::{self, Result}; @@ -26,31 +26,67 @@ pub type SequenceRef = Arc; pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; -pub struct Sequence { - inner: Mutex, +pub struct SequenceBuilder { + name: String, + initial: u64, + step: u64, + generator: KvBackendRef, + max: u64, } -impl Sequence { - pub fn new(name: impl AsRef, initial: u64, step: u64, generator: KvBackendRef) -> Self { - let name = format!("{}-{}", SEQ_PREFIX, name.as_ref()); - let step = step.max(1); +impl SequenceBuilder { + pub fn new(name: impl AsRef, generator: KvBackendRef) -> Self { Self { + name: format!("{}-{}", SEQ_PREFIX, name.as_ref()), + initial: 0, + step: 1, + generator, + max: u64::MAX, + } + } + + pub fn initial(self, initial: u64) -> Self { + Self { initial, ..self } + } + + pub fn step(self, step: u64) -> Self { + Self { step, ..self } + } + + pub fn max(self, max: u64) -> Self { + Self { max, ..self } + } + + pub fn build(self) -> Sequence { + Sequence { inner: Mutex::new(Inner { - name, - generator, - initial, - next: initial, - step, + name: self.name, + generator: self.generator, + initial: self.initial, + next: self.initial, + step: self.step, range: None, force_quit: 1024, + max: self.max, }), } } +} + +pub struct Sequence { + inner: Mutex, +} +impl Sequence { pub async fn next(&self) -> Result { let mut inner = self.inner.lock().await; inner.next().await } + + pub async fn min_max(&self) -> Range { + let inner = self.inner.lock().await; + inner.initial..inner.max + } } struct Inner { @@ -67,6 +103,7 @@ struct Inner { range: Option>, // Used to avoid dead loops. force_quit: usize, + max: u64, } impl Inner { @@ -108,14 +145,17 @@ impl Inner { u64::to_le_bytes(start).to_vec() }; - let value = start - .checked_add(self.step) - .context(error::SequenceOutOfRangeSnafu { - name: &self.name, - start, - step: self.step, - })?; - let value = u64::to_le_bytes(value); + let step = self.step.min(self.max - start); + + ensure!( + step > 0, + error::NextSequenceSnafu { + err_msg: format!("next sequence exhausted, max: {}", self.max) + } + ); + + // No overflow: step <= self.max - start -> step + start <= self.max <= u64::MAX + let value = u64::to_le_bytes(start + step); let req = CompareAndPutRequest { key: key.to_vec(), @@ -143,7 +183,7 @@ impl Inner { return Ok(Range { start, - end: start + self.step, + end: start + step, }); } @@ -173,7 +213,9 @@ mod tests { async fn test_sequence() { let kv_backend = Arc::new(MemoryKvBackend::default()); let initial = 1024; - let seq = Sequence::new("test_seq", initial, 10, kv_backend); + let seq = SequenceBuilder::new("test_seq", kv_backend) + .initial(initial) + .build(); for i in initial..initial + 100 { assert_eq!(i, seq.next().await.unwrap()); @@ -182,9 +224,10 @@ mod tests { #[tokio::test] async fn test_sequence_out_of_rage() { - let kv_backend = Arc::new(MemoryKvBackend::default()); - let initial = u64::MAX - 10; - let seq = Sequence::new("test_seq", initial, 10, kv_backend); + let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default())) + .initial(u64::MAX - 10) + .step(10) + .build(); for _ in 0..10 { let _ = seq.next().await.unwrap(); @@ -192,10 +235,7 @@ mod tests { let res = seq.next().await; assert!(res.is_err()); - assert!(matches!( - res.unwrap_err(), - error::Error::SequenceOutOfRange { .. } - )) + assert!(matches!(res.unwrap_err(), Error::NextSequence { .. })) } #[tokio::test] @@ -248,8 +288,7 @@ mod tests { } } - let kv_backend = Arc::new(Noop {}); - let seq = Sequence::new("test_seq", 0, 10, kv_backend); + let seq = SequenceBuilder::new("test_seq", Arc::new(Noop)).build(); let next = seq.next().await; assert!(next.is_err()); diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 9376dce00920..00a510f27d8d 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -22,10 +22,9 @@ use common_error::ext::BoxedError; use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef}; use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; -use common_meta::sequence::{Sequence, SequenceRef}; +use common_meta::sequence::SequenceRef; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing; use common_telemetry::tracing_context::{FutureExt, TracingContext}; @@ -37,8 +36,6 @@ use table::metadata::RawTableInfo; use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; -const TABLE_ID_SEQ: &str = "table_id"; - pub struct StandaloneDatanodeManager(pub RegionServer); #[async_trait] @@ -112,10 +109,8 @@ pub struct StandaloneTableMetadataCreator { } impl StandaloneTableMetadataCreator { - pub fn new(kv_backend: KvBackendRef) -> Self { - Self { - table_id_sequence: Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend)), - } + pub fn new(table_id_sequence: SequenceRef) -> Self { + Self { table_id_sequence } } } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index e54740c34294..8be541ee68bc 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -434,7 +434,7 @@ mod tests { use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION}; use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::sequence::Sequence; + use common_meta::sequence::SequenceBuilder; use tokio::sync::mpsc; use crate::handler::check_leader_handler::CheckLeaderHandler; @@ -487,7 +487,7 @@ mod tests { .await; let kv_backend = Arc::new(MemoryKvBackend::new()); - let seq = Sequence::new("test_seq", 0, 10, kv_backend); + let seq = SequenceBuilder::new("test_seq", kv_backend).build(); let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq); let msg = MailboxMessage { diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 4d4ade748f72..c5c42a96bba1 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -147,7 +147,7 @@ mod tests { use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::sequence::Sequence; + use common_meta::sequence::SequenceBuilder; use super::*; use crate::cluster::MetaPeerClientBuilder; @@ -162,7 +162,7 @@ mod tests { let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( kv_backend.clone(), )); - let seq = Sequence::new("test_seq", 0, 10, kv_backend.clone()); + let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 3b588fb6f07b..143982ba8020 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -51,7 +51,7 @@ mod tests { use api::v1::meta::{HeartbeatResponse, RequestHeader}; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::sequence::Sequence; + use common_meta::sequence::SequenceBuilder; use common_telemetry::tracing_context::W3cTrace; use super::*; @@ -66,7 +66,7 @@ mod tests { let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( kv_backend.clone(), )); - let seq = Sequence::new("test_seq", 0, 10, kv_backend.clone()); + let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 789cf55dc0e1..d1b91bf03cb8 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -26,7 +26,6 @@ use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::region_keeper::MemoryRegionKeeperRef; -use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; @@ -223,7 +222,6 @@ pub struct MetaSrv { in_memory: ResettableKvBackendRef, kv_backend: KvBackendRef, leader_cached_kv_backend: Arc, - table_id_sequence: SequenceRef, meta_peer_client: MetaPeerClientRef, selector: SelectorRef, handler_group: HeartbeatHandlerGroup, @@ -360,10 +358,6 @@ impl MetaSrv { &self.meta_peer_client } - pub fn table_id_sequence(&self) -> &SequenceRef { - &self.table_id_sequence - } - pub fn selector(&self) -> &SelectorRef { &self.selector } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 01841b434f5c..ee312b28f09a 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -18,6 +18,7 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; +use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc::channel_manager::ChannelConfig; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::TableMetadataAllocatorRef; @@ -27,7 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; -use common_meta::sequence::Sequence; +use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; @@ -190,7 +191,7 @@ impl MetaSrvBuilder { let pushers = Pushers::default(); let mailbox = build_mailbox(&kv_backend, &pushers); let procedure_manager = build_procedure_manager(&options, &kv_backend); - let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone())); + let table_metadata_manager = Arc::new(TableMetadataManager::new( leader_cached_kv_backend.clone() as _, )); @@ -204,10 +205,16 @@ impl MetaSrvBuilder { }; let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { + let sequence = Arc::new( + SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_TABLE_ID as u64) + .step(10) + .build(), + ); Arc::new(MetaSrvTableMetadataAllocator::new( selector_ctx.clone(), selector.clone(), - table_id_sequence.clone(), + sequence, )) }); @@ -293,7 +300,6 @@ impl MetaSrvBuilder { kv_backend, leader_cached_kv_backend, meta_peer_client: meta_peer_client.clone(), - table_id_sequence, selector, handler_group, election, @@ -328,7 +334,11 @@ fn build_default_meta_peer_client( } fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef { - let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_backend.clone()); + let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone()) + .initial(1) + .step(100) + .build(); + HeartbeatMailbox::create(pushers.clone(), mailbox_sequence) } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 2694f8c88a13..f96226f5e0a7 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -389,7 +389,7 @@ mod tests { use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::sequence::Sequence; + use common_meta::sequence::SequenceBuilder; use common_meta::DatanodeId; use common_procedure::{BoxedProcedure, ProcedureId}; use common_procedure_test::MockContextProvider; @@ -512,7 +512,10 @@ mod tests { } let mailbox_sequence = - Sequence::new("test_heartbeat_mailbox", 0, 100, kv_backend.clone()); + SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()) + .initial(0) + .step(100) + .build(); let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); let selector = self.selector.unwrap_or_else(|| { diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 616104fe82f2..a4ff64403da2 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -26,7 +26,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::rpc::router::RegionRoute; -use common_meta::sequence::Sequence; +use common_meta::sequence::{Sequence, SequenceBuilder}; use common_meta::DatanodeId; use common_procedure::{Context as ProcedureContext, ProcedureId, Status}; use common_procedure_test::MockContextProvider; @@ -96,7 +96,8 @@ impl TestingEnv { let kv_backend = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 1, kv_backend.clone()); + let mailbox_sequence = + SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); let mailbox_ctx = MailboxContext::new(mailbox_sequence); let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index f90fb46dd5b9..f0f0ba72bb56 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -117,7 +117,7 @@ pub mod test_data { use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::RegionRoute; - use common_meta::sequence::Sequence; + use common_meta::sequence::SequenceBuilder; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; @@ -194,7 +194,8 @@ pub mod test_data { pub(crate) fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); - let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_backend.clone()); + let mailbox_sequence = + SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); DdlContext { diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 4428ec625822..b6993f7a3778 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -20,7 +20,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; -use common_meta::sequence::Sequence; +use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; use datatypes::data_type::ConcreteDataType; @@ -55,7 +55,8 @@ pub(crate) fn create_region_failover_manager() -> Arc { let kv_backend = Arc::new(MemoryKvBackend::new()); let pushers = Pushers::default(); - let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_backend.clone()); + let mailbox_sequence = + SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence); let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 352353233ddd..9536eaefe88c 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -16,11 +16,13 @@ use std::sync::Arc; use cmd::options::MixOptions; use common_base::Plugins; +use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::KvBackendConfig; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; +use common_meta::sequence::SequenceBuilder; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; @@ -109,13 +111,21 @@ impl GreptimeDbStandaloneBuilder { let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); + let table_id_sequence = Arc::new( + SequenceBuilder::new("table_id", kv_backend.clone()) + .initial(MIN_USER_TABLE_ID as u64) + .step(10) + .build(), + ); + let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence)); + let ddl_task_executor = Arc::new( DdlManager::try_new( procedure_manager.clone(), datanode_manager.clone(), Arc::new(DummyCacheInvalidator), table_metadata_manager, - Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), + table_meta_allocator, Arc::new(MemoryRegionKeeper::default()), ) .unwrap(),