diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 47cc7590041e..d95c495db14b 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -175,7 +175,9 @@ jobs: - name: Setup etcd server working-directory: tests-integration/fixtures/etcd run: docker compose -f docker-compose-standalone.yml up -d --wait - #TODO(niebaye) Add a step to setup kafka clusters. Maybe add a docker file for starting kafka clusters. + - name: Setup kafka server + working-directory: tests-integration/fixtures/kafka + run: docker compose -f docker-compose-standalone.yml up -d --wait - name: Run nextest cases run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard env: @@ -187,6 +189,7 @@ jobs: GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }} GT_S3_REGION: ${{ secrets.S3_REGION }} GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 + GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 UNITTEST_LOG_DIR: "__unittest_logs" - name: Codecov upload uses: codecov/codecov-action@v2 diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 4773871ae134..0efe573acc83 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -174,8 +174,13 @@ impl TopicManager { #[cfg(test)] mod tests { + use std::env; + + use common_telemetry::info; + use super::*; use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::{self}; // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. #[tokio::test] @@ -201,4 +206,27 @@ mod tests { assert_eq!(topics, restored_topics); } + + #[tokio::test] + async fn test_topic_manager() { + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + common_telemetry::init_default_ut_logging(); + + if endpoints.is_empty() { + info!("The endpoints is empty, skipping the test."); + return; + } + // TODO: supports topic prefix + let kv_backend = Arc::new(MemoryKvBackend::new()); + let config = KafkaConfig { + replication_factor: 1, + broker_endpoints: endpoints + .split(',') + .map(|s| s.to_string()) + .collect::>(), + ..Default::default() + }; + let manager = TopicManager::new(config, kv_backend); + manager.start().await.unwrap(); + } } diff --git a/tests-integration/fixtures/kafka/docker-compose-standalone.yml b/tests-integration/fixtures/kafka/docker-compose-standalone.yml new file mode 100644 index 000000000000..9c257418a5d8 --- /dev/null +++ b/tests-integration/fixtures/kafka/docker-compose-standalone.yml @@ -0,0 +1,21 @@ +version: '3.8' +services: + kafka: + image: bitnami/kafka:3.6.0 + container_name: kafka + ports: + - 9092:9092 + environment: + # KRaft settings + KAFKA_KRAFT_CLUSTER_ID: Kmp-xkTnSf-WWXhWmiorDg + KAFKA_ENABLE_KRAFT: "yes" + KAFKA_CFG_NODE_ID: "1" + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181 + # Listeners + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181 + ALLOW_PLAINTEXT_LISTENER: "yes" + KAFKA_BROKER_ID: "1"