diff --git a/tests-integration/src/wal_util/kafka.rs b/tests-integration/src/wal_util/kafka.rs index ee71c6410b1c..2e14f90fc499 100644 --- a/tests-integration/src/wal_util/kafka.rs +++ b/tests-integration/src/wal_util/kafka.rs @@ -15,3 +15,10 @@ pub mod config; mod image; pub mod runtime; + +#[macro_export] +macro_rules! start_kafka { + () => { + let _ = $crate::wal_util::kafka::runtime::Runtime::default().start().await; + }; +} diff --git a/tests-integration/src/wal_util/kafka/runtime.rs b/tests-integration/src/wal_util/kafka/runtime.rs index 24ff66de8c7e..f593d6cf733f 100644 --- a/tests-integration/src/wal_util/kafka/runtime.rs +++ b/tests-integration/src/wal_util/kafka/runtime.rs @@ -30,9 +30,53 @@ impl Runtime { } } -#[macro_export] -macro_rules! start_kafka { - () => { - let _ = Runtime::default().start(); - }; +#[cfg(test)] +mod tests { + use rskafka::chrono::Utc; + use rskafka::client::partition::UnknownTopicHandling; + use rskafka::client::ClientBuilder; + use rskafka::record::Record; + + use crate::start_kafka; + + #[tokio::test] + async fn test_runtime() { + start_kafka!(); + + let bootstrap_brokers = vec![9092.to_string()]; + let client = ClientBuilder::new(bootstrap_brokers).build().await.unwrap(); + + // Creates a topic. + let topic = "test_topic"; + client + .controller_client() + .unwrap() + .create_topic(topic, 1, 1, 500) + .await + .unwrap(); + + // Produces a record. + let partition_client = client + .partition_client(topic, 0, UnknownTopicHandling::Error) + .await + .unwrap(); + let produced = vec![Record { + key: Some(b"111".to_vec()), + value: Some(b"222".to_vec()), + timestamp: Utc::now(), + headers: Default::default(), + }]; + let offset = partition_client + .produce(produced.clone(), Default::default()) + .await + .unwrap()[0]; + + // Consumes the record. + let consumed = partition_client + .fetch_records(offset, 1..4096, 500) + .await + .unwrap() + .0; + assert_eq!(produced[0], consumed[0].record); + } }