diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 4773871ae134..0efe573acc83 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -174,8 +174,13 @@ impl TopicManager { #[cfg(test)] mod tests { + use std::env; + + use common_telemetry::info; + use super::*; use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::{self}; // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. #[tokio::test] @@ -201,4 +206,27 @@ mod tests { assert_eq!(topics, restored_topics); } + + #[tokio::test] + async fn test_topic_manager() { + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + common_telemetry::init_default_ut_logging(); + + if endpoints.is_empty() { + info!("The endpoints is empty, skipping the test."); + return; + } + // TODO: supports topic prefix + let kv_backend = Arc::new(MemoryKvBackend::new()); + let config = KafkaConfig { + replication_factor: 1, + broker_endpoints: endpoints + .split(',') + .map(|s| s.to_string()) + .collect::>(), + ..Default::default() + }; + let manager = TopicManager::new(config, kv_backend); + manager.start().await.unwrap(); + } }