Skip to content

Commit

Permalink
test: add some unit tests for kafka log store
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 30, 2023
1 parent f7954dc commit 0b503a5
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 138 deletions.
2 changes: 1 addition & 1 deletion src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ mod tests {
async fn ensure_topics_exist(topics: &[Topic], client_manager: &ClientManager) {
let client_pool = client_manager.client_pool.read().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
assert_eq!(all_exist, true);
assert!(all_exist);
}

async fn test_which(test_name: &str) {
Expand Down
132 changes: 131 additions & 1 deletion src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, WalOptions};
use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic, WalOptions};
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
Expand Down Expand Up @@ -47,6 +47,26 @@ impl KafkaLogStore {
config: config.clone(),
})
}

/// Gets the end offset of the last record in a Kafka topic.
/// Warning: this method is intended to be used only in testing.
// TODO(niebayes): use this to test that the initial offset is 1 for a Kafka log store in that
// a no-op record is successfully appended into each topic.
#[allow(unused)]
pub async fn get_offset(&self, topic: &Topic) -> EntryId {
let client = self
.client_manager
.get_or_insert(topic)
.await
.unwrap()
.raw_client;
client
.get_offset(OffsetAt::Latest)
.await
.map(TryInto::try_into)
.unwrap()
.unwrap()
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -267,3 +287,113 @@ impl LogStore for KafkaLogStore {
Ok(())
}
}

#[cfg(test)]
mod tests {
use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY;

use super::*;
use crate::get_broker_endpoints_from_env;
use crate::test_util::kafka::topic_builder::Affix;
use crate::test_util::kafka::{create_topics, EntryBuilder, TopicBuilder};

fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl {
NamespaceImpl {
region_id,
topic: topic.to_string(),
}
}

// TODO(niebayes): change `expected` to &[EntryImpl].
async fn check_entries(
ns: &NamespaceImpl,
start_offset: EntryId,
expected: Vec<EntryImpl>,
logstore: &KafkaLogStore,
) {
let stream = logstore.read(ns, start_offset).await.unwrap();
let got = stream
.collect::<Vec<_>>()
.await
.into_iter()
.flat_map(|x| x.unwrap())
.collect::<Vec<_>>();
assert_eq!(expected, got);
}

/// Appends entries for one region and checks all entries can be read successfully.
#[tokio::test]
async fn test_one_region() {
let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY);
let topic_builder = TopicBuilder::default()
.with_prefix(Affix::Fixed("test_one_region".to_string()))
.with_suffix(Affix::TimeNow);
let topic = create_topics(1, topic_builder, &broker_endpoints).await[0].clone();

let config = KafkaConfig {
broker_endpoints,
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();

let ns = new_namespace(&topic, 0);
let entry_builder = EntryBuilder::new(ns.clone());
let entry = entry_builder.with_random_data();

let last_entry_id = logstore.append(entry.clone()).await.unwrap().last_entry_id;
check_entries(&ns, last_entry_id, vec![entry], &logstore).await;

let entries = (0..10)
.map(|_| entry_builder.with_random_data())
.collect::<Vec<_>>();
let last_entry_id = logstore
.append_batch(entries.clone())
.await
.unwrap()
.last_entry_ids[&ns.region_id];
check_entries(&ns, last_entry_id, entries, &logstore).await;
}

/// Appends entries for multiple regions and checks entries for each region can be read successfully.
/// A topic is assigned only a single region.
#[tokio::test]
async fn test_multi_regions_disjoint() {
let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY);
let topic_builder = TopicBuilder::default()
.with_prefix(Affix::Fixed("test_multi_regions_disjoint".to_string()))
.with_suffix(Affix::TimeNow);
let topics = create_topics(10, topic_builder, &broker_endpoints).await;

let config = KafkaConfig {
broker_endpoints,
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();

let (region_namespaces, mut entry_builders): (Vec<_>, Vec<_>) = topics
.iter()
.enumerate()
.map(|(i, topic)| {
let ns = new_namespace(topic, i as u64);
let entry_builder = EntryBuilder::new(ns.clone());
(ns, entry_builder)
})
.unzip();
let region_entries = entry_builders
.iter_mut()
.map(|builder| builder.with_random_data_batch(5))
.collect::<Vec<_>>();
let entries = region_entries.iter().flatten().cloned().collect::<Vec<_>>();
let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids;

for (i, ns) in region_namespaces.iter().enumerate() {
let expected = region_entries[i].clone();
check_entries(ns, last_entry_ids[&ns.region_id], expected, &logstore).await;
}
}

/// Appends entries for multiple regions and checks entries for each region can be read successfully.
/// A topic may be assigned multiple regions.
#[tokio::test]
async fn test_multi_regions_overlapped() {}
}
4 changes: 2 additions & 2 deletions src/log-store/src/test_util/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use crate::test_util::kafka::topic_builder::TopicBuilder;
macro_rules! get_broker_endpoints_from_env {
($key:expr) => {{
let broker_endpoints = std::env::var($key)
.unwrap()
.unwrap_or("localhost:9092".to_string())
.split(',')
.map(ToString::to_string)
.collect::<Vec<_>>();
Expand All @@ -36,7 +36,7 @@ macro_rules! get_broker_endpoints_from_env {
}

/// Creates `num_topiocs` number of topics with the given TopicBuilder.
/// Requests for creating these topics on the Kafka cluster.
/// Requests for creating these topics on the Kafka cluster specified with the `broker_endpoints`.
pub async fn create_topics(
num_topics: usize,
mut builder: TopicBuilder,
Expand Down
16 changes: 14 additions & 2 deletions src/log-store/src/test_util/kafka/entry_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const DEFAULT_DATA: &[u8; 10] = b"[greptime]";
/// A builder for building entries for a namespace.
pub struct EntryBuilder {
/// The namespace of the entries.
ns: NamespaceImpl,
pub ns: NamespaceImpl,
/// The next entry id to allocate. It starts from 0 by default.
next_entry_id: AtomicEntryId,
/// A generator for supporting random data generation.
Expand Down Expand Up @@ -78,6 +78,7 @@ impl EntryBuilder {
}

/// Builds an entry with the default data.
// TODO(niebayes): may be remove this method since it's not used.
pub fn with_default_data(&self) -> EntryImpl {
EntryImpl {
data: DEFAULT_DATA.to_vec(),
Expand All @@ -86,6 +87,17 @@ impl EntryBuilder {
}
}

/// Builds a batch of entries each with random data.
pub fn with_random_data_batch(&self, batch_size: usize) -> Vec<EntryImpl> {
(0..batch_size)
.map(|_| EntryImpl {
data: self.make_random_data(),
id: self.alloc_entry_id(),
ns: self.ns.clone(),
})
.collect()
}

/// Builds an entry with random data.
pub fn with_random_data(&self) -> EntryImpl {
EntryImpl {
Expand All @@ -105,7 +117,7 @@ impl EntryBuilder {
let amount = rng.gen_range(0..self.data_pool.len());
self.data_pool
.choose_multiple(&mut rng, amount)
.map(|x| *x)
.copied()
.collect()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/test_util/kafka/topic_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TopicBuilder {
}

/// Builds a topic by inserting a prefix and a suffix into the given topic.
pub fn build(&mut self, body: &Topic) -> Topic {
pub fn build(&mut self, body: &str) -> Topic {
const ITERS: usize = 24;
for _ in 0..ITERS {
let topic = format!(
Expand Down
131 changes: 0 additions & 131 deletions tests-integration/tests/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,134 +140,3 @@
// .await
// .unwrap();
// }

// async fn check_entries(
// ns: &NamespaceImpl,
// start_offset: EntryId,
// expected: Vec<EntryImpl>,
// logstore: &KafkaLogStore,
// ) {
// let stream = logstore.read(ns, start_offset).await.unwrap();
// let got = stream
// .collect::<Vec<_>>()
// .await
// .into_iter()
// .flat_map(|x| x.unwrap())
// .collect::<Vec<_>>();
// assert_eq!(expected, got);
// // for entry in expected {
// // let got = stream.next().await.unwrap().unwrap();
// // }
// }

// // Tests that the Kafka log store is able to write and read log entries from Kafka.
// // #[tokio::test]
// // async fn test_kafka_log_store() {
// // println!("Start running test");

// // // Starts a Kafka container.
// // let docker = DockerCli::default();
// // let container = docker.run(KafkaImage::default());

// // println!("Started the container");

// // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
// // .unwrap_or(format!(
// // "localhost:{}",
// // container.get_host_port_ipv4(DEFAULT_EXPOSED_PORT)
// // ))
// // .split(',')
// // .map(ToString::to_string)
// // .collect::<Vec<_>>();
// // let config = DatanodeKafkaConfig {
// // broker_endpoints,
// // ..Default::default()
// // };
// // let logstore = KafkaLogStore::try_new(&config).await.unwrap();

// // println!("Started the log store");

// // let client = ClientBuilder::new(config.broker_endpoints.clone())
// // .build()
// // .await
// // .unwrap()
// // .controller_client()
// // .unwrap();

// // println!("Created a client");

// // // Appends one entry.
// // let topic = "__test_kafka_log_store_topic_append";
// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await;

// // println!("Created a topic");

// // let ns = NamespaceImpl {
// // region_id: 0,
// // topic: topic.to_string(),
// // };
// // let entry = new_test_entry(b"0", 0, ns.clone());
// // let last_entry_id = logstore.append(entry.clone()).await.unwrap().last_entry_id;

// // println!("Appended an entry");

// // check_entries(&ns, last_entry_id, vec![entry], &logstore).await;

// // // Appends a batch of entries.
// // // Region 1, 2 are mapped to topic 1,
// // let topic = "__test_kafka_log_store_topic_append_batch_1";
// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await;

// // println!("Created a topic");

// // let ns_1 = NamespaceImpl {
// // region_id: 1,
// // topic: topic.to_string(),
// // };
// // let ns_2 = NamespaceImpl {
// // region_id: 2,
// // topic: topic.to_string(),
// // };

// // // Region 3 is mapped to topic 2.
// // let topic = "__test_kafka_log_store_topic_append_batch_2";
// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await;

// // println!("Created a topic");

// // let ns_3 = NamespaceImpl {
// // region_id: 3,
// // topic: topic.to_string(),
// // };

// // // Constructs a batch of entries.
// // let entries_1 = vec![
// // new_test_entry(b"1", 0, ns_1.clone()),
// // new_test_entry(b"1", 1, ns_1.clone()),
// // ];
// // let entries_2 = vec![
// // new_test_entry(b"2", 2, ns_2.clone()),
// // new_test_entry(b"2", 3, ns_2.clone()),
// // ];
// // let entries_3 = vec![
// // new_test_entry(b"3", 7, ns_3.clone()),
// // new_test_entry(b"3", 8, ns_3.clone()),
// // ];
// // let entries = vec![entries_1.clone(), entries_2.clone(), entries_3.clone()]
// // .into_iter()
// // .flatten()
// // .collect::<Vec<_>>();

// // let last_entry_ids = logstore
// // .append_batch(entries.clone())
// // .await
// // .unwrap()
// // .last_entry_ids;

// // // Reads entries for region 1.
// // check_entries(&ns_1, last_entry_ids[&1], entries_1, &logstore).await;
// // // Reads entries from region 2.
// // check_entries(&ns_2, last_entry_ids[&2], entries_2, &logstore).await;
// // // Reads entries from region 3.
// // check_entries(&ns_3, last_entry_ids[&3], entries_3, &logstore).await;
// // }

0 comments on commit 0b503a5

Please sign in to comment.