Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(remote_wal): add unit tests for kafka remote wal #2993

Merged
merged 34 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
81804eb
test: add unit tests
niebayes Dec 25, 2023
a7f639f
feat: introduce kafka runtime backed by testcontainers
niebayes Dec 27, 2023
1e3cafe
test: add test for kafka runtime
niebayes Dec 27, 2023
90fd40a
fix: format
niebayes Dec 27, 2023
796aeae
chore: make kafka image ready to be used
niebayes Dec 28, 2023
1851393
feat: add entry builder
niebayes Dec 28, 2023
ef031ee
tmp
niebayes Dec 29, 2023
f7954dc
test: add unit tests for client manager
niebayes Dec 30, 2023
0b503a5
test: add some unit tests for kafka log store
niebayes Dec 30, 2023
3d489ff
chore: resolve some todos
niebayes Dec 31, 2023
21fbddc
chore: resolve some todos
niebayes Dec 31, 2023
84541ed
test: add unit tests for kafka log store
niebayes Jan 1, 2024
a676ba8
chore: add deprecate develop branch warning
waynexia Dec 28, 2023
7ac84f9
chore: merge with branch main
niebayes Jan 1, 2024
68980c1
tmp: ready to move unit tests to an indie dir
niebayes Jan 1, 2024
d6f2b34
test: update unit tests for client manager
niebayes Jan 2, 2024
b06dc94
test: add unit tests for meta srv remote wal
niebayes Jan 2, 2024
070404b
fix: license
niebayes Jan 2, 2024
40eccc9
fix: test
niebayes Jan 3, 2024
815e03a
refactor: kafka image
niebayes Jan 3, 2024
1c504aa
doc: add doc example for kafka image
niebayes Jan 3, 2024
fd31088
chore: migrate kafka image to an indie PR
niebayes Jan 3, 2024
883c6d1
fix: conflicts
niebayes Jan 5, 2024
d38a16f
fix: CR
niebayes Jan 5, 2024
826d5f2
fix: conflicts
niebayes Jan 5, 2024
cf68a87
fix: CR
niebayes Jan 5, 2024
5de92f2
fix: test
niebayes Jan 6, 2024
ae057d0
fix: CR
niebayes Jan 8, 2024
b017551
fix: update Cargo.toml
niebayes Jan 8, 2024
48d967c
fix: CR
niebayes Jan 8, 2024
006b27a
fix: conflicts
niebayes Jan 8, 2024
c117dd8
feat: skip test if no endpoints env
niebayes Jan 8, 2024
8076038
fix: format
niebayes Jan 8, 2024
341ae67
test: rewrite parallel test with barrier
niebayes Jan 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub struct KafkaConfig {
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress log entries.
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-stream.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
Expand Down Expand Up @@ -49,5 +50,6 @@ tonic.workspace = true
[dev-dependencies]
chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }
1 change: 1 addition & 0 deletions src/common/meta/src/wal/kafka/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
/// Kafka wal topic.
/// Publishers publish log entries to the topic while subscribers pull log entries from the topic.
/// A topic is simply a string right now. But it may be more complex in the future.
// TODO(niebayes): remove the Topic alias.
pub type Topic = String;
75 changes: 55 additions & 20 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
// TODO(niebayes): maybe add a guard to ensure all topics in the topic pool are created.
topic_pool: Vec<Topic>,
topic_selector: TopicSelectorRef,
pub(crate) topic_pool: Vec<Topic>,
pub(crate) topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
}

