diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 0a54e9fc..f799bc47 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -144,7 +144,7 @@ pub(crate) mod tests { executor::tokio::TokioExecutor, fs::FileProvider, record::Record, - tests::{get_test_record_batch, Test}, + tests::{get_test_record_batch, Test, TestRef}, timestamp::Timestamped, DbOption, }; @@ -175,13 +175,20 @@ pub(crate) mod tests { let key = Timestamped::new("hello".to_owned(), 1.into()); - dbg!(open_sstable::(&table_path) - .await - .get(key.borrow(), ProjectionMask::all()) - .await - .unwrap() - .unwrap() - .get()); + assert_eq!( + open_sstable::(&table_path) + .await + .get(key.borrow(), ProjectionMask::all()) + .await + .unwrap() + .unwrap() + .get(), + Some(TestRef { + vstring: "hello", + vu32: Some(12), + vbool: Some(true), + }) + ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 85b7f6e8..c861635c 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -139,7 +139,7 @@ mod tests { use super::MergeStream; use crate::{ - executor::tokio::TokioExecutor, fs::FileProvider, inmem::mutable::Mutable, + executor::tokio::TokioExecutor, fs::FileProvider, inmem::mutable::Mutable, stream::Entry, wal::log::LogType, DbOption, }; @@ -195,12 +195,42 @@ mod tests { .await .unwrap(); - dbg!(merge.next().await); - dbg!(merge.next().await); - dbg!(merge.next().await); - dbg!(merge.next().await); - dbg!(merge.next().await); - dbg!(merge.next().await); + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "a"); + assert_eq!(entry.key().ts, 1.into()); + assert_eq!(entry.value().as_deref(), Some("a")); + } else { + unreachable!() + } + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "b"); + assert_eq!(entry.key().ts, 3.into()); + assert!(entry.value().is_none()); + } else { + unreachable!() + } + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "c"); + assert_eq!(entry.key().ts, 4.into()); + assert_eq!(entry.value().as_deref(), Some("c")); + } else { + unreachable!() + } + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "d"); + assert_eq!(entry.key().ts, 5.into()); + assert_eq!(entry.value().as_deref(), Some("d")); + } else { + unreachable!() + } + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "e"); + assert_eq!(entry.key().ts, 4.into()); + assert_eq!(entry.value().as_deref(), Some("e")); + } else { + unreachable!() + } + assert!(merge.next().await.is_none()); } #[tokio::test] @@ -238,9 +268,25 @@ mod tests { .await .unwrap(); - dbg!(merge.next().await); - dbg!(merge.next().await); - dbg!(merge.next().await); + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "1"); + assert_eq!(entry.key().ts, 0.into()); + assert_eq!(entry.value().as_deref(), Some("1")); + }; + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "2"); + assert_eq!(entry.key().ts, 1.into()); + assert_eq!(entry.value().as_deref(), Some("2")); + } else { + unreachable!() + } + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "3"); + assert_eq!(entry.key().ts, 1.into()); + assert_eq!(entry.value().as_deref(), Some("3")); + } else { + unreachable!() + } let lower = "1".to_string(); let upper = "4".to_string(); @@ -250,8 +296,26 @@ mod tests { .await .unwrap(); - dbg!(merge.next().await); - dbg!(merge.next().await); - dbg!(merge.next().await); + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "1"); + assert_eq!(entry.key().ts, 0.into()); + assert_eq!(entry.value().as_deref(), Some("1")); + } else { + unreachable!() + } + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "2"); + assert_eq!(entry.key().ts, 1.into()); + assert_eq!(entry.value().as_deref(), Some("2")); + } else { + unreachable!() + } + if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { + assert_eq!(entry.key().value, "3"); + assert_eq!(entry.key().ts, 1.into()); + assert_eq!(entry.value().as_deref(), Some("3")); + } else { + unreachable!() + }; } } diff --git a/src/stream/package.rs b/src/stream/package.rs index 2f413a92..eed93ed7 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -82,8 +82,9 @@ where #[cfg(test)] mod tests { - use std::collections::Bound; + use std::{collections::Bound, sync::Arc}; + use arrow::array::{BooleanArray, RecordBatch, StringArray, UInt32Array}; use futures_util::StreamExt; use tempfile::TempDir; @@ -94,6 +95,7 @@ mod tests { immutable::{tests::TestImmutableArrays, ArrowArrays}, mutable::Mutable, }, + record::Record, stream::{merge::MergeStream, package::PackageStream}, tests::Test, wal::log::LogType, @@ -188,9 +190,24 @@ mod tests { batch_size: 8192, inner: merge, builder: TestImmutableArrays::builder(8192), - projection_indices: Some(projection_indices), + projection_indices: Some(projection_indices.clone()), }; - dbg!(package.next().await); + let arrays = package.next().await.unwrap().unwrap(); + assert_eq!( + arrays.as_record_batch(), + &RecordBatch::try_new( + Arc::new(Test::arrow_schema().project(&projection_indices).unwrap(),), + vec![ + Arc::new(BooleanArray::from(vec![ + false, false, false, false, false, false + ])), + Arc::new(UInt32Array::from(vec![0, 1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e", "f"])), + Arc::new(UInt32Array::from(vec![0, 1, 2, 3, 4, 5])), + ], + ) + .unwrap() + ) } } diff --git a/src/transaction.rs b/src/transaction.rs index c2c227ea..e0a36951 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -248,7 +248,7 @@ mod tests { txn1.insert("foo".to_string()); let txn2 = db.transaction().await; - dbg!(txn2 + assert!(txn2 .get(&"foo".to_string(), Projection::All) .await .unwrap() @@ -260,11 +260,11 @@ mod tests { { let txn3 = db.transaction().await; - dbg!(txn3 + assert!(txn3 .get(&"foo".to_string(), Projection::All) .await .unwrap() - .is_none()); + .is_some()); txn3.commit().await.unwrap(); } }