Skip to content

Commit

Permalink
chore: make kafka image ready to be used
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 28, 2023
1 parent 90fd40a commit 796aeae
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 102 deletions.
10 changes: 0 additions & 10 deletions tests-integration/src/wal_util/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,3 @@

pub mod config;
mod image;
pub mod runtime;

#[macro_export]
macro_rules! start_kafka {
() => {
let _ = $crate::wal_util::kafka::runtime::Runtime::default()
.start()
.await;
};
}
30 changes: 23 additions & 7 deletions tests-integration/src/wal_util/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,26 @@ use std::collections::HashMap;

use testcontainers::core::WaitFor;

/// Through which port the Zookeeper node listens for external traffics, i.e. traffics from the Kafka node.
/// Through which port the Zookeeper node listens for external traffics, e.g. traffics from the Kafka node.
pub const ZOOKEEPER_PORT: u16 = 2181;
/// Through which port the Kafka node listens for internal traffics, i.e. traffics between Kafka nodes in the same Kafka cluster.
pub const KAFAK_LISTENER_PORT: u16 = 19092;
pub const KAFKA_LISTENER_PORT: u16 = 19092;
/// Through which port the Kafka node listens for external traffics, e.g. traffics from Kafka clients.
pub const KAFKA_ADVERTISED_LISTENER_PORT: u16 = 9092;

/// Configurations for a Kafka runtime.
/// Since the runtime corresponds to a cluster with a single Kafka node and a single Zookeeper node, the ports are all singletons.
#[derive(Debug, Clone)]
pub struct Config {
/// The name of the Kafka image hosted in the docker hub.
pub image_name: String,
/// The tag of the kafka image hosted in the docker hub.
/// Warning: please use a tag with long-term support. Do not use `latest` or any other tags that
/// the underlying image may suddenly change.
pub image_tag: String,
/// Through which port clients could connect with the runtime.
/// The runtime is running in a docker container and has its own network. In order to be used by the host machine,
/// the runtime must expose an internal port. For e.g. assume the runtime has an internal port 9092,
/// and the `exposed_port` is set to 9092, then the host machine can get a mapped external port with
/// `container.get_host_port_ipv4(exposed_port)`. With the mapped port, the host machine could connect with the runtime.
pub exposed_port: u16,
/// The runtime is regarded ready to be used if all ready conditions are met.
/// Warning: be sure to update the conditions when necessary if the image is altered.
Expand All @@ -42,6 +45,15 @@ pub struct Config {
pub env_vars: HashMap<String, String>,
}

impl Config {
pub fn with_exposed_port(port: u16) -> Self {
Self {
exposed_port: port,
..Default::default()
}
}
}

impl Default for Config {
fn default() -> Self {
Self {
Expand All @@ -54,7 +66,7 @@ impl Default for Config {
)],
env_vars: build_env_vars(
ZOOKEEPER_PORT,
KAFAK_LISTENER_PORT,
KAFKA_LISTENER_PORT,
KAFKA_ADVERTISED_LISTENER_PORT,
),
}
Expand All @@ -73,11 +85,15 @@ fn build_env_vars(
),
(
"KAFKA_LISTENERS".to_string(),
format!("PLAINTEXT://0.0.0.0:{kafka_advertised_listener_port},PLAINTEXT://0.0.0.0:{kafka_listener_port}"),
format!("PLAINTEXT://0.0.0.0:{kafka_advertised_listener_port},BROKER://0.0.0.0:{kafka_listener_port}"),
),
(
"KAFKA_ADVERTISED_LISTENERS".to_string(),
format!("PLAINTEXT://localhost:{kafka_advertised_listener_port},PLAINTEXT://localhost:{kafka_listener_port}",),
format!("PLAINTEXT://localhost:{kafka_advertised_listener_port},BROKER://localhost:{kafka_listener_port}",),
),
(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_string(),
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_string(),
),
(
"KAFKA_INTER_BROKER_LISTENER_NAME".to_string(),
Expand Down
87 changes: 84 additions & 3 deletions tests-integration/src/wal_util/kafka/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use testcontainers::core::{ContainerState, ExecCommand, WaitFor};

use crate::wal_util::kafka::config::{Config, ZOOKEEPER_PORT};
use crate::wal_util::kafka::config::{
Config, KAFKA_ADVERTISED_LISTENER_PORT, KAFKA_LISTENER_PORT, ZOOKEEPER_PORT,
};

#[derive(Debug, Clone, Default)]
pub struct ImageArgs;
Expand Down Expand Up @@ -48,6 +50,13 @@ pub struct Image {
config: Config,
}

impl Image {
#[allow(unused)]
pub fn new(config: Config) -> Self {
Self { config }
}
}

impl testcontainers::Image for Image {
type Args = ImageArgs;

Expand All @@ -71,7 +80,79 @@ impl testcontainers::Image for Image {
vec![self.config.exposed_port]
}

fn exec_after_start(&self, _cs: ContainerState) -> Vec<ExecCommand> {
vec![]
fn exec_after_start(&self, cs: ContainerState) -> Vec<ExecCommand> {
let mut commands = vec![];
let cmd = format!(
"kafka-configs --alter --bootstrap-server 0.0.0.0:{} --entity-type brokers --entity-name 1 --add-config advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]",
KAFKA_LISTENER_PORT,
cs.host_port_ipv4(KAFKA_ADVERTISED_LISTENER_PORT),
);
let ready_conditions = vec![WaitFor::message_on_stdout(
"Checking need to trigger auto leader balancing",
)];
commands.push(ExecCommand {
cmd,
ready_conditions,
});
commands
}
}

#[cfg(test)]
mod tests {
use chrono::TimeZone;
use rskafka::chrono::Utc;
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::ClientBuilder;
use rskafka::record::Record;
use testcontainers::clients::Cli as DockerCli;

use crate::wal_util::kafka::config::{Config, KAFKA_ADVERTISED_LISTENER_PORT};
use crate::wal_util::kafka::image::Image;

#[tokio::test]
async fn test_image() {
// Starts a Kafka container.
let port = KAFKA_ADVERTISED_LISTENER_PORT;
let config = Config::with_exposed_port(port);
let docker = DockerCli::default();
let container = docker.run(Image::new(config));

// Creates a Kafka client.
let bootstrap_brokers = vec![format!("127.0.0.1:{}", container.get_host_port_ipv4(port))];
let client = ClientBuilder::new(bootstrap_brokers).build().await.unwrap();

// Creates a topic.
let topic = "test_topic";
client
.controller_client()
.unwrap()
.create_topic(topic, 1, 1, 500)
.await
.unwrap();

// Produces a record.
let partition_client = client
.partition_client(topic, 0, UnknownTopicHandling::Error)
.await
.unwrap();
let produced = vec![Record {
key: Some(b"111".to_vec()),
value: Some(b"222".to_vec()),
timestamp: Utc.timestamp_millis_opt(42).unwrap(),
headers: Default::default(),
}];
let offset = partition_client
.produce(produced.clone(), Default::default())
.await
.unwrap()[0];

// Consumes the record.
let consumed = partition_client
.fetch_records(offset, 1..4096, 500)
.await
.unwrap()
.0;
assert_eq!(produced[0], consumed[0].record);
}
}
82 changes: 0 additions & 82 deletions tests-integration/src/wal_util/kafka/runtime.rs

This file was deleted.

0 comments on commit 796aeae

Please sign in to comment.