diff --git a/Cargo.lock b/Cargo.lock index 355b828073e9..4161fd5e1353 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -972,16 +972,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "bollard-stubs" -version = "1.42.0-rc.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed59b5c00048f48d7af971b71f800fdf23e858844a6f9e4d32ca72e9399e7864" -dependencies = [ - "serde", - "serde_with 1.14.0", -] - [[package]] name = "borsh" version = "1.3.0" @@ -1640,7 +1630,7 @@ dependencies = [ "rskafka", "serde", "serde_json", - "serde_with 3.4.0", + "serde_with", "toml 0.8.8", ] @@ -1853,7 +1843,7 @@ dependencies = [ "rskafka", "serde", "serde_json", - "serde_with 3.4.0", + "serde_with", "snafu", "store-api", "strum 0.25.0", @@ -1993,7 +1983,6 @@ dependencies = [ "rskafka", "store-api", "tempfile", - "testcontainers", "tokio", ] @@ -2352,16 +2341,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "darling" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" -dependencies = [ - "darling_core 0.13.4", - "darling_macro 0.13.4", -] - [[package]] name = "darling" version = "0.14.4" @@ -2382,20 +2361,6 @@ dependencies = [ "darling_macro 0.20.3", ] -[[package]] -name = "darling_core" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim 0.10.0", - "syn 1.0.109", -] - [[package]] name = "darling_core" version = "0.14.4" @@ -2424,17 +2389,6 @@ dependencies = [ "syn 2.0.43", ] -[[package]] -name = "darling_macro" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" -dependencies = [ - "darling_core 0.13.4", - "quote", - "syn 1.0.109", -] - [[package]] name = "darling_macro" version = "0.14.4" @@ -5044,7 +4998,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_with 3.4.0", + "serde_with", "smallvec", "snafu", "store-api", @@ -8458,16 +8412,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_with" -version = "1.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" -dependencies = [ - "serde", - "serde_with_macros 1.5.2", -] - [[package]] name = "serde_with" version = "3.4.0" @@ -8481,22 +8425,10 @@ dependencies = [ "indexmap 2.1.0", "serde", "serde_json", - "serde_with_macros 3.4.0", + "serde_with_macros", "time", ] -[[package]] -name = "serde_with_macros" -version = "1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" -dependencies = [ - "darling 0.13.4", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "serde_with_macros" version = "3.4.0" @@ -9538,23 +9470,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" -[[package]] -name = "testcontainers" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83d2931d7f521af5bae989f716c3fa43a6af9af7ec7a5e21b59ae40878cec00" -dependencies = [ - "bollard-stubs", - "futures", - "hex", - "hmac", - "log", - "rand", - "serde", - "serde_json", - "sha2", -] - [[package]] name = "tests-integration" version = "0.5.0" @@ -9618,7 +9533,6 @@ dependencies = [ "substrait 0.5.0", "table", "tempfile", - "testcontainers", "time", "tokio", "tokio-postgres", diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index 0884c626c2cf..be5b5a5275d8 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -13,5 +13,4 @@ rand.workspace = true rskafka.workspace = true store-api.workspace = true tempfile.workspace = true -testcontainers = "0.15.0" tokio.workspace = true diff --git a/src/common/test-util/src/wal/kafka.rs b/src/common/test-util/src/wal/kafka.rs index db9d49b4be29..728c6978757f 100644 --- a/src/common/test-util/src/wal/kafka.rs +++ b/src/common/test-util/src/wal/kafka.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod image; pub mod topic_decorator; use common_config::wal::KafkaWalTopic as Topic; diff --git a/src/common/test-util/src/wal/kafka/image.rs b/src/common/test-util/src/wal/kafka/image.rs deleted file mode 100644 index ff801ea3dfec..000000000000 --- a/src/common/test-util/src/wal/kafka/image.rs +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use testcontainers::core::{ContainerState, ExecCommand, WaitFor}; - -const IMAGE_NAME: &str = "confluentinc/cp-kafka"; -const IMAGE_TAG: &str = "7.4.3"; -/// Through which port the Zookeeper node listens for external traffics, e.g. traffics from the Kafka node. -const ZOOKEEPER_PORT: u16 = 2181; -/// Through which port the Kafka node listens for internal traffics, i.e. traffics between Kafka nodes in the same Kafka cluster. -const KAFKA_LISTENER_PORT: u16 = 19092; -/// Through which port the Kafka node listens for external traffics, e.g. traffics from Kafka clients. -const KAFKA_ADVERTISED_LISTENER_PORT: u16 = 9092; - -#[derive(Debug, Clone, Default)] -pub struct ImageArgs; - -impl testcontainers::ImageArgs for ImageArgs { - fn into_iterator(self) -> Box> { - Box::new( - vec![ - "/bin/bash".to_string(), - "-c".to_string(), - format!( - r#" - echo 'clientPort={}' > zookeeper.properties; - echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties; - echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties; - zookeeper-server-start zookeeper.properties & - . /etc/confluent/docker/bash-config && - /etc/confluent/docker/configure && - /etc/confluent/docker/launch - "#, - ZOOKEEPER_PORT, - ), - ] - .into_iter(), - ) - } -} - -/// # Example -/// ```rust -/// // Starts a Kafka container. -/// let port = KAFKA_ADVERTISED_LISTENER_PORT; -/// let docker = DockerCli::default(); -/// let container = docker.run(Image::default()); -/// // Gets the broker endpoints of the containerized Kafka node. -/// let broker_endpoints = vec![format!("127.0.0.1:{}", container.get_host_port_ipv4(port))]; -/// // Do something with the broker endpoints, for e.g. building a Kafka client. -/// let client = ClientBuilder::new(broker_endpoints).build().await.unwrap(); -/// ``` -pub struct Image { - env_vars: HashMap, -} - -impl Default for Image { - fn default() -> Self { - Self { - env_vars: build_env_vars(), - } - } -} - -impl testcontainers::Image for Image { - type Args = ImageArgs; - - /// The name of the Kafka image hosted in the docker hub. - fn name(&self) -> String { - IMAGE_NAME.to_string() - } - - /// The tag of the kafka image hosted in the docker hub. - /// Warning: please use a tag with long-term support. Do not use `latest` or any other tags that - /// the underlying image may suddenly change. - fn tag(&self) -> String { - IMAGE_TAG.to_string() - } - - /// The runtime is regarded ready to be used if all ready conditions are met. - /// Warning: be sure to update the conditions when necessary if the image is altered. - fn ready_conditions(&self) -> Vec { - vec![WaitFor::message_on_stdout( - "started (kafka.server.KafkaServer)", - )] - } - - /// The environment variables required to run the runtime. - /// Warning: be sure to update the environment variables when necessary if the image is altered. - fn env_vars(&self) -> Box + '_> { - Box::new(self.env_vars.iter()) - } - - /// The runtime is running in a docker container and has its own network. In order to be used by the host machine, - /// the runtime must expose an internal port. For e.g. assume the runtime has an internal port 9092, - /// and the `exposed_port` is set to 9092, then the host machine can get a mapped external port with - /// `container.get_host_port_ipv4(exposed_port)`. With the mapped port, the host machine could connect with the runtime. - fn expose_ports(&self) -> Vec { - vec![KAFKA_ADVERTISED_LISTENER_PORT] - } - - /// Specifies a collection of commands to be executed when the container is started. - fn exec_after_start(&self, cs: ContainerState) -> Vec { - let mut commands = vec![]; - let cmd = format!( - "kafka-configs --alter --bootstrap-server 0.0.0.0:{} --entity-type brokers --entity-name 1 --add-config advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]", - KAFKA_LISTENER_PORT, - cs.host_port_ipv4(KAFKA_ADVERTISED_LISTENER_PORT), - ); - let ready_conditions = vec![WaitFor::message_on_stdout( - "Checking need to trigger auto leader balancing", - )]; - commands.push(ExecCommand { - cmd, - ready_conditions, - }); - commands - } -} - -fn build_env_vars() -> HashMap { - [ - ( - "KAFKA_ZOOKEEPER_CONNECT".to_string(), - format!("localhost:{ZOOKEEPER_PORT}"), - ), - ( - "KAFKA_LISTENERS".to_string(), - format!("PLAINTEXT://0.0.0.0:{KAFKA_ADVERTISED_LISTENER_PORT},BROKER://0.0.0.0:{KAFKA_LISTENER_PORT}"), - ), - ( - "KAFKA_ADVERTISED_LISTENERS".to_string(), - format!("PLAINTEXT://localhost:{KAFKA_ADVERTISED_LISTENER_PORT},BROKER://localhost:{KAFKA_LISTENER_PORT}",), - ), - ( - "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_string(), - "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_string(), - ), - ( - "KAFKA_INTER_BROKER_LISTENER_NAME".to_string(), - "BROKER".to_string(), - ), - ("KAFKA_BROKER_ID".to_string(), "1".to_string()), - ( - "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_string(), - "1".to_string(), - ), - ] - .into() -} - -#[cfg(test)] -mod tests { - use rskafka::chrono::{TimeZone, Utc}; - use rskafka::client::partition::UnknownTopicHandling; - use rskafka::client::ClientBuilder; - use rskafka::record::Record; - use testcontainers::clients::Cli as DockerCli; - - use super::*; - - #[tokio::test] - async fn test_image() { - // Starts a Kafka container. - let port = KAFKA_ADVERTISED_LISTENER_PORT; - let docker = DockerCli::default(); - let container = docker.run(Image::default()); - - // Creates a Kafka client. - let broker_endpoints = vec![format!("127.0.0.1:{}", container.get_host_port_ipv4(port))]; - let client = ClientBuilder::new(broker_endpoints).build().await.unwrap(); - - // Creates a topic. - let topic = "test_topic"; - client - .controller_client() - .unwrap() - .create_topic(topic, 1, 1, 500) - .await - .unwrap(); - - // Produces a record. - let partition_client = client - .partition_client(topic, 0, UnknownTopicHandling::Error) - .await - .unwrap(); - let produced = vec![Record { - key: Some(b"111".to_vec()), - value: Some(b"222".to_vec()), - timestamp: Utc.timestamp_millis_opt(42).unwrap(), - headers: Default::default(), - }]; - let offset = partition_client - .produce(produced.clone(), Default::default()) - .await - .unwrap()[0]; - - // Consumes the record. - let consumed = partition_client - .fetch_records(offset, 1..4096, 500) - .await - .unwrap() - .0; - assert_eq!(produced[0], consumed[0].record); - } -} diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 377c428c2a55..8d311d3f7ea2 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -64,7 +64,6 @@ sqlx = { version = "0.6", features = [ substrait.workspace = true table.workspace = true tempfile.workspace = true -testcontainers = "0.15.0" time = "0.3" tokio.workspace = true tonic.workspace = true