From bd68babbeaa8560a5c728f2cf665e290a028c4fe Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 27 Nov 2024 15:31:05 +0800 Subject: [PATCH] chore: fix Schema --- examples/datafusion.rs | 25 +- examples/declare.rs | 9 +- src/compaction/mod.rs | 209 ++++++++++------ src/inmem/immutable.rs | 1 + src/inmem/mutable.rs | 12 +- src/lib.rs | 289 ++++++++++++---------- src/ondisk/sstable.rs | 10 +- src/option.rs | 8 +- src/record/mod.rs | 25 +- src/record/runtime/record.rs | 6 +- src/snapshot.rs | 23 +- src/stream/level.rs | 22 +- src/stream/mem_projection.rs | 20 +- src/stream/merge.rs | 41 ++- src/stream/mod.rs | 21 +- src/stream/package.rs | 37 ++- src/stream/record_batch.rs | 8 +- src/transaction.rs | 91 ++++--- src/version/cleaner.rs | 6 +- src/version/set.rs | 17 +- tests/data_integrity.rs | 9 +- tests/macros_correctness.rs | 59 +++-- tonbo_macros/src/record.rs | 134 ++++++---- tonbo_macros/src/utils/ident_generator.rs | 6 + 24 files changed, 691 insertions(+), 397 deletions(-) diff --git a/examples/datafusion.rs b/examples/datafusion.rs index 2b7dd167..d7a9a940 100644 --- a/examples/datafusion.rs +++ b/examples/datafusion.rs @@ -28,7 +28,10 @@ use futures_core::Stream; use futures_util::StreamExt; use tokio::fs; use tonbo::{ - executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB, + executor::tokio::TokioExecutor, + inmem::immutable::ArrowArrays, + record::{Record, Schema}, + DbOption, DB, }; use tonbo_macros::Record; @@ -49,7 +52,10 @@ struct MusicExec { db: Arc>, projection: Option>, limit: Option, - range: (Bound<::Key>, Bound<::Key>), + range: ( + Bound<::Key>, + Bound<::Key>, + ), } struct MusicStream { @@ -63,7 +69,7 @@ impl TableProvider for MusicProvider { } fn schema(&self) -> SchemaRef { - Music::arrow_schema().clone() + MusicSchema {}.arrow_schema().clone() } fn table_type(&self) -> TableType { @@ -96,7 +102,7 @@ impl TableProvider for MusicProvider { impl MusicExec { fn new(db: Arc>, projection: Option<&Vec>) -> Self { - let schema = Music::arrow_schema(); + let schema = MusicSchema {}.arrow_schema(); let schema = if let Some(projection) = &projection { Arc::new(schema.project(projection).unwrap()) } else { @@ -127,7 +133,7 @@ impl Stream for MusicStream { impl RecordBatchStream for MusicStream { fn schema(&self) -> SchemaRef { - Music::arrow_schema().clone() + MusicSchema {}.arrow_schema().clone() } } @@ -215,9 +221,14 @@ async fn main() -> Result<()> { // make sure the path exists let _ = fs::create_dir_all("./db_path/music").await; - let options = DbOption::from(Path::from_filesystem_path("./db_path/music").unwrap()); + let options = DbOption::from(( + Path::from_filesystem_path("./db_path/music").unwrap(), + &MusicSchema, + )); - let db = DB::new(options, TokioExecutor::default()).await.unwrap(); + let db = DB::new(options, TokioExecutor::default(), MusicSchema) + .await + .unwrap(); for (id, name, like) in [ (0, "welcome".to_string(), 0), (1, "tonbo".to_string(), 999), diff --git a/examples/declare.rs b/examples/declare.rs index 6e6edcd3..310adcfc 100644 --- a/examples/declare.rs +++ b/examples/declare.rs @@ -22,9 +22,14 @@ async fn main() { // make sure the path exists let _ = fs::create_dir_all("./db_path/users").await; - let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); + let options = DbOption::from(( + Path::from_filesystem_path("./db_path/users").unwrap(), + &UserSchema, + )); // pluggable async runtime and I/O - let db = DB::new(options, TokioExecutor::default()).await.unwrap(); + let db = DB::new(options, TokioExecutor::default(), UserSchema) + .await + .unwrap(); // insert with owned value db.insert(User { diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 51b41713..cd720469 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -15,7 +15,7 @@ use crate::{ mutable::Mutable, }, ondisk::sstable::SsTable, - record::{KeyRef, Record, RecordInstance}, + record::{KeyRef, Record, Schema as RecordSchema}, scope::Scope, stream::{level::LevelStream, merge::MergeStream, ScanStream}, transaction::CommitError, @@ -39,6 +39,7 @@ where pub(crate) schema: Arc>>, pub(crate) version_set: VersionSet, pub(crate) manager: Arc, + pub(crate) record_schema: Arc, } impl Compactor @@ -47,6 +48,7 @@ where { pub(crate) fn new( schema: Arc>>, + record_schema: Arc, option: Arc>, version_set: VersionSet, manager: Arc, @@ -56,6 +58,7 @@ where schema, version_set, manager, + record_schema, } } @@ -74,9 +77,15 @@ where let trigger_clone = guard.trigger.clone(); let mutable = mem::replace( &mut guard.mutable, - Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await?, + Mutable::new( + &self.option, + trigger_clone, + self.manager.base_fs(), + self.record_schema.clone(), + ) + .await?, ); - let (file_id, immutable) = mutable.into_immutable(&guard.record_instance).await?; + let (file_id, immutable) = mutable.into_immutable().await?; guard.immutables.push((file_id, immutable)); if guard.immutables.len() > self.option.immutable_chunk_max_num { @@ -91,7 +100,7 @@ where &self.option, recover_wal_ids, excess, - &guard.record_instance, + &guard.record_schema, &self.manager, ) .await? @@ -108,7 +117,7 @@ where &scope.max, &mut version_edits, &mut delete_gens, - &guard.record_instance, + &guard.record_schema, &self.manager, parquet_lru, ) @@ -133,10 +142,13 @@ where pub(crate) async fn minor_compaction( option: &DbOption, recover_wal_ids: Option>, - batches: &[(Option, Immutable)], - instance: &RecordInstance, + batches: &[( + Option, + Immutable<::Columns>, + )], + schema: &R::Schema, manager: &StoreManager, - ) -> Result>, CompactionError> { + ) -> Result::Key>>, CompactionError> { if !batches.is_empty() { let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path); let level_0_fs = manager.get_fs(level_0_path); @@ -156,7 +168,7 @@ where ) .await?, ), - instance.arrow_schema::().clone(), + schema.arrow_schema().clone(), Some(option.write_parquet_properties.clone()), )?; @@ -192,11 +204,11 @@ where pub(crate) async fn major_compaction( version: &Version, option: &DbOption, - mut min: &R::Key, - mut max: &R::Key, - version_edits: &mut Vec>, + mut min: &::Key, + mut max: &::Key, + version_edits: &mut Vec::Key>>, delete_gens: &mut Vec<(FileId, usize)>, - instance: &RecordInstance, + instance: &R::Schema, manager: &StoreManager, parquet_lru: ParquetLru, ) -> Result<(), CompactionError> { @@ -308,11 +320,18 @@ where fn next_level_scopes<'a>( version: &'a Version, - min: &mut &'a ::Key, - max: &mut &'a ::Key, + min: &mut &'a ::Key, + max: &mut &'a ::Key, level: usize, - meet_scopes_l: &[&'a Scope<::Key>], - ) -> Result<(Vec<&'a Scope<::Key>>, usize, usize), CompactionError> { + meet_scopes_l: &[&'a Scope<::Key>], + ) -> Result< + ( + Vec<&'a Scope<::Key>>, + usize, + usize, + ), + CompactionError, + > { let mut meet_scopes_ll = Vec::new(); let mut start_ll = 0; let mut end_ll = 0; @@ -348,10 +367,14 @@ where fn this_level_scopes<'a>( version: &'a Version, - min: &::Key, - max: &::Key, + min: &::Key, + max: &::Key, level: usize, - ) -> (Vec<&'a Scope<::Key>>, usize, usize) { + ) -> ( + Vec<&'a Scope<::Key>>, + usize, + usize, + ) { let mut meet_scopes_l = Vec::new(); let mut start_l = Version::::scope_search(min, &version.level_slice[level]); let mut end_l = start_l; @@ -386,16 +409,17 @@ where async fn build_tables<'scan>( option: &DbOption, - version_edits: &mut Vec::Key>>, + version_edits: &mut Vec::Key>>, level: usize, streams: Vec>, - instance: &RecordInstance, + schema: &R::Schema, fs: &Arc, ) -> Result<(), CompactionError> { let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; // Kould: is the capacity parameter necessary? - let mut builder = R::Columns::builder(&instance.arrow_schema::(), 8192); + let mut builder = + ::Columns::builder(schema.arrow_schema().clone(), 8192); let mut min = None; let mut max = None; @@ -417,7 +441,7 @@ where &mut builder, &mut min, &mut max, - instance, + schema, fs, ) .await?; @@ -431,7 +455,7 @@ where &mut builder, &mut min, &mut max, - instance, + schema, fs, ) .await?; @@ -440,8 +464,14 @@ where } fn full_scope<'a>( - meet_scopes: &[&'a Scope<::Key>], - ) -> Result<(&'a ::Key, &'a ::Key), CompactionError> { + meet_scopes: &[&'a Scope<::Key>], + ) -> Result< + ( + &'a ::Key, + &'a ::Key, + ), + CompactionError, + > { let lower = &meet_scopes.first().ok_or(CompactionError::EmptyLevel)?.min; let upper = &meet_scopes.last().ok_or(CompactionError::EmptyLevel)?.max; Ok((lower, upper)) @@ -450,12 +480,12 @@ where #[allow(clippy::too_many_arguments)] async fn build_table( option: &DbOption, - version_edits: &mut Vec>, + version_edits: &mut Vec::Key>>, level: usize, - builder: &mut ::Builder, - min: &mut Option, - max: &mut Option, - instance: &RecordInstance, + builder: &mut <::Columns as ArrowArrays>::Builder, + min: &mut Option<::Key>, + max: &mut Option<::Key>, + schema: &R::Schema, fs: &Arc, ) -> Result<(), CompactionError> { debug_assert!(min.is_some()); @@ -471,7 +501,7 @@ where ) .await?, ), - instance.arrow_schema::().clone(), + schema.arrow_schema().clone(), Some(option.write_parquet_properties.clone()), )?; writer.write(columns.as_record_batch()).await?; @@ -526,8 +556,11 @@ pub(crate) mod tests { compaction::Compactor, executor::tokio::TokioExecutor, fs::{manager::StoreManager, FileId, FileType}, - inmem::{immutable::Immutable, mutable::Mutable}, - record::{Datatype, DynRecord, Record, RecordInstance, Value, ValueDesc}, + inmem::{ + immutable::{tests::TestSchema, Immutable}, + mutable::Mutable, + }, + record::{Datatype, DynRecord, DynSchema, Record, Schema, Value, ValueDesc}, scope::Scope, tests::Test, timestamp::Timestamp, @@ -540,34 +573,34 @@ pub(crate) mod tests { async fn build_immutable( option: &DbOption, records: Vec<(LogType, R, Timestamp)>, - instance: &RecordInstance, + schema: &Arc, fs: &Arc, - ) -> Result, DbError> + ) -> Result::Columns>, DbError> where R: Record + Send, { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let mutable: Mutable = Mutable::new(option, trigger, fs).await?; + let mutable: Mutable = Mutable::new(option, trigger, fs, schema.clone()).await?; for (log_ty, record, ts) in records { let _ = mutable.insert(log_ty, record, ts).await?; } - Ok(Immutable::from((mutable.data, instance))) + Ok(Immutable::new(mutable.data, schema.arrow_schema().clone())) } pub(crate) async fn build_parquet_table( option: &DbOption, gen: FileId, records: Vec<(LogType, R, Timestamp)>, - instance: &RecordInstance, + schema: &Arc, level: usize, fs: &Arc, ) -> Result<(), DbError> where R: Record + Send, { - let immutable = build_immutable::(option, records, instance, fs).await?; + let immutable = build_immutable::(option, records, schema, fs).await?; let mut writer = AsyncArrowWriter::try_new( AsyncWriter::new( fs.open_options( @@ -576,7 +609,7 @@ pub(crate) mod tests { ) .await?, ), - R::arrow_schema().clone(), + schema.arrow_schema().clone(), None, )?; writer.write(immutable.as_record_batch()).await?; @@ -590,13 +623,16 @@ pub(crate) mod tests { let temp_dir = tempfile::tempdir().unwrap(); let temp_dir_l0 = tempfile::tempdir().unwrap(); - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .level_path( - 0, - Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), - FsOptions::Local, - ) - .unwrap(); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )) + .level_path( + 0, + Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), + FsOptions::Local, + ) + .unwrap(); let manager = StoreManager::new(option.base_fs.clone(), option.level_paths.clone()).unwrap(); manager @@ -636,7 +672,7 @@ pub(crate) mod tests { 0.into(), ), ], - &RecordInstance::Normal, + &Arc::new(TestSchema), manager.base_fs(), ) .await @@ -673,7 +709,7 @@ pub(crate) mod tests { 0.into(), ), ], - &RecordInstance::Normal, + &Arc::new(TestSchema), manager.base_fs(), ) .await @@ -686,7 +722,7 @@ pub(crate) mod tests { (Some(FileId::new()), batch_1), (Some(FileId::new()), batch_2), ], - &RecordInstance::Normal, + &TestSchema, &manager, ) .await @@ -711,11 +747,10 @@ pub(crate) mod tests { .await .unwrap(); - let empty_record = DynRecord::empty_record( + let instance = Arc::new(DynSchema::new( vec![ValueDesc::new("id".to_owned(), Datatype::Int32, false)], 0, - ); - let instance = RecordInstance::Runtime(empty_record); + )); let mut batch1_data = vec![]; let mut batch2_data = vec![]; @@ -772,19 +807,22 @@ pub(crate) mod tests { let temp_dir_l0 = TempDir::new().unwrap(); let temp_dir_l1 = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .level_path( - 0, - Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), - FsOptions::Local, - ) - .unwrap() - .level_path( - 1, - Path::from_filesystem_path(temp_dir_l1.path()).unwrap(), - FsOptions::Local, - ) - .unwrap(); + let mut option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )) + .level_path( + 0, + Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), + FsOptions::Local, + ) + .unwrap() + .level_path( + 1, + Path::from_filesystem_path(temp_dir_l1.path()).unwrap(), + FsOptions::Local, + ) + .unwrap(); option.major_threshold_with_sst_size = 2; let option = Arc::new(option); let manager = @@ -802,7 +840,7 @@ pub(crate) mod tests { .unwrap(); let ((table_gen_1, table_gen_2, table_gen_3, table_gen_4, _), version) = - build_version(&option, &manager).await; + build_version(&option, &manager, &Arc::new(TestSchema)).await; let min = 2.to_string(); let max = 5.to_string(); @@ -815,7 +853,7 @@ pub(crate) mod tests { &max, &mut version_edits, &mut vec![], - &RecordInstance::Normal, + &TestSchema, &manager, Arc::new(NoCache::default()), ) @@ -853,6 +891,7 @@ pub(crate) mod tests { pub(crate) async fn build_version( option: &Arc>, manager: &StoreManager, + schema: &Arc, ) -> ((FileId, FileId, FileId, FileId, FileId), Version) { let level_0_fs = option .level_fs_path(0) @@ -898,7 +937,7 @@ pub(crate) mod tests { 0.into(), ), ], - &RecordInstance::Normal, + schema, 0, level_0_fs, ) @@ -936,7 +975,7 @@ pub(crate) mod tests { 0.into(), ), ], - &RecordInstance::Normal, + schema, 0, level_0_fs, ) @@ -979,7 +1018,7 @@ pub(crate) mod tests { 0.into(), ), ], - &RecordInstance::Normal, + schema, 1, level_1_fs, ) @@ -1017,7 +1056,7 @@ pub(crate) mod tests { 0.into(), ), ], - &RecordInstance::Normal, + schema, 1, level_1_fs, ) @@ -1055,7 +1094,7 @@ pub(crate) mod tests { 0.into(), ), ], - &RecordInstance::Normal, + schema, 1, level_1_fs, ) @@ -1112,7 +1151,10 @@ pub(crate) mod tests { pub(crate) async fn major_panic() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); option.major_threshold_with_sst_size = 1; option.level_sst_magnification = 1; let manager = @@ -1162,7 +1204,7 @@ pub(crate) mod tests { &option, table_gen0, records0, - &RecordInstance::Normal, + &Arc::new(TestSchema), 0, level_0_fs, ) @@ -1172,7 +1214,7 @@ pub(crate) mod tests { &option, table_gen1, records1, - &RecordInstance::Normal, + &Arc::new(TestSchema), 1, level_1_fs, ) @@ -1207,7 +1249,7 @@ pub(crate) mod tests { &max, &mut version_edits, &mut vec![], - &RecordInstance::Normal, + &TestSchema, &manager, Arc::new(NoCache::default()), ) @@ -1220,7 +1262,10 @@ pub(crate) mod tests { async fn test_flush_major_level_sort() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 0; option.major_threshold_with_sst_size = 2; @@ -1230,7 +1275,9 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new(), TestSchema) + .await + .unwrap(); for i in 5..9 { let item = Test { diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index c9af2021..077f9a05 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -236,6 +236,7 @@ pub(crate) mod tests { timestamp::timestamped::Timestamped, }; + #[derive(Debug)] pub struct TestSchema; impl Schema for TestSchema { diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 49a78177..0ddfac83 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -234,11 +234,14 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let mem_table = Mutable::::new(&option, trigger, &fs, TestSchema) + let mem_table = Mutable::::new(&option, trigger, &fs, Arc::new(TestSchema {})) .await .unwrap(); @@ -284,7 +287,10 @@ mod tests { async fn range() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &StringSchema, + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); diff --git a/src/lib.rs b/src/lib.rs index 1ac0f054..bff78dcf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,9 +54,14 @@ //! // make sure the path exists //! let _ = fs::create_dir_all("./db_path/users").await; //! -//! let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); +//! let options = DbOption::from(( +//! Path::from_filesystem_path("./db_path/users").unwrap(), +//! &UserSchema, +//! )); //! // pluggable async runtime and I/O -//! let db = DB::new(options, TokioExecutor::default()).await.unwrap(); +//! let db = DB::new(options, TokioExecutor::default(), UserSchema) +//! .await +//! .unwrap(); //! // insert with owned value //! db.insert(User { //! name: "Alice".into(), @@ -149,7 +154,7 @@ use parquet::{ errors::ParquetError, }; use parquet_lru::{DynLruCache, NoCache}; -use record::{DynRecord, Record, ValueDesc}; +use record::{DynRecord, Record}; use thiserror::Error; use timestamp::{Timestamp, TimestampedRef}; use tokio::sync::oneshot; @@ -162,6 +167,7 @@ use crate::{ compaction::{CompactTask, CompactionError, Compactor}, executor::Executor, fs::{manager::StoreManager, parse_file_id, FileType}, + record::{DynSchema, Schema as RecordSchema}, serdes::Decode, snapshot::Snapshot, stream::{ @@ -195,22 +201,18 @@ where pub async fn with_schema( option: DbOption, executor: E, - column_descs: Vec, - primary_index: usize, + schema: DynSchema, ) -> Result> { let option = Arc::new(option); - let instance = - RecordInstance::Runtime(DynRecord::empty_record(column_descs, primary_index)); - - Self::build(option, executor, instance, Arc::new(NoCache::default())).await + Self::build(option, executor, schema, Arc::new(NoCache::default())).await } } impl DB where R: Record + Send + Sync, - R::Columns: Send + Sync, + ::Columns: Send + Sync, E: Executor + Send + Sync + 'static, { /// Open [`DB`] with a [`DbOption`]. This will create a new directory at the @@ -218,11 +220,15 @@ where /// according to the configuration of [`DbOption`]. /// /// For more configurable options, please refer to [`DbOption`]. - pub async fn new(option: DbOption, executor: E) -> Result> { + pub async fn new( + option: DbOption, + executor: E, + schema: R::Schema, + ) -> Result> { Self::build( Arc::new(option), executor, - RecordInstance::Normal, + schema, Arc::new(NoCache::default()), ) .await @@ -232,15 +238,16 @@ where impl DB where R: Record + Send + Sync, - R::Columns: Send + Sync, + ::Columns: Send + Sync, E: Executor + Send + Sync + 'static, { async fn build( option: Arc>, executor: E, - instance: RecordInstance, + schema: R::Schema, lru_cache: ParquetLru, ) -> Result> { + let record_schema = Arc::new(schema); let manager = Arc::new(StoreManager::new( option.base_fs.clone(), option.level_paths.clone(), @@ -263,10 +270,18 @@ where let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?; let schema = Arc::new(RwLock::new( - Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?, + Schema::new( + option.clone(), + task_tx, + &version_set, + record_schema.clone(), + &manager, + ) + .await?, )); let mut compactor = Compactor::::new( schema.clone(), + record_schema, option.clone(), version_set.clone(), manager.clone(), @@ -344,7 +359,10 @@ where } /// delete the record with the primary key as the `key` - pub async fn remove(&self, key: R::Key) -> Result> { + pub async fn remove( + &self, + key: ::Key, + ) -> Result> { Ok(self .schema .read() @@ -368,7 +386,7 @@ where /// get the record with `key` as the primary key and process it using closure `f` pub async fn get( &self, - key: &R::Key, + key: &::Key, mut f: impl FnMut(TransactionEntry<'_, R>) -> Option, ) -> Result, CommitError> { Ok(self @@ -396,7 +414,10 @@ where /// scan records with primary keys in the `range` and process them using closure `f` pub async fn scan<'scan, T: 'scan>( &'scan self, - range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), + range: ( + Bound<&'scan ::Key>, + Bound<&'scan ::Key>, + ), mut f: impl FnMut(TransactionEntry<'_, R>) -> T + 'scan, ) -> impl Stream>> + 'scan { stream! { @@ -469,11 +490,14 @@ where R: Record, { pub mutable: Mutable, - pub immutables: Vec<(Option, Immutable)>, + pub immutables: Vec<( + Option, + Immutable<::Columns>, + )>, compaction_tx: Sender, recover_wal_ids: Option>, trigger: Arc + Send + Sync>>, - record_instance: RecordInstance, + record_schema: Arc, } impl Schema @@ -484,17 +508,23 @@ where option: Arc>, compaction_tx: Sender, version_set: &VersionSet, - record_instance: RecordInstance, + record_schema: Arc, manager: &StoreManager, ) -> Result> { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); let mut schema = Schema { - mutable: Mutable::new(&option, trigger.clone(), manager.base_fs()).await?, + mutable: Mutable::new( + &option, + trigger.clone(), + manager.base_fs(), + record_schema.clone(), + ) + .await?, immutables: Default::default(), compaction_tx, recover_wal_ids: None, trigger, - record_instance, + record_schema, }; let base_fs = manager.base_fs(); @@ -574,7 +604,7 @@ where async fn remove( &self, log_ty: LogType, - key: R::Key, + key: ::Key, ts: Timestamp, ) -> Result> { self.mutable.remove(log_ty, key, ts).await @@ -582,7 +612,7 @@ where async fn recover_append( &self, - key: R::Key, + key: ::Key, ts: Timestamp, value: Option, ) -> Result> { @@ -593,12 +623,12 @@ where &'get self, version: &'get Version, manager: &StoreManager, - key: &'get R::Key, + key: &'get ::Key, ts: Timestamp, projection: Projection, parquet_lru: ParquetLru, ) -> Result>, DbError> { - let primary_key_index = self.record_instance.primary_key_index::(); + let primary_key_index = self.record_schema.primary_key_index(); let projection = match projection { Projection::All => ProjectionMask::all(), @@ -610,7 +640,7 @@ where fixed_projection.dedup(); ProjectionMask::roots( - &arrow_to_parquet_schema(&self.record_instance.arrow_schema::()).unwrap(), + &arrow_to_parquet_schema(self.record_schema.arrow_schema()).unwrap(), fixed_projection, ) } @@ -640,7 +670,7 @@ where .map(|entry| Entry::RecordBatch(entry))) } - fn check_conflict(&self, key: &R::Key, ts: Timestamp) -> bool { + fn check_conflict(&self, key: &::Key, ts: Timestamp) -> bool { self.mutable.check_conflict(key, ts) || self .immutables @@ -653,6 +683,10 @@ where self.mutable.flush_wal().await?; Ok(()) } + + pub(crate) fn record_schema(&self) -> &Arc { + &self.record_schema + } } /// scan configuration intermediate structure @@ -663,8 +697,8 @@ where { schema: &'scan Schema, manager: &'scan StoreManager, - lower: Bound<&'range R::Key>, - upper: Bound<&'range R::Key>, + lower: Bound<&'range ::Key>, + upper: Bound<&'range ::Key>, ts: Timestamp, version: &'scan Version, @@ -685,7 +719,10 @@ where fn new( schema: &'scan Schema, manager: &'scan StoreManager, - (lower, upper): (Bound<&'range R::Key>, Bound<&'range R::Key>), + (lower, upper): ( + Bound<&'range ::Key>, + Bound<&'range ::Key>, + ), ts: Timestamp, version: &'scan Version, fn_pre_stream: Box< @@ -722,13 +759,13 @@ where for p in &mut projection { *p += 2; } - let primary_key_index = self.schema.record_instance.primary_key_index::(); + let primary_key_index = self.schema.record_schema.primary_key_index(); let mut fixed_projection = vec![0, 1, primary_key_index]; fixed_projection.append(&mut projection); fixed_projection.dedup(); let mask = ProjectionMask::roots( - &arrow_to_parquet_schema(&self.schema.record_instance.arrow_schema::()).unwrap(), + &arrow_to_parquet_schema(self.schema.record_schema.arrow_schema()).unwrap(), fixed_projection.clone(), ); @@ -795,7 +832,10 @@ where pub async fn package( self, batch_size: usize, - ) -> Result> + 'scan, DbError> { + ) -> Result< + impl Stream::Columns, ParquetError>> + 'scan, + DbError, + > { let mut streams = Vec::new(); let is_projection = self.projection_indices.is_some(); @@ -842,7 +882,7 @@ where batch_size, merge_stream, self.projection_indices, - &self.schema.record_instance, + self.schema.record_schema.arrow_schema().clone(), )) } } @@ -908,12 +948,15 @@ pub(crate) mod tests { compaction::{CompactTask, CompactionError, Compactor}, executor::{tokio::TokioExecutor, Executor}, fs::{manager::StoreManager, FileId}, - inmem::{immutable::tests::TestImmutableArrays, mutable::Mutable}, + inmem::{ + immutable::tests::{TestImmutableArrays, TestSchema}, + mutable::Mutable, + }, record::{ internal::InternalRecordRef, runtime::test::{test_dyn_item_schema, test_dyn_items}, - Datatype, DynRecord, RecordDecodeError, RecordEncodeError, RecordInstance, RecordRef, - Value, + Datatype, DynRecord, DynSchema, Key, RecordDecodeError, RecordEncodeError, RecordRef, + Schema as RecordSchema, Value, }, serdes::{Decode, Encode}, trigger::{TriggerFactory, TriggerType}, @@ -967,9 +1010,7 @@ pub(crate) mod tests { } impl Record for Test { - type Columns = TestImmutableArrays; - - type Key = String; + type Schema = TestSchema; type Ref<'r> = TestRef<'r> @@ -980,20 +1021,6 @@ pub(crate) mod tests { &self.vstring } - fn primary_key_index() -> usize { - 2 - } - - fn primary_key_path() -> (ColumnPath, Vec) { - ( - ColumnPath::new(vec!["_ts".to_string(), "vstring".to_string()]), - vec![ - SortingColumn::new(1, true, true), - SortingColumn::new(2, false, true), - ], - ) - } - fn as_record_ref(&self) -> Self::Ref<'_> { TestRef { vstring: &self.vstring, @@ -1002,20 +1029,6 @@ pub(crate) mod tests { } } - fn arrow_schema() -> &'static Arc { - static SCHEMA: Lazy> = Lazy::new(|| { - Arc::new(Schema::new(vec![ - Field::new("_null", DataType::Boolean, false), - Field::new("_ts", DataType::UInt32, false), - Field::new("vstring", DataType::Utf8, false), - Field::new("vu32", DataType::UInt32, false), - Field::new("vbool", DataType::Boolean, true), - ])) - }); - - &SCHEMA - } - fn size(&self) -> usize { let string_size = self.vstring.len(); let u32_size = mem::size_of::(); @@ -1071,7 +1084,7 @@ pub(crate) mod tests { impl<'r> RecordRef<'r> for TestRef<'r> { type Record = Test; - fn key(self) -> <::Key as crate::record::Key>::Ref<'r> { + fn key(self) -> <<::Schema as RecordSchema>::Key as Key>::Ref<'r> { self.vstring } @@ -1140,7 +1153,9 @@ pub(crate) mod tests { option: DbOption, executor: E, ) -> RecordBatch { - let db: DB = DB::new(option.clone(), executor).await.unwrap(); + let db: DB = DB::new(option.clone(), executor, TestSchema {}) + .await + .unwrap(); let base_fs = db.manager.base_fs(); db.write( @@ -1169,12 +1184,17 @@ pub(crate) mod tests { let trigger = schema.trigger.clone(); let mutable = mem::replace( &mut schema.mutable, - Mutable::new(&option, trigger, base_fs).await.unwrap(), + Mutable::new(&option, trigger, base_fs, Arc::new(TestSchema {})) + .await + .unwrap(), ); - Immutable::<::Columns>::from((mutable.data, &RecordInstance::Normal)) - .as_record_batch() - .clone() + Immutable::<::Columns>::new( + mutable.data, + TestSchema {}.arrow_schema().clone(), + ) + .as_record_batch() + .clone() } pub(crate) async fn build_schema( @@ -1183,7 +1203,7 @@ pub(crate) mod tests { ) -> Result<(crate::Schema, Receiver), fusio::Error> { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let mutable = Mutable::new(&option, trigger.clone(), fs).await?; + let mutable = Mutable::new(&option, trigger.clone(), fs, Arc::new(TestSchema {})).await?; mutable .insert( @@ -1225,7 +1245,8 @@ pub(crate) mod tests { let immutables = { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let mutable: Mutable = Mutable::new(&option, trigger.clone(), fs).await?; + let mutable: Mutable = + Mutable::new(&option, trigger.clone(), fs, Arc::new(TestSchema)).await?; mutable .insert( @@ -1266,7 +1287,7 @@ pub(crate) mod tests { vec![( Some(FileId::new()), - Immutable::from((mutable.data, &RecordInstance::Normal)), + Immutable::new(mutable.data, TestSchema {}.arrow_schema().clone()), )] }; @@ -1279,7 +1300,7 @@ pub(crate) mod tests { compaction_tx, recover_wal_ids: None, trigger, - record_instance: RecordInstance::Normal, + record_schema: Arc::new(TestSchema {}), }, compaction_rx, )) @@ -1290,12 +1311,13 @@ pub(crate) mod tests { compaction_rx: Receiver, executor: E, schema: crate::Schema, + record_schema: Arc, version: Version, manager: Arc, ) -> Result, DbError> where R: Record + Send + Sync, - R::Columns: Send + Sync, + ::Columns: Send + Sync, E: Executor + Send + Sync + 'static, { { @@ -1312,6 +1334,7 @@ pub(crate) mod tests { build_version_set(version, clean_sender, option.clone(), manager.clone()).await?; let mut compactor = Compactor::::new( schema.clone(), + record_schema, option.clone(), version_set.clone(), manager.clone(), @@ -1572,7 +1595,7 @@ pub(crate) mod tests { let path = Path::from_filesystem_path(temp_dir.path()).unwrap(); let path_l0 = Path::from_filesystem_path(temp_dir_l0.path()).unwrap(); - let mut option = DbOption::from(path) + let mut option = DbOption::from((path, &TestSchema)) .level_path(0, path_l0, FsOptions::Local) .unwrap(); option.immutable_chunk_num = 1; @@ -1583,7 +1606,9 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new(), TestSchema) + .await + .unwrap(); for (i, item) in test_items().into_iter().enumerate() { db.write(item, 0.into()).await.unwrap(); @@ -1610,7 +1635,10 @@ pub(crate) mod tests { async fn test_flush() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1619,7 +1647,9 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new(), TestSchema) + .await + .unwrap(); for item in &test_items()[0..10] { db.write(item.clone(), 0.into()).await.unwrap(); @@ -1642,21 +1672,24 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &TestSchema, + ))); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let (task_tx, _task_rx) = bounded(1); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); let schema: crate::Schema = crate::Schema { - mutable: Mutable::new(&option, trigger.clone(), &fs).await.unwrap(), + mutable: Mutable::new(&option, trigger.clone(), &fs, Arc::new(TestSchema)) + .await + .unwrap(), immutables: Default::default(), compaction_tx: task_tx.clone(), recover_wal_ids: None, trigger, - record_instance: RecordInstance::Normal, + record_schema: Arc::new(TestSchema), }; for (i, item) in test_items().into_iter().enumerate() { @@ -1668,9 +1701,10 @@ pub(crate) mod tests { schema.flush_wal().await.unwrap(); drop(schema); - let db: DB = DB::new(option.as_ref().to_owned(), TokioExecutor::new()) - .await - .unwrap(); + let db: DB = + DB::new(option.as_ref().to_owned(), TokioExecutor::new(), TestSchema) + .await + .unwrap(); let mut sort_items = BTreeMap::new(); for item in test_items() { @@ -1700,11 +1734,11 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); - let (desc, primary_key_index) = test_dyn_item_schema(); + let dyn_schema = Arc::new(test_dyn_item_schema()); let option = Arc::new(DbOption::with_path( Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_owned(), - primary_key_index, + dyn_schema.primary_key_index(), )); manager .base_fs() @@ -1716,14 +1750,19 @@ pub(crate) mod tests { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); let schema: crate::Schema = crate::Schema { - mutable: Mutable::new(&option, trigger.clone(), manager.base_fs()) - .await - .unwrap(), + mutable: Mutable::new( + &option, + trigger.clone(), + manager.base_fs(), + dyn_schema.clone(), + ) + .await + .unwrap(), immutables: Default::default(), compaction_tx: task_tx.clone(), recover_wal_ids: None, trigger, - record_instance: RecordInstance::Normal, + record_schema: dyn_schema.clone(), }; for item in test_dyn_items().into_iter() { @@ -1738,10 +1777,11 @@ pub(crate) mod tests { let option = DbOption::with_path( Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_owned(), - primary_key_index, + dyn_schema.primary_key_index(), ); + let dyn_schema = test_dyn_item_schema(); let db: DB = - DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index) + DB::with_schema(option, TokioExecutor::new(), dyn_schema) .await .unwrap(); @@ -1774,13 +1814,18 @@ pub(crate) mod tests { async fn test_get_removed() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new(), TestSchema) + .await + .unwrap(); for (idx, item) in test_items().into_iter().enumerate() { if idx % 2 == 0 { @@ -1808,11 +1853,11 @@ pub(crate) mod tests { async fn test_read_write_dyn() { let temp_dir = TempDir::new().unwrap(); - let (cols_desc, primary_key_index) = test_dyn_item_schema(); + let dyn_schema = test_dyn_item_schema(); let mut option = DbOption::with_path( Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_string(), - primary_key_index, + dyn_schema.primary_key_index(), ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; @@ -1823,7 +1868,7 @@ pub(crate) mod tests { option.trigger_type = TriggerType::Length(5); let db: DB = - DB::with_schema(option, TokioExecutor::new(), cols_desc, primary_key_index) + DB::with_schema(option, TokioExecutor::new(), dyn_schema) .await .unwrap(); @@ -2016,11 +2061,11 @@ pub(crate) mod tests { async fn test_dyn_multiple_db() { let temp_dir1 = TempDir::with_prefix("db1").unwrap(); - let (cols_desc, primary_key_index) = test_dyn_item_schema(); + let dyn_schema = test_dyn_item_schema(); let mut option = DbOption::with_path( Path::from_filesystem_path(temp_dir1.path()).unwrap(), "id".to_string(), - primary_key_index, + dyn_schema.primary_key_index(), ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; @@ -2032,7 +2077,7 @@ pub(crate) mod tests { let mut option2 = DbOption::with_path( Path::from_filesystem_path(temp_dir2.path()).unwrap(), "id".to_string(), - primary_key_index, + dyn_schema.primary_key_index(), ); option2.immutable_chunk_num = 1; option2.immutable_chunk_max_num = 1; @@ -2044,7 +2089,7 @@ pub(crate) mod tests { let mut option3 = DbOption::with_path( Path::from_filesystem_path(temp_dir3.path()).unwrap(), "id".to_string(), - primary_key_index, + dyn_schema.primary_key_index(), ); option3.immutable_chunk_num = 1; option3.immutable_chunk_max_num = 1; @@ -2052,24 +2097,16 @@ pub(crate) mod tests { option3.major_default_oldest_table_num = 1; option3.trigger_type = TriggerType::Length(5); - let db1: DB = DB::with_schema( - option, - TokioExecutor::new(), - cols_desc.clone(), - primary_key_index, - ) - .await - .unwrap(); - let db2: DB = DB::with_schema( - option2, - TokioExecutor::new(), - cols_desc.clone(), - primary_key_index, - ) - .await - .unwrap(); + let db1: DB = + DB::with_schema(option, TokioExecutor::new(), test_dyn_item_schema()) + .await + .unwrap(); + let db2: DB = + DB::with_schema(option2, TokioExecutor::new(), test_dyn_item_schema()) + .await + .unwrap(); let db3: DB = - DB::with_schema(option3, TokioExecutor::new(), cols_desc, primary_key_index) + DB::with_schema(option3, TokioExecutor::new(), test_dyn_item_schema()) .await .unwrap(); diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 32b32f71..c3c94cf9 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -193,7 +193,10 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); let record_batch = get_test_record_batch::( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), + DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )), TokioExecutor::new(), ) .await; @@ -268,7 +271,10 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); let record_batch = get_test_record_batch::( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), + DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )), TokioExecutor::new(), ) .await; diff --git a/src/option.rs b/src/option.rs index 78cb85e9..ef7fde43 100644 --- a/src/option.rs +++ b/src/option.rs @@ -14,7 +14,7 @@ use parquet::{ use crate::{ fs::{FileId, FileType}, - record::Record, + record::{Record, Schema}, trigger::TriggerType, version::{Version, MAX_LEVEL}, DbError, @@ -96,13 +96,13 @@ where } } -impl From for DbOption +impl From<(Path, &R::Schema)> for DbOption where R: Record, { /// build the default configured [`DbOption`] based on the passed path - fn from(base_path: Path) -> Self { - let (column_paths, sorting_columns) = R::primary_key_path(); + fn from((base_path, schema): (Path, &R::Schema)) -> Self { + let (column_paths, sorting_columns) = schema.primary_key_path(); DbOption { immutable_chunk_num: 3, immutable_chunk_max_num: 5, diff --git a/src/record/mod.rs b/src/record/mod.rs index 127180ee..497d1dd4 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -4,10 +4,13 @@ pub mod runtime; #[cfg(test)] pub(crate) mod test; -use std::{error::Error, fmt::Debug, io, sync::Arc}; +use std::{collections::HashMap, error::Error, fmt::Debug, io, sync::Arc}; use array::DynRecordImmutableArrays; -use arrow::{array::RecordBatch, datatypes::Schema as ArrowSchema}; +use arrow::{ + array::RecordBatch, + datatypes::{DataType, Field, Schema as ArrowSchema}, +}; use internal::InternalRecordRef; pub use key::{Key, KeyRef}; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; @@ -48,7 +51,7 @@ use crate::{ // } // } -pub trait Schema { +pub trait Schema: Debug + Send + Sync { type Record: Record; type Columns: ArrowArrays; @@ -71,11 +74,17 @@ pub struct DynSchema { impl DynSchema { pub fn new(schema: Vec, primary_index: usize) -> Self { - let arrow_schema = Arc::new(ArrowSchema::new( - schema - .iter() - .map(|desc| desc.arrow_field()) - .collect::>(), + let mut metadata = HashMap::new(); + metadata.insert("primary_key_index".to_string(), primary_index.to_string()); + let arrow_schema = Arc::new(ArrowSchema::new_with_metadata( + [ + Field::new("_null", DataType::Boolean, false), + Field::new("_ts", DataType::UInt32, false), + ] + .into_iter() + .chain(schema.iter().map(|desc| desc.arrow_field())) + .collect::>(), + metadata, )); Self { schema, diff --git a/src/record/runtime/record.rs b/src/record/runtime/record.rs index 4902e4a7..6b921619 100644 --- a/src/record/runtime/record.rs +++ b/src/record/runtime/record.rs @@ -238,10 +238,10 @@ pub(crate) mod test { use std::sync::Arc; use super::DynRecord; - use crate::record::{Datatype, Value, ValueDesc}; + use crate::record::{Datatype, DynSchema, Value, ValueDesc}; #[allow(unused)] - pub(crate) fn test_dyn_item_schema() -> (Vec, usize) { + pub(crate) fn test_dyn_item_schema() -> DynSchema { let descs = vec![ ValueDesc::new("id".to_string(), Datatype::Int64, false), ValueDesc::new("age".to_string(), Datatype::Int8, true), @@ -252,7 +252,7 @@ pub(crate) mod test { ValueDesc::new("enabled".to_string(), Datatype::Boolean, false), ValueDesc::new("bytes".to_string(), Datatype::Bytes, true), ]; - (descs, 0) + DynSchema::new(descs, 0) } #[allow(unused)] diff --git a/src/snapshot.rs b/src/snapshot.rs index c548d010..1782dd8c 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -5,7 +5,7 @@ use parquet::arrow::ProjectionMask; use crate::{ fs::manager::StoreManager, - record::Record, + record::{Record, Schema as RecordSchema}, stream, stream::ScanStream, timestamp::Timestamp, @@ -30,7 +30,7 @@ where { pub async fn get<'get>( &'get self, - key: &'get R::Key, + key: &'get ::Key, projection: Projection, ) -> Result>, DbError> { Ok(self @@ -55,7 +55,10 @@ where pub fn scan<'scan, 'range>( &'scan self, - range: (Bound<&'range R::Key>, Bound<&'range R::Key>), + range: ( + Bound<&'range ::Key>, + Bound<&'range ::Key>, + ), ) -> Scan<'scan, 'range, R> { Scan::new( &self.share, @@ -97,7 +100,10 @@ where pub(crate) fn _scan<'scan, 'range>( &'scan self, - range: (Bound<&'range R::Key>, Bound<&'range R::Key>), + range: ( + Bound<&'range ::Key>, + Bound<&'range ::Key>, + ), fn_pre_stream: Box< dyn FnOnce(Option) -> Option> + Send + 'scan, >, @@ -127,6 +133,7 @@ mod tests { compaction::tests::build_version, executor::tokio::TokioExecutor, fs::manager::StoreManager, + inmem::immutable::tests::TestSchema, tests::{build_db, build_schema}, version::TransactionTs, DbOption, @@ -136,9 +143,10 @@ mod tests { async fn snapshot_scan() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &TestSchema, + ))); manager .base_fs() @@ -151,7 +159,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); @@ -160,6 +168,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(TestSchema), version, manager, ) diff --git a/src/stream/level.rs b/src/stream/level.rs index 61b2e71e..1fb580de 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -229,17 +229,23 @@ mod tests { use tempfile::TempDir; use crate::{ - compaction::tests::build_version, fs::manager::StoreManager, record::Record, - stream::level::LevelStream, tests::Test, DbOption, + compaction::tests::build_version, + fs::manager::StoreManager, + inmem::immutable::tests::TestSchema, + record::{Record, Schema}, + stream::level::LevelStream, + tests::Test, + DbOption, }; #[tokio::test] async fn projection_scan() { let temp_dir = TempDir::new().unwrap(); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &TestSchema {}, + ))); manager .base_fs() @@ -252,7 +258,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; { let mut level_stream_1 = LevelStream::new( @@ -264,7 +270,7 @@ mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2, 3], ), manager.base_fs().clone(), @@ -301,7 +307,7 @@ mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2, 4], ), manager.base_fs().clone(), @@ -338,7 +344,7 @@ mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), [0, 1, 2], ), manager.base_fs().clone(), diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 0397e022..334e3569 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -63,21 +63,31 @@ mod tests { use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; use crate::{ - inmem::mutable::Mutable, record::Record, stream::mem_projection::MemProjectionStream, - tests::Test, trigger::TriggerFactory, wal::log::LogType, DbOption, + inmem::{immutable::tests::TestSchema, mutable::Mutable}, + record::{Record, Schema}, + stream::mem_projection::MemProjectionStream, + tests::Test, + trigger::TriggerFactory, + wal::log::LogType, + DbOption, }; #[tokio::test] async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let mutable = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let mutable = Mutable::::new(&option, trigger, &fs, Arc::new(TestSchema {})) + .await + .unwrap(); mutable .insert( @@ -117,7 +127,7 @@ mod tests { .unwrap(); let mask = ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(TestSchema.arrow_schema()).unwrap(), vec![0, 1, 2, 4], ); diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 5750fb3a..42ea60dd 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -163,7 +163,11 @@ mod tests { use super::MergeStream; use crate::{ - inmem::mutable::Mutable, stream::Entry, trigger::TriggerFactory, wal::log::LogType, + inmem::{immutable::tests::TestSchema, mutable::Mutable}, + record::test::StringSchema, + stream::Entry, + trigger::TriggerFactory, + wal::log::LogType, DbOption, }; @@ -171,13 +175,18 @@ mod tests { async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &StringSchema, + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let m1 = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let m1 = Mutable::::new(&option, trigger, &fs, Arc::new(StringSchema)) + .await + .unwrap(); m1.remove(LogType::Full, "b".into(), 3.into()) .await @@ -191,7 +200,9 @@ mod tests { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let m2 = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let m2 = Mutable::::new(&option, trigger, &fs, Arc::new(StringSchema)) + .await + .unwrap(); m2.insert(LogType::Full, "a".into(), 1.into()) .await .unwrap(); @@ -204,7 +215,9 @@ mod tests { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let m3 = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let m3 = Mutable::::new(&option, trigger, &fs, Arc::new(StringSchema)) + .await + .unwrap(); m3.insert(LogType::Full, "e".into(), 4.into()) .await .unwrap(); @@ -265,13 +278,18 @@ mod tests { async fn merge_mutable_remove_duplicates() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &StringSchema, + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let m1 = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let m1 = Mutable::::new(&option, trigger, &fs, Arc::new(StringSchema)) + .await + .unwrap(); m1.insert(LogType::Full, "1".into(), 0_u32.into()) .await .unwrap(); @@ -351,13 +369,18 @@ mod tests { async fn merge_mutable_limit() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &StringSchema, + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let m1 = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let m1 = Mutable::::new(&option, trigger, &fs, Arc::new(StringSchema)) + .await + .unwrap(); m1.insert(LogType::Full, "1".into(), 0_u32.into()) .await .unwrap(); diff --git a/src/stream/mod.rs b/src/stream/mod.rs index fa0b5afe..375f7919 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -21,7 +21,7 @@ use record_batch::RecordBatchEntry; use crate::{ inmem::{immutable::ImmutableScan, mutable::MutableScan}, ondisk::scan::SsTableScan, - record::{Key, Record, RecordRef}, + record::{Key, Record, RecordRef, Schema}, stream::{level::LevelStream, mem_projection::MemProjectionStream}, timestamp::Timestamped, transaction::TransactionScan, @@ -31,8 +31,15 @@ pub enum Entry<'entry, R> where R: Record, { - Transaction((Timestamped<::Ref<'entry>>, &'entry Option)), - Mutable(crossbeam_skiplist::map::Entry<'entry, Timestamped, Option>), + Transaction( + ( + Timestamped<<::Key as Key>::Ref<'entry>>, + &'entry Option, + ), + ), + Mutable( + crossbeam_skiplist::map::Entry<'entry, Timestamped<::Key>, Option>, + ), Projection((Box>, Arc)), RecordBatch(RecordBatchEntry), } @@ -41,14 +48,14 @@ impl Entry<'_, R> where R: Record, { - pub(crate) fn key(&self) -> Timestamped<::Ref<'_>> { + pub(crate) fn key(&self) -> Timestamped<<::Key as Key>::Ref<'_>> { match self { Entry::Transaction((key, _)) => { // Safety: shorter lifetime must be safe unsafe { transmute::< - Timestamped<<::Key as Key>::Ref<'_>>, - Timestamped<<::Key as Key>::Ref<'_>>, + Timestamped<<::Key as Key>::Ref<'_>>, + Timestamped<<::Key as Key>::Ref<'_>>, >(key.clone()) } } @@ -77,7 +84,7 @@ where impl fmt::Debug for Entry<'_, R> where R: Record + Debug, - R::Key: Debug, + ::Key: Debug, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { diff --git a/src/stream/package.rs b/src/stream/package.rs index 7e23b1b0..2dec0e0e 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -1,14 +1,16 @@ use std::{ pin::Pin, + sync::Arc, task::{Context, Poll}, }; +use arrow::datatypes::Schema as ArrowSchema; use futures_core::Stream; use pin_project_lite::pin_project; use crate::{ inmem::immutable::{ArrowArrays, Builder}, - record::{Record, RecordInstance}, + record::{Record, Schema}, stream::merge::MergeStream, }; @@ -20,7 +22,7 @@ pin_project! { row_count: usize, batch_size: usize, inner: MergeStream<'package, R>, - builder: ::Builder, + builder: <::Columns as ArrowArrays>::Builder, projection_indices: Option>, } } @@ -33,13 +35,13 @@ where batch_size: usize, merge: MergeStream<'package, R>, projection_indices: Option>, - instance: &RecordInstance, + schema: Arc, ) -> Self { Self { row_count: 0, batch_size, inner: merge, - builder: R::Columns::builder(&instance.arrow_schema::(), batch_size), + builder: ::Columns::builder(schema, batch_size), projection_indices, } } @@ -49,7 +51,7 @@ impl<'package, R> Stream for PackageStream<'package, R> where R: Record, { - type Item = Result; + type Item = Result<::Columns, parquet::errors::ParquetError>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut project = self.project(); @@ -88,10 +90,13 @@ mod tests { use crate::{ inmem::{ - immutable::{tests::TestImmutableArrays, ArrowArrays}, + immutable::{ + tests::{TestImmutableArrays, TestSchema}, + ArrowArrays, + }, mutable::Mutable, }, - record::Record, + record::{Record, Schema}, stream::{merge::MergeStream, package::PackageStream}, tests::Test, trigger::TriggerFactory, @@ -103,13 +108,18 @@ mod tests { async fn iter() { let temp_dir = TempDir::new().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let m1 = Mutable::::new(&option, trigger, &fs).await.unwrap(); + let m1 = Mutable::::new(&option, trigger, &fs, Arc::new(TestSchema {})) + .await + .unwrap(); m1.insert( LogType::Full, Test { @@ -191,7 +201,7 @@ mod tests { row_count: 0, batch_size: 8192, inner: merge, - builder: TestImmutableArrays::builder(Test::arrow_schema(), 8192), + builder: TestImmutableArrays::builder(TestSchema {}.arrow_schema().clone(), 8192), projection_indices: Some(projection_indices.clone()), }; @@ -199,7 +209,12 @@ mod tests { assert_eq!( arrays.as_record_batch(), &RecordBatch::try_new( - Arc::new(Test::arrow_schema().project(&projection_indices).unwrap(),), + Arc::new( + TestSchema {} + .arrow_schema() + .project(&projection_indices) + .unwrap(), + ), vec![ Arc::new(BooleanArray::from(vec![ false, false, false, false, false, false diff --git a/src/stream/record_batch.rs b/src/stream/record_batch.rs index c974f8ee..edbeec84 100644 --- a/src/stream/record_batch.rs +++ b/src/stream/record_batch.rs @@ -9,7 +9,7 @@ use arrow::{array::RecordBatch, datatypes::Schema}; use parquet::arrow::ProjectionMask; use crate::{ - record::{internal::InternalRecordRef, Key, Record, RecordRef}, + record::{internal::InternalRecordRef, Key, Record, RecordRef, Schema as RecordSchema}, timestamp::Timestamped, }; @@ -35,11 +35,13 @@ where } } - pub(crate) fn internal_key(&self) -> Timestamped<::Ref<'_>> { + pub(crate) fn internal_key( + &self, + ) -> Timestamped<<::Key as Key>::Ref<'_>> { self.record_ref.value() } - pub fn key(&self) -> ::Ref<'_> { + pub fn key(&self) -> <::Key as Key>::Ref<'_> { self.record_ref.value().value().clone() } diff --git a/src/transaction.rs b/src/transaction.rs index aa2df175..997c6443 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -14,7 +14,7 @@ use thiserror::Error; use crate::{ compaction::CompactTask, - record::{Key, KeyRef}, + record::{Key, KeyRef, Schema as RecordSchema}, snapshot::Snapshot, stream, stream::mem_projection::MemProjectionStream, @@ -24,7 +24,7 @@ use crate::{ }; pub(crate) struct TransactionScan<'scan, R: Record> { - inner: Range<'scan, R::Key, Option>, + inner: Range<'scan, ::Key, Option>, ts: Timestamp, } @@ -32,7 +32,10 @@ impl<'scan, R> Iterator for TransactionScan<'scan, R> where R: Record, { - type Item = (Timestamped<::Ref<'scan>>, &'scan Option); + type Item = ( + Timestamped<<::Key as Key>::Ref<'scan>>, + &'scan Option, + ); fn next(&mut self) -> Option { self.inner @@ -46,16 +49,19 @@ pub struct Transaction<'txn, R> where R: Record, { - local: BTreeMap>, + local: BTreeMap<::Key, Option>, snapshot: Snapshot<'txn, R>, - lock_map: LockMap, + lock_map: LockMap<::Key>, } impl<'txn, R> Transaction<'txn, R> where R: Record + Send, { - pub(crate) fn new(snapshot: Snapshot<'txn, R>, lock_map: LockMap) -> Self { + pub(crate) fn new( + snapshot: Snapshot<'txn, R>, + lock_map: LockMap<::Key>, + ) -> Self { Self { local: BTreeMap::new(), snapshot, @@ -67,7 +73,7 @@ where /// [`Projection`] pub async fn get<'get>( &'get self, - key: &'get R::Key, + key: &'get ::Key, projection: Projection, ) -> Result>, DbError> { Ok(match self.local.get(key).and_then(|v| v.as_ref()) { @@ -83,7 +89,10 @@ where /// scan records with primary keys in the `range` pub fn scan<'scan, 'range>( &'scan self, - range: (Bound<&'range R::Key>, Bound<&'range R::Key>), + range: ( + Bound<&'range ::Key>, + Bound<&'range ::Key>, + ), ) -> Scan<'scan, 'range, R> { let ts = self.snapshot.ts(); let inner = self.local.range(range); @@ -105,11 +114,11 @@ where } /// delete the record with the primary key as the `key` on this transaction - pub fn remove(&mut self, key: R::Key) { + pub fn remove(&mut self, key: ::Key) { self.entry(key, None) } - fn entry(&mut self, key: R::Key, value: Option) { + fn entry(&mut self, key: ::Key, value: Option) { match self.local.entry(key) { Entry::Vacant(v) => { v.insert(value); @@ -179,7 +188,7 @@ where async fn append( schema: &Schema, log_ty: LogType, - key: ::Key, + key: ::Key, record: Option, new_ts: Timestamp, ) -> Result> { @@ -225,7 +234,7 @@ where #[error("transaction database error {:?}", .0)] Database(#[from] DbError), #[error("transaction write conflict: {:?}", .0)] - WriteConflict(R::Key), + WriteConflict(::Key), #[error("Failed to send compact task")] SendCompactTaskError(#[from] SendError), #[error("Channel is closed")] @@ -245,8 +254,10 @@ mod tests { compaction::tests::build_version, executor::tokio::TokioExecutor, fs::manager::StoreManager, + inmem::immutable::tests::TestSchema, record::{ - runtime::{Datatype, DynRecord, Value}, + runtime::{test::test_dyn_item_schema, Datatype, DynRecord, Value}, + test::StringSchema, ValueDesc, }, tests::{build_db, build_schema, Test}, @@ -260,8 +271,12 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let db = DB::::new( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), + DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &StringSchema, + )), TokioExecutor::new(), + StringSchema, ) .await .unwrap(); @@ -295,9 +310,10 @@ mod tests { async fn transaction_get() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &TestSchema, + ))); manager .base_fs() @@ -310,7 +326,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); @@ -319,6 +335,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(TestSchema), version, manager, ) @@ -385,9 +402,12 @@ mod tests { #[tokio::test] async fn write_conflicts() { let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &StringSchema, + )); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new(), StringSchema) .await .unwrap(); @@ -418,9 +438,12 @@ mod tests { #[tokio::test] async fn transaction_projection() { let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new(), TestSchema) .await .unwrap(); @@ -456,9 +479,10 @@ mod tests { async fn transaction_scan() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &TestSchema, + ))); manager .base_fs() @@ -471,7 +495,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); @@ -480,6 +504,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(TestSchema), version, manager, ) @@ -551,9 +576,10 @@ mod tests { async fn test_transaction_scan_bound() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &TestSchema, + ))); manager .base_fs() @@ -566,7 +592,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); @@ -575,6 +601,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(TestSchema), version, manager, ) @@ -727,9 +754,10 @@ mod tests { async fn test_transaction_scan_limit() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &TestSchema, + ))); manager .base_fs() @@ -742,7 +770,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); @@ -751,6 +779,7 @@ mod tests { compaction_rx, TokioExecutor::new(), schema, + Arc::new(TestSchema), version, manager, ) @@ -800,7 +829,7 @@ mod tests { "age".to_string(), 0, ); - let db = DB::with_schema(option, TokioExecutor::default(), descs, 0) + let db = DB::with_schema(option, TokioExecutor::default(), test_dyn_item_schema()) .await .unwrap(); diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index 19a00a7f..a0579d8a 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -107,6 +107,7 @@ pub(crate) mod tests { use crate::{ executor::{tokio::TokioExecutor, Executor}, fs::{manager::StoreManager, FileId, FileType}, + inmem::immutable::tests::TestSchema, tests::Test, version::cleaner::{CleanTag, Cleaner}, DbOption, @@ -116,9 +117,10 @@ pub(crate) mod tests { async fn test_cleaner() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &TestSchema, + ))); let gen_0 = FileId::new(); let gen_1 = FileId::new(); diff --git a/src/version/set.rs b/src/version/set.rs index e4050d47..050ebfac 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -297,7 +297,7 @@ pub(crate) mod tests { use crate::{ fs::{manager::StoreManager, FileId, FileType}, - record::Record, + record::{test::StringSchema, Record}, scope::Scope, version::{ cleaner::CleanTag, @@ -344,9 +344,10 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); let (sender, _) = bounded(1); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &StringSchema, + ))); manager .base_fs() .create_dir_all(&option.version_log_dir_path()) @@ -381,7 +382,10 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); let (sender, _) = bounded(1); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &StringSchema, + )); option.version_log_snapshot_threshold = 4; let option = Arc::new(option); @@ -508,9 +512,10 @@ pub(crate) mod tests { async fn version_level_sort() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( + let option = Arc::new(DbOption::from(( Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + &StringSchema, + ))); let (sender, _) = bounded(1); manager diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index b4a1b13a..87d8b291 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -70,9 +70,14 @@ mod tests { let mut write_hasher = crc32fast::Hasher::new(); let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from(( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &CustomerSchema, + )); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new(), CustomerSchema) + .await + .unwrap(); for _ in 0..WRITE_TIMES { let customer = gen_record(&mut rng, &mut primary_key_count); diff --git a/tests/macros_correctness.rs b/tests/macros_correctness.rs index 73360ed3..42b408f7 100644 --- a/tests/macros_correctness.rs +++ b/tests/macros_correctness.rs @@ -1,3 +1,4 @@ +use tonbo::record::Schema; use tonbo_macros::Record; #[derive(Record, Debug, PartialEq)] @@ -21,12 +22,12 @@ mod tests { use tokio::io::AsyncSeekExt; use tonbo::{ inmem::immutable::{ArrowArrays, Builder}, - record::{Record, RecordRef}, + record::{Record, RecordRef, Schema}, serdes::{Decode, Encode}, timestamp::timestamped::Timestamped, }; - use crate::{User, UserImmutableArrays, UserRef}; + use crate::{User, UserImmutableArrays, UserRef, UserSchema}; #[tokio::test] async fn test_record_info() { @@ -38,9 +39,9 @@ mod tests { assert_eq!(user.key(), "cat"); assert_eq!(user.size(), 20); - assert_eq!(User::primary_key_index(), 4); + assert_eq!(UserSchema {}.primary_key_index(), 4); assert_eq!( - User::primary_key_path(), + UserSchema {}.primary_key_path(), ( ColumnPath::new(vec!["_ts".to_string(), "name".to_string()]), vec![ @@ -62,7 +63,7 @@ mod tests { let mut user_ref = user.as_record_ref(); user_ref.projection(&ProjectionMask::roots( - &arrow_to_parquet_schema(User::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), vec![2, 3], )); @@ -74,7 +75,7 @@ mod tests { let mut user_ref = user.as_record_ref(); user_ref.projection(&ProjectionMask::roots( - &arrow_to_parquet_schema(User::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), vec![], )); @@ -86,7 +87,7 @@ mod tests { let mut user_ref = user.as_record_ref(); user_ref.projection(&ProjectionMask::roots( - &arrow_to_parquet_schema(User::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), vec![2], )); @@ -98,7 +99,7 @@ mod tests { let mut user_ref = user.as_record_ref(); user_ref.projection(&ProjectionMask::roots( - &arrow_to_parquet_schema(User::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), vec![3], )); @@ -112,7 +113,12 @@ mod tests { async fn test_record_from_record_batch() { { let record_batch = RecordBatch::try_new( - Arc::new(User::arrow_schema().project(&[0, 1, 2, 3, 4]).unwrap()), + Arc::new( + UserSchema {} + .arrow_schema() + .project(&[0, 1, 2, 3, 4]) + .unwrap(), + ), vec![ Arc::new(BooleanArray::from(vec![false])), Arc::new(UInt32Array::from(vec![9])), @@ -124,11 +130,15 @@ mod tests { .unwrap(); let project_mask = ProjectionMask::roots( - &arrow_to_parquet_schema(User::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), vec![0, 1, 2, 3, 4], ); - let record_ref = - UserRef::from_record_batch(&record_batch, 0, &project_mask, User::arrow_schema()); + let record_ref = UserRef::from_record_batch( + &record_batch, + 0, + &project_mask, + UserSchema {}.arrow_schema(), + ); assert_eq!( record_ref.value(), Timestamped { @@ -146,7 +156,7 @@ mod tests { } { let record_batch = RecordBatch::try_new( - Arc::new(User::arrow_schema().project(&[0, 1, 3, 4]).unwrap()), + Arc::new(UserSchema {}.arrow_schema().project(&[0, 1, 3, 4]).unwrap()), vec![ Arc::new(BooleanArray::from(vec![false])), Arc::new(UInt32Array::from(vec![9])), @@ -157,11 +167,15 @@ mod tests { .unwrap(); let project_mask = ProjectionMask::roots( - &arrow_to_parquet_schema(User::arrow_schema()).unwrap(), + &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), vec![0, 1, 3, 4], ); - let record_ref = - UserRef::from_record_batch(&record_batch, 0, &project_mask, User::arrow_schema()); + let record_ref = UserRef::from_record_batch( + &record_batch, + 0, + &project_mask, + UserSchema {}.arrow_schema(), + ); assert_eq!( record_ref.value(), Timestamped { @@ -200,7 +214,7 @@ mod tests { #[tokio::test] async fn test_record_arrays() { - let mut builder = UserImmutableArrays::builder(User::arrow_schema(), 10); + let mut builder = UserImmutableArrays::builder(UserSchema {}.arrow_schema().clone(), 10); let cat = User { email: Some("cat@example.com".to_string()), @@ -242,7 +256,12 @@ mod tests { assert_eq!( arrays.as_record_batch(), &RecordBatch::try_new( - Arc::new(User::arrow_schema().project(&[0, 1, 2, 3, 4]).unwrap(),), + Arc::new( + UserSchema {} + .arrow_schema() + .project(&[0, 1, 2, 3, 4]) + .unwrap(), + ), vec![ Arc::new(BooleanArray::from(vec![false, false, true])), Arc::new(UInt32Array::from(vec![0, 1, 2])), @@ -261,7 +280,7 @@ mod tests { #[tokio::test] async fn test_record_arrays_projection() { - let mut builder = UserImmutableArrays::builder(User::arrow_schema(), 10); + let mut builder = UserImmutableArrays::builder(UserSchema {}.arrow_schema().clone(), 10); let cat = User { email: Some("cat@example.com".to_string()), @@ -303,7 +322,7 @@ mod tests { assert_eq!( arrays.as_record_batch(), &RecordBatch::try_new( - Arc::new(User::arrow_schema().project(&[0, 1, 3, 4]).unwrap(),), + Arc::new(UserSchema {}.arrow_schema().project(&[0, 1, 3, 4]).unwrap(),), vec![ Arc::new(BooleanArray::from(vec![false, false, true])), Arc::new(UInt32Array::from(vec![0, 1, 2])), diff --git a/tonbo_macros/src/record.rs b/tonbo_macros/src/record.rs index 233d6adc..66937650 100644 --- a/tonbo_macros/src/record.rs +++ b/tonbo_macros/src/record.rs @@ -107,16 +107,16 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { let builder_append_primary_key = &primary_key_definitions.builder_append_value; - let record_codegen = trait_record_codegen( - &data_struct.fields, - struct_name, - primary_key_definitions.clone(), - ); + let record_codegen = + trait_record_codegen(&data_struct.fields, struct_name, &primary_key_definitions); let decode_codegen = trait_decode_codegen(struct_name, &data_struct.fields); let struct_ref_codegen = struct_ref_codegen(struct_name, &data_struct.fields); + let struct_schema_codegen = + struct_schema_codegen(struct_name, &data_struct.fields, &primary_key_definitions); + let decode_ref_codegen = trait_decode_ref_codegen(&struct_name, primary_key_ident, &data_struct.fields); @@ -138,6 +138,8 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { #struct_ref_codegen + #struct_schema_codegen + #decode_ref_codegen #encode_codegen @@ -156,12 +158,11 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { fn trait_record_codegen( fields: &[RecordStructFieldOpt], struct_name: &Ident, - primary_key: PrimaryKey, + primary_key: &PrimaryKey, ) -> TokenStream { let mut size_fields: Vec = Vec::new(); let mut to_ref_init_fields: Vec = Vec::new(); - let mut schema_fields: Vec = Vec::new(); for field in fields.iter() { let field_name = field.ident.as_ref().unwrap(); @@ -170,13 +171,8 @@ fn trait_record_codegen( let is_string = matches!(data_type, DataType::String); let is_bytes = matches!(data_type, DataType::Bytes); - let mapped_type = data_type.to_mapped_type(); let size_field = data_type.to_size_field(field_name, is_nullable); - schema_fields.push(quote! { - ::tonbo::arrow::datatypes::Field::new(stringify!(#field_name), #mapped_type, #is_nullable), - }); - size_fields.push(quote! { + #size_field }); @@ -205,60 +201,32 @@ fn trait_record_codegen( } } - let struct_arrays_name = struct_name.to_immutable_array_ident(); let struct_ref_name = struct_name.to_ref_ident(); + let struct_schema_name = struct_name.to_schema_ident(); let PrimaryKey { - name: primary_key_name, - base_ty: primary_key_ty, fn_key: fn_primary_key, - builder_append_value: _builder_append_primary_key, - index: primary_key_index, + .. } = primary_key; quote! { impl ::tonbo::record::Record for #struct_name { - type Columns = #struct_arrays_name; - - type Key = #primary_key_ty; + type Schema = #struct_schema_name; type Ref<'r> = #struct_ref_name<'r> where Self: 'r; - fn key(&self) -> <::Key as ::tonbo::record::Key>::Ref<'_> { + fn key(&self) -> <::Key as ::tonbo::record::Key>::Ref<'_> { #fn_primary_key } - fn primary_key_index() -> usize { - #primary_key_index - } - - fn primary_key_path() -> (::tonbo::parquet::schema::types::ColumnPath, Vec<::tonbo::parquet::format::SortingColumn>) { - ( - ::tonbo::parquet::schema::types::ColumnPath::new(vec!["_ts".to_string(), stringify!(#primary_key_name).to_string()]), - vec![::tonbo::parquet::format::SortingColumn::new(1_i32, true, true), ::tonbo::parquet::format::SortingColumn::new(#primary_key_index as i32, false, true)] - ) - } - fn as_record_ref(&self) -> Self::Ref<'_> { #struct_ref_name { #(#to_ref_init_fields)* } } - fn arrow_schema() -> &'static ::std::sync::Arc<::tonbo::arrow::datatypes::Schema> { - static SCHEMA: ::tonbo::once_cell::sync::Lazy<::std::sync::Arc<::tonbo::arrow::datatypes::Schema>> = ::tonbo::once_cell::sync::Lazy::new(|| { - ::std::sync::Arc::new(::tonbo::arrow::datatypes::Schema::new(vec![ - ::tonbo::arrow::datatypes::Field::new("_null", ::tonbo::arrow::datatypes::DataType::Boolean, false), - ::tonbo::arrow::datatypes::Field::new("_ts", ::tonbo::arrow::datatypes::DataType::UInt32, false), - #(#schema_fields)* - ])) - }); - - &SCHEMA - } - fn size(&self) -> usize { 0 #(#size_fields)* } @@ -361,6 +329,71 @@ fn struct_ref_codegen(struct_name: &Ident, fields: &[RecordStructFieldOpt]) -> T } } +fn struct_schema_codegen( + struct_name: &Ident, + fields: &[RecordStructFieldOpt], + primary_key: &PrimaryKey, +) -> TokenStream { + let struct_schema_name = struct_name.to_schema_ident(); + let struct_arrays_name = struct_name.to_immutable_array_ident(); + let mut schema_fields: Vec = Vec::new(); + + let PrimaryKey { + name: primary_key_name, + base_ty: primary_key_ty, + builder_append_value: _builder_append_primary_key, + index: primary_key_index, + .. + } = primary_key; + + for field in fields.iter() { + let field_name = field.ident.as_ref().unwrap(); + + let (data_type, is_nullable) = field.to_data_type().expect("unreachable code"); + let mapped_type = data_type.to_mapped_type(); + + schema_fields.push(quote! { + ::tonbo::arrow::datatypes::Field::new(stringify!(#field_name), #mapped_type, #is_nullable), + }); + } + + quote! { + #[derive(Debug, PartialEq, Eq, Clone, Copy)] + pub struct #struct_schema_name; + + impl ::tonbo::record::Schema for #struct_schema_name { + type Record = #struct_name; + + type Columns = #struct_arrays_name; + + type Key = #primary_key_ty; + + fn primary_key_index(&self) -> usize { + #primary_key_index + } + + fn primary_key_path(&self) -> (::tonbo::parquet::schema::types::ColumnPath, Vec<::tonbo::parquet::format::SortingColumn>) { + ( + ::tonbo::parquet::schema::types::ColumnPath::new(vec!["_ts".to_string(), stringify!(#primary_key_name).to_string()]), + vec![::tonbo::parquet::format::SortingColumn::new(1_i32, true, true), ::tonbo::parquet::format::SortingColumn::new(#primary_key_index as i32, false, true)] + ) + } + + fn arrow_schema(&self) -> &'static ::std::sync::Arc<::tonbo::arrow::datatypes::Schema> { + static SCHEMA: ::tonbo::once_cell::sync::Lazy<::std::sync::Arc<::tonbo::arrow::datatypes::Schema>> = ::tonbo::once_cell::sync::Lazy::new(|| { + ::std::sync::Arc::new(::tonbo::arrow::datatypes::Schema::new(vec![ + ::tonbo::arrow::datatypes::Field::new("_null", ::tonbo::arrow::datatypes::DataType::Boolean, false), + ::tonbo::arrow::datatypes::Field::new("_ts", ::tonbo::arrow::datatypes::DataType::UInt32, false), + #(#schema_fields)* + ])) + }); + + &SCHEMA + } + } + } +} + fn trait_decode_ref_codegen( struct_name: &&Ident, primary_key_name: &Ident, @@ -437,7 +470,7 @@ fn trait_decode_ref_codegen( impl<'r> ::tonbo::record::RecordRef<'r> for #struct_ref_name<'r> { type Record = #struct_name; - fn key(self) -> <::Key as ::tonbo::record::Key>::Ref<'r> { + fn key(self) -> <<<<#struct_ref_name<'r> as ::tonbo::record::RecordRef<'r>>::Record as ::tonbo::record::Record>::Schema as ::tonbo::record::Schema>::Key as ::tonbo::record::Key>::Ref<'r> { self.#primary_key_name } @@ -599,7 +632,7 @@ fn trait_arrow_array_codegen( type Builder = #struct_builder_name; - fn builder(schema: &::std::sync::Arc<::tonbo::arrow::datatypes::Schema>, capacity: usize) -> Self::Builder { + fn builder(schema: ::std::sync::Arc<::tonbo::arrow::datatypes::Schema>, capacity: usize) -> Self::Builder { #struct_builder_name { #(#builder_init_fields)* @@ -642,6 +675,7 @@ fn struct_builder_codegen( fields: &[RecordStructFieldOpt], ) -> TokenStream { + let struct_schema_name = struct_name.to_schema_ident(); let struct_builder_name = struct_name.to_builder_ident(); let mut field_names: Vec = Vec::new(); @@ -723,7 +757,7 @@ fn struct_builder_codegen( } impl ::tonbo::inmem::immutable::Builder<#struct_arrays_name> for #struct_builder_name { - fn push(&mut self, key: ::tonbo::timestamp::timestamped::Timestamped<<<#struct_name as ::tonbo::record::Record>::Key as ::tonbo::record::Key>::Ref<'_>>, row: Option<#struct_ref_name>) { + fn push(&mut self, key: ::tonbo::timestamp::timestamped::Timestamped<<<<#struct_name as ::tonbo::record::Record>::Schema as ::tonbo::record::Schema>::Key as ::tonbo::record::Key>::Ref<'_>>, row: Option<#struct_ref_name>) { #builder_append_primary_key match row { Some(row) => { @@ -750,10 +784,10 @@ fn struct_builder_codegen( let _null = ::std::sync::Arc::new(::tonbo::arrow::array::BooleanArray::new(self._null.finish(), None)); let _ts = ::std::sync::Arc::new(self._ts.finish()); + let schema = #struct_schema_name {}; + let mut record_batch = ::tonbo::arrow::record_batch::RecordBatch::try_new( - ::std::sync::Arc::clone( - <<#struct_arrays_name as ::tonbo::inmem::immutable::ArrowArrays>::Record as ::tonbo::record::Record>::arrow_schema(), - ), + ::std::sync::Arc::clone(::tonbo::record::Schema::arrow_schema(&schema)), vec![ ::std::sync::Arc::clone(&_null) as ::std::sync::Arc, ::std::sync::Arc::clone(&_ts) as ::std::sync::Arc, diff --git a/tonbo_macros/src/utils/ident_generator.rs b/tonbo_macros/src/utils/ident_generator.rs index 8c9743fc..6b24b186 100644 --- a/tonbo_macros/src/utils/ident_generator.rs +++ b/tonbo_macros/src/utils/ident_generator.rs @@ -3,6 +3,8 @@ use syn::Ident; pub(crate) trait IdentGenerator { fn to_ref_ident(&self) -> Ident; + fn to_schema_ident(&self) -> Ident; + fn to_builder_ident(&self) -> Ident; fn to_array_ident(&self) -> Ident; @@ -15,6 +17,10 @@ impl IdentGenerator for proc_macro2::Ident { Ident::new(&format!("{}Ref", self), self.span()) } + fn to_schema_ident(&self) -> Ident { + Ident::new(&format!("{}Schema", self), self.span()) + } + fn to_builder_ident(&self) -> Ident { Ident::new(&format!("{}Builder", self), self.span()) }