Skip to content

Commit

Permalink
fix: check env before running kafka test
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 6, 2024
1 parent 7830363 commit 38b90a3
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ tokio.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
rand.workspace = true
uuid.workspace = true
2 changes: 2 additions & 0 deletions src/log-store/src/kafka/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@

pub mod offset;
pub mod record;
#[cfg(test)]
mod test_util;
39 changes: 21 additions & 18 deletions src/log-store/src/kafka/util/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,11 @@ mod tests {

use common_base::readable_size::ReadableSize;
use common_config::wal::KafkaConfig;
use rand::Rng;
use uuid::Uuid;

use super::*;
use crate::kafka::client_manager::ClientManager;

use crate::kafka::util::test_util::run_test_with_kafka_wal;
// Implements some utility methods for testing.
impl Default for Record {
fn default() -> Self {
Expand Down Expand Up @@ -544,21 +544,24 @@ mod tests {

#[tokio::test]
async fn test_produce_large_entry() {
let topic = format!("greptimedb_wal_topic_{}", rand::thread_rng().gen::<usize>());
let ns = NamespaceImpl {
region_id: 1,
topic,
};
let entry = new_test_entry([b'1'; 2000000], 0, ns.clone());
let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]);

// TODO(niebayes): get broker endpoints from env vars.
let config = KafkaConfig {
broker_endpoints: vec!["localhost:9092".to_string()],
max_batch_size: ReadableSize::mb(1),
..Default::default()
};
let manager = Arc::new(ClientManager::try_new(&config).await.unwrap());
producer.produce(&manager).await.unwrap();
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let topic = format!("greptimedb_wal_topic_{}", Uuid::new_v4());
let ns = NamespaceImpl {
region_id: 1,
topic,
};
let entry = new_test_entry([b'1'; 2000000], 0, ns.clone());
let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]);
let config = KafkaConfig {
broker_endpoints,
max_batch_size: ReadableSize::mb(1),
..Default::default()
};
let manager = Arc::new(ClientManager::try_new(&config).await.unwrap());
producer.produce(&manager).await.unwrap();
})
})
.await
}
}
36 changes: 36 additions & 0 deletions src/log-store/src/kafka/util/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env;

use common_telemetry::warn;
use futures_util::future::BoxFuture;

pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return;
}

let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();

test(endpoints).await
}

0 comments on commit 38b90a3

Please sign in to comment.