diff --git a/tests/end_at_last_offsets_test.rs b/tests/end_at_last_offsets_test.rs deleted file mode 100644 index 80bd9cc..0000000 --- a/tests/end_at_last_offsets_test.rs +++ /dev/null @@ -1,100 +0,0 @@ -use deltalake_core::DeltaTable; -use kafka_delta_ingest::IngestOptions; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use uuid::Uuid; - -#[allow(dead_code)] -mod helpers; - -#[derive(Debug, Serialize, Deserialize)] -struct Msg { - id: u32, - city: String, -} - -impl Msg { - fn new(id: u32) -> Self { - Self { - id, - city: "default".to_string(), - } - } -} - -#[tokio::test] -async fn end_at_initial_offsets() { - helpers::init_logger(); - let topic = format!("end_at_offset_{}", Uuid::new_v4()); - - let table = helpers::create_local_table( - json!({ - "id": "integer", - "city": "string", - }), - vec!["city"], - &topic, - ); - let table = table.as_str(); - - helpers::create_topic(&topic, 3).await; - - let producer = helpers::create_producer(); - // submit 15 messages in kafka - for i in 0..15 { - helpers::send_json( - &producer, - &topic, - &serde_json::to_value(Msg::new(i)).unwrap(), - ) - .await; - } - - let (kdi, _token, rt) = helpers::create_kdi( - &topic, - table, - IngestOptions { - app_id: topic.clone(), - allowed_latency: 5, - max_messages_per_batch: 20, - min_bytes_per_file: 20, - end_at_last_offsets: true, - ..Default::default() - }, - ); - - helpers::wait_until_version_created(table, 1); - - { - // check that there's 3 records in table - let table = deltalake_core::open_table(table).await.unwrap(); - assert_eq!(table.version(), 1); - assert_eq!(count_records(table), 15); - } - - // messages in kafka - for i in 16..31 { - helpers::send_json( - &producer, - &topic, - &serde_json::to_value(Msg::new(i)).unwrap(), - ) - .await; - } - - helpers::expect_termination_within(kdi, 10).await; - rt.shutdown_background(); - - // check that there's only 3 records - let table = deltalake_core::open_table(table).await.unwrap(); - assert_eq!(table.version(), 1); - assert_eq!(count_records(table), 15); -} - -fn count_records(table: DeltaTable) -> i64 { - let mut count = 0; - for x in table.get_stats() { - count += x.as_ref().unwrap().as_ref().unwrap().num_records; - } - count -} diff --git a/tests/starting_offset_tests.rs b/tests/offset_tests.rs similarity index 60% rename from tests/starting_offset_tests.rs rename to tests/offset_tests.rs index d245b10..17dc277 100644 --- a/tests/starting_offset_tests.rs +++ b/tests/offset_tests.rs @@ -1,22 +1,96 @@ -#[allow(dead_code)] -mod helpers; - +use deltalake_core::DeltaTable; use log::{debug, info}; use rdkafka::{producer::Producer, util::Timeout}; use serde::{Deserialize, Serialize}; use serde_json::json; - -// These tests are executed serially to allow for predictable rebalance waits. -// Rebalance times vary too much to produce predictable earliest/latest seek positions -// when the local kafka container is receiving concurrent requests from other tasks. use serial_test::serial; +use std::path::Path; +use uuid::Uuid; use kafka_delta_ingest::{AutoOffsetReset, IngestOptions}; +#[allow(dead_code)] +mod helpers; -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Debug, Serialize, Deserialize)] struct TestMsg { id: u64, - date: String, + color: String, +} + +impl TestMsg { + fn new(id: u64) -> Self { + Self { + id, + color: "default".to_string(), + } + } +} + +#[tokio::test] +async fn zero_offset_issue() { + let table = "./tests/data/zero_offset"; + helpers::init_logger(); + let topic = format!("zero_offset_issue_{}", Uuid::new_v4()); + + helpers::create_topic(&topic, 1).await; + + let (kdi, token, rt) = helpers::create_kdi( + &topic, + table, + IngestOptions { + app_id: "zero_offset".to_string(), + allowed_latency: 5, + max_messages_per_batch: 1, + min_bytes_per_file: 20, + ..Default::default() + }, + ); + + { + // check that there's only 1 record in table + let table = deltalake_core::open_table(table).await.unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(count_records(table), 1); + } + + let producer = helpers::create_producer(); + + // submit 3 messages in kafka, but only 2nd and 3rd should go in as msg 0:0 already in delta + for i in 0..3 { + helpers::send_json( + &producer, + &topic, + &serde_json::to_value(TestMsg::new(i)).unwrap(), + ) + .await; + } + + let v2 = Path::new("./tests/data/zero_offset/_delta_log/00000000000000000002.json"); + let v3 = Path::new("./tests/data/zero_offset/_delta_log/00000000000000000003.json"); + + helpers::wait_until_file_created(v2); + helpers::wait_until_file_created(v3); + token.cancel(); + // if it succeeds then it means that we successfully seeked into offset 0, e.g. Offset::Beginning + kdi.await.unwrap(); + rt.shutdown_background(); + + // check that there's only 3 records + let table = deltalake_core::open_table(table).await.unwrap(); + assert_eq!(table.version(), 3); + assert_eq!(count_records(table), 3); + + //cleanup + std::fs::remove_file(v2).unwrap(); + std::fs::remove_file(v3).unwrap(); +} + +fn count_records(table: DeltaTable) -> i64 { + let mut count = 0; + for x in table.get_stats() { + count += x.as_ref().unwrap().as_ref().unwrap().num_records; + } + count } #[tokio::test] @@ -27,9 +101,9 @@ async fn test_start_from_explicit() { let table = helpers::create_local_table( json!({ "id": "integer", - "date": "string", + "color": "string", }), - vec!["date"], + vec!["color"], "starting_offsets_explicit", ); @@ -97,9 +171,9 @@ async fn test_start_from_earliest() { let table = helpers::create_local_table( json!({ "id": "integer", - "date": "string", + "color": "string", }), - vec!["date"], + vec!["color"], "starting_offsets_earliest", ); @@ -157,9 +231,9 @@ async fn test_start_from_latest() { let table = helpers::create_local_table( json! ({ "id": "integer", - "date": "string", + "color": "string", }), - vec!["date"], + vec!["color"], "starting_offsets_latest", ); @@ -234,6 +308,90 @@ async fn test_start_from_latest() { fn create_generator(starting_id: u64) -> impl Iterator { std::iter::successors(Some(starting_id), |n| Some(*n + 1)).map(|n| TestMsg { id: n, - date: "2021-09-25".to_string(), + color: "red".to_string(), }) } + +#[derive(Debug, Serialize, Deserialize)] +struct Msg { + id: u32, + city: String, +} + +impl Msg { + fn new(id: u32) -> Self { + Self { + id, + city: "default".to_string(), + } + } +} + +#[tokio::test] +async fn end_at_initial_offsets() { + helpers::init_logger(); + let topic = format!("end_at_offset_{}", Uuid::new_v4()); + + let table = helpers::create_local_table( + json!({ + "id": "integer", + "city": "string", + }), + vec!["city"], + &topic, + ); + let table = table.as_str(); + + helpers::create_topic(&topic, 3).await; + + let producer = helpers::create_producer(); + // submit 15 messages in kafka + for i in 0..15 { + helpers::send_json( + &producer, + &topic, + &serde_json::to_value(Msg::new(i)).unwrap(), + ) + .await; + } + + let (kdi, _token, rt) = helpers::create_kdi( + &topic, + table, + IngestOptions { + app_id: topic.clone(), + allowed_latency: 5, + max_messages_per_batch: 20, + min_bytes_per_file: 20, + end_at_last_offsets: true, + ..Default::default() + }, + ); + + helpers::wait_until_version_created(table, 1); + + { + // check that there's 3 records in table + let table = deltalake_core::open_table(table).await.unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(count_records(table), 15); + } + + // messages in kafka + for i in 16..31 { + helpers::send_json( + &producer, + &topic, + &serde_json::to_value(Msg::new(i)).unwrap(), + ) + .await; + } + + helpers::expect_termination_within(kdi, 10).await; + rt.shutdown_background(); + + // check that there's only 3 records + let table = deltalake_core::open_table(table).await.unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(count_records(table), 15); +} diff --git a/tests/zero_offset_issue_tests.rs b/tests/zero_offset_issue_tests.rs deleted file mode 100644 index 040627c..0000000 --- a/tests/zero_offset_issue_tests.rs +++ /dev/null @@ -1,90 +0,0 @@ -use deltalake_core::DeltaTable; -use kafka_delta_ingest::IngestOptions; -use serde::{Deserialize, Serialize}; -use std::path::Path; -use uuid::Uuid; - -#[allow(dead_code)] -mod helpers; - -#[derive(Debug, Serialize, Deserialize)] -struct TestMsg { - id: u32, - color: String, -} - -impl TestMsg { - fn new(id: u32) -> Self { - Self { - id, - color: "default".to_string(), - } - } -} - -#[tokio::test] -async fn zero_offset_issue() { - let table = "./tests/data/zero_offset"; - helpers::init_logger(); - let topic = format!("zero_offset_issue_{}", Uuid::new_v4()); - - helpers::create_topic(&topic, 1).await; - - let (kdi, token, rt) = helpers::create_kdi( - &topic, - table, - IngestOptions { - app_id: "zero_offset".to_string(), - allowed_latency: 5, - max_messages_per_batch: 1, - min_bytes_per_file: 20, - ..Default::default() - }, - ); - - { - // check that there's only 1 record in table - let table = deltalake_core::open_table(table).await.unwrap(); - assert_eq!(table.version(), 1); - assert_eq!(count_records(table), 1); - } - - let producer = helpers::create_producer(); - - // submit 3 messages in kafka, but only 2nd and 3rd should go in as msg 0:0 already in delta - for i in 0..3 { - helpers::send_json( - &producer, - &topic, - &serde_json::to_value(TestMsg::new(i)).unwrap(), - ) - .await; - } - - let v2 = Path::new("./tests/data/zero_offset/_delta_log/00000000000000000002.json"); - let v3 = Path::new("./tests/data/zero_offset/_delta_log/00000000000000000003.json"); - - helpers::wait_until_file_created(v2); - helpers::wait_until_file_created(v3); - token.cancel(); - // if it succeeds then it means that we successfully seeked into offset 0, e.g. Offset::Beginning - kdi.await.unwrap(); - rt.shutdown_background(); - - // check that there's only 3 records - let table = deltalake_core::open_table(table).await.unwrap(); - assert_eq!(table.version(), 3); - assert_eq!(count_records(table), 3); - - //cleanup - std::fs::remove_file(v2).unwrap(); - std::fs::remove_file(v3).unwrap(); -} - -fn count_records(table: DeltaTable) -> i64 { - let mut count = 0; - for x in table.get_stats() { - count += x.as_ref().unwrap().as_ref().unwrap().num_records; - } - count -}