diff --git a/Cargo.lock b/Cargo.lock index 8b9c75c08773..4d37d4f9ef74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4522,6 +4522,7 @@ dependencies = [ "store-api", "tokio", "tokio-util", + "uuid", ] [[package]] diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index dd85e2933c22..f5923c772090 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -40,3 +40,4 @@ tokio.workspace = true common-meta = { workspace = true, features = ["testing"] } common-test-util.workspace = true rand.workspace = true +uuid.workspace = true diff --git a/src/log-store/src/kafka/util.rs b/src/log-store/src/kafka/util.rs index 61059b16451f..86243e38d61f 100644 --- a/src/log-store/src/kafka/util.rs +++ b/src/log-store/src/kafka/util.rs @@ -14,3 +14,5 @@ pub mod offset; pub mod record; +#[cfg(test)] +mod test_util; diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index 7d45165514a7..21dac1603397 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -295,11 +295,11 @@ mod tests { use common_base::readable_size::ReadableSize; use common_config::wal::KafkaConfig; - use rand::Rng; + use uuid::Uuid; use super::*; use crate::kafka::client_manager::ClientManager; - + use crate::kafka::util::test_util::run_test_with_kafka_wal; // Implements some utility methods for testing. impl Default for Record { fn default() -> Self { @@ -544,21 +544,24 @@ mod tests { #[tokio::test] async fn test_produce_large_entry() { - let topic = format!("greptimedb_wal_topic_{}", rand::thread_rng().gen::()); - let ns = NamespaceImpl { - region_id: 1, - topic, - }; - let entry = new_test_entry([b'1'; 2000000], 0, ns.clone()); - let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]); - - // TODO(niebayes): get broker endpoints from env vars. - let config = KafkaConfig { - broker_endpoints: vec!["localhost:9092".to_string()], - max_batch_size: ReadableSize::mb(1), - ..Default::default() - }; - let manager = Arc::new(ClientManager::try_new(&config).await.unwrap()); - producer.produce(&manager).await.unwrap(); + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + let topic = format!("greptimedb_wal_topic_{}", Uuid::new_v4()); + let ns = NamespaceImpl { + region_id: 1, + topic, + }; + let entry = new_test_entry([b'1'; 2000000], 0, ns.clone()); + let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]); + let config = KafkaConfig { + broker_endpoints, + max_batch_size: ReadableSize::mb(1), + ..Default::default() + }; + let manager = Arc::new(ClientManager::try_new(&config).await.unwrap()); + producer.produce(&manager).await.unwrap(); + }) + }) + .await } } diff --git a/src/log-store/src/kafka/util/test_util.rs b/src/log-store/src/kafka/util/test_util.rs new file mode 100644 index 000000000000..015e5a3cba6e --- /dev/null +++ b/src/log-store/src/kafka/util/test_util.rs @@ -0,0 +1,36 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; + +use common_telemetry::warn; +use futures_util::future::BoxFuture; + +pub async fn run_test_with_kafka_wal(test: F) +where + F: FnOnce(Vec) -> BoxFuture<'static, ()>, +{ + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + if endpoints.is_empty() { + warn!("The endpoints is empty, skipping the test"); + return; + } + + let endpoints = endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); + + test(endpoints).await +}