Skip to content

Commit

Permalink
test: add test for kafka runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 27, 2023
1 parent a7f639f commit 1e3cafe
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
7 changes: 7 additions & 0 deletions tests-integration/src/wal_util/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@
pub mod config;
mod image;
pub mod runtime;

#[macro_export]
macro_rules! start_kafka {
() => {
let _ = $crate::wal_util::kafka::runtime::Runtime::default().start().await;
};
}
54 changes: 49 additions & 5 deletions tests-integration/src/wal_util/kafka/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,53 @@ impl Runtime {
}
}

#[macro_export]
macro_rules! start_kafka {
() => {
let _ = Runtime::default().start();
};
#[cfg(test)]
mod tests {
use rskafka::chrono::Utc;
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::ClientBuilder;
use rskafka::record::Record;

use crate::start_kafka;

#[tokio::test]
async fn test_runtime() {
start_kafka!();

let bootstrap_brokers = vec![9092.to_string()];
let client = ClientBuilder::new(bootstrap_brokers).build().await.unwrap();

// Creates a topic.
let topic = "test_topic";
client
.controller_client()
.unwrap()
.create_topic(topic, 1, 1, 500)
.await
.unwrap();

// Produces a record.
let partition_client = client
.partition_client(topic, 0, UnknownTopicHandling::Error)
.await
.unwrap();
let produced = vec![Record {
key: Some(b"111".to_vec()),
value: Some(b"222".to_vec()),
timestamp: Utc::now(),
headers: Default::default(),
}];
let offset = partition_client
.produce(produced.clone(), Default::default())
.await
.unwrap()[0];

// Consumes the record.
let consumed = partition_client
.fetch_records(offset, 1..4096, 500)
.await
.unwrap()
.0;
assert_eq!(produced[0], consumed[0].record);
}
}

0 comments on commit 1e3cafe

Please sign in to comment.