From f21799140a19fe8d44b78c5b97931bed9b2696f3 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sat, 23 Dec 2023 01:46:37 +0000 Subject: [PATCH] test: add a naive test for topic manager --- .../meta/src/wal/kafka/topic_manager.rs | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 4773871ae134..231e3ed9ef60 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -174,8 +174,13 @@ impl TopicManager { #[cfg(test)] mod tests { + use std::env; + + use common_telemetry::info; + use super::*; use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::{self}; // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. #[tokio::test] @@ -201,4 +206,25 @@ mod tests { assert_eq!(topics, restored_topics); } + + #[tokio::test] + async fn test_topic_manager() { + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + common_telemetry::init_default_ut_logging(); + + if endpoints.is_empty() { + info!("The endpoints is empty, skipping the test."); + return; + } + // TODO: supports topic prefix + let kv_backend = Arc::new(MemoryKvBackend::new()); + let mut config = KafkaConfig::default(); + config.replication_factor = 1; + config.broker_endpoints = endpoints + .split(",") + .map(|s| s.to_string()) + .collect::>(); + let manager = TopicManager::new(config, kv_backend); + manager.start().await.unwrap(); + } }