Skip to content

Commit

Permalink
test: add some unit tests for record
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Jan 4, 2024
1 parent 5fa5018 commit 9768996
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 8 deletions.
8 changes: 4 additions & 4 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod client_manager;
pub(crate) mod client_manager;
pub mod log_store;
mod util;
pub(crate) mod util;

use std::fmt::Display;

Expand All @@ -28,8 +28,8 @@ use crate::error::Error;
/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct NamespaceImpl {
region_id: u64,
topic: Topic,
pub region_id: u64,
pub topic: Topic,
}

impl Namespace for NamespaceImpl {
Expand Down
182 changes: 178 additions & 4 deletions src/log-store/src/kafka/util/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::kafka::util::offset::Offset;
use crate::kafka::{EntryId, EntryImpl, NamespaceImpl};

/// The current version of Record.
const VERSION: u32 = 0;
pub(crate) const VERSION: u32 = 0;

/// The type of a record.
///
Expand Down Expand Up @@ -61,7 +61,7 @@ impl From<RecordType> for i32 {

/// The minimal storage unit in the Kafka log store.
/// Our own Record will be converted into a Kafka record during producing.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Record {
/// The version of the record. Used for backward compatibility.
version: u32,
Expand Down Expand Up @@ -312,9 +312,69 @@ pub fn maybe_emit_entry(
}

#[cfg(test)]
#[allow(unused)]
mod tests {
use rand::Rng;

use super::*;

// Implements some utility methods for testing.
impl Default for Record {
fn default() -> Self {
Self {
version: VERSION,
tp: RecordType::Full,
ns: NamespaceImpl {
region_id: 0,
topic: "greptimedb_wal_topic".to_string(),
},
data: Vec::new(),
entry_id: 0,
checksum: 0,
}
}
}

impl Record {
/// Overrides tp.
pub(crate) fn with_tp(&self, tp: RecordType) -> Self {
Self { tp, ..self.clone() }
}

/// Overrides data with the given data.
pub(crate) fn with_data(&self, data: &[u8]) -> Self {
Self {
data: data.to_vec(),
..self.clone()
}
}

/// Overrides data with random data.
pub(crate) fn with_random_data(&self) -> Self {
let data = rand::thread_rng().gen_range(0..u32::MAX).to_string();
Self {
data: data.as_bytes().to_vec(),
..self.clone()
}
}

/// Overrides entry id.
pub(crate) fn with_entry_id(&self, entry_id: EntryId) -> Self {
Self {
entry_id,
..self.clone()
}
}

/// Overrides checksum.
pub(crate) fn with_checksum(&self, checksum: u32) -> Self {
Self {
checksum,
..self.clone()
}
}
}

#[allow(unused)]
fn new_test_entry<D: AsRef<[u8]>>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl {
EntryImpl {
Expand All @@ -326,15 +386,129 @@ mod tests {

/// Tests that the `check_records` could handle various record sequences.
#[test]
fn test_check_records() {}
fn test_check_records() {
// On empty records.
let got = check_records(&[]).unwrap_err();
let expected = IllegalSequenceSnafu {
error: "Empty sequence",
}
.build();
assert_eq!(got.to_string(), expected.to_string());

// On mismatched checksums.
let template = Record::default()
.with_data(b"123")
.with_checksum(crc32fast::hash(b"123"));
let got = check_records(&[template.with_data(b"234")]).unwrap_err();
assert_eq!(got.to_string(), ChecksumMismatchedSnafu.build().to_string());

// On illegal record sequences.
let missing_full_err = IllegalSequenceSnafu {
error: "Missing Full record",
}
.build()
.to_string();
let out_of_order_err = IllegalSequenceSnafu {
error: "Out of order",
}
.build()
.to_string();

let template = Record::default();
let got = check_records(&[template.with_tp(RecordType::First)]).unwrap_err();
assert_eq!(got.to_string(), missing_full_err);

let got = check_records(&[template.with_tp(RecordType::Last)]).unwrap_err();
assert_eq!(got.to_string(), missing_full_err);

let got = check_records(&[template.with_tp(RecordType::Middle(1))]).unwrap_err();
assert_eq!(got.to_string(), missing_full_err);

let got = check_records(&[
template.with_tp(RecordType::Full),
template.with_tp(RecordType::First),
])
.unwrap_err();
assert_eq!(got.to_string(), out_of_order_err);

let got = check_records(&[
template.with_tp(RecordType::First),
template.with_tp(RecordType::Middle(0)),
])
.unwrap_err();
assert_eq!(got.to_string(), out_of_order_err);

let got = check_records(&[
template.with_tp(RecordType::First),
template.with_tp(RecordType::Middle(1)),
])
.unwrap_err();
assert_eq!(got.to_string(), out_of_order_err);

let got = check_records(&[
template.with_tp(RecordType::First),
template.with_tp(RecordType::Middle(0)),
template.with_tp(RecordType::Last),
])
.unwrap_err();
assert_eq!(got.to_string(), out_of_order_err);

let got = check_records(&[
template.with_tp(RecordType::First),
template.with_tp(RecordType::Middle(2)),
template.with_tp(RecordType::Last),
])
.unwrap_err();
assert_eq!(got.to_string(), out_of_order_err);

let got = check_records(&[
template.with_tp(RecordType::First),
template.with_tp(RecordType::Middle(2)),
template.with_tp(RecordType::Middle(1)),
template.with_tp(RecordType::Last),
])
.unwrap_err();
assert_eq!(got.to_string(), out_of_order_err);

let got = check_records(&[
template.with_tp(RecordType::First),
template.with_tp(RecordType::Middle(1)),
template.with_tp(RecordType::Last),
]);
assert!(got.is_ok());

let got = check_records(&[
template.with_tp(RecordType::First),
template.with_tp(RecordType::Middle(1)),
template.with_tp(RecordType::Middle(2)),
template.with_tp(RecordType::Middle(3)),
template.with_tp(RecordType::Last),
]);
assert!(got.is_ok());
}

/// Tests that the `build_records` works as expected.
#[test]
fn test_build_records() {}

/// Tests that Record and KafkaRecord are able to be converted back and forth.
#[test]
fn test_record_conversion() {}
fn test_record_conversion() {
let record = Record {
version: VERSION,
tp: RecordType::Full,
data: b"12345".to_vec(),
entry_id: 1,
ns: NamespaceImpl {
region_id: 1,
topic: "greptimedb_wal_topic".to_string(),
},
checksum: crc32fast::hash(b"12345".as_slice()),
};
let kafka_record: KafkaRecord = record.clone().try_into().unwrap();
let got = Record::try_from(kafka_record).unwrap();
assert_eq!(record, got);
}

/// Tests that the reconstruction of an entry behaves as expected.
#[test]
Expand Down

0 comments on commit 9768996

Please sign in to comment.