Skip to content

Commit

Permalink
Consolidate offset tests into one file
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Jan 6, 2024
1 parent 40da522 commit e47e1cd
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 206 deletions.
100 changes: 0 additions & 100 deletions tests/end_at_last_offsets_test.rs

This file was deleted.

190 changes: 174 additions & 16 deletions tests/starting_offset_tests.rs → tests/offset_tests.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,96 @@
#[allow(dead_code)]
mod helpers;

use deltalake_core::DeltaTable;
use log::{debug, info};
use rdkafka::{producer::Producer, util::Timeout};
use serde::{Deserialize, Serialize};
use serde_json::json;

// These tests are executed serially to allow for predictable rebalance waits.
// Rebalance times vary too much to produce predictable earliest/latest seek positions
// when the local kafka container is receiving concurrent requests from other tasks.
use serial_test::serial;
use std::path::Path;
use uuid::Uuid;

use kafka_delta_ingest::{AutoOffsetReset, IngestOptions};
#[allow(dead_code)]
mod helpers;

#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Debug, Serialize, Deserialize)]
struct TestMsg {
id: u64,
date: String,
color: String,
}

impl TestMsg {
fn new(id: u64) -> Self {
Self {
id,
color: "default".to_string(),
}
}
}

#[tokio::test]
async fn zero_offset_issue() {
let table = "./tests/data/zero_offset";
helpers::init_logger();
let topic = format!("zero_offset_issue_{}", Uuid::new_v4());

helpers::create_topic(&topic, 1).await;

let (kdi, token, rt) = helpers::create_kdi(
&topic,
table,
IngestOptions {
app_id: "zero_offset".to_string(),
allowed_latency: 5,
max_messages_per_batch: 1,
min_bytes_per_file: 20,
..Default::default()
},
);

{
// check that there's only 1 record in table
let table = deltalake_core::open_table(table).await.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(count_records(table), 1);
}

let producer = helpers::create_producer();

// submit 3 messages in kafka, but only 2nd and 3rd should go in as msg 0:0 already in delta
for i in 0..3 {
helpers::send_json(
&producer,
&topic,
&serde_json::to_value(TestMsg::new(i)).unwrap(),
)
.await;
}

let v2 = Path::new("./tests/data/zero_offset/_delta_log/00000000000000000002.json");
let v3 = Path::new("./tests/data/zero_offset/_delta_log/00000000000000000003.json");

helpers::wait_until_file_created(v2);
helpers::wait_until_file_created(v3);
token.cancel();
// if it succeeds then it means that we successfully seeked into offset 0, e.g. Offset::Beginning
kdi.await.unwrap();
rt.shutdown_background();

// check that there's only 3 records
let table = deltalake_core::open_table(table).await.unwrap();
assert_eq!(table.version(), 3);
assert_eq!(count_records(table), 3);

//cleanup
std::fs::remove_file(v2).unwrap();
std::fs::remove_file(v3).unwrap();
}

fn count_records(table: DeltaTable) -> i64 {
let mut count = 0;
for x in table.get_stats() {
count += x.as_ref().unwrap().as_ref().unwrap().num_records;
}
count
}

#[tokio::test]
Expand All @@ -27,9 +101,9 @@ async fn test_start_from_explicit() {
let table = helpers::create_local_table(
json!({
"id": "integer",
"date": "string",
"color": "string",
}),
vec!["date"],
vec!["color"],
"starting_offsets_explicit",
);

Expand Down Expand Up @@ -97,9 +171,9 @@ async fn test_start_from_earliest() {
let table = helpers::create_local_table(
json!({
"id": "integer",
"date": "string",
"color": "string",
}),
vec!["date"],
vec!["color"],
"starting_offsets_earliest",
);

Expand Down Expand Up @@ -157,9 +231,9 @@ async fn test_start_from_latest() {
let table = helpers::create_local_table(
json! ({
"id": "integer",
"date": "string",
"color": "string",
}),
vec!["date"],
vec!["color"],
"starting_offsets_latest",
);

Expand Down Expand Up @@ -234,6 +308,90 @@ async fn test_start_from_latest() {
fn create_generator(starting_id: u64) -> impl Iterator<Item = TestMsg> {
std::iter::successors(Some(starting_id), |n| Some(*n + 1)).map(|n| TestMsg {
id: n,
date: "2021-09-25".to_string(),
color: "red".to_string(),
})
}

#[derive(Debug, Serialize, Deserialize)]
struct Msg {
id: u32,
city: String,
}

impl Msg {
fn new(id: u32) -> Self {
Self {
id,
city: "default".to_string(),
}
}
}

#[tokio::test]
async fn end_at_initial_offsets() {
helpers::init_logger();
let topic = format!("end_at_offset_{}", Uuid::new_v4());

let table = helpers::create_local_table(
json!({
"id": "integer",
"city": "string",
}),
vec!["city"],
&topic,
);
let table = table.as_str();

helpers::create_topic(&topic, 3).await;

let producer = helpers::create_producer();
// submit 15 messages in kafka
for i in 0..15 {
helpers::send_json(
&producer,
&topic,
&serde_json::to_value(Msg::new(i)).unwrap(),
)
.await;
}

let (kdi, _token, rt) = helpers::create_kdi(
&topic,
table,
IngestOptions {
app_id: topic.clone(),
allowed_latency: 5,
max_messages_per_batch: 20,
min_bytes_per_file: 20,
end_at_last_offsets: true,
..Default::default()
},
);

helpers::wait_until_version_created(table, 1);

{
// check that there's 3 records in table
let table = deltalake_core::open_table(table).await.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(count_records(table), 15);
}

// messages in kafka
for i in 16..31 {
helpers::send_json(
&producer,
&topic,
&serde_json::to_value(Msg::new(i)).unwrap(),
)
.await;
}

helpers::expect_termination_within(kdi, 10).await;
rt.shutdown_background();

// check that there's only 3 records
let table = deltalake_core::open_table(table).await.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(count_records(table), 15);
}
Loading

0 comments on commit e47e1cd

Please sign in to comment.