Skip to content

Commit

Permalink
refactor: make sequence bounded with max value (#2937)
Browse files Browse the repository at this point in the history
* refactor: make sequence bounded with max value

(cherry picked from commit 3a8eba6)

* fix: resolve PR comments
  • Loading branch information
MichaelScofield authored Dec 18, 2023
1 parent 262a79a commit 033a065
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 67 deletions.
11 changes: 9 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{fs, path};

use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
Expand All @@ -25,6 +26,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
Expand Down Expand Up @@ -364,8 +366,13 @@ impl StartCommand {

let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));

let table_meta_allocator =
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone()));
let table_id_sequence = Arc::new(
SequenceBuilder::new("table_id", kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
.build(),
);
let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence));

let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),
Expand Down
101 changes: 70 additions & 31 deletions src/common/meta/src/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::ops::Range;
use std::sync::Arc;

use snafu::{ensure, OptionExt};
use snafu::ensure;
use tokio::sync::Mutex;

use crate::error::{self, Result};
Expand All @@ -26,31 +26,67 @@ pub type SequenceRef = Arc<Sequence>;

pub(crate) const SEQ_PREFIX: &str = "__meta_seq";

pub struct Sequence {
inner: Mutex<Inner>,
pub struct SequenceBuilder {
name: String,
initial: u64,
step: u64,
generator: KvBackendRef,
max: u64,
}

impl Sequence {
pub fn new(name: impl AsRef<str>, initial: u64, step: u64, generator: KvBackendRef) -> Self {
let name = format!("{}-{}", SEQ_PREFIX, name.as_ref());
let step = step.max(1);
impl SequenceBuilder {
pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
Self {
name: format!("{}-{}", SEQ_PREFIX, name.as_ref()),
initial: 0,
step: 1,
generator,
max: u64::MAX,
}
}

pub fn initial(self, initial: u64) -> Self {
Self { initial, ..self }
}

pub fn step(self, step: u64) -> Self {
Self { step, ..self }
}

pub fn max(self, max: u64) -> Self {
Self { max, ..self }
}

pub fn build(self) -> Sequence {
Sequence {
inner: Mutex::new(Inner {
name,
generator,
initial,
next: initial,
step,
name: self.name,
generator: self.generator,
initial: self.initial,
next: self.initial,
step: self.step,
range: None,
force_quit: 1024,
max: self.max,
}),
}
}
}

pub struct Sequence {
inner: Mutex<Inner>,
}

impl Sequence {
pub async fn next(&self) -> Result<u64> {
let mut inner = self.inner.lock().await;
inner.next().await
}

pub async fn min_max(&self) -> Range<u64> {
let inner = self.inner.lock().await;
inner.initial..inner.max
}
}

struct Inner {
Expand All @@ -67,6 +103,7 @@ struct Inner {
range: Option<Range<u64>>,
// Used to avoid dead loops.
force_quit: usize,
max: u64,
}

impl Inner {
Expand Down Expand Up @@ -108,14 +145,17 @@ impl Inner {
u64::to_le_bytes(start).to_vec()
};

let value = start
.checked_add(self.step)
.context(error::SequenceOutOfRangeSnafu {
name: &self.name,
start,
step: self.step,
})?;
let value = u64::to_le_bytes(value);
let step = self.step.min(self.max - start);

ensure!(
step > 0,
error::NextSequenceSnafu {
err_msg: format!("next sequence exhausted, max: {}", self.max)
}
);

// No overflow: step <= self.max - start -> step + start <= self.max <= u64::MAX
let value = u64::to_le_bytes(start + step);

let req = CompareAndPutRequest {
key: key.to_vec(),
Expand Down Expand Up @@ -143,7 +183,7 @@ impl Inner {

return Ok(Range {
start,
end: start + self.step,
end: start + step,
});
}

Expand Down Expand Up @@ -173,7 +213,9 @@ mod tests {
async fn test_sequence() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let initial = 1024;
let seq = Sequence::new("test_seq", initial, 10, kv_backend);
let seq = SequenceBuilder::new("test_seq", kv_backend)
.initial(initial)
.build();

for i in initial..initial + 100 {
assert_eq!(i, seq.next().await.unwrap());
Expand All @@ -182,20 +224,18 @@ mod tests {

#[tokio::test]
async fn test_sequence_out_of_rage() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let initial = u64::MAX - 10;
let seq = Sequence::new("test_seq", initial, 10, kv_backend);
let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default()))
.initial(u64::MAX - 10)
.step(10)
.build();

for _ in 0..10 {
let _ = seq.next().await.unwrap();
}

let res = seq.next().await;
assert!(res.is_err());
assert!(matches!(
res.unwrap_err(),
error::Error::SequenceOutOfRange { .. }
))
assert!(matches!(res.unwrap_err(), Error::NextSequence { .. }))
}

#[tokio::test]
Expand Down Expand Up @@ -248,8 +288,7 @@ mod tests {
}
}

let kv_backend = Arc::new(Noop {});
let seq = Sequence::new("test_seq", 0, 10, kv_backend);
let seq = SequenceBuilder::new("test_seq", Arc::new(Noop)).build();

let next = seq.next().await;
assert!(next.is_err());
Expand Down
11 changes: 3 additions & 8 deletions src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ use common_error::ext::BoxedError;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::{Sequence, SequenceRef};
use common_meta::sequence::SequenceRef;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
Expand All @@ -37,8 +36,6 @@ use table::metadata::RawTableInfo;

use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};

const TABLE_ID_SEQ: &str = "table_id";

pub struct StandaloneDatanodeManager(pub RegionServer);

#[async_trait]
Expand Down Expand Up @@ -112,10 +109,8 @@ pub struct StandaloneTableMetadataCreator {
}

impl StandaloneTableMetadataCreator {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
table_id_sequence: Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend)),
}
pub fn new(table_id_sequence: SequenceRef) -> Self {
Self { table_id_sequence }
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ mod tests {

use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::Sequence;
use common_meta::sequence::SequenceBuilder;
use tokio::sync::mpsc;

use crate::handler::check_leader_handler::CheckLeaderHandler;
Expand Down Expand Up @@ -487,7 +487,7 @@ mod tests {
.await;

let kv_backend = Arc::new(MemoryKvBackend::new());
let seq = Sequence::new("test_seq", 0, 10, kv_backend);
let seq = SequenceBuilder::new("test_seq", kv_backend).build();
let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);

let msg = MailboxMessage {
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/handler/persist_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ mod tests {

use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::Sequence;
use common_meta::sequence::SequenceBuilder;

use super::*;
use crate::cluster::MetaPeerClientBuilder;
Expand All @@ -162,7 +162,7 @@ mod tests {
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = Sequence::new("test_seq", 0, 10, kv_backend.clone());
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/handler/response_header_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mod tests {
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::Sequence;
use common_meta::sequence::SequenceBuilder;
use common_telemetry::tracing_context::W3cTrace;

use super::*;
Expand All @@ -66,7 +66,7 @@ mod tests {
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = Sequence::new("test_seq", 0, 10, kv_backend.clone());
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
Expand Down
6 changes: 0 additions & 6 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::sequence::SequenceRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
Expand Down Expand Up @@ -223,7 +222,6 @@ pub struct MetaSrv {
in_memory: ResettableKvBackendRef,
kv_backend: KvBackendRef,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
table_id_sequence: SequenceRef,
meta_peer_client: MetaPeerClientRef,
selector: SelectorRef,
handler_group: HeartbeatHandlerGroup,
Expand Down Expand Up @@ -360,10 +358,6 @@ impl MetaSrv {
&self.meta_peer_client
}

pub fn table_id_sequence(&self) -> &SequenceRef {
&self.table_id_sequence
}

pub fn selector(&self) -> &SelectorRef {
&self.selector
}
Expand Down
20 changes: 15 additions & 5 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;

use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::TableMetadataAllocatorRef;
Expand All @@ -27,7 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::sequence::Sequence;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
Expand Down Expand Up @@ -190,7 +191,7 @@ impl MetaSrvBuilder {
let pushers = Pushers::default();
let mailbox = build_mailbox(&kv_backend, &pushers);
let procedure_manager = build_procedure_manager(&options, &kv_backend);
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone()));

let table_metadata_manager = Arc::new(TableMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
Expand All @@ -204,10 +205,16 @@ impl MetaSrvBuilder {
};

let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
.build(),
);
Arc::new(MetaSrvTableMetadataAllocator::new(
selector_ctx.clone(),
selector.clone(),
table_id_sequence.clone(),
sequence,
))
});

Expand Down Expand Up @@ -293,7 +300,6 @@ impl MetaSrvBuilder {
kv_backend,
leader_cached_kv_backend,
meta_peer_client: meta_peer_client.clone(),
table_id_sequence,
selector,
handler_group,
election,
Expand Down Expand Up @@ -328,7 +334,11 @@ fn build_default_meta_peer_client(
}

fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_backend.clone());
let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
.initial(1)
.step(100)
.build();

HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
}

Expand Down
Loading

0 comments on commit 033a065

Please sign in to comment.