Skip to content

Commit

Permalink
test: add unit tests for client manager
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 30, 2023
1 parent ef031ee commit f7954dc
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 67 deletions.
111 changes: 51 additions & 60 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
use common_telemetry::debug;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
use tokio::sync::RwLock;

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};

Expand Down Expand Up @@ -69,7 +67,7 @@ pub(crate) struct ClientManager {
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: TokioMutex<HashMap<Topic, Client>>,
client_pool: RwLock<HashMap<Topic, Client>>,
}

impl ClientManager {
Expand All @@ -90,28 +88,36 @@ impl ClientManager {
broker_endpoints: config.broker_endpoints.clone(),
})?;

debug!("Created a ClientManager");

Ok(Self {
config: config.clone(),
client_factory: client,
client_pool: TokioMutex::new(HashMap::new()),
client_pool: RwLock::new(HashMap::new()),
})
}

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
let mut client_pool = self.client_pool.lock().await;
if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) {
entry.insert(self.try_create_client(topic).await?);
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
return Ok(client.clone());
}
// Manullay releases the read lock.
drop(client_pool);

// Acquires the write lock.
let mut client_pool = self.client_pool.write().await;
match client_pool.get(topic) {
Some(client) => Ok(client.clone()),
None => {
let client = self.try_create_client(topic).await?;
client_pool.insert(topic.clone(), client.clone());
Ok(client)
}
}
Ok(client_pool[topic].clone())
}

async fn try_create_client(&self, topic: &Topic) -> Result<Client> {
debug!("Try to create client for topic {}", topic);

// Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error.
// That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
// The reconnecting won't stop until succeed or a different error returns.
Expand All @@ -125,8 +131,6 @@ impl ClientManager {
})
.map(Arc::new)?;

debug!("Created a client for topic {}", topic);

Ok(Client::new(raw_client, &self.config))
}
}
Expand All @@ -136,74 +140,61 @@ mod tests {
use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY;

use super::*;
use crate::get_broker_endpoints_from_env;
use crate::test_util::kafka::create_topics;
use crate::test_util::kafka::topic_builder::{Affix, TopicBuilder};

/// Checks clients for the given topics are created.
async fn ensure_topics_exist(topics: &[Topic], client_manager: &ClientManager) {
let client_pool = client_manager.client_pool.lock().await;
let client_pool = client_manager.client_pool.read().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
assert_eq!(all_exist, true);
}

/// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_sequential() {
let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
.unwrap()
.split(',')
.map(ToString::to_string)
.collect::<Vec<_>>();
if broker_endpoints.is_empty() {
return;
}

let client = ClientBuilder::new(broker_endpoints.clone())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();

async fn test_which(test_name: &str) {
// Creates a collection of topics in Kafka.
let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY);
let topic_builder = TopicBuilder::default()
.with_prefix(Affix::Fixed("test_sequential".to_string()))
.with_prefix(Affix::Fixed(test_name.to_string()))
.with_suffix(Affix::TimeNow);

let topics = create_topics(256, topic_builder, &broker_endpoints).await;

let config = KafkaConfig {
broker_endpoints,
..Default::default()
};
let manager = ClientManager::try_new(&config).await.unwrap();

// Constructs a collection of mock topics.
let num_topics = 256;
let topics = (0..num_topics)
.map(|i| format!("topic_{i}"))
.collect::<Vec<_>>();

// Gets all clients sequentially.
for topic in topics.iter() {
manager.get_or_insert(topic).await.unwrap();
match test_name {
"test_sequential" => {
// Gets all clients sequentially.
for topic in topics.iter() {
manager.get_or_insert(topic).await.unwrap();
}
}
"test_parallel" => {
// Gets all clients in parallel.
let tasks = topics
.iter()
.map(|topic| manager.get_or_insert(topic))
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.unwrap();
}
_ => unreachable!(),
}

ensure_topics_exist(&topics, &manager).await;
}

/// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_sequential() {
test_which("test_sequential").await;
}

/// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_parallel() {
let manager = ClientManager::try_new(&config).await.unwrap();

// Constructs a collection of mock topics.
let num_topics = 256;
let topics = (0..num_topics)
.map(|i| format!("topic_{i}"))
.collect::<Vec<_>>();

// Gets all clients in parallel.
let tasks = topics
.iter()
.map(|topic| manager.get_or_insert(topic))
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.unwrap();
ensure_topics_exist(&topics, &manager).await;
test_which("test_parallel").await;
}
}
1 change: 0 additions & 1 deletion src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, WalOptions};
use common_telemetry::{debug, warn};
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
Expand Down
42 changes: 39 additions & 3 deletions src/log-store/src/test_util/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,48 @@ pub mod entry_builder;
pub mod topic_builder;

use common_config::wal::KafkaWalTopic as Topic;
use rskafka::client::ClientBuilder;

pub use crate::test_util::kafka::entry_builder::EntryBuilder;
pub use crate::test_util::kafka::topic_builder::TopicBuilder;

/// Gets broker endpoints from environment variables with the given key.
#[macro_export]
macro_rules! get_broker_endpoints_from_env {
($key:expr) => {{
let broker_endpoints = std::env::var($key)
.unwrap()
.split(',')
.map(ToString::to_string)
.collect::<Vec<_>>();
assert!(!broker_endpoints.is_empty());
broker_endpoints
}};
}

/// Creates `num_topiocs` number of topics with the given TopicBuilder.
/// Requests for creating these topics on the Kafka cluster if the `broker_endpoints` is not empty.
pub fn create_topics(num_topics: usize, builder: TopicBuilder, broker_endpoints: Vec<String>) -> Vec<Topic> {

/// Requests for creating these topics on the Kafka cluster.
pub async fn create_topics(
num_topics: usize,
mut builder: TopicBuilder,
broker_endpoints: &[String],
) -> Vec<Topic> {
assert!(!broker_endpoints.is_empty());

let client = ClientBuilder::new(broker_endpoints.to_vec())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();

let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics)
.map(|i| {
let topic = builder.build(&format!("topic_{i}"));
let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500);
(topic, task)
})
.unzip();
futures::future::try_join_all(tasks).await.unwrap();

topics
}
6 changes: 3 additions & 3 deletions src/log-store/src/test_util/kafka/topic_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl ToString for Affix {
fn to_string(&self) -> String {
match self {
Affix::Fixed(s) => s.to_string(),
Affix::TimeNow => chrono::Local::now().to_string(),
Affix::TimeNow => chrono::Local::now().timestamp_millis().to_string(),
Affix::Nothing => String::default(),
}
}
Expand Down Expand Up @@ -70,13 +70,13 @@ impl TopicBuilder {
}

/// Builds a topic by inserting a prefix and a suffix into the given topic.
pub fn build(&mut self, topic: &Topic) -> Topic {
pub fn build(&mut self, body: &Topic) -> Topic {
const ITERS: usize = 24;
for _ in 0..ITERS {
let topic = format!(
"{}_{}_{}",
self.prefix.to_string(),
topic,
body,
self.suffix.to_string()
);
if !self.created.contains(&topic) {
Expand Down

0 comments on commit f7954dc

Please sign in to comment.