From 0b503a5851efaff5cc09f0f7b94b0f3a4d0ab4b4 Mon Sep 17 00:00:00 2001 From: niebayes Date: Sat, 30 Dec 2023 23:27:24 +0800 Subject: [PATCH] test: add some unit tests for kafka log store --- src/log-store/src/kafka/client_manager.rs | 2 +- src/log-store/src/kafka/log_store.rs | 132 +++++++++++++++++- src/log-store/src/test_util/kafka.rs | 4 +- .../src/test_util/kafka/entry_builder.rs | 16 ++- .../src/test_util/kafka/topic_builder.rs | 2 +- tests-integration/tests/wal.rs | 131 ----------------- 6 files changed, 149 insertions(+), 138 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 72d0b839a755..ef4361b3d700 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -148,7 +148,7 @@ mod tests { async fn ensure_topics_exist(topics: &[Topic], client_manager: &ClientManager) { let client_pool = client_manager.client_pool.read().await; let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic)); - assert_eq!(all_exist, true); + assert!(all_exist); } async fn test_which(test_name: &str) { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index d78f846f9277..75f34b95caf0 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use common_config::wal::{KafkaConfig, WalOptions}; +use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic, WalOptions}; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; @@ -47,6 +47,26 @@ impl KafkaLogStore { config: config.clone(), }) } + + /// Gets the end offset of the last record in a Kafka topic. + /// Warning: this method is intended to be used only in testing. + // TODO(niebayes): use this to test that the initial offset is 1 for a Kafka log store in that + // a no-op record is successfully appended into each topic. + #[allow(unused)] + pub async fn get_offset(&self, topic: &Topic) -> EntryId { + let client = self + .client_manager + .get_or_insert(topic) + .await + .unwrap() + .raw_client; + client + .get_offset(OffsetAt::Latest) + .await + .map(TryInto::try_into) + .unwrap() + .unwrap() + } } #[async_trait::async_trait] @@ -267,3 +287,113 @@ impl LogStore for KafkaLogStore { Ok(()) } } + +#[cfg(test)] +mod tests { + use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY; + + use super::*; + use crate::get_broker_endpoints_from_env; + use crate::test_util::kafka::topic_builder::Affix; + use crate::test_util::kafka::{create_topics, EntryBuilder, TopicBuilder}; + + fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl { + NamespaceImpl { + region_id, + topic: topic.to_string(), + } + } + + // TODO(niebayes): change `expected` to &[EntryImpl]. + async fn check_entries( + ns: &NamespaceImpl, + start_offset: EntryId, + expected: Vec, + logstore: &KafkaLogStore, + ) { + let stream = logstore.read(ns, start_offset).await.unwrap(); + let got = stream + .collect::>() + .await + .into_iter() + .flat_map(|x| x.unwrap()) + .collect::>(); + assert_eq!(expected, got); + } + + /// Appends entries for one region and checks all entries can be read successfully. + #[tokio::test] + async fn test_one_region() { + let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY); + let topic_builder = TopicBuilder::default() + .with_prefix(Affix::Fixed("test_one_region".to_string())) + .with_suffix(Affix::TimeNow); + let topic = create_topics(1, topic_builder, &broker_endpoints).await[0].clone(); + + let config = KafkaConfig { + broker_endpoints, + ..Default::default() + }; + let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + + let ns = new_namespace(&topic, 0); + let entry_builder = EntryBuilder::new(ns.clone()); + let entry = entry_builder.with_random_data(); + + let last_entry_id = logstore.append(entry.clone()).await.unwrap().last_entry_id; + check_entries(&ns, last_entry_id, vec![entry], &logstore).await; + + let entries = (0..10) + .map(|_| entry_builder.with_random_data()) + .collect::>(); + let last_entry_id = logstore + .append_batch(entries.clone()) + .await + .unwrap() + .last_entry_ids[&ns.region_id]; + check_entries(&ns, last_entry_id, entries, &logstore).await; + } + + /// Appends entries for multiple regions and checks entries for each region can be read successfully. + /// A topic is assigned only a single region. + #[tokio::test] + async fn test_multi_regions_disjoint() { + let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY); + let topic_builder = TopicBuilder::default() + .with_prefix(Affix::Fixed("test_multi_regions_disjoint".to_string())) + .with_suffix(Affix::TimeNow); + let topics = create_topics(10, topic_builder, &broker_endpoints).await; + + let config = KafkaConfig { + broker_endpoints, + ..Default::default() + }; + let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + + let (region_namespaces, mut entry_builders): (Vec<_>, Vec<_>) = topics + .iter() + .enumerate() + .map(|(i, topic)| { + let ns = new_namespace(topic, i as u64); + let entry_builder = EntryBuilder::new(ns.clone()); + (ns, entry_builder) + }) + .unzip(); + let region_entries = entry_builders + .iter_mut() + .map(|builder| builder.with_random_data_batch(5)) + .collect::>(); + let entries = region_entries.iter().flatten().cloned().collect::>(); + let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids; + + for (i, ns) in region_namespaces.iter().enumerate() { + let expected = region_entries[i].clone(); + check_entries(ns, last_entry_ids[&ns.region_id], expected, &logstore).await; + } + } + + /// Appends entries for multiple regions and checks entries for each region can be read successfully. + /// A topic may be assigned multiple regions. + #[tokio::test] + async fn test_multi_regions_overlapped() {} +} diff --git a/src/log-store/src/test_util/kafka.rs b/src/log-store/src/test_util/kafka.rs index f2af612e1f58..d80eb838d1d0 100644 --- a/src/log-store/src/test_util/kafka.rs +++ b/src/log-store/src/test_util/kafka.rs @@ -26,7 +26,7 @@ pub use crate::test_util::kafka::topic_builder::TopicBuilder; macro_rules! get_broker_endpoints_from_env { ($key:expr) => {{ let broker_endpoints = std::env::var($key) - .unwrap() + .unwrap_or("localhost:9092".to_string()) .split(',') .map(ToString::to_string) .collect::>(); @@ -36,7 +36,7 @@ macro_rules! get_broker_endpoints_from_env { } /// Creates `num_topiocs` number of topics with the given TopicBuilder. -/// Requests for creating these topics on the Kafka cluster. +/// Requests for creating these topics on the Kafka cluster specified with the `broker_endpoints`. pub async fn create_topics( num_topics: usize, mut builder: TopicBuilder, diff --git a/src/log-store/src/test_util/kafka/entry_builder.rs b/src/log-store/src/test_util/kafka/entry_builder.rs index 3e234c78d6e4..9bfb41027abe 100644 --- a/src/log-store/src/test_util/kafka/entry_builder.rs +++ b/src/log-store/src/test_util/kafka/entry_builder.rs @@ -27,7 +27,7 @@ const DEFAULT_DATA: &[u8; 10] = b"[greptime]"; /// A builder for building entries for a namespace. pub struct EntryBuilder { /// The namespace of the entries. - ns: NamespaceImpl, + pub ns: NamespaceImpl, /// The next entry id to allocate. It starts from 0 by default. next_entry_id: AtomicEntryId, /// A generator for supporting random data generation. @@ -78,6 +78,7 @@ impl EntryBuilder { } /// Builds an entry with the default data. + // TODO(niebayes): may be remove this method since it's not used. pub fn with_default_data(&self) -> EntryImpl { EntryImpl { data: DEFAULT_DATA.to_vec(), @@ -86,6 +87,17 @@ impl EntryBuilder { } } + /// Builds a batch of entries each with random data. + pub fn with_random_data_batch(&self, batch_size: usize) -> Vec { + (0..batch_size) + .map(|_| EntryImpl { + data: self.make_random_data(), + id: self.alloc_entry_id(), + ns: self.ns.clone(), + }) + .collect() + } + /// Builds an entry with random data. pub fn with_random_data(&self) -> EntryImpl { EntryImpl { @@ -105,7 +117,7 @@ impl EntryBuilder { let amount = rng.gen_range(0..self.data_pool.len()); self.data_pool .choose_multiple(&mut rng, amount) - .map(|x| *x) + .copied() .collect() } } diff --git a/src/log-store/src/test_util/kafka/topic_builder.rs b/src/log-store/src/test_util/kafka/topic_builder.rs index be51d8c10f49..037390914c8b 100644 --- a/src/log-store/src/test_util/kafka/topic_builder.rs +++ b/src/log-store/src/test_util/kafka/topic_builder.rs @@ -70,7 +70,7 @@ impl TopicBuilder { } /// Builds a topic by inserting a prefix and a suffix into the given topic. - pub fn build(&mut self, body: &Topic) -> Topic { + pub fn build(&mut self, body: &str) -> Topic { const ITERS: usize = 24; for _ in 0..ITERS { let topic = format!( diff --git a/tests-integration/tests/wal.rs b/tests-integration/tests/wal.rs index 94c6c3b12cb3..8bf09f55c550 100644 --- a/tests-integration/tests/wal.rs +++ b/tests-integration/tests/wal.rs @@ -140,134 +140,3 @@ // .await // .unwrap(); // } - -// async fn check_entries( -// ns: &NamespaceImpl, -// start_offset: EntryId, -// expected: Vec, -// logstore: &KafkaLogStore, -// ) { -// let stream = logstore.read(ns, start_offset).await.unwrap(); -// let got = stream -// .collect::>() -// .await -// .into_iter() -// .flat_map(|x| x.unwrap()) -// .collect::>(); -// assert_eq!(expected, got); -// // for entry in expected { -// // let got = stream.next().await.unwrap().unwrap(); -// // } -// } - -// // Tests that the Kafka log store is able to write and read log entries from Kafka. -// // #[tokio::test] -// // async fn test_kafka_log_store() { -// // println!("Start running test"); - -// // // Starts a Kafka container. -// // let docker = DockerCli::default(); -// // let container = docker.run(KafkaImage::default()); - -// // println!("Started the container"); - -// // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) -// // .unwrap_or(format!( -// // "localhost:{}", -// // container.get_host_port_ipv4(DEFAULT_EXPOSED_PORT) -// // )) -// // .split(',') -// // .map(ToString::to_string) -// // .collect::>(); -// // let config = DatanodeKafkaConfig { -// // broker_endpoints, -// // ..Default::default() -// // }; -// // let logstore = KafkaLogStore::try_new(&config).await.unwrap(); - -// // println!("Started the log store"); - -// // let client = ClientBuilder::new(config.broker_endpoints.clone()) -// // .build() -// // .await -// // .unwrap() -// // .controller_client() -// // .unwrap(); - -// // println!("Created a client"); - -// // // Appends one entry. -// // let topic = "__test_kafka_log_store_topic_append"; -// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await; - -// // println!("Created a topic"); - -// // let ns = NamespaceImpl { -// // region_id: 0, -// // topic: topic.to_string(), -// // }; -// // let entry = new_test_entry(b"0", 0, ns.clone()); -// // let last_entry_id = logstore.append(entry.clone()).await.unwrap().last_entry_id; - -// // println!("Appended an entry"); - -// // check_entries(&ns, last_entry_id, vec![entry], &logstore).await; - -// // // Appends a batch of entries. -// // // Region 1, 2 are mapped to topic 1, -// // let topic = "__test_kafka_log_store_topic_append_batch_1"; -// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await; - -// // println!("Created a topic"); - -// // let ns_1 = NamespaceImpl { -// // region_id: 1, -// // topic: topic.to_string(), -// // }; -// // let ns_2 = NamespaceImpl { -// // region_id: 2, -// // topic: topic.to_string(), -// // }; - -// // // Region 3 is mapped to topic 2. -// // let topic = "__test_kafka_log_store_topic_append_batch_2"; -// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await; - -// // println!("Created a topic"); - -// // let ns_3 = NamespaceImpl { -// // region_id: 3, -// // topic: topic.to_string(), -// // }; - -// // // Constructs a batch of entries. -// // let entries_1 = vec![ -// // new_test_entry(b"1", 0, ns_1.clone()), -// // new_test_entry(b"1", 1, ns_1.clone()), -// // ]; -// // let entries_2 = vec![ -// // new_test_entry(b"2", 2, ns_2.clone()), -// // new_test_entry(b"2", 3, ns_2.clone()), -// // ]; -// // let entries_3 = vec![ -// // new_test_entry(b"3", 7, ns_3.clone()), -// // new_test_entry(b"3", 8, ns_3.clone()), -// // ]; -// // let entries = vec![entries_1.clone(), entries_2.clone(), entries_3.clone()] -// // .into_iter() -// // .flatten() -// // .collect::>(); - -// // let last_entry_ids = logstore -// // .append_batch(entries.clone()) -// // .await -// // .unwrap() -// // .last_entry_ids; - -// // // Reads entries for region 1. -// // check_entries(&ns_1, last_entry_ids[&1], entries_1, &logstore).await; -// // // Reads entries from region 2. -// // check_entries(&ns_2, last_entry_ids[&2], entries_2, &logstore).await; -// // // Reads entries from region 3. -// // check_entries(&ns_3, last_entry_ids[&3], entries_3, &logstore).await; -// // }