Expand Down Expand Up @@ -168,7 +167,7 @@ impl TopicManager {
vec![Record {
key: None,
value: None,
timestamp: rskafka::chrono::Utc::now(),
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::NoCompression,
Expand Down Expand Up @@ -242,11 +241,14 @@ impl TopicManager {
mod tests {
use std::env;

use chrono::format::Fixed;
use common_telemetry::info;
use common_test_util::get_broker_endpoints;
use common_test_util::wal::kafka::topic_decorator::{Affix, TopicDecorator};
use common_test_util::wal::kafka::{create_topics, BROKER_ENDPOINTS_KEY};

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::{self};

// Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
#[tokio::test]
Expand All @@ -273,26 +275,59 @@ mod tests {
assert_eq!(topics, restored_topics);
}

/// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
async fn test_topic_manager() {
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();
async fn test_alloc_topics() {
let broker_endpoints = get_broker_endpoints!(BROKER_ENDPOINTS_KEY);
// Constructs topics that should be created.
let decorator = TopicDecorator::default()
.with_prefix(Affix::Fixed("test_alloc_topics".to_string()))
.with_suffix(Affix::TimeNow);
let topics = (0..256)
.map(|i| decorator.decorate(&format!("topic_{i}")))
.collect::<Vec<_>>();

if endpoints.is_empty() {
info!("The endpoints is empty, skipping the test.");
return;
}
// TODO: supports topic prefix
let kv_backend = Arc::new(MemoryKvBackend::new());
// Creates a topic manager.
let config = KafkaConfig {
replication_factor: 1,
broker_endpoints: endpoints
.split(',')
.map(|s| s.to_string())
.collect::<Vec<_>>(),
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()
};
let manager = TopicManager::new(config, kv_backend);
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut manager = TopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
manager.topic_pool = topics.clone();
// Replaces the default selector with a round-robin selector without shuffled.
manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());
manager.start().await.unwrap();

// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| manager.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);

// Selects exactly the number of `num_topics` topics in a batching manner.
let got = manager
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);

// Selects more than the number of `num_topics` topics.
let got = manager
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
}
}
8 changes: 8 additions & 0 deletions src/common/meta/src/wal/kafka/topic_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ impl TopicSelector for RoundRobinTopicSelector {
mod tests {
use super::*;

/// Tests that a selector behaves as expected when the given topic pool is empty.
#[test]
fn test_empty_topic_pool() {
let topic_pool = vec![];
let selector = RoundRobinTopicSelector::default();
assert!(selector.select(&topic_pool).is_err());
}

#[test]
fn test_round_robin_topic_selector() {
let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect();
Expand Down
53 changes: 52 additions & 1 deletion src/common/meta/src/wal/options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,16 @@ pub fn allocate_region_wal_options(

#[cfg(test)]
mod tests {
use common_test_util::get_broker_endpoints;
use common_test_util::wal::kafka::topic_decorator::{Affix, TopicDecorator};
use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY;

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal::kafka::topic_selector::RoundRobinTopicSelector;
use crate::wal::kafka::KafkaConfig;

// Tests the wal options allocator could successfully allocate raft-engine wal options.
// Note: tests for allocator with kafka are integration tests.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
Expand All @@ -128,4 +133,50 @@ mod tests {
.collect();
assert_eq!(got, expected);
}

// Tests that the wal options allocator could successfully allocate Kafka wal options.
#[tokio::test]
async fn test_allocator_with_kafka() {
let broker_endpoints = get_broker_endpoints!(BROKER_ENDPOINTS_KEY);
// Constructs topics that should be created.
let decorator = TopicDecorator::default()
.with_prefix(Affix::Fixed("test_allocator_with_kafka".to_string()))
.with_suffix(Affix::TimeNow);
let topics = (0..256)
.map(|i| decorator.decorate(&format!("topic_{i}")))
.collect::<Vec<_>>();

// Creates a topic manager.
let config = KafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
topic_manager.topic_pool = topics.clone();
// Replaces the default selector with a round-robin selector without shuffled.
topic_manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());

// Creates an options allocator.
let wal_config = WalConfig::Kafka(config.clone());
let allocator = WalOptionsAllocator::Kafka(topic_manager);
allocator.start().await.unwrap();

let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();

// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
}
}
6 changes: 6 additions & 0 deletions src/common/test-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ edition.workspace = true
license.workspace = true

[dependencies]
chrono.workspace = true
common-config.workspace = true
futures.workspace = true
once_cell.workspace = true
rand.workspace = true
rskafka.workspace = true
niebayes marked this conversation as resolved.
Show resolved Hide resolved
store-api.workspace = true
tempfile.workspace = true
tokio.workspace = true
1 change: 1 addition & 0 deletions src/common/test-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::LazyLock;

pub mod ports;
pub mod temp_dir;
pub mod wal;
niebayes marked this conversation as resolved.
Show resolved Hide resolved

// Rust is working on an env possibly named `CARGO_WORKSPACE_DIR` to find the root path to the
// workspace, see https://github.com/rust-lang/cargo/issues/3946.
Expand Down
15 changes: 15 additions & 0 deletions src/common/test-util/src/wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod kafka;
66 changes: 66 additions & 0 deletions src/common/test-util/src/wal/kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod topic_decorator;

use common_config::wal::KafkaWalTopic as Topic;
use rskafka::client::ClientBuilder;

use crate::wal::kafka::topic_decorator::TopicDecorator;

pub const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS";

/// Gets broker endpoints from environment variables with the given key.
/// Returns ["localhost:9092"] if no environment variables set for broker endpoints.
#[macro_export]
macro_rules! get_broker_endpoints {
($key:expr) => {{
let broker_endpoints = std::env::var($key)
.unwrap_or("localhost:9092".to_string())
.split(',')
.map(ToString::to_string)
.collect::<Vec<_>>();
assert!(!broker_endpoints.is_empty());
broker_endpoints
}};
}
niebayes marked this conversation as resolved.
Show resolved Hide resolved

/// Creates `num_topiocs` number of topics from the seed topic which are going to be decorated with the given TopicDecorator.
/// A default seed `topic` will be used if the provided seed is None.
pub async fn create_topics(
num_topics: usize,
decorator: TopicDecorator,
broker_endpoints: &[String],
seed: Option<&str>,
) -> Vec<Topic> {
assert!(!broker_endpoints.is_empty());

let client = ClientBuilder::new(broker_endpoints.to_vec())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();

let seed = seed.unwrap_or("topic");
let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics)
.map(|i| {
let topic = decorator.decorate(&format!("{seed}_{i}"));
let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500);
(topic, task)
})
.unzip();
futures::future::try_join_all(tasks).await.unwrap();

topics
}
niebayes marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading