diff --git a/Cargo.lock b/Cargo.lock index 5170d5d3403a..92c636190f20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1833,6 +1833,7 @@ dependencies = [ "derive_builder 0.12.0", "etcd-client", "futures", + "futures-util", "humantime-serde", "hyper", "lazy_static", @@ -1851,6 +1852,7 @@ dependencies = [ "tokio", "toml 0.8.8", "tonic 0.10.2", + "uuid", ] [[package]] @@ -4507,7 +4509,6 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", - "dashmap", "futures", "futures-util", "itertools 0.10.5", @@ -4515,6 +4516,7 @@ dependencies = [ "protobuf-build", "raft-engine", "rand", + "rand_distr", "rskafka", "serde", "serde_json", diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index 884e3f3f0a99..e7179f96f106 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -40,7 +40,6 @@ pub struct KafkaConfig { pub broker_endpoints: Vec, /// The compression algorithm used to compress log entries. #[serde(skip)] - #[serde(default)] pub compression: RsKafkaCompression, /// The max size of a single producer batch. pub max_batch_size: ReadableSize, diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 5a15581f41c6..609fe259d2e2 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -14,6 +14,7 @@ async-stream.workspace = true async-trait.workspace = true base64.workspace = true bytes.workspace = true +chrono.workspace = true common-catalog.workspace = true common-config.workspace = true common-error.workspace = true @@ -27,6 +28,7 @@ common-time.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true +futures-util.workspace = true futures.workspace = true humantime-serde.workspace = true lazy_static.workspace = true @@ -51,3 +53,4 @@ chrono.workspace = true common-procedure = { workspace = true, features = ["testing"] } datatypes.workspace = true hyper = { version = "0.14", features = ["full"] } +uuid.workspace = true diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 4132a58839db..45b4db5b0865 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -36,7 +36,6 @@ pub mod sequence; pub mod state_store; pub mod table_name; pub mod util; -#[allow(unused)] pub mod wal; pub type ClusterId = u64; diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index c7af1d64d306..2c018145ab42 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -19,12 +19,9 @@ use std::collections::HashMap; use common_config::wal::StandaloneWalConfig; use common_config::WAL_OPTIONS_KEY; -use common_telemetry::warn; use serde::{Deserialize, Serialize}; -use serde_with::with_prefix; use store_api::storage::{RegionId, RegionNumber}; -use crate::error::Result; use crate::wal::kafka::KafkaConfig; pub use crate::wal::kafka::Topic as KafkaWalTopic; pub use crate::wal::options_allocator::{ @@ -43,7 +40,7 @@ pub enum WalConfig { impl From for WalConfig { fn from(value: StandaloneWalConfig) -> Self { match value { - StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine, + StandaloneWalConfig::RaftEngine(_) => WalConfig::RaftEngine, StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(KafkaConfig { broker_endpoints: config.base.broker_endpoints, num_topics: config.num_topics, diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 703dfa7e3de0..18cde0fdaa52 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(any(test, feature = "testing"))] +pub mod test_util; pub mod topic; pub mod topic_manager; pub mod topic_selector; @@ -19,7 +21,6 @@ pub mod topic_selector; use std::time::Duration; use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType}; -use common_config::wal::StandaloneWalConfig; use serde::{Deserialize, Serialize}; pub use crate::wal::kafka::topic::Topic; diff --git a/src/common/meta/src/wal/kafka/test_util.rs b/src/common/meta/src/wal/kafka/test_util.rs new file mode 100644 index 000000000000..9dab26e83738 --- /dev/null +++ b/src/common/meta/src/wal/kafka/test_util.rs @@ -0,0 +1,33 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::warn; +use futures_util::future::BoxFuture; + +pub async fn run_test_with_kafka_wal(test: F) +where + F: FnOnce(Vec) -> BoxFuture<'static, ()>, +{ + let Ok(endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else { + warn!("The endpoints is empty, skipping the test"); + return; + }; + + let endpoints = endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); + + test(endpoints).await +} diff --git a/src/common/meta/src/wal/kafka/topic.rs b/src/common/meta/src/wal/kafka/topic.rs index 67223637f1ae..34e15ad22450 100644 --- a/src/common/meta/src/wal/kafka/topic.rs +++ b/src/common/meta/src/wal/kafka/topic.rs @@ -15,4 +15,5 @@ /// Kafka wal topic. /// Publishers publish log entries to the topic while subscribers pull log entries from the topic. /// A topic is simply a string right now. But it may be more complex in the future. +// TODO(niebayes): remove the Topic alias. pub type Topic = String; diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 80aaa90d402f..f57d2044862b 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -14,10 +14,9 @@ use std::collections::HashSet; use std::sync::Arc; -use std::time::Duration; use common_config::wal::kafka::TopicSelectorType; -use common_telemetry::{debug, error, info}; +use common_telemetry::{error, info}; use rskafka::client::controller::ControllerClient; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; @@ -25,7 +24,7 @@ use rskafka::client::partition::{Compression, UnknownTopicHandling}; use rskafka::client::{Client, ClientBuilder}; use rskafka::record::Record; use rskafka::BackoffConfig; -use snafu::{ensure, AsErrorSource, ResultExt}; +use snafu::{ensure, ResultExt}; use crate::error::{ BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, @@ -47,9 +46,8 @@ const DEFAULT_PARTITION: i32 = 0; /// Manages topic initialization and selection. pub struct TopicManager { config: KafkaConfig, - // TODO(niebayes): maybe add a guard to ensure all topics in the topic pool are created. - topic_pool: Vec, - topic_selector: TopicSelectorRef, + pub(crate) topic_pool: Vec, + pub(crate) topic_selector: TopicSelectorRef, kv_backend: KvBackendRef, } @@ -168,7 +166,7 @@ impl TopicManager { vec![Record { key: None, value: None, - timestamp: rskafka::chrono::Utc::now(), + timestamp: chrono::Utc::now(), headers: Default::default(), }], Compression::NoCompression, @@ -240,13 +238,9 @@ impl TopicManager { #[cfg(test)] mod tests { - use std::env; - - use common_telemetry::info; - use super::*; use crate::kv_backend::memory::MemoryKvBackend; - use crate::kv_backend::{self}; + use crate::wal::kafka::test_util::run_test_with_kafka_wal; // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. #[tokio::test] @@ -273,26 +267,60 @@ mod tests { assert_eq!(topics, restored_topics); } + /// Tests that the topic manager could allocate topics correctly. #[tokio::test] - async fn test_topic_manager() { - let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); - common_telemetry::init_default_ut_logging(); - - if endpoints.is_empty() { - info!("The endpoints is empty, skipping the test."); - return; - } - // TODO: supports topic prefix - let kv_backend = Arc::new(MemoryKvBackend::new()); - let config = KafkaConfig { - replication_factor: 1, - broker_endpoints: endpoints - .split(',') - .map(|s| s.to_string()) - .collect::>(), - ..Default::default() - }; - let manager = TopicManager::new(config, kv_backend); - manager.start().await.unwrap(); + async fn test_alloc_topics() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + // Constructs topics that should be created. + let topics = (0..256) + .map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4())) + .collect::>(); + + // Creates a topic manager. + let config = KafkaConfig { + replication_factor: broker_endpoints.len() as i16, + broker_endpoints, + ..Default::default() + }; + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let mut manager = TopicManager::new(config.clone(), kv_backend); + // Replaces the default topic pool with the constructed topics. + manager.topic_pool = topics.clone(); + // Replaces the default selector with a round-robin selector without shuffled. + manager.topic_selector = Arc::new(RoundRobinTopicSelector::default()); + manager.start().await.unwrap(); + + // Selects exactly the number of `num_topics` topics one by one. + let got = (0..topics.len()) + .map(|_| manager.select().unwrap()) + .cloned() + .collect::>(); + assert_eq!(got, topics); + + // Selects exactly the number of `num_topics` topics in a batching manner. + let got = manager + .select_batch(topics.len()) + .unwrap() + .into_iter() + .map(ToString::to_string) + .collect::>(); + assert_eq!(got, topics); + + // Selects more than the number of `num_topics` topics. + let got = manager + .select_batch(2 * topics.len()) + .unwrap() + .into_iter() + .map(ToString::to_string) + .collect::>(); + let expected = vec![topics.clone(); 2] + .into_iter() + .flatten() + .collect::>(); + assert_eq!(got, expected); + }) + }) + .await; } } diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs index fe7517bfd0b5..432900ebacc3 100644 --- a/src/common/meta/src/wal/kafka/topic_selector.rs +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use rand::Rng; -use serde::{Deserialize, Serialize}; use snafu::ensure; use crate::error::{EmptyTopicPoolSnafu, Result}; @@ -60,6 +59,14 @@ impl TopicSelector for RoundRobinTopicSelector { mod tests { use super::*; + /// Tests that a selector behaves as expected when the given topic pool is empty. + #[test] + fn test_empty_topic_pool() { + let topic_pool = vec![]; + let selector = RoundRobinTopicSelector::default(); + assert!(selector.select(&topic_pool).is_err()); + } + #[test] fn test_round_robin_topic_selector() { let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect(); diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index 6c2702053b87..58724b64573b 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -107,14 +107,16 @@ pub fn allocate_region_wal_options( mod tests { use super::*; use crate::kv_backend::memory::MemoryKvBackend; + use crate::wal::kafka::test_util::run_test_with_kafka_wal; + use crate::wal::kafka::topic_selector::RoundRobinTopicSelector; + use crate::wal::kafka::KafkaConfig; // Tests the wal options allocator could successfully allocate raft-engine wal options. - // Note: tests for allocator with kafka are integration tests. #[tokio::test] async fn test_allocator_with_raft_engine() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; let wal_config = WalConfig::RaftEngine; - let mut allocator = WalOptionsAllocator::new(wal_config, kv_backend); + let allocator = WalOptionsAllocator::new(wal_config, kv_backend); allocator.start().await.unwrap(); let num_regions = 32; @@ -128,4 +130,49 @@ mod tests { .collect(); assert_eq!(got, expected); } + + // Tests that the wal options allocator could successfully allocate Kafka wal options. + #[tokio::test] + async fn test_allocator_with_kafka() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + let topics = (0..256) + .map(|i| format!("test_allocator_with_kafka_{}_{}", i, uuid::Uuid::new_v4())) + .collect::>(); + + // Creates a topic manager. + let config = KafkaConfig { + replication_factor: broker_endpoints.len() as i16, + broker_endpoints, + ..Default::default() + }; + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend); + // Replaces the default topic pool with the constructed topics. + topic_manager.topic_pool = topics.clone(); + // Replaces the default selector with a round-robin selector without shuffled. + topic_manager.topic_selector = Arc::new(RoundRobinTopicSelector::default()); + + // Creates an options allocator. + let allocator = WalOptionsAllocator::Kafka(topic_manager); + allocator.start().await.unwrap(); + + let num_regions = 32; + let regions = (0..num_regions).collect::>(); + let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap(); + + // Check the allocated wal options contain the expected topics. + let expected = (0..num_regions) + .map(|i| { + let options = WalOptions::Kafka(KafkaWalOptions { + topic: topics[i as usize].clone(), + }); + (i, serde_json::to_string(&options).unwrap()) + }) + .collect::>(); + assert_eq!(got, expected); + }) + }) + .await; + } } diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index f5923c772090..9f2c8b9bc728 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -22,10 +22,8 @@ common-macro.workspace = true common-meta.workspace = true common-runtime.workspace = true common-telemetry.workspace = true -dashmap.workspace = true futures-util.workspace = true futures.workspace = true -itertools.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true rskafka.workspace = true @@ -39,5 +37,7 @@ tokio.workspace = true [dev-dependencies] common-meta = { workspace = true, features = ["testing"] } common-test-util.workspace = true +itertools.workspace = true rand.workspace = true +rand_distr = "0.4" uuid.workspace = true diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index ab8556572e8b..27a422d2f647 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -40,7 +40,7 @@ impl Namespace for NamespaceImpl { impl Display for NamespaceImpl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}/{}", self.topic, self.region_id) + write!(f, "[topic: {}, region: {}]", self.topic, self.region_id) } } @@ -48,11 +48,11 @@ impl Display for NamespaceImpl { #[derive(Debug, PartialEq, Clone)] pub struct EntryImpl { /// Entry payload. - data: Vec, + pub data: Vec, /// The logical entry id. - id: EntryId, + pub id: EntryId, /// The namespace used to identify and isolate log entries from different regions. - ns: NamespaceImpl, + pub ns: NamespaceImpl, } impl Entry for EntryImpl { @@ -76,7 +76,7 @@ impl Display for EntryImpl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "Entry (ns: {}, id: {}, data_len: {})", + "Entry [ns: {}, id: {}, data_len: {}]", self.ns, self.id, self.data.len() diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 19ae75deffcb..d5402a42b1d1 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -98,14 +98,13 @@ impl ClientManager { /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - let client_pool = self.client_pool.read().await; - if let Some(client) = client_pool.get(topic) { - return Ok(client.clone()); + { + let client_pool = self.client_pool.read().await; + if let Some(client) = client_pool.get(topic) { + return Ok(client.clone()); + } } - // Manullay releases the read lock. - drop(client_pool); - // Acquires the write lock. let mut client_pool = self.client_pool.write().await; match client_pool.get(topic) { Some(client) => Ok(client.clone()), @@ -134,3 +133,95 @@ impl ClientManager { Ok(Client::new(raw_client, &self.config)) } } + +#[cfg(test)] +mod tests { + use common_meta::wal::kafka::test_util::run_test_with_kafka_wal; + use tokio::sync::Barrier; + + use super::*; + use crate::test_util::kafka::create_topics; + + /// Prepares for a test in that a collection of topics and a client manager are created. + async fn prepare( + test_name: &str, + num_topics: usize, + broker_endpoints: Vec, + ) -> (ClientManager, Vec) { + let topics = create_topics( + num_topics, + |i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()), + &broker_endpoints, + ) + .await; + + let config = KafkaConfig { + broker_endpoints, + ..Default::default() + }; + let manager = ClientManager::try_new(&config).await.unwrap(); + + (manager, topics) + } + + /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly. + #[tokio::test] + async fn test_sequential() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await; + // Assigns multiple regions to a topic. + let region_topic = (0..512) + .map(|region_id| (region_id, &topics[region_id % topics.len()])) + .collect::>(); + + // Gets all clients sequentially. + for (_, topic) in region_topic { + manager.get_or_insert(topic).await.unwrap(); + } + + // Ensures all clients exist. + let client_pool = manager.client_pool.read().await; + let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic)); + assert!(all_exist); + }) + }) + .await; + } + + /// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly. + #[tokio::test(flavor = "multi_thread")] + async fn test_parallel() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await; + // Assigns multiple regions to a topic. + let region_topic = (0..512) + .map(|region_id| (region_id, topics[region_id % topics.len()].clone())) + .collect::>(); + + // Gets all clients in parallel. + let manager = Arc::new(manager); + let barrier = Arc::new(Barrier::new(region_topic.len())); + let tasks = region_topic + .into_values() + .map(|topic| { + let manager = manager.clone(); + let barrier = barrier.clone(); + tokio::spawn(async move { + barrier.wait().await; + assert!(manager.get_or_insert(&topic).await.is_ok()); + }) + }) + .collect::>(); + futures::future::try_join_all(tasks).await.unwrap(); + + // Ensures all clients exist. + let client_pool = manager.client_pool.read().await; + let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic)); + assert!(all_exist); + }) + }) + .await; + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index ddac8c6b752f..2e05683fe246 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -140,8 +140,13 @@ impl LogStore for KafkaLogStore { .await .context(GetOffsetSnafu { ns: ns.clone() })? - 1; - // Reads entries with offsets in the range [start_offset, end_offset). - let start_offset: i64 = Offset::try_from(entry_id)?.0; + // Reads entries with offsets in the range [start_offset, end_offset]. + let start_offset = Offset::try_from(entry_id)?.0; + + debug!( + "Start reading entries in range [{}, {}] for ns {}", + start_offset, end_offset, ns + ); // Abort if there're no new entries. // FIXME(niebayes): how come this case happens? @@ -274,3 +279,205 @@ fn check_termination( Ok(false) } } + +#[cfg(test)] +mod tests { + use common_base::readable_size::ReadableSize; + use common_config::wal::KafkaWalTopic as Topic; + use rand::seq::IteratorRandom; + + use super::*; + use crate::test_util::kafka::{ + create_topics, entries_with_random_data, new_namespace, EntryBuilder, + }; + + // Stores test context for a region. + struct RegionContext { + ns: NamespaceImpl, + entry_builder: EntryBuilder, + expected: Vec, + flushed_entry_id: EntryId, + } + + /// Prepares for a test in that a log store is constructed and a collection of topics is created. + async fn prepare( + test_name: &str, + num_topics: usize, + broker_endpoints: Vec, + ) -> (KafkaLogStore, Vec) { + let topics = create_topics( + num_topics, + |i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()), + &broker_endpoints, + ) + .await; + + let config = KafkaConfig { + broker_endpoints, + max_batch_size: ReadableSize::kb(32), + ..Default::default() + }; + let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + + // Appends a no-op record to each topic. + for topic in topics.iter() { + let last_entry_id = logstore + .append(EntryImpl { + data: vec![], + id: 0, + ns: new_namespace(topic, 0), + }) + .await + .unwrap() + .last_entry_id; + assert_eq!(last_entry_id, 0); + } + + (logstore, topics) + } + + /// Creates a vector containing indexes of all regions if the `all` is true. + /// Otherwise, creates a subset of the indexes. The cardinality of the subset + /// is nearly a quarter of that of the universe set. + fn all_or_subset(all: bool, num_regions: usize) -> Vec { + assert!(num_regions > 0); + let amount = if all { + num_regions + } else { + (num_regions / 4).max(1) + }; + (0..num_regions as u64).choose_multiple(&mut rand::thread_rng(), amount) + } + + /// Builds entries for regions specified by `which`. Builds large entries if `large` is true. + /// Returns the aggregated entries. + fn build_entries( + region_contexts: &mut HashMap, + which: &[u64], + large: bool, + ) -> Vec { + let mut aggregated = Vec::with_capacity(which.len()); + for region_id in which { + let ctx = region_contexts.get_mut(region_id).unwrap(); + // Builds entries for the region. + ctx.expected = if !large { + entries_with_random_data(3, &ctx.entry_builder) + } else { + // Builds a large entry of size 256KB which is way greater than the configured `max_batch_size` which is 32KB. + let large_entry = ctx.entry_builder.with_data([b'1'; 256 * 1024]); + vec![large_entry] + }; + // Aggregates entries of all regions. + aggregated.push(ctx.expected.clone()); + } + aggregated.into_iter().flatten().collect() + } + + /// Starts a test with: + /// * `test_name` - The name of the test. + /// * `num_topics` - Number of topics to be created in the preparation phase. + /// * `num_regions` - Number of regions involved in the test. + /// * `num_appends` - Number of append operations to be performed. + /// * `all` - All regions will be involved in an append operation if `all` is true. Otherwise, + /// an append operation will only randomly choose a subset of regions. + /// * `large` - Builds large entries for each region is `large` is true. + async fn test_with( + test_name: &str, + num_topics: usize, + num_regions: usize, + num_appends: usize, + all: bool, + large: bool, + ) { + let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else { + warn!("The endpoints is empty, skipping the test {test_name}"); + return; + }; + let broker_endpoints = broker_endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); + + let (logstore, topics) = prepare(test_name, num_topics, broker_endpoints).await; + let mut region_contexts = (0..num_regions) + .map(|i| { + let topic = &topics[i % topics.len()]; + let ns = new_namespace(topic, i as u64); + let entry_builder = EntryBuilder::new(ns.clone()); + ( + i as u64, + RegionContext { + ns, + entry_builder, + expected: Vec::new(), + flushed_entry_id: 0, + }, + ) + }) + .collect(); + + for _ in 0..num_appends { + // Appends entries for a subset of regions. + let which = all_or_subset(all, num_regions); + let entries = build_entries(&mut region_contexts, &which, large); + let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids; + + // Reads entries for regions and checks for each region that the gotten entries are identical with the expected ones. + for region_id in which { + let ctx = ®ion_contexts[®ion_id]; + let stream = logstore + .read(&ctx.ns, ctx.flushed_entry_id + 1) + .await + .unwrap(); + let got = stream + .collect::>() + .await + .into_iter() + .flat_map(|x| x.unwrap()) + .collect::>(); + assert_eq!(ctx.expected, got); + } + + // Simulates a flush for regions. + for (region_id, last_entry_id) in last_entry_ids { + let ctx = region_contexts.get_mut(®ion_id).unwrap(); + ctx.flushed_entry_id = last_entry_id; + } + } + } + + /// Appends entries for one region and checks all entries can be read successfully. + #[tokio::test] + async fn test_one_region() { + test_with("test_one_region", 1, 1, 1, true, false).await; + } + + /// Appends entries for multiple regions and checks entries for each region can be read successfully. + /// A topic is assigned only a single region. + #[tokio::test] + async fn test_multi_regions_disjoint() { + test_with("test_multi_regions_disjoint", 5, 5, 1, true, false).await; + } + + /// Appends entries for multiple regions and checks entries for each region can be read successfully. + /// A topic is assigned multiple regions. + #[tokio::test] + async fn test_multi_regions_overlapped() { + test_with("test_multi_regions_overlapped", 5, 20, 1, true, false).await; + } + + /// Appends entries for multiple regions and checks entries for each region can be read successfully. + /// A topic may be assigned multiple regions. The append operation repeats for a several iterations. + /// Each append operation will only append entries for a subset of randomly chosen regions. + #[tokio::test] + async fn test_multi_appends() { + test_with("test_multi_appends", 5, 20, 3, false, false).await; + } + + /// Appends large entries for multiple regions and checks entries for each region can be read successfully. + /// A topic may be assigned multiple regions. + #[tokio::test] + async fn test_append_large_entries() { + test_with("test_append_large_entries", 5, 20, 3, true, true).await; + } +} diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index 71a2cd1db61e..9ff92064549f 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -149,6 +149,7 @@ impl RecordProducer { } /// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records. + /// Returns the offset of the last successfully produced record. pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); diff --git a/src/log-store/src/test_util.rs b/src/log-store/src/test_util.rs index 973d6d3f9720..ce5ba3eb854f 100644 --- a/src/log-store/src/test_util.rs +++ b/src/log-store/src/test_util.rs @@ -12,4 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(test)] +pub mod kafka; pub mod log_store_util; diff --git a/src/log-store/src/test_util/kafka.rs b/src/log-store/src/test_util/kafka.rs new file mode 100644 index 000000000000..7107c6e5c3f1 --- /dev/null +++ b/src/log-store/src/test_util/kafka.rs @@ -0,0 +1,126 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering}; +use std::sync::Mutex; + +use common_meta::wal::KafkaWalTopic as Topic; +use rand::distributions::Alphanumeric; +use rand::rngs::ThreadRng; +use rand::{thread_rng, Rng}; +use rskafka::client::ClientBuilder; +use store_api::logstore::EntryId; + +use crate::kafka::{EntryImpl, NamespaceImpl}; + +/// Creates `num_topiocs` number of topics each will be decorated by the given decorator. +pub async fn create_topics( + num_topics: usize, + decorator: F, + broker_endpoints: &[String], +) -> Vec +where + F: Fn(usize) -> String, +{ + assert!(!broker_endpoints.is_empty()); + let client = ClientBuilder::new(broker_endpoints.to_vec()) + .build() + .await + .unwrap(); + let ctrl_client = client.controller_client().unwrap(); + let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics) + .map(|i| { + let topic = decorator(i); + let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500); + (topic, task) + }) + .unzip(); + futures::future::try_join_all(tasks).await.unwrap(); + topics +} + +/// Creates a new Kafka namespace with the given topic and region id. +pub fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl { + NamespaceImpl { + topic: topic.to_string(), + region_id, + } +} + +/// A builder for building entries for a namespace. +pub struct EntryBuilder { + /// The namespace of the entries. + ns: NamespaceImpl, + /// The next entry id to allocate. It starts from 0 by default. + next_entry_id: AtomicEntryId, + /// A generator for supporting random data generation. + /// Wrapped with Mutex> to provide interior mutability. + rng: Mutex>, +} + +impl EntryBuilder { + /// Creates an EntryBuilder for the given namespace. + pub fn new(ns: NamespaceImpl) -> Self { + Self { + ns, + next_entry_id: AtomicEntryId::new(0), + rng: Mutex::new(Some(thread_rng())), + } + } + + /// Sets the next entry id to the given entry id. + pub fn next_entry_id(self, entry_id: EntryId) -> Self { + Self { + next_entry_id: AtomicEntryId::new(entry_id), + ..self + } + } + + /// Skips the next `step` entry ids and returns the next entry id after the stepping. + pub fn skip(&mut self, step: EntryId) -> EntryId { + let old = self.next_entry_id.fetch_add(step, Ordering::Relaxed); + old + step + } + + /// Builds an entry with the given data. + pub fn with_data>(&self, data: D) -> EntryImpl { + EntryImpl { + data: data.as_ref().to_vec(), + id: self.alloc_entry_id(), + ns: self.ns.clone(), + } + } + + /// Builds an entry with random data. + pub fn with_random_data(&self) -> EntryImpl { + self.with_data(self.make_random_data()) + } + + fn alloc_entry_id(&self) -> EntryId { + self.next_entry_id.fetch_add(1, Ordering::Relaxed) + } + + fn make_random_data(&self) -> Vec { + let mut guard = self.rng.lock().unwrap(); + let rng = guard.as_mut().unwrap(); + (0..42).map(|_| rng.sample(Alphanumeric)).collect() + } +} + +/// Builds a batch of entries each with random data. +pub fn entries_with_random_data(batch_size: usize, builder: &EntryBuilder) -> Vec { + (0..batch_size) + .map(|_| builder.with_random_data()) + .collect() +} diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 16809c26b1a1..903de87ee4f0 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -19,9 +19,11 @@ use std::collections::HashMap; use common_config::wal::WalOptions; use common_error::ext::ErrorExt; -use crate::logstore::entry::{Entry, Id as EntryId}; +use crate::logstore::entry::Entry; +pub use crate::logstore::entry::Id as EntryId; use crate::logstore::entry_stream::SendableEntryStream; -use crate::logstore::namespace::{Id as NamespaceId, Namespace}; +pub use crate::logstore::namespace::Id as NamespaceId; +use crate::logstore::namespace::Namespace; pub mod entry; pub mod entry_stream;