diff --git a/tests-integration/src/wal_util/kafka.rs b/tests-integration/src/wal_util/kafka.rs index abde462d720d..d4d0f8773d91 100644 --- a/tests-integration/src/wal_util/kafka.rs +++ b/tests-integration/src/wal_util/kafka.rs @@ -14,13 +14,3 @@ pub mod config; mod image; -pub mod runtime; - -#[macro_export] -macro_rules! start_kafka { - () => { - let _ = $crate::wal_util::kafka::runtime::Runtime::default() - .start() - .await; - }; -} diff --git a/tests-integration/src/wal_util/kafka/config.rs b/tests-integration/src/wal_util/kafka/config.rs index 4da810dfa93b..ca5136aec1ae 100644 --- a/tests-integration/src/wal_util/kafka/config.rs +++ b/tests-integration/src/wal_util/kafka/config.rs @@ -16,15 +16,15 @@ use std::collections::HashMap; use testcontainers::core::WaitFor; -/// Through which port the Zookeeper node listens for external traffics, i.e. traffics from the Kafka node. +/// Through which port the Zookeeper node listens for external traffics, e.g. traffics from the Kafka node. pub const ZOOKEEPER_PORT: u16 = 2181; /// Through which port the Kafka node listens for internal traffics, i.e. traffics between Kafka nodes in the same Kafka cluster. -pub const KAFAK_LISTENER_PORT: u16 = 19092; +pub const KAFKA_LISTENER_PORT: u16 = 19092; /// Through which port the Kafka node listens for external traffics, e.g. traffics from Kafka clients. pub const KAFKA_ADVERTISED_LISTENER_PORT: u16 = 9092; /// Configurations for a Kafka runtime. -/// Since the runtime corresponds to a cluster with a single Kafka node and a single Zookeeper node, the ports are all singletons. +#[derive(Debug, Clone)] pub struct Config { /// The name of the Kafka image hosted in the docker hub. pub image_name: String, @@ -32,7 +32,10 @@ pub struct Config { /// Warning: please use a tag with long-term support. Do not use `latest` or any other tags that /// the underlying image may suddenly change. pub image_tag: String, - /// Through which port clients could connect with the runtime. + /// The runtime is running in a docker container and has its own network. In order to be used by the host machine, + /// the runtime must expose an internal port. For e.g. assume the runtime has an internal port 9092, + /// and the `exposed_port` is set to 9092, then the host machine can get a mapped external port with + /// `container.get_host_port_ipv4(exposed_port)`. With the mapped port, the host machine could connect with the runtime. pub exposed_port: u16, /// The runtime is regarded ready to be used if all ready conditions are met. /// Warning: be sure to update the conditions when necessary if the image is altered. @@ -42,6 +45,15 @@ pub struct Config { pub env_vars: HashMap, } +impl Config { + pub fn with_exposed_port(port: u16) -> Self { + Self { + exposed_port: port, + ..Default::default() + } + } +} + impl Default for Config { fn default() -> Self { Self { @@ -54,7 +66,7 @@ impl Default for Config { )], env_vars: build_env_vars( ZOOKEEPER_PORT, - KAFAK_LISTENER_PORT, + KAFKA_LISTENER_PORT, KAFKA_ADVERTISED_LISTENER_PORT, ), } @@ -73,11 +85,15 @@ fn build_env_vars( ), ( "KAFKA_LISTENERS".to_string(), - format!("PLAINTEXT://0.0.0.0:{kafka_advertised_listener_port},PLAINTEXT://0.0.0.0:{kafka_listener_port}"), + format!("PLAINTEXT://0.0.0.0:{kafka_advertised_listener_port},BROKER://0.0.0.0:{kafka_listener_port}"), ), ( "KAFKA_ADVERTISED_LISTENERS".to_string(), - format!("PLAINTEXT://localhost:{kafka_advertised_listener_port},PLAINTEXT://localhost:{kafka_listener_port}",), + format!("PLAINTEXT://localhost:{kafka_advertised_listener_port},BROKER://localhost:{kafka_listener_port}",), + ), + ( + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_string(), + "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_string(), ), ( "KAFKA_INTER_BROKER_LISTENER_NAME".to_string(), diff --git a/tests-integration/src/wal_util/kafka/image.rs b/tests-integration/src/wal_util/kafka/image.rs index 90cb164efa4b..78f513dd8406 100644 --- a/tests-integration/src/wal_util/kafka/image.rs +++ b/tests-integration/src/wal_util/kafka/image.rs @@ -14,7 +14,9 @@ use testcontainers::core::{ContainerState, ExecCommand, WaitFor}; -use crate::wal_util::kafka::config::{Config, ZOOKEEPER_PORT}; +use crate::wal_util::kafka::config::{ + Config, KAFKA_ADVERTISED_LISTENER_PORT, KAFKA_LISTENER_PORT, ZOOKEEPER_PORT, +}; #[derive(Debug, Clone, Default)] pub struct ImageArgs; @@ -48,6 +50,13 @@ pub struct Image { config: Config, } +impl Image { + #[allow(unused)] + pub fn new(config: Config) -> Self { + Self { config } + } +} + impl testcontainers::Image for Image { type Args = ImageArgs; @@ -71,7 +80,79 @@ impl testcontainers::Image for Image { vec![self.config.exposed_port] } - fn exec_after_start(&self, _cs: ContainerState) -> Vec { - vec![] + fn exec_after_start(&self, cs: ContainerState) -> Vec { + let mut commands = vec![]; + let cmd = format!( + "kafka-configs --alter --bootstrap-server 0.0.0.0:{} --entity-type brokers --entity-name 1 --add-config advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]", + KAFKA_LISTENER_PORT, + cs.host_port_ipv4(KAFKA_ADVERTISED_LISTENER_PORT), + ); + let ready_conditions = vec![WaitFor::message_on_stdout( + "Checking need to trigger auto leader balancing", + )]; + commands.push(ExecCommand { + cmd, + ready_conditions, + }); + commands + } +} + +#[cfg(test)] +mod tests { + use chrono::TimeZone; + use rskafka::chrono::Utc; + use rskafka::client::partition::UnknownTopicHandling; + use rskafka::client::ClientBuilder; + use rskafka::record::Record; + use testcontainers::clients::Cli as DockerCli; + + use crate::wal_util::kafka::config::{Config, KAFKA_ADVERTISED_LISTENER_PORT}; + use crate::wal_util::kafka::image::Image; + + #[tokio::test] + async fn test_image() { + // Starts a Kafka container. + let port = KAFKA_ADVERTISED_LISTENER_PORT; + let config = Config::with_exposed_port(port); + let docker = DockerCli::default(); + let container = docker.run(Image::new(config)); + + // Creates a Kafka client. + let bootstrap_brokers = vec![format!("127.0.0.1:{}", container.get_host_port_ipv4(port))]; + let client = ClientBuilder::new(bootstrap_brokers).build().await.unwrap(); + + // Creates a topic. + let topic = "test_topic"; + client + .controller_client() + .unwrap() + .create_topic(topic, 1, 1, 500) + .await + .unwrap(); + + // Produces a record. + let partition_client = client + .partition_client(topic, 0, UnknownTopicHandling::Error) + .await + .unwrap(); + let produced = vec![Record { + key: Some(b"111".to_vec()), + value: Some(b"222".to_vec()), + timestamp: Utc.timestamp_millis_opt(42).unwrap(), + headers: Default::default(), + }]; + let offset = partition_client + .produce(produced.clone(), Default::default()) + .await + .unwrap()[0]; + + // Consumes the record. + let consumed = partition_client + .fetch_records(offset, 1..4096, 500) + .await + .unwrap() + .0; + assert_eq!(produced[0], consumed[0].record); } } diff --git a/tests-integration/src/wal_util/kafka/runtime.rs b/tests-integration/src/wal_util/kafka/runtime.rs deleted file mode 100644 index f593d6cf733f..000000000000 --- a/tests-integration/src/wal_util/kafka/runtime.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use testcontainers::clients::Cli as DockerCli; -use testcontainers::Container; - -use crate::wal_util::kafka::image::Image; - -/// A runtime running a cluster consisting of a single Kafka node and a single ZooKeeper node. -#[derive(Default)] -pub struct Runtime { - docker: DockerCli, -} - -impl Runtime { - /// Starts the runtime. The runtime terminates when the returned container is dropped. - pub async fn start(&self) -> Container { - self.docker.run(Image::default()) - } -} - -#[cfg(test)] -mod tests { - use rskafka::chrono::Utc; - use rskafka::client::partition::UnknownTopicHandling; - use rskafka::client::ClientBuilder; - use rskafka::record::Record; - - use crate::start_kafka; - - #[tokio::test] - async fn test_runtime() { - start_kafka!(); - - let bootstrap_brokers = vec![9092.to_string()]; - let client = ClientBuilder::new(bootstrap_brokers).build().await.unwrap(); - - // Creates a topic. - let topic = "test_topic"; - client - .controller_client() - .unwrap() - .create_topic(topic, 1, 1, 500) - .await - .unwrap(); - - // Produces a record. - let partition_client = client - .partition_client(topic, 0, UnknownTopicHandling::Error) - .await - .unwrap(); - let produced = vec![Record { - key: Some(b"111".to_vec()), - value: Some(b"222".to_vec()), - timestamp: Utc::now(), - headers: Default::default(), - }]; - let offset = partition_client - .produce(produced.clone(), Default::default()) - .await - .unwrap()[0]; - - // Consumes the record. - let consumed = partition_client - .fetch_records(offset, 1..4096, 500) - .await - .unwrap() - .0; - assert_eq!(produced[0], consumed[0].record); - } -}