diff --git a/Cargo.toml b/Cargo.toml index 3f3f8118..c4820a2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "fnck_sql" -version = "0.0.7" +version = "0.0.8" edition = "2021" authors = ["Kould ", "Xwg "] description = "SQL as a Function for Rust" diff --git a/README.md b/README.md index c94acde4..313cd2ae 100755 --- a/README.md +++ b/README.md @@ -73,13 +73,13 @@ run `cargo run -p tpcc --release` to run tpcc - Tips: TPC-C currently only supports single thread ```shell <90th Percentile RT (MaxRT)> - New-Order : 0.003 (0.006) + New-Order : 0.003 (0.012) Payment : 0.001 (0.003) -Order-Status : 0.062 (0.188) - Delivery : 0.022 (0.052) +Order-Status : 0.054 (0.188) + Delivery : 0.021 (0.049) Stock-Level : 0.004 (0.006) -6669 Tpmc +7345 Tpmc ``` #### 👉[check more](tpcc/README.md) diff --git a/src/binder/copy.rs b/src/binder/copy.rs index 398d1761..902147c3 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -80,8 +80,9 @@ impl Binder<'_, '_, T> { return Err(DatabaseError::UnsupportedStmt("'COPY SOURCE'".to_string())); } }; + let table_name = Arc::new(lower_case_name(&table_name)?); - if let Some(table) = self.context.table(Arc::new(table_name.to_string()))? { + if let Some(table) = self.context.table(table_name.clone())? { let schema_ref = table.schema_ref().clone(); let ext_source = ExtSource { path: match target { @@ -107,7 +108,7 @@ impl Binder<'_, '_, T> { Operator::CopyFromFile(CopyFromFileOperator { source: ext_source, schema_ref, - table: table_name.to_string(), + table: table_name, }), Childrens::None, )) diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 73ab2ef2..c3a9e970 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -11,6 +11,7 @@ use std::{slice, vec}; use ulid::Generator; pub type TableName = Arc; +pub type PrimaryKeyIndices = Arc>; #[derive(Debug, Clone, PartialEq)] pub struct TableCatalog { @@ -22,6 +23,7 @@ pub struct TableCatalog { schema_ref: SchemaRef, primary_keys: Vec<(usize, ColumnRef)>, + primary_key_indices: PrimaryKeyIndices, primary_key_type: Option, } @@ -32,6 +34,10 @@ pub struct TableMeta { } impl TableCatalog { + pub(crate) fn name(&self) -> &TableName { + &self.name + } + pub(crate) fn get_unique_index(&self, col_id: &ColumnId) -> Option<&IndexMetaRef> { self.indexes .iter() @@ -79,6 +85,10 @@ impl TableCatalog { &self.primary_keys } + pub(crate) fn primary_keys_indices(&self) -> &PrimaryKeyIndices { + &self.primary_key_indices + } + pub(crate) fn types(&self) -> Vec { self.columns() .map(|column| column.datatype().clone()) @@ -186,6 +196,7 @@ impl TableCatalog { indexes: vec![], schema_ref: Arc::new(vec![]), primary_keys: vec![], + primary_key_indices: Default::default(), primary_key_type: None, }; let mut generator = Generator::new(); @@ -194,7 +205,11 @@ impl TableCatalog { .add_column(col_catalog, &mut generator) .unwrap(); } - table_catalog.primary_keys = Self::build_primary_keys(&table_catalog.schema_ref); + let (primary_keys, primary_key_indices) = + Self::build_primary_keys(&table_catalog.schema_ref); + + table_catalog.primary_keys = primary_keys; + table_catalog.primary_key_indices = primary_key_indices; Ok(table_catalog) } @@ -216,7 +231,7 @@ impl TableCatalog { columns.insert(column_id, i); } let schema_ref = Arc::new(column_refs.clone()); - let primary_keys = Self::build_primary_keys(&schema_ref); + let (primary_keys, primary_key_indices) = Self::build_primary_keys(&schema_ref); Ok(TableCatalog { name, @@ -225,12 +240,18 @@ impl TableCatalog { indexes, schema_ref, primary_keys, + primary_key_indices, primary_key_type: None, }) } - fn build_primary_keys(schema_ref: &Arc>) -> Vec<(usize, ColumnRef)> { - schema_ref + fn build_primary_keys( + schema_ref: &Arc>, + ) -> (Vec<(usize, ColumnRef)>, PrimaryKeyIndices) { + let mut primary_keys = Vec::new(); + let mut primary_key_indices = Vec::new(); + + for (_, (i, column)) in schema_ref .iter() .enumerate() .filter_map(|(i, column)| { @@ -240,8 +261,12 @@ impl TableCatalog { .map(|p_i| (p_i, (i, column.clone()))) }) .sorted_by_key(|(p_i, _)| *p_i) - .map(|(_, entry)| entry) - .collect_vec() + { + primary_key_indices.push(i); + primary_keys.push((i, column)); + } + + (primary_keys, Arc::new(primary_key_indices)) } } diff --git a/src/db.rs b/src/db.rs index 61a518a1..ce16769c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -532,10 +532,10 @@ pub(crate) mod test { ); assert_eq!( iter.next().unwrap()?, - Tuple { - id: None, - values: vec![DataValue::Date32(Some(Local::now().num_days_from_ce()))], - } + Tuple::new( + None, + vec![DataValue::Date32(Some(Local::now().num_days_from_ce()))] + ) ); assert!(iter.next().is_none()); @@ -562,17 +562,11 @@ pub(crate) mod test { assert_eq!(iter.schema(), &Arc::new(vec![ColumnRef::from(column)])); assert_eq!( iter.next().unwrap()?, - Tuple { - id: None, - values: vec![DataValue::Int32(Some(3))], - } + Tuple::new(None, vec![DataValue::Int32(Some(3))]) ); assert_eq!( iter.next().unwrap()?, - Tuple { - id: None, - values: vec![DataValue::Int32(Some(4))], - } + Tuple::new(None, vec![DataValue::Int32(Some(4))]) ); Ok(()) } diff --git a/src/execution/ddl/add_column.rs b/src/execution/ddl/add_column.rs index 1df2cc47..e395b426 100644 --- a/src/execution/ddl/add_column.rs +++ b/src/execution/ddl/add_column.rs @@ -27,7 +27,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { fn execute_mut( mut self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -55,7 +55,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { if let Some(value) = throw!(column.default_value()) { if let Some(unique_values) = &mut unique_values { - unique_values.push((tuple.id.clone().unwrap(), value.clone())); + unique_values.push((tuple.id().unwrap().clone(), value.clone())); } tuple.values.push(value); } else { @@ -66,21 +66,28 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { drop(coroutine); for tuple in tuples { - throw!(transaction.append_tuple(table_name, tuple, &types, true)); + throw!(unsafe { &mut (*transaction) } + .append_tuple(table_name, tuple, &types, true)); } - let col_id = - throw!(transaction.add_column(cache.0, table_name, column, *if_not_exists)); + let col_id = throw!(unsafe { &mut (*transaction) }.add_column( + cache.0, + table_name, + column, + *if_not_exists + )); // Unique Index if let (Some(unique_values), Some(unique_meta)) = ( unique_values, - throw!(transaction.table(cache.0, table_name.clone())) + throw!(unsafe { &mut (*transaction) }.table(cache.0, table_name.clone())) .and_then(|table| table.get_unique_index(&col_id)) .cloned(), ) { for (tuple_id, value) in unique_values { let index = Index::new(unique_meta.id, &value, IndexType::Unique); - throw!(transaction.add_index(table_name, index, &tuple_id)); + throw!( + unsafe { &mut (*transaction) }.add_index(table_name, index, &tuple_id) + ); } } diff --git a/src/execution/ddl/create_index.rs b/src/execution/ddl/create_index.rs index 6fd888f2..d19c9c9c 100644 --- a/src/execution/ddl/create_index.rs +++ b/src/execution/ddl/create_index.rs @@ -30,7 +30,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex { fn execute_mut( mut self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -52,7 +52,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex { }) .unzip(); let schema = self.input.output_schema().clone(); - let index_id = match transaction.add_index_meta( + let index_id = match unsafe { &mut (*transaction) }.add_index_meta( cache.0, &table_name, index_name, @@ -69,29 +69,29 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex { } err => throw!(err), }; - let mut index_values = Vec::new(); let mut coroutine = build_read(self.input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let mut tuple: Tuple = throw!(tuple); - let tuple_id = if let Some(tuple_id) = tuple.id.take() { - tuple_id - } else { + let Some(value) = DataValue::values_to_tuple(throw!(Projection::projection( + &tuple, + &column_exprs, + &schema + ))) else { continue; }; - index_values.push(( - tuple_id, - throw!(Projection::projection(&tuple, &column_exprs, &schema)), - )); - } - drop(coroutine); - for (tuple_id, values) in index_values { - let Some(value) = DataValue::values_to_tuple(values) else { + let tuple_id = if let Some(tuple_id) = tuple.id().take() { + tuple_id + } else { continue; }; let index = Index::new(index_id, &value, ty); - throw!(transaction.add_index(table_name.as_str(), index, &tuple_id)); + throw!(unsafe { &mut (*transaction) }.add_index( + table_name.as_str(), + index, + tuple_id + )); } yield Ok(TupleBuilder::build_result("1".to_string())); }, diff --git a/src/execution/ddl/create_table.rs b/src/execution/ddl/create_table.rs index c2d2a7f4..c55035fe 100644 --- a/src/execution/ddl/create_table.rs +++ b/src/execution/ddl/create_table.rs @@ -18,7 +18,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable { fn execute_mut( self, (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -29,7 +29,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateTable { if_not_exists, } = self.op; - let _ = throw!(transaction.create_table( + let _ = throw!(unsafe { &mut (*transaction) }.create_table( table_cache, table_name.clone(), columns, diff --git a/src/execution/ddl/create_view.rs b/src/execution/ddl/create_view.rs index 5fe81863..d153f2df 100644 --- a/src/execution/ddl/create_view.rs +++ b/src/execution/ddl/create_view.rs @@ -18,7 +18,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateView { fn execute_mut( self, (_, view_cache, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -26,7 +26,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateView { let CreateViewOperator { view, or_replace } = self.op; let result_tuple = TupleBuilder::build_result(format!("{}", view.name)); - throw!(transaction.create_view(view_cache, view, or_replace)); + throw!(unsafe { &mut (*transaction) }.create_view(view_cache, view, or_replace)); yield Ok(result_tuple); }, diff --git a/src/execution/ddl/drop_column.rs b/src/execution/ddl/drop_column.rs index 0183ca3d..2bb818bb 100644 --- a/src/execution/ddl/drop_column.rs +++ b/src/execution/ddl/drop_column.rs @@ -25,7 +25,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { fn execute_mut( mut self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -67,9 +67,19 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { } drop(coroutine); for tuple in tuples { - throw!(transaction.append_tuple(&table_name, tuple, &types, true)); + throw!(unsafe { &mut (*transaction) }.append_tuple( + &table_name, + tuple, + &types, + true + )); } - throw!(transaction.drop_column(cache.0, cache.2, &table_name, &column_name)); + throw!(unsafe { &mut (*transaction) }.drop_column( + cache.0, + cache.2, + &table_name, + &column_name + )); yield Ok(TupleBuilder::build_result("1".to_string())); } else if if_exists { diff --git a/src/execution/ddl/drop_table.rs b/src/execution/ddl/drop_table.rs index e80b862f..8541ce16 100644 --- a/src/execution/ddl/drop_table.rs +++ b/src/execution/ddl/drop_table.rs @@ -18,7 +18,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable { fn execute_mut( self, (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -28,7 +28,11 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropTable { if_exists, } = self.op; - throw!(transaction.drop_table(table_cache, table_name.clone(), if_exists)); + throw!(unsafe { &mut (*transaction) }.drop_table( + table_cache, + table_name.clone(), + if_exists + )); yield Ok(TupleBuilder::build_result(format!("{}", table_name))); }, diff --git a/src/execution/ddl/drop_view.rs b/src/execution/ddl/drop_view.rs index 798d7284..13e78174 100644 --- a/src/execution/ddl/drop_view.rs +++ b/src/execution/ddl/drop_view.rs @@ -18,7 +18,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropView { fn execute_mut( self, (table_cache, view_cache, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -28,7 +28,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropView { if_exists, } = self.op; - throw!(transaction.drop_view( + throw!(unsafe { &mut (*transaction) }.drop_view( view_cache, table_cache, view_name.clone(), diff --git a/src/execution/ddl/truncate.rs b/src/execution/ddl/truncate.rs index d1e00f84..57558e5a 100644 --- a/src/execution/ddl/truncate.rs +++ b/src/execution/ddl/truncate.rs @@ -18,14 +18,14 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Truncate { fn execute_mut( self, _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] move || { let TruncateOperator { table_name } = self.op; - throw!(transaction.drop_data(&table_name)); + throw!(unsafe { &mut (*transaction) }.drop_data(&table_name)); yield Ok(TupleBuilder::build_result(format!("{}", table_name))); }, diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index 62341061..8e4519ab 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -54,7 +54,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { fn execute_mut( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -67,9 +67,11 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { let schema = input.output_schema().clone(); let mut builders = Vec::with_capacity(index_metas.len()); - let table = throw!(throw!(transaction.table(cache.0, table_name.clone())) - .cloned() - .ok_or(DatabaseError::TableNotFound)); + let table = throw!(throw!( + unsafe { &mut (*transaction) }.table(cache.0, table_name.clone()) + ) + .cloned() + .ok_or(DatabaseError::TableNotFound)); for index in table.indexes() { builders.push(( @@ -123,7 +125,12 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { ty: Utf8Type::Variable(None), unit: CharLengthUnits::Characters, }); - throw!(transaction.save_table_meta(cache.2, &table_name, path_str, meta)); + throw!(unsafe { &mut (*transaction) }.save_table_meta( + cache.2, + &table_name, + path_str, + meta + )); throw!(fs::rename(&temp_path, &path).map_err(DatabaseError::IO)); active_index_paths.insert(index_file); @@ -138,7 +145,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { } } - yield Ok(Tuple { id: None, values }); + yield Ok(Tuple::new(None, values)); }, ) } diff --git a/src/execution/dml/copy_from_file.rs b/src/execution/dml/copy_from_file.rs index 1d29200a..116b1dad 100644 --- a/src/execution/dml/copy_from_file.rs +++ b/src/execution/dml/copy_from_file.rs @@ -1,4 +1,5 @@ use crate::binder::copy::FileFormat; +use crate::catalog::PrimaryKeyIndices; use crate::errors::DatabaseError; use crate::execution::{Executor, WriteExecutor}; use crate::planner::operator::copy_from_file::CopyFromFileOperator; @@ -26,8 +27,8 @@ impl From for CopyFromFile { impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CopyFromFile { fn execute_mut( self, - _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -38,11 +39,20 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CopyFromFile { // # Cancellation // When this stream is dropped, the `rx` is dropped, the spawned task will fail to send to // `tx`, then the task will finish. - let table_name = self.op.table.clone(); - let handle = thread::spawn(|| self.read_file_blocking(tx)); + let table = throw!(throw!( + unsafe { &mut (*transaction) }.table(table_cache, self.op.table.clone()) + ) + .ok_or(DatabaseError::TableNotFound)); + let primary_keys_indices = table.primary_keys_indices().clone(); + let handle = thread::spawn(|| self.read_file_blocking(tx, primary_keys_indices)); let mut size = 0_usize; while let Ok(chunk) = rx.recv() { - throw!(transaction.append_tuple(&table_name, chunk, &types, false)); + throw!(unsafe { &mut (*transaction) }.append_tuple( + table.name(), + chunk, + &types, + false + )); size += 1; } throw!(handle.join().unwrap()); @@ -61,7 +71,11 @@ impl CopyFromFile { /// Read records from file using blocking IO. /// /// The read data chunks will be sent through `tx`. - fn read_file_blocking(mut self, tx: Sender) -> Result<(), DatabaseError> { + fn read_file_blocking( + mut self, + tx: Sender, + pk_indices: PrimaryKeyIndices, + ) -> Result<(), DatabaseError> { let file = File::open(self.op.source.path)?; let mut buf_reader = BufReader::new(file); let mut reader = match self.op.source.format { @@ -79,7 +93,7 @@ impl CopyFromFile { }; let column_count = self.op.schema_ref.len(); - let tuple_builder = TupleBuilder::new(&self.op.schema_ref); + let tuple_builder = TupleBuilder::new(&self.op.schema_ref, Some(&pk_indices)); for record in reader.records() { // read records and push raw str rows into data chunk builder @@ -178,7 +192,7 @@ mod tests { ]; let op = CopyFromFileOperator { - table: "test_copy".to_string(), + table: Arc::new("test_copy".to_string()), source: ExtSource { path: file.path().into(), format: FileFormat::Csv { diff --git a/src/execution/dml/copy_to_file.rs b/src/execution/dml/copy_to_file.rs index e5ce481e..7e824894 100644 --- a/src/execution/dml/copy_to_file.rs +++ b/src/execution/dml/copy_to_file.rs @@ -21,14 +21,14 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for CopyToFile { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] move || { let mut writer = throw!(self.create_writer()); - let mut iter = throw!(transaction.read( + let mut iter = throw!(unsafe { &mut (*transaction) }.read( cache.0, Arc::new(self.op.table.clone()), (None, None), diff --git a/src/execution/dml/delete.rs b/src/execution/dml/delete.rs index fa511e0c..e1d76db9 100644 --- a/src/execution/dml/delete.rs +++ b/src/execution/dml/delete.rs @@ -31,7 +31,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { fn execute_mut( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -42,66 +42,66 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { } = self; let schema = input.output_schema().clone(); - let table = throw!(throw!(transaction.table(cache.0, table_name.clone())) - .cloned() - .ok_or(DatabaseError::TableNotFound)); - let mut tuple_ids = Vec::new(); + let table = throw!(throw!( + unsafe { &mut (*transaction) }.table(cache.0, table_name.clone()) + ) + .ok_or(DatabaseError::TableNotFound)); let mut indexes: HashMap = HashMap::new(); let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - let tuple: Tuple = throw!(tuple); + let mut tuple: Tuple = throw!(tuple); for index_meta in table.indexes() { - if let Some(Value { - exprs, value_rows, .. - }) = indexes.get_mut(&index_meta.id) - { - value_rows.push(throw!(Projection::projection(&tuple, exprs, &schema))); + if let Some(Value { exprs, values, .. }) = indexes.get_mut(&index_meta.id) { + let Some(data_value) = DataValue::values_to_tuple(throw!( + Projection::projection(&tuple, exprs, &schema) + )) else { + continue; + }; + values.push(data_value); } else { - let exprs = throw!(index_meta.column_exprs(&table)); - let values = throw!(Projection::projection(&tuple, &exprs, &schema)); + let mut values = Vec::with_capacity(table.indexes().len()); + let exprs = throw!(index_meta.column_exprs(table)); + let Some(data_value) = DataValue::values_to_tuple(throw!( + Projection::projection(&tuple, &exprs, &schema) + )) else { + continue; + }; + values.push(data_value); indexes.insert( index_meta.id, Value { exprs, - value_rows: vec![values], + values, index_ty: index_meta.ty, }, ); } } - if let Some(tuple_id) = tuple.id { - tuple_ids.push(tuple_id); - } - } - drop(coroutine); - for ( - index_id, - Value { - value_rows, - index_ty, - .. - }, - ) in indexes - { - for (i, values) in value_rows.into_iter().enumerate() { - let Some(value) = DataValue::values_to_tuple(values) else { - continue; - }; + if let Some(tuple_id) = tuple.id() { + for ( + index_id, + Value { + values, index_ty, .. + }, + ) in indexes.iter_mut() + { + for value in values { + throw!(unsafe { &mut (*transaction) }.del_index( + &table_name, + &Index::new(*index_id, value, *index_ty), + tuple_id, + )); + } + } - throw!(transaction.del_index( - &table_name, - &Index::new(index_id, &value, index_ty), - Some(&tuple_ids[i]), - )); + throw!(unsafe { &mut (*transaction) }.remove_tuple(&table_name, tuple_id)); } } - for tuple_id in tuple_ids { - throw!(transaction.remove_tuple(&table_name, &tuple_id)); - } + drop(coroutine); yield Ok(TupleBuilder::build_result("1".to_string())); }, ) @@ -110,6 +110,6 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { struct Value { exprs: Vec, - value_rows: Vec>, + values: Vec, index_ty: IndexType, } diff --git a/src/execution/dml/insert.rs b/src/execution/dml/insert.rs index 1a9b5856..fe71244e 100644 --- a/src/execution/dml/insert.rs +++ b/src/execution/dml/insert.rs @@ -64,7 +64,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { fn execute_mut( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -76,7 +76,6 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { is_mapping_by_name, } = self; - let mut tuples = Vec::new(); let schema = input.output_schema().clone(); let primary_keys = schema @@ -90,26 +89,26 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { } if let Some(table_catalog) = - throw!(transaction.table(cache.0, table_name.clone())).cloned() + throw!(unsafe { &mut (*transaction) }.table(cache.0, table_name.clone())) + .cloned() { + let mut index_metas = Vec::new(); + for index_meta in table_catalog.indexes() { + let exprs = throw!(index_meta.column_exprs(&table_catalog)); + index_metas.push((index_meta, exprs)); + } + let types = table_catalog.types(); + let indices = table_catalog.primary_keys_indices(); let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let Tuple { values, .. } = throw!(tuple); - let mut tuple_id = Vec::with_capacity(primary_keys.len()); let mut tuple_map = HashMap::new(); for (i, value) in values.into_iter().enumerate() { tuple_map.insert(schema[i].key(is_mapping_by_name), value); } - - for primary_key in primary_keys.iter() { - tuple_id.push(throw!(tuple_map - .get(primary_key) - .cloned() - .ok_or(DatabaseError::NotNull))); - } let mut values = Vec::with_capacity(table_catalog.columns_len()); for col in table_catalog.columns() { @@ -127,36 +126,31 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { } values.push(value) } - tuples.push(Tuple { - id: Some(if primary_keys.len() == 1 { - tuple_id.pop().unwrap() - } else { - DataValue::Tuple(Some((tuple_id, false))) - }), - values, - }); - } - drop(coroutine); - for index_meta in table_catalog.indexes() { - let exprs = throw!(index_meta.column_exprs(&table_catalog)); + let mut tuple = Tuple::new(Some(indices.clone()), values); - for tuple in tuples.iter() { - let values = throw!(Projection::projection(tuple, &exprs, &schema)); + for (index_meta, exprs) in index_metas.iter() { + let values = throw!(Projection::projection(&tuple, exprs, &schema)); let Some(value) = DataValue::values_to_tuple(values) else { continue; }; + let Some(tuple_id) = tuple.id() else { + unreachable!() + }; let index = Index::new(index_meta.id, &value, index_meta.ty); - - throw!(transaction.add_index( + throw!(unsafe { &mut (*transaction) }.add_index( &table_name, index, - tuple.id.as_ref().unwrap() + tuple_id )); } + throw!(unsafe { &mut (*transaction) }.append_tuple( + &table_name, + tuple, + &types, + is_overwrite + )); } - for tuple in tuples { - throw!(transaction.append_tuple(&table_name, tuple, &types, is_overwrite)); - } + drop(coroutine); } yield Ok(TupleBuilder::build_result("1".to_string())); }, diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs index 2ae6a6c6..ba0ae31b 100644 --- a/src/execution/dml/update.rs +++ b/src/execution/dml/update.rs @@ -9,7 +9,7 @@ use crate::throw; use crate::types::index::Index; use crate::types::tuple::types; use crate::types::tuple::Tuple; -use crate::types::tuple_builder::{TupleBuilder, TupleIdBuilder}; +use crate::types::tuple_builder::TupleBuilder; use crate::types::value::DataValue; use std::collections::HashMap; use std::ops::Coroutine; @@ -44,7 +44,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { fn execute_mut( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -64,58 +64,49 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { let types = types(&input_schema); if let Some(table_catalog) = - throw!(transaction.table(cache.0, table_name.clone())).cloned() + throw!(unsafe { &mut (*transaction) }.table(cache.0, table_name.clone())) + .cloned() { - let mut tuples = Vec::new(); + let mut index_metas = Vec::new(); + for index_meta in table_catalog.indexes() { + let exprs = throw!(index_meta.column_exprs(&table_catalog)); + index_metas.push((index_meta, exprs)); + } let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - let tuple: Tuple = throw!(tuple); + let mut tuple: Tuple = throw!(tuple); - tuples.push(tuple); - } - drop(coroutine); - let mut index_metas = Vec::new(); - for index_meta in table_catalog.indexes() { - let exprs = throw!(index_meta.column_exprs(&table_catalog)); + let mut is_overwrite = true; - for tuple in tuples.iter() { + let old_pk = tuple.id().cloned().unwrap(); + for (index_meta, exprs) in index_metas.iter() { let values = - throw!(Projection::projection(tuple, &exprs, &input_schema)); + throw!(Projection::projection(&tuple, exprs, &input_schema)); let Some(value) = DataValue::values_to_tuple(values) else { continue; }; let index = Index::new(index_meta.id, &value, index_meta.ty); - throw!(transaction.del_index( + throw!(unsafe { &mut (*transaction) }.del_index( &table_name, &index, - Some(tuple.id.as_ref().unwrap()) + &old_pk )); } - index_metas.push((index_meta, exprs)); - } - let mut id_builder = TupleIdBuilder::new(&input_schema); - - for mut tuple in tuples { - let mut is_overwrite = true; - for (i, column) in input_schema.iter().enumerate() { if let Some(expr) = exprs_map.get(&column.id()) { - let value = throw!(expr.eval(&tuple, &input_schema)); - if column.desc().is_primary() { - id_builder.append(value.clone()); - } - tuple.values[i] = value; + tuple.values[i] = throw!(expr.eval(&tuple, &input_schema)); } } - if let Some(id) = id_builder.build() { - if &id != tuple.id.as_ref().unwrap() { - let old_key = tuple.id.replace(id).unwrap(); + tuple.clear_id(); + let new_pk = tuple.id().unwrap().clone(); - throw!(transaction.remove_tuple(&table_name, &old_key)); - is_overwrite = false; - } + if new_pk != old_pk { + throw!( + unsafe { &mut (*transaction) }.remove_tuple(&table_name, &old_pk) + ); + is_overwrite = false; } for (index_meta, exprs) in index_metas.iter() { let values = @@ -124,15 +115,21 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { continue; }; let index = Index::new(index_meta.id, &value, index_meta.ty); - throw!(transaction.add_index( + throw!(unsafe { &mut (*transaction) }.add_index( &table_name, index, - tuple.id.as_ref().unwrap() + &new_pk )); } - throw!(transaction.append_tuple(&table_name, tuple, &types, is_overwrite)); + throw!(unsafe { &mut (*transaction) }.append_tuple( + &table_name, + tuple, + &types, + is_overwrite + )); } + drop(coroutine); } yield Ok(TupleBuilder::build_result("1".to_string())); }, diff --git a/src/execution/dql/aggregate/hash_agg.rs b/src/execution/dql/aggregate/hash_agg.rs index 656db0ba..39e2abe0 100644 --- a/src/execution/dql/aggregate/hash_agg.rs +++ b/src/execution/dql/aggregate/hash_agg.rs @@ -1,4 +1,3 @@ -use crate::catalog::ColumnRef; use crate::errors::DatabaseError; use crate::execution::dql::aggregate::{create_accumulators, Accumulator}; use crate::execution::{build_read, Executor, ReadExecutor}; @@ -7,10 +6,11 @@ use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; -use crate::types::tuple::{SchemaRef, Tuple}; +use crate::types::tuple::Tuple; use crate::types::value::DataValue; -use ahash::HashMap; +use ahash::{HashMap, HashMapExt}; use itertools::Itertools; +use std::collections::hash_map::Entry; use std::ops::{Coroutine, CoroutineState}; use std::pin::Pin; @@ -43,7 +43,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashAggExecutor { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -54,109 +54,56 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashAggExecutor { mut input, } = self; - let mut agg_status = - HashAggStatus::new(input.output_schema().clone(), agg_calls, groupby_exprs); + let schema_ref = input.output_schema().clone(); + let mut group_hash_accs: HashMap, Vec>> = + HashMap::new(); let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(result) = Pin::new(&mut coroutine).resume(()) { - throw!(agg_status.update(throw!(result))); + let tuple = throw!(result); + let mut values = Vec::with_capacity(agg_calls.len()); + + for expr in agg_calls.iter() { + if let ScalarExpression::AggCall { args, .. } = expr { + if args.len() > 1 { + throw!(Err(DatabaseError::UnsupportedStmt("currently aggregate functions only support a single Column as a parameter".to_string()))) + } + values.push(throw!(args[0].eval(&tuple, &schema_ref))); + } else { + unreachable!() + } + } + let group_keys: Vec = throw!(groupby_exprs + .iter() + .map(|expr| expr.eval(&tuple, &schema_ref)) + .try_collect()); + + let entry = match group_hash_accs.entry(group_keys) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + entry.insert(throw!(create_accumulators(&agg_calls))) + } + }; + for (acc, value) in entry.iter_mut().zip_eq(values.iter()) { + throw!(acc.update_value(value)); + } } - for tuple in throw!(agg_status.as_tuples()) { - yield Ok(tuple); + for (group_keys, accs) in group_hash_accs { + // Tips: Accumulator First + let values: Vec = throw!(accs + .iter() + .map(|acc| acc.evaluate()) + .chain(group_keys.into_iter().map(Ok)) + .try_collect()); + yield Ok(Tuple::new(None, values)); } }, ) } } -pub(crate) struct HashAggStatus { - schema_ref: SchemaRef, - - agg_calls: Vec, - groupby_exprs: Vec, - - group_columns: Vec, - group_hash_accs: HashMap, Vec>>, -} - -impl HashAggStatus { - pub(crate) fn new( - schema_ref: SchemaRef, - agg_calls: Vec, - groupby_exprs: Vec, - ) -> Self { - HashAggStatus { - schema_ref, - agg_calls, - groupby_exprs, - group_columns: vec![], - group_hash_accs: Default::default(), - } - } - - pub(crate) fn update(&mut self, tuple: Tuple) -> Result<(), DatabaseError> { - // 1. build group and agg columns for hash_agg columns. - // Tips: AggCall First - if self.group_columns.is_empty() { - self.group_columns = self - .agg_calls - .iter() - .chain(self.groupby_exprs.iter()) - .map(|expr| expr.output_column()) - .collect_vec(); - } - - // 2.1 evaluate agg exprs and collect the result values for later accumulators. - let values: Vec = self - .agg_calls - .iter() - .map(|expr| { - if let ScalarExpression::AggCall { args, .. } = expr { - args[0].eval(&tuple, &self.schema_ref) - } else { - unreachable!() - } - }) - .try_collect()?; - - let group_keys: Vec = self - .groupby_exprs - .iter() - .map(|expr| expr.eval(&tuple, &self.schema_ref)) - .try_collect()?; - - for (acc, value) in self - .group_hash_accs - .entry(group_keys) - .or_insert_with(|| create_accumulators(&self.agg_calls).unwrap()) - .iter_mut() - .zip_eq(values.iter()) - { - acc.update_value(value)?; - } - - Ok(()) - } - - pub(crate) fn as_tuples(&mut self) -> Result, DatabaseError> { - self.group_hash_accs - .drain() - .map(|(group_keys, accs)| { - // Tips: Accumulator First - let values: Vec = accs - .iter() - .map(|acc| acc.evaluate()) - .chain(group_keys.into_iter().map(Ok)) - .try_collect()?; - - Ok::(Tuple { id: None, values }) - }) - .try_collect() - } -} - #[cfg(test)] mod test { use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; @@ -188,7 +135,7 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path()).unwrap(); - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let desc = ColumnDesc::new(LogicalType::Integer, None, false, None)?; let t1_schema = Arc::new(vec![ @@ -241,7 +188,7 @@ mod test { let tuples = try_collect( HashAggExecutor::from((operator, input)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction), + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction), )?; assert_eq!(tuples.len(), 2); diff --git a/src/execution/dql/aggregate/simple_agg.rs b/src/execution/dql/aggregate/simple_agg.rs index 7537681e..2fb13dcd 100644 --- a/src/execution/dql/aggregate/simple_agg.rs +++ b/src/execution/dql/aggregate/simple_agg.rs @@ -29,7 +29,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SimpleAggExecutor { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -62,7 +62,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SimpleAggExecutor { let values: Vec = throw!(accs.into_iter().map(|acc| acc.evaluate()).try_collect()); - yield Ok(Tuple { id: None, values }); + yield Ok(Tuple::new(None, values)); }, ) } diff --git a/src/execution/dql/describe.rs b/src/execution/dql/describe.rs index c939a55c..132f4251 100644 --- a/src/execution/dql/describe.rs +++ b/src/execution/dql/describe.rs @@ -43,13 +43,15 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Describe { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] move || { - let table = throw!(throw!(transaction.table(cache.0, self.table_name.clone())) - .ok_or(DatabaseError::TableNotFound)); + let table = throw!(throw!( + unsafe { &mut (*transaction) }.table(cache.0, self.table_name.clone()) + ) + .ok_or(DatabaseError::TableNotFound)); let key_fn = |column: &ColumnCatalog| { if column.desc().is_primary() { PRIMARY_KEY_TYPE.clone() @@ -96,7 +98,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Describe { unit: CharLengthUnits::Characters, }, ]; - yield Ok(Tuple { id: None, values }); + yield Ok(Tuple::new(None, values)); } }, ) diff --git a/src/execution/dql/dummy.rs b/src/execution/dql/dummy.rs index 3c4acd40..6ea7b669 100644 --- a/src/execution/dql/dummy.rs +++ b/src/execution/dql/dummy.rs @@ -8,15 +8,12 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Dummy { fn execute( self, _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - _: &'a T, + _: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] move || { - yield Ok(Tuple { - id: None, - values: vec![], - }); + yield Ok(Tuple::new(None, Vec::new())); }, ) } diff --git a/src/execution/dql/explain.rs b/src/execution/dql/explain.rs index b7772449..2f1c303b 100644 --- a/src/execution/dql/explain.rs +++ b/src/execution/dql/explain.rs @@ -19,7 +19,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Explain { fn execute( self, _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - _: &'a T, + _: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -30,7 +30,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Explain { unit: CharLengthUnits::Characters, }]; - yield Ok(Tuple { id: None, values }); + yield Ok(Tuple::new(None, values)); }, ) } diff --git a/src/execution/dql/filter.rs b/src/execution/dql/filter.rs index a4132142..57d3cc65 100644 --- a/src/execution/dql/filter.rs +++ b/src/execution/dql/filter.rs @@ -23,7 +23,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Filter { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] diff --git a/src/execution/dql/function_scan.rs b/src/execution/dql/function_scan.rs index 410312e3..e7936c31 100644 --- a/src/execution/dql/function_scan.rs +++ b/src/execution/dql/function_scan.rs @@ -20,7 +20,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for FunctionScan { fn execute( self, _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - _: &'a T, + _: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] diff --git a/src/execution/dql/index_scan.rs b/src/execution/dql/index_scan.rs index e1027280..bf3794ba 100644 --- a/src/execution/dql/index_scan.rs +++ b/src/execution/dql/index_scan.rs @@ -30,7 +30,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan { fn execute( self, (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -42,7 +42,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan { .. } = self.op; - let mut iter = transaction + let mut iter = unsafe { &(*transaction) } .read_by_index( table_cache, table_name, diff --git a/src/execution/dql/join/hash_join.rs b/src/execution/dql/join/hash_join.rs index 1fb1590c..96ae3c64 100644 --- a/src/execution/dql/join/hash_join.rs +++ b/src/execution/dql/join/hash_join.rs @@ -7,15 +7,14 @@ use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; use crate::throw; -use crate::types::tuple::{Schema, SchemaRef, Tuple}; +use crate::types::tuple::{Schema, Tuple}; use crate::types::value::{DataValue, NULL_VALUE}; use crate::utils::bit_vector::BitVector; -use ahash::HashMap; +use ahash::{HashMap, HashMapExt}; use itertools::Itertools; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; -use std::sync::Arc; pub struct HashJoin { on: JoinCondition, @@ -41,244 +40,18 @@ impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for HashJoin { } } -impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { - fn execute( - self, - cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, - ) -> Executor<'a> { - Box::new( - #[coroutine] - move || { - let HashJoin { - on, - ty, - mut left_input, - mut right_input, - } = self; - let mut join_status = HashJoinStatus::new( - on, - ty, - left_input.output_schema(), - right_input.output_schema(), - ); - let join_status_ptr: *mut HashJoinStatus = &mut join_status; - - // build phase: - // 1.construct hashtable, one hash key may contains multiple rows indices. - // 2.merged all left tuples. - let mut coroutine = build_read(left_input, cache, transaction); - - while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - let tuple: Tuple = throw!(tuple); - - throw!(unsafe { (*join_status_ptr).left_build(tuple) }); - } - - // probe phase - let mut coroutine = build_read(right_input, cache, transaction); - - while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - let tuple: Tuple = throw!(tuple); - - unsafe { - let mut coroutine = (*join_status_ptr).right_probe(tuple); - - while let CoroutineState::Yielded(tuple) = - Pin::new(&mut coroutine).resume(()) - { - yield tuple; - } - } - } - - unsafe { - if let Some(mut coroutine) = (*join_status_ptr).build_drop() { - while let CoroutineState::Yielded(tuple) = - Pin::new(&mut coroutine).resume(()) - { - yield tuple; - } - }; - } - }, - ) - } -} - -pub(crate) struct HashJoinStatus { - ty: JoinType, - filter: Option, - build_map: HashMap, (Vec, bool, bool)>, - - full_schema_ref: SchemaRef, - left_schema_len: usize, - on_left_keys: Vec, - on_right_keys: Vec, -} - -impl HashJoinStatus { - pub(crate) fn new( - on: JoinCondition, - ty: JoinType, - left_schema: &SchemaRef, - right_schema: &SchemaRef, - ) -> Self { - if ty == JoinType::Cross { - unreachable!("Cross join should not be in HashJoinExecutor"); - } - let ((on_left_keys, on_right_keys), filter): ( - (Vec, Vec), - _, - ) = match on { - JoinCondition::On { on, filter } => (on.into_iter().unzip(), filter), - JoinCondition::None => unreachable!("HashJoin must has on condition"), - }; - if on_left_keys.is_empty() || on_right_keys.is_empty() { - todo!("`NestLoopJoin` should be used when there is no equivalent condition") - } - debug_assert!(!on_left_keys.is_empty()); - debug_assert!(!on_right_keys.is_empty()); - - let fn_process = |schema: &mut Vec, force_nullable| { - for column in schema.iter_mut() { - if let Some(new_column) = column.nullable_for_join(force_nullable) { - *column = new_column; - } - } - }; - let (left_force_nullable, right_force_nullable) = joins_nullable(&ty); - let left_schema_len = left_schema.len(); - - let mut join_schema = Vec::clone(left_schema); - fn_process(&mut join_schema, left_force_nullable); - let mut right_schema = Vec::clone(right_schema); - fn_process(&mut right_schema, right_force_nullable); - - join_schema.append(&mut right_schema); - - HashJoinStatus { - ty, - filter, - build_map: Default::default(), +impl HashJoin { + fn eval_keys( + on_keys: &[ScalarExpression], + tuple: &Tuple, + schema: &[ColumnRef], + ) -> Result, DatabaseError> { + let mut values = Vec::with_capacity(on_keys.len()); - full_schema_ref: Arc::new(join_schema), - left_schema_len, - on_left_keys, - on_right_keys, + for expr in on_keys { + values.push(expr.eval(tuple, schema)?); } - } - - pub(crate) fn left_build(&mut self, tuple: Tuple) -> Result<(), DatabaseError> { - let HashJoinStatus { - on_left_keys, - build_map, - full_schema_ref, - left_schema_len, - .. - } = self; - let values = Self::eval_keys(on_left_keys, &tuple, &full_schema_ref[0..*left_schema_len])?; - - build_map - .entry(values) - .or_insert_with(|| (Vec::new(), false, false)) - .0 - .push(tuple); - - Ok(()) - } - - pub(crate) fn right_probe(&mut self, tuple: Tuple) -> Executor { - Box::new( - #[coroutine] - move || { - let HashJoinStatus { - on_right_keys, - full_schema_ref, - build_map, - ty, - filter, - left_schema_len, - .. - } = self; - - let right_cols_len = tuple.values.len(); - let values = throw!(Self::eval_keys( - on_right_keys, - &tuple, - &full_schema_ref[*left_schema_len..] - )); - let has_null = values.iter().any(|value| value.is_null()); - - if let (false, Some((tuples, is_used, is_filtered))) = - (has_null, build_map.get_mut(&values)) - { - let mut bits_option = None; - *is_used = true; - - match ty { - JoinType::LeftSemi => { - if *is_filtered { - return; - } else { - bits_option = Some(BitVector::new(tuples.len())); - } - } - JoinType::LeftAnti => return, - _ => (), - } - for (i, Tuple { values, .. }) in tuples.iter().enumerate() { - let full_values = values - .iter() - .cloned() - .chain(tuple.values.clone()) - .collect_vec(); - let tuple = Tuple { - id: None, - values: full_values, - }; - if let Some(tuple) = throw!(Self::filter( - tuple, - full_schema_ref, - filter, - ty, - *left_schema_len - )) { - if let Some(bits) = bits_option.as_mut() { - bits.set_bit(i, true); - } else { - yield Ok(tuple); - } - } - } - if let Some(bits) = bits_option { - let mut cnt = 0; - tuples.retain(|_| { - let res = bits.get_bit(cnt); - cnt += 1; - res - }); - *is_filtered = true - } - } else if matches!(ty, JoinType::RightOuter | JoinType::Full) { - let empty_len = full_schema_ref.len() - right_cols_len; - let values = (0..empty_len) - .map(|_| NULL_VALUE.clone()) - .chain(tuple.values) - .collect_vec(); - let tuple = Tuple { id: None, values }; - if let Some(tuple) = throw!(Self::filter( - tuple, - full_schema_ref, - filter, - ty, - *left_schema_len - )) { - yield Ok(tuple); - } - } - }, - ) + Ok(values) } pub(crate) fn filter( @@ -314,106 +87,215 @@ impl HashJoinStatus { Ok(Some(tuple)) } +} - pub(crate) fn build_drop(&mut self) -> Option { - let HashJoinStatus { - full_schema_ref, - build_map, - ty, - filter, - left_schema_len, - .. - } = self; - - match ty { - JoinType::LeftOuter | JoinType::Full => { - Some(Self::right_null_tuple(build_map, full_schema_ref)) - } - JoinType::LeftSemi | JoinType::LeftAnti => Some(Self::one_side_tuple( - build_map, - full_schema_ref, - filter, - ty, - *left_schema_len, - )), - _ => None, - } - } - - fn right_null_tuple<'a>( - build_map: &'a mut HashMap, (Vec, bool, bool)>, - schema: &'a Schema, +impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { + fn execute( + self, + cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] move || { - for (_, (left_tuples, is_used, _)) in build_map.drain() { - if is_used { - continue; - } - for mut tuple in left_tuples { - while tuple.values.len() != schema.len() { - tuple.values.push(NULL_VALUE.clone()); + let HashJoin { + on, + ty, + mut left_input, + mut right_input, + } = self; + + if ty == JoinType::Cross { + unreachable!("Cross join should not be in HashJoinExecutor"); + } + let ((on_left_keys, on_right_keys), filter): ( + (Vec, Vec), + _, + ) = match on { + JoinCondition::On { on, filter } => (on.into_iter().unzip(), filter), + JoinCondition::None => unreachable!("HashJoin must has on condition"), + }; + if on_left_keys.is_empty() || on_right_keys.is_empty() { + throw!(Err(DatabaseError::UnsupportedStmt( + "`NestLoopJoin` should be used when there is no equivalent condition" + .to_string() + ))) + } + debug_assert!(!on_left_keys.is_empty()); + debug_assert!(!on_right_keys.is_empty()); + + let fn_process = |schema: &mut [ColumnRef], force_nullable| { + for column in schema.iter_mut() { + if let Some(new_column) = column.nullable_for_join(force_nullable) { + *column = new_column; } - yield Ok(tuple); } - } - }, - ) - } + }; + let (left_force_nullable, right_force_nullable) = joins_nullable(&ty); - fn one_side_tuple<'a>( - build_map: &'a mut HashMap, (Vec, bool, bool)>, - schema: &'a Schema, - filter: &'a Option, - join_ty: &'a JoinType, - left_schema_len: usize, - ) -> Executor<'a> { - Box::new( - #[coroutine] - move || { - let is_left_semi = matches!(join_ty, JoinType::LeftSemi); + let mut full_schema_ref = Vec::clone(left_input.output_schema()); + let left_schema_len = full_schema_ref.len(); - for (_, (left_tuples, mut is_used, is_filtered)) in build_map.drain() { - if is_left_semi { - is_used = !is_used; - } - if is_used { - continue; + fn_process(&mut full_schema_ref, left_force_nullable); + full_schema_ref.extend_from_slice(right_input.output_schema()); + fn_process( + &mut full_schema_ref[left_schema_len..], + right_force_nullable, + ); + + // build phase: + // 1.construct hashtable, one hash key may contains multiple rows indices. + // 2.merged all left tuples. + let mut coroutine = build_read(left_input, cache, transaction); + let mut build_map = HashMap::new(); + let build_map_ptr: *mut HashMap, (Vec, bool, bool)> = + &mut build_map; + + while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { + let tuple: Tuple = throw!(tuple); + let values = throw!(Self::eval_keys( + &on_left_keys, + &tuple, + &full_schema_ref[0..left_schema_len] + )); + + unsafe { + (*build_map_ptr) + .entry(values) + .or_insert_with(|| (Vec::new(), false, false)) + .0 + .push(tuple); } - if is_filtered { - for tuple in left_tuples { - yield Ok(tuple); + } + + // probe phase + let mut coroutine = build_read(right_input, cache, transaction); + + while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { + let tuple: Tuple = throw!(tuple); + + let right_cols_len = tuple.values.len(); + let values = throw!(Self::eval_keys( + &on_right_keys, + &tuple, + &full_schema_ref[left_schema_len..] + )); + let has_null = values.iter().any(|value| value.is_null()); + let build_value = unsafe { (*build_map_ptr).get_mut(&values) }; + drop(values); + + if let (false, Some((tuples, is_used, is_filtered))) = (has_null, build_value) { + let mut bits_option = None; + *is_used = true; + + match ty { + JoinType::LeftSemi => { + if *is_filtered { + continue; + } else { + bits_option = Some(BitVector::new(tuples.len())); + } + } + JoinType::LeftAnti => continue, + _ => (), } - continue; - } - for tuple in left_tuples { + for (i, Tuple { values, .. }) in tuples.iter().enumerate() { + let full_values = values + .iter() + .chain(tuple.values.iter()) + .cloned() + .collect_vec(); + let tuple = Tuple::new(None, full_values); + if let Some(tuple) = throw!(Self::filter( + tuple, + &full_schema_ref, + &filter, + &ty, + left_schema_len + )) { + if let Some(bits) = bits_option.as_mut() { + bits.set_bit(i, true); + } else { + yield Ok(tuple); + } + } + } + if let Some(bits) = bits_option { + let mut cnt = 0; + tuples.retain(|_| { + let res = bits.get_bit(cnt); + cnt += 1; + res + }); + *is_filtered = true + } + } else if matches!(ty, JoinType::RightOuter | JoinType::Full) { + let empty_len = full_schema_ref.len() - right_cols_len; + let values = (0..empty_len) + .map(|_| NULL_VALUE.clone()) + .chain(tuple.values) + .collect_vec(); + let tuple = Tuple::new(None, values); if let Some(tuple) = throw!(Self::filter( tuple, - schema, - filter, - join_ty, + &full_schema_ref, + &filter, + &ty, left_schema_len )) { yield Ok(tuple); } } } - }, - ) - } - fn eval_keys( - on_keys: &[ScalarExpression], - tuple: &Tuple, - schema: &[ColumnRef], - ) -> Result, DatabaseError> { - let mut values = Vec::with_capacity(on_keys.len()); + // left drop + match ty { + JoinType::LeftOuter | JoinType::Full => { + for (_, (left_tuples, is_used, _)) in build_map { + if is_used { + continue; + } + for mut tuple in left_tuples { + while tuple.values.len() != full_schema_ref.len() { + tuple.values.push(NULL_VALUE.clone()); + } + yield Ok(tuple); + } + } + } + JoinType::LeftSemi | JoinType::LeftAnti => { + let is_left_semi = matches!(ty, JoinType::LeftSemi); - for expr in on_keys { - values.push(expr.eval(tuple, schema)?); - } - Ok(values) + for (_, (left_tuples, mut is_used, is_filtered)) in build_map { + if is_left_semi { + is_used = !is_used; + } + if is_used { + continue; + } + if is_filtered { + for tuple in left_tuples { + yield Ok(tuple); + } + continue; + } + for tuple in left_tuples { + if let Some(tuple) = throw!(Self::filter( + tuple, + &full_schema_ref, + &filter, + &ty, + left_schema_len + )) { + yield Ok(tuple); + } + } + } + } + _ => (), + } + }, + ) } } @@ -526,7 +408,7 @@ mod test { fn test_inner_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -540,7 +422,7 @@ mod test { join_type: JoinType::Inner, }; let executor = HashJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; assert_eq!(tuples.len(), 3); @@ -565,7 +447,7 @@ mod test { fn test_left_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -582,7 +464,7 @@ mod test { { let executor = HashJoin::from((op.clone(), left.clone(), right.clone())); let tuples = try_collect( - executor.execute((&table_cache, &view_cache, &meta_cache), &transaction), + executor.execute((&table_cache, &view_cache, &meta_cache), &mut transaction), )?; assert_eq!(tuples.len(), 4); @@ -609,7 +491,7 @@ mod test { let mut executor = HashJoin::from((op.clone(), left.clone(), right.clone())); executor.ty = JoinType::LeftSemi; let mut tuples = try_collect( - executor.execute((&table_cache, &view_cache, &meta_cache), &transaction), + executor.execute((&table_cache, &view_cache, &meta_cache), &mut transaction), )?; assert_eq!(tuples.len(), 2); @@ -633,7 +515,7 @@ mod test { let mut executor = HashJoin::from((op, left, right)); executor.ty = JoinType::LeftAnti; let tuples = try_collect( - executor.execute((&table_cache, &view_cache, &meta_cache), &transaction), + executor.execute((&table_cache, &view_cache, &meta_cache), &mut transaction), )?; assert_eq!(tuples.len(), 1); @@ -650,7 +532,7 @@ mod test { fn test_right_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -664,7 +546,7 @@ mod test { join_type: JoinType::RightOuter, }; let executor = HashJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; assert_eq!(tuples.len(), 4); @@ -693,7 +575,7 @@ mod test { fn test_full_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -707,7 +589,7 @@ mod test { join_type: JoinType::Full, }; let executor = HashJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; assert_eq!(tuples.len(), 5); diff --git a/src/execution/dql/join/nested_loop_join.rs b/src/execution/dql/join/nested_loop_join.rs index c0755363..c84b44e8 100644 --- a/src/execution/dql/join/nested_loop_join.rs +++ b/src/execution/dql/join/nested_loop_join.rs @@ -129,7 +129,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -239,10 +239,8 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { | JoinType::Full if !has_matched => { - let right_tuple = Tuple { - id: None, - values: vec![NULL_VALUE.clone(); right_schema_len], - }; + let right_tuple = + Tuple::new(None, vec![NULL_VALUE.clone(); right_schema_len]); if matches!(ty, JoinType::RightOuter) { Self::emit_tuple(&right_tuple, &left_tuple, ty, false) } else { @@ -271,7 +269,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { let mut values = vec![NULL_VALUE.clone(); right_schema_len]; values.append(&mut right_tuple.values); - yield Ok(Tuple { id: None, values }) + yield Ok(Tuple::new(None, values)) } idx += 1; } @@ -329,7 +327,7 @@ impl NestedLoopJoin { return None; } - Some(Tuple { id: None, values }) + Some(Tuple::new(None, values)) } /// Merge the two tuples. @@ -337,24 +335,24 @@ impl NestedLoopJoin { /// `right_tuple` must be from the `NestedLoopJoin.right_input` fn merge_tuple(left_tuple: &Tuple, right_tuple: &Tuple, ty: &JoinType) -> Tuple { match ty { - JoinType::RightOuter => Tuple { - id: None, - values: right_tuple + JoinType::RightOuter => Tuple::new( + None, + right_tuple .values .iter() + .chain(left_tuple.values.iter()) .cloned() - .chain(left_tuple.clone().values) .collect_vec(), - }, - _ => Tuple { - id: None, - values: left_tuple + ), + _ => Tuple::new( + None, + left_tuple .values .iter() + .chain(right_tuple.values.iter()) .cloned() - .chain(right_tuple.clone().values) .collect_vec(), - }, + ), } } @@ -540,7 +538,7 @@ mod test { fn test_nested_inner_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -553,7 +551,7 @@ mod test { join_type: JoinType::Inner, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -569,7 +567,7 @@ mod test { fn test_nested_left_out_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -582,7 +580,7 @@ mod test { join_type: JoinType::LeftOuter, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; assert_eq!( @@ -610,7 +608,7 @@ mod test { fn test_nested_cross_join_with_on() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -623,7 +621,7 @@ mod test { join_type: JoinType::Cross, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -640,7 +638,7 @@ mod test { fn test_nested_cross_join_without_filter() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -653,7 +651,7 @@ mod test { join_type: JoinType::Cross, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(3); @@ -673,7 +671,7 @@ mod test { fn test_nested_cross_join_without_on() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -686,7 +684,7 @@ mod test { join_type: JoinType::Cross, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; assert_eq!(tuples.len(), 16); @@ -698,7 +696,7 @@ mod test { fn test_nested_left_semi_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -711,7 +709,7 @@ mod test { join_type: JoinType::LeftSemi, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(1); @@ -726,7 +724,7 @@ mod test { fn test_nested_left_anti_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -739,7 +737,7 @@ mod test { join_type: JoinType::LeftAnti, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(3); @@ -756,7 +754,7 @@ mod test { fn test_nested_right_out_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -769,7 +767,7 @@ mod test { join_type: JoinType::RightOuter, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; let mut expected_set = HashSet::with_capacity(4); @@ -791,7 +789,7 @@ mod test { fn test_nested_full_join() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let storage = RocksStorage::new(temp_dir.path())?; - let transaction = storage.transaction()?; + let mut transaction = storage.transaction()?; let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); @@ -804,7 +802,7 @@ mod test { join_type: JoinType::Full, }; let executor = NestedLoopJoin::from((op, left, right)) - .execute((&table_cache, &view_cache, &meta_cache), &transaction); + .execute((&table_cache, &view_cache, &meta_cache), &mut transaction); let tuples = try_collect(executor)?; assert_eq!( diff --git a/src/execution/dql/limit.rs b/src/execution/dql/limit.rs index cbb0eb1c..a9050be1 100644 --- a/src/execution/dql/limit.rs +++ b/src/execution/dql/limit.rs @@ -26,7 +26,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Limit { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] diff --git a/src/execution/dql/projection.rs b/src/execution/dql/projection.rs index aa840062..9d3bf75e 100644 --- a/src/execution/dql/projection.rs +++ b/src/execution/dql/projection.rs @@ -27,7 +27,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Projection { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -37,10 +37,12 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Projection { let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - let mut tuple = throw!(tuple); + let tuple = throw!(tuple); - tuple.values = throw!(Self::projection(&tuple, &exprs, &schema)); - yield Ok(tuple); + yield Ok(Tuple::new( + None, + throw!(Self::projection(&tuple, &exprs, &schema)), + )); } }, ) diff --git a/src/execution/dql/seq_scan.rs b/src/execution/dql/seq_scan.rs index 2a304dc6..dc56e89f 100644 --- a/src/execution/dql/seq_scan.rs +++ b/src/execution/dql/seq_scan.rs @@ -17,7 +17,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SeqScan { fn execute( self, (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -29,7 +29,12 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SeqScan { .. } = self.op; - let mut iter = throw!(transaction.read(table_cache, table_name, limit, columns)); + let mut iter = throw!(unsafe { &mut (*transaction) }.read( + table_cache, + table_name, + limit, + columns + )); while let Some(tuple) = throw!(iter.next_tuple()) { yield Ok(tuple); diff --git a/src/execution/dql/show_table.rs b/src/execution/dql/show_table.rs index 37e20d5e..e77fcda0 100644 --- a/src/execution/dql/show_table.rs +++ b/src/execution/dql/show_table.rs @@ -12,12 +12,12 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for ShowTables { fn execute( self, _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] move || { - let metas = throw!(transaction.table_metas()); + let metas = throw!(unsafe { &mut (*transaction) }.table_metas()); for TableMeta { table_name } in metas { let values = vec![DataValue::Utf8 { @@ -26,7 +26,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for ShowTables { unit: CharLengthUnits::Characters, }]; - yield Ok(Tuple { id: None, values }); + yield Ok(Tuple::new(None, values)); } }, ) diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index 5e0b820c..a882be10 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -228,7 +228,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Sort { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -310,27 +310,9 @@ mod test { ColumnDesc::new(LogicalType::Integer, None, false, None).unwrap(), ))]); let tuples = NullableVec(vec![ - Some(( - 0_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(None)], - }, - )), - Some(( - 1_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(Some(0))], - }, - )), - Some(( - 2_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(Some(1))], - }, - )), + Some((0_usize, Tuple::new(None, vec![DataValue::Int32(None)]))), + Some((1_usize, Tuple::new(None, vec![DataValue::Int32(Some(0))]))), + Some((2_usize, Tuple::new(None, vec![DataValue::Int32(Some(1))]))), ]); let fn_asc_and_nulls_last_eq = |mut iter: Box>| { @@ -487,45 +469,42 @@ mod test { let tuples = NullableVec(vec![ Some(( 0_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(None), DataValue::Int32(None)], - }, + Tuple::new(None, vec![DataValue::Int32(None), DataValue::Int32(None)]), )), Some(( 1_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(Some(0)), DataValue::Int32(None)], - }, + Tuple::new( + None, + vec![DataValue::Int32(Some(0)), DataValue::Int32(None)], + ), )), Some(( 2_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(Some(1)), DataValue::Int32(None)], - }, + Tuple::new( + None, + vec![DataValue::Int32(Some(1)), DataValue::Int32(None)], + ), )), Some(( 3_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(None), DataValue::Int32(Some(0))], - }, + Tuple::new( + None, + vec![DataValue::Int32(None), DataValue::Int32(Some(0))], + ), )), Some(( 4_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(Some(0)), DataValue::Int32(Some(0))], - }, + Tuple::new( + None, + vec![DataValue::Int32(Some(0)), DataValue::Int32(Some(0))], + ), )), Some(( 5_usize, - Tuple { - id: None, - values: vec![DataValue::Int32(Some(1)), DataValue::Int32(Some(0))], - }, + Tuple::new( + None, + vec![DataValue::Int32(Some(1)), DataValue::Int32(Some(0))], + ), )), ]); let fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq = diff --git a/src/execution/dql/union.rs b/src/execution/dql/union.rs index 9cb3409e..95762985 100644 --- a/src/execution/dql/union.rs +++ b/src/execution/dql/union.rs @@ -23,7 +23,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Union { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] diff --git a/src/execution/dql/values.rs b/src/execution/dql/values.rs index c7a3a8d3..bfaeeba0 100644 --- a/src/execution/dql/values.rs +++ b/src/execution/dql/values.rs @@ -20,7 +20,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Values { fn execute( self, _: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - _: &'a T, + _: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] @@ -36,7 +36,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Values { } } - yield Ok(Tuple { id: None, values }); + yield Ok(Tuple::new(None, values)); } }, ) diff --git a/src/execution/mod.rs b/src/execution/mod.rs index c63c9eb2..1f25ae89 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -50,7 +50,7 @@ pub trait ReadExecutor<'a, T: Transaction + 'a> { fn execute( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a>; } @@ -58,14 +58,14 @@ pub trait WriteExecutor<'a, T: Transaction + 'a> { fn execute_mut( self, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a>; } pub fn build_read<'a, T: Transaction + 'a>( plan: LogicalPlan, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a T, + transaction: *mut T, ) -> Executor<'a> { let LogicalPlan { operator, @@ -150,7 +150,7 @@ pub fn build_read<'a, T: Transaction + 'a>( pub fn build_write<'a, T: Transaction + 'a>( plan: LogicalPlan, cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), - transaction: &'a mut T, + transaction: *mut T, ) -> Executor<'a> { let LogicalPlan { operator, diff --git a/src/function/numbers.rs b/src/function/numbers.rs index ebccdf2b..adbda58f 100644 --- a/src/function/numbers.rs +++ b/src/function/numbers.rs @@ -5,8 +5,8 @@ use crate::errors::DatabaseError; use crate::expression::function::table::TableFunctionImpl; use crate::expression::function::FunctionSummary; use crate::expression::ScalarExpression; -use crate::types::tuple::SchemaRef; use crate::types::tuple::Tuple; +use crate::types::tuple::{SchemaRef, EMPTY_TUPLE}; use crate::types::value::DataValue; use crate::types::LogicalType; use serde::Deserialize; @@ -52,25 +52,17 @@ impl TableFunctionImpl for Numbers { &self, args: &[ScalarExpression], ) -> Result>>, DatabaseError> { - let tuple = Tuple { - id: None, - values: Vec::new(), - }; - - let mut value = args[0].eval(&tuple, &[])?; + let mut value = args[0].eval(&EMPTY_TUPLE, &[])?; if value.logical_type() != LogicalType::Integer { value = value.cast(&LogicalType::Integer)?; } let num = value.i32().ok_or(DatabaseError::NotNull)?; - Ok(Box::new((0..num).map(|i| { - Ok(Tuple { - id: None, - values: vec![DataValue::Int32(Some(i))], - }) - })) - as Box>>) + Ok( + Box::new((0..num).map(|i| Ok(Tuple::new(None, vec![DataValue::Int32(Some(i))])))) + as Box>>, + ) } fn output_schema(&self) -> &SchemaRef { diff --git a/src/macros/mod.rs b/src/macros/mod.rs index e6bb0b47..b3f7499e 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -130,13 +130,10 @@ macro_rules! scala_function { /// /// Ok(Box::new((0..num) /// .into_iter() -/// .map(|i| Ok(Tuple { -/// id: None, -/// values: vec![ +/// .map(|i| Ok(Tuple::new(None, vec![ /// DataValue::Int32(Some(i)), /// DataValue::Int32(Some(i)), -/// ] -/// }))) as Box>>) +/// ])))) as Box>>) /// })); /// /// let fnck_sql = DataBaseBuilder::path("./example") @@ -185,13 +182,9 @@ macro_rules! table_function { #[allow(unused_variables, clippy::redundant_closure_call)] fn eval(&self, args: &[::fnck_sql::expression::ScalarExpression]) -> Result>>, ::fnck_sql::errors::DatabaseError> { let mut _index = 0; - let tuple = ::fnck_sql::types::tuple::Tuple { - id: None, - values: Vec::new(), - }; $closure($({ - let mut value = args[_index].eval(&tuple, &[])?; + let mut value = args[_index].eval(&::fnck_sql::types::tuple::EMPTY_TUPLE, &[])?; _index += 1; if value.logical_type() != $arg_ty { diff --git a/src/planner/operator/copy_from_file.rs b/src/planner/operator/copy_from_file.rs index fa9455bc..20d65421 100644 --- a/src/planner/operator/copy_from_file.rs +++ b/src/planner/operator/copy_from_file.rs @@ -1,4 +1,5 @@ use crate::binder::copy::ExtSource; +use crate::catalog::TableName; use crate::types::tuple::SchemaRef; use fnck_sql_serde_macros::ReferenceSerialization; use itertools::Itertools; @@ -7,7 +8,7 @@ use std::fmt::Formatter; #[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)] pub struct CopyFromFileOperator { - pub table: String, + pub table: TableName, pub source: ExtSource, pub schema_ref: SchemaRef, } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a9b560a3..6563c22a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -2,7 +2,9 @@ pub mod rocksdb; pub(crate) mod table_codec; use crate::catalog::view::View; -use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName}; +use crate::catalog::{ + ColumnCatalog, ColumnRef, PrimaryKeyIndices, TableCatalog, TableMeta, TableName, +}; use crate::errors::DatabaseError; use crate::expression::range_detacher::Range; use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta}; @@ -10,7 +12,6 @@ use crate::serdes::ReferenceTables; use crate::storage::table_codec::TableCodec; use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType}; use crate::types::tuple::{Tuple, TupleId}; -use crate::types::tuple_builder::TupleIdBuilder; use crate::types::value::DataValue; use crate::types::{ColumnId, LogicalType}; use crate::utils::lru::SharedLruCache; @@ -47,20 +48,20 @@ pub trait Transaction: Sized { /// The bounds is applied to the whole data batches, not per batch. /// /// The projections is column indices. - fn read( - &self, - table_cache: &TableCache, + fn read<'a>( + &'a self, + table_cache: &'a TableCache, table_name: TableName, bounds: Bounds, mut columns: Vec<(usize, ColumnRef)>, - ) -> Result, DatabaseError> { + ) -> Result, DatabaseError> { debug_assert!(columns.is_sorted_by_key(|(i, _)| i)); debug_assert!(columns.iter().map(|(i, _)| i).all_unique()); let table = self .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; - let id_builder = TupleIdBuilder::new(table.schema_ref()); + let pk_indices = table.primary_keys_indices(); let table_types = table.types(); if columns.is_empty() { let (i, column) = &table.primary_keys()[0]; @@ -74,14 +75,14 @@ pub trait Transaction: Sized { } let (min, max) = TableCodec::tuple_bound(&table_name); - let iter = self.range(Bound::Included(&min), Bound::Included(&max))?; + let iter = self.range(Bound::Included(min), Bound::Included(max))?; Ok(TupleIter { offset: bounds.0.unwrap_or(0), limit: bounds.1, table_types, tuple_columns: Arc::new(tuple_columns), - id_builder, + pk_indices, projections, iter, }) @@ -102,7 +103,7 @@ pub trait Transaction: Sized { let table = self .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; - let id_builder = TupleIdBuilder::new(table.schema_ref()); + let pk_indices = table.primary_keys_indices(); let table_types = table.types(); let table_name = table.name.as_str(); let offset = offset_option.unwrap_or(0); @@ -118,7 +119,7 @@ pub trait Transaction: Sized { Ok(IndexIter { offset, limit: limit_option, - id_builder, + pk_indices, params: IndexImplParams { tuple_schema_ref: Arc::new(tuple_columns), projections, @@ -182,12 +183,16 @@ pub trait Transaction: Sized { &mut self, table_name: &str, index: &Index, - tuple_id: Option<&TupleId>, + tuple_id: &TupleId, ) -> Result<(), DatabaseError> { if matches!(index.ty, IndexType::PrimaryKey { .. }) { return Ok(()); } - self.remove(&TableCodec::encode_index_key(table_name, index, tuple_id)?)?; + self.remove(&TableCodec::encode_index_key( + table_name, + index, + Some(tuple_id), + )?)?; Ok(()) } @@ -195,11 +200,11 @@ pub trait Transaction: Sized { fn append_tuple( &mut self, table_name: &str, - tuple: Tuple, + mut tuple: Tuple, types: &[LogicalType], is_overwrite: bool, ) -> Result<(), DatabaseError> { - let (key, value) = TableCodec::encode_tuple(table_name, &tuple, types)?; + let (key, value) = TableCodec::encode_tuple(table_name, &mut tuple, types)?; if !is_overwrite && self.get(&key)?.is_some() { return Err(DatabaseError::DuplicatePrimaryKey); @@ -282,7 +287,7 @@ pub trait Transaction: Sized { self.remove(&index_meta_key)?; let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id)?; - self._drop_data(&index_min, &index_max)?; + self._drop_data(index_min, index_max)?; self.remove_table_meta(meta_cache, table_name, index_meta.id)?; } @@ -386,10 +391,10 @@ pub trait Transaction: Sized { self.drop_data(table_name.as_str())?; let (column_min, column_max) = TableCodec::columns_bound(table_name.as_str()); - self._drop_data(&column_min, &column_max)?; + self._drop_data(column_min, column_max)?; let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name.as_str()); - self._drop_data(&index_meta_min, &index_meta_max)?; + self._drop_data(index_meta_min, index_meta_max)?; self.remove(&TableCodec::encode_root_table_key(table_name.as_str()))?; table_cache.remove(&table_name); @@ -399,13 +404,13 @@ pub trait Transaction: Sized { fn drop_data(&mut self, table_name: &str) -> Result<(), DatabaseError> { let (tuple_min, tuple_max) = TableCodec::tuple_bound(table_name); - self._drop_data(&tuple_min, &tuple_max)?; + self._drop_data(tuple_min, tuple_max)?; let (index_min, index_max) = TableCodec::all_index_bound(table_name); - self._drop_data(&index_min, &index_max)?; + self._drop_data(index_min, index_max)?; let (statistics_min, statistics_max) = TableCodec::statistics_bound(table_name); - self._drop_data(&statistics_min, &statistics_max)?; + self._drop_data(statistics_min, statistics_max)?; Ok(()) } @@ -449,7 +454,7 @@ pub trait Transaction: Sized { fn table_metas(&self) -> Result, DatabaseError> { let mut metas = vec![]; let (min, max) = TableCodec::root_table_bound(); - let mut iter = self.range(Bound::Included(&min), Bound::Included(&max))?; + let mut iter = self.range(Bound::Included(min), Bound::Included(max))?; while let Some((_, value)) = iter.try_next().ok().flatten() { let meta = TableCodec::decode_root_table::(&value)?; @@ -517,8 +522,10 @@ pub trait Transaction: Sized { table_name: &TableName, ) -> Result, Vec)>, DatabaseError> { let (table_min, table_max) = TableCodec::table_bound(table_name); - let mut column_iter = - self.range(Bound::Included(&table_min), Bound::Included(&table_max))?; + let mut column_iter = self.range( + Bound::Included(table_min.clone()), + Bound::Included(table_max), + )?; let mut columns = Vec::new(); let mut index_metas = Vec::new(); @@ -541,7 +548,7 @@ pub trait Transaction: Sized { Ok((!columns.is_empty()).then_some((columns, index_metas))) } - fn _drop_data(&mut self, min: &[u8], max: &[u8]) -> Result<(), DatabaseError> { + fn _drop_data(&mut self, min: Vec, max: Vec) -> Result<(), DatabaseError> { let mut iter = self.range(Bound::Included(min), Bound::Included(max))?; let mut data_keys = vec![]; @@ -601,11 +608,11 @@ pub trait Transaction: Sized { fn remove(&mut self, key: &[u8]) -> Result<(), DatabaseError>; - fn range<'a>( - &'a self, - min: Bound<&[u8]>, - max: Bound<&[u8]>, - ) -> Result, DatabaseError>; + fn range( + &self, + min: Bound>, + max: Bound>, + ) -> Result, DatabaseError>; fn commit(self) -> Result<(), DatabaseError>; } @@ -614,14 +621,14 @@ trait IndexImpl { fn index_lookup( &self, bytes: &Bytes, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams, ) -> Result; fn eq_to_res<'a>( &self, value: &DataValue, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError>; @@ -681,7 +688,7 @@ impl IndexImplParams<'_, T> { fn get_tuple_by_id( &self, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, tuple_id: &TupleId, ) -> Result, DatabaseError> { let key = TableCodec::encode_tuple_key(self.table_name, tuple_id)?; @@ -689,7 +696,7 @@ impl IndexImplParams<'_, T> { Ok(self.tx.get(&key)?.map(|bytes| { TableCodec::decode_tuple( &self.table_types, - id_builder, + pk_indices, &self.projections, &self.tuple_schema_ref, &bytes, @@ -707,28 +714,28 @@ impl IndexImpl for IndexImplEnum { fn index_lookup( &self, bytes: &Bytes, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams, ) -> Result { match self { - IndexImplEnum::PrimaryKey(inner) => inner.index_lookup(bytes, id_builder, params), - IndexImplEnum::Unique(inner) => inner.index_lookup(bytes, id_builder, params), - IndexImplEnum::Normal(inner) => inner.index_lookup(bytes, id_builder, params), - IndexImplEnum::Composite(inner) => inner.index_lookup(bytes, id_builder, params), + IndexImplEnum::PrimaryKey(inner) => inner.index_lookup(bytes, pk_indices, params), + IndexImplEnum::Unique(inner) => inner.index_lookup(bytes, pk_indices, params), + IndexImplEnum::Normal(inner) => inner.index_lookup(bytes, pk_indices, params), + IndexImplEnum::Composite(inner) => inner.index_lookup(bytes, pk_indices, params), } } fn eq_to_res<'a>( &self, value: &DataValue, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { match self { - IndexImplEnum::PrimaryKey(inner) => inner.eq_to_res(value, id_builder, params), - IndexImplEnum::Unique(inner) => inner.eq_to_res(value, id_builder, params), - IndexImplEnum::Normal(inner) => inner.eq_to_res(value, id_builder, params), - IndexImplEnum::Composite(inner) => inner.eq_to_res(value, id_builder, params), + IndexImplEnum::PrimaryKey(inner) => inner.eq_to_res(value, pk_indices, params), + IndexImplEnum::Unique(inner) => inner.eq_to_res(value, pk_indices, params), + IndexImplEnum::Normal(inner) => inner.eq_to_res(value, pk_indices, params), + IndexImplEnum::Composite(inner) => inner.eq_to_res(value, pk_indices, params), } } @@ -750,12 +757,12 @@ impl IndexImpl for PrimaryKeyIndexImpl { fn index_lookup( &self, bytes: &Bytes, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams, ) -> Result { Ok(TableCodec::decode_tuple( ¶ms.table_types, - id_builder, + pk_indices, ¶ms.projections, ¶ms.tuple_schema_ref, bytes, @@ -765,7 +772,7 @@ impl IndexImpl for PrimaryKeyIndexImpl { fn eq_to_res<'a>( &self, value: &DataValue, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let tuple = params @@ -774,7 +781,7 @@ impl IndexImpl for PrimaryKeyIndexImpl { .map(|bytes| { TableCodec::decode_tuple( ¶ms.table_types, - id_builder, + pk_indices, ¶ms.projections, ¶ms.tuple_schema_ref, &bytes, @@ -794,12 +801,12 @@ impl IndexImpl for PrimaryKeyIndexImpl { fn secondary_index_lookup( bytes: &Bytes, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams, ) -> Result { let tuple_id = TableCodec::decode_index(bytes, ¶ms.index_meta.pk_ty)?; params - .get_tuple_by_id(id_builder, &tuple_id)? + .get_tuple_by_id(pk_indices, &tuple_id)? .ok_or(DatabaseError::TupleIdNotFound(tuple_id)) } @@ -807,16 +814,16 @@ impl IndexImpl for UniqueIndexImpl { fn index_lookup( &self, bytes: &Bytes, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams, ) -> Result { - secondary_index_lookup(bytes, id_builder, params) + secondary_index_lookup(bytes, pk_indices, params) } fn eq_to_res<'a>( &self, value: &DataValue, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let Some(bytes) = params.tx.get(&self.bound_key(params, value)?)? else { @@ -824,7 +831,7 @@ impl IndexImpl for UniqueIndexImpl { }; let tuple_id = TableCodec::decode_index(&bytes, ¶ms.index_meta.pk_ty)?; let tuple = params - .get_tuple_by_id(id_builder, &tuple_id)? + .get_tuple_by_id(pk_indices, &tuple_id)? .ok_or(DatabaseError::TupleIdNotFound(tuple_id))?; Ok(IndexResult::Tuple(Some(tuple))) } @@ -844,25 +851,24 @@ impl IndexImpl for NormalIndexImpl { fn index_lookup( &self, bytes: &Bytes, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams, ) -> Result { - secondary_index_lookup(bytes, id_builder, params) + secondary_index_lookup(bytes, pk_indices, params) } fn eq_to_res<'a>( &self, value: &DataValue, - _: &mut TupleIdBuilder, + _: &PrimaryKeyIndices, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let min = self.bound_key(params, value)?; let max = self.bound_key(params, value)?; - let iter = params.tx.range( - Bound::Included(min.as_slice()), - Bound::Included(max.as_slice()), - )?; + let iter = params + .tx + .range(Bound::Included(min), Bound::Included(max))?; Ok(IndexResult::Scope(iter)) } @@ -881,25 +887,24 @@ impl IndexImpl for CompositeIndexImpl { fn index_lookup( &self, bytes: &Bytes, - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, params: &IndexImplParams, ) -> Result { - secondary_index_lookup(bytes, id_builder, params) + secondary_index_lookup(bytes, pk_indices, params) } fn eq_to_res<'a>( &self, value: &DataValue, - _: &mut TupleIdBuilder, + _: &PrimaryKeyIndices, params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let min = self.bound_key(params, value)?; let max = self.bound_key(params, value)?; - let iter = params.tx.range( - Bound::Included(min.as_slice()), - Bound::Included(max.as_slice()), - )?; + let iter = params + .tx + .range(Bound::Included(min), Bound::Included(max))?; Ok(IndexResult::Scope(iter)) } @@ -919,7 +924,7 @@ pub struct TupleIter<'a, T: Transaction + 'a> { limit: Option, table_types: Vec, tuple_columns: Arc>, - id_builder: TupleIdBuilder, + pk_indices: &'a PrimaryKeyIndices, projections: Vec, iter: T::IterType<'a>, } @@ -941,7 +946,7 @@ impl<'a, T: Transaction + 'a> Iter for TupleIter<'a, T> { while let Some((_, value)) = self.iter.try_next()? { let tuple = TableCodec::decode_tuple( &self.table_types, - &mut self.id_builder, + self.pk_indices, &self.projections, &self.tuple_columns, &value, @@ -962,7 +967,7 @@ pub struct IndexIter<'a, T: Transaction> { offset: usize, limit: Option, - id_builder: TupleIdBuilder, + pk_indices: &'a PrimaryKeyIndices, params: IndexImplParams<'a, T>, inner: IndexImplEnum, // for buffering data @@ -1052,19 +1057,13 @@ impl Iter for IndexIter<'_, T> { let mut encode_max = bound_encode(max)?; check_bound(&mut encode_max, bound_max); - let iter = self.params.tx.range( - encode_min.as_ref().map(Vec::as_slice), - encode_max.as_ref().map(Vec::as_slice), - )?; + let iter = self.params.tx.range(encode_min, encode_max)?; self.state = IndexIterState::Range(iter); } Range::Eq(mut val) => { val = self.params.try_cast(val)?; - match self - .inner - .eq_to_res(&val, &mut self.id_builder, &self.params)? - { + match self.inner.eq_to_res(&val, self.pk_indices, &self.params)? { IndexResult::Tuple(tuple) => { if Self::offset_move(&mut self.offset) { continue; @@ -1088,7 +1087,7 @@ impl Iter for IndexIter<'_, T> { Self::limit_sub(&mut self.limit); let tuple = self.inner - .index_lookup(&bytes, &mut self.id_builder, &self.params)?; + .index_lookup(&bytes, self.pk_indices, &self.params)?; return Ok(Some(tuple)); } @@ -1161,30 +1160,30 @@ mod test { } fn build_tuples() -> Vec { vec![ - Tuple { - id: Some(DataValue::Int32(Some(0))), - values: vec![ + Tuple::new( + Some(Arc::new(vec![0])), + vec![ DataValue::Int32(Some(0)), DataValue::Boolean(Some(true)), DataValue::Int32(Some(0)), ], - }, - Tuple { - id: Some(DataValue::Int32(Some(1))), - values: vec![ + ), + Tuple::new( + Some(Arc::new(vec![0])), + vec![ DataValue::Int32(Some(1)), DataValue::Boolean(Some(true)), DataValue::Int32(Some(1)), ], - }, - Tuple { - id: Some(DataValue::Int32(Some(2))), - values: vec![ + ), + Tuple::new( + Some(Arc::new(vec![0])), + vec![ DataValue::Int32(Some(2)), DataValue::Boolean(Some(false)), DataValue::Int32(Some(0)), ], - }, + ), ] } @@ -1325,7 +1324,7 @@ mod test { assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[2]); let (min, max) = TableCodec::tuple_bound("t1"); - let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + let mut iter = transaction.range(Bound::Included(min), Bound::Included(max))?; let (_, value) = iter.try_next()?.unwrap(); dbg!(value); @@ -1349,7 +1348,7 @@ mod test { assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[2]); let (min, max) = TableCodec::tuple_bound("t1"); - let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + let mut iter = transaction.range(Bound::Included(min), Bound::Included(max))?; let (_, value) = iter.try_next()?.unwrap(); dbg!(value); @@ -1427,7 +1426,7 @@ mod test { )?; { let (min, max) = TableCodec::index_meta_bound("t1"); - let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + let mut iter = transaction.range(Bound::Included(min), Bound::Included(max))?; let (_, value) = iter.try_next()?.unwrap(); dbg!(value); @@ -1526,7 +1525,7 @@ mod test { assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[1]); let (min, max) = TableCodec::index_bound("t1", &1)?; - let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + let mut iter = transaction.range(Bound::Included(min), Bound::Included(max))?; let (_, value) = iter.try_next()?.unwrap(); dbg!(value); @@ -1536,7 +1535,7 @@ mod test { dbg!(value); assert!(iter.try_next()?.is_none()); } - transaction.del_index("t1", &indexes[0].1, Some(&indexes[0].0))?; + transaction.del_index("t1", &indexes[0].1, &indexes[0].0)?; let mut index_iter = build_index_iter(&transaction, &table_cache, c3_column_id)?; @@ -1544,7 +1543,7 @@ mod test { assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[1]); let (min, max) = TableCodec::index_bound("t1", &1)?; - let mut iter = transaction.range(Bound::Included(&min), Bound::Included(&max))?; + let mut iter = transaction.range(Bound::Included(min), Bound::Included(max))?; let (_, value) = iter.try_next()?.unwrap(); dbg!(value); diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 38b15347..c9e0d0b2 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -68,11 +68,11 @@ impl<'txn> Transaction for RocksTransaction<'txn> { } // Tips: rocksdb has weak support for `Include` and `Exclude`, so precision will be lost - fn range<'a>( - &'a self, - min: Bound<&[u8]>, - max: Bound<&[u8]>, - ) -> Result, DatabaseError> { + fn range( + &self, + min: Bound>, + max: Bound>, + ) -> Result, DatabaseError> { fn bound_to_include(bound: Bound<&[u8]>) -> Option<&[u8]> { match bound { Bound::Included(bytes) | Bound::Excluded(bytes) => Some(bytes), @@ -80,14 +80,14 @@ impl<'txn> Transaction for RocksTransaction<'txn> { } } - let lower = bound_to_include(min) + let lower = bound_to_include(min.as_ref().map(Vec::as_slice)) .map(|bytes| IteratorMode::From(bytes, Direction::Forward)) .unwrap_or(IteratorMode::Start); let iter = self.tx.iterator(lower); Ok(RocksIter { - lower: min.map(|bytes| bytes.to_vec()), - upper: max.map(|bytes| bytes.to_vec()), + lower: min, + upper: max, iter, }) } @@ -144,7 +144,6 @@ mod test { }; use crate::types::index::{IndexMeta, IndexType}; use crate::types::tuple::Tuple; - use crate::types::tuple_builder::TupleIdBuilder; use crate::types::value::DataValue; use crate::types::LogicalType; use crate::utils::lru::SharedLruCache; @@ -193,19 +192,19 @@ mod test { transaction.append_tuple( &"test".to_string(), - Tuple { - id: Some(DataValue::Int32(Some(1))), - values: vec![DataValue::Int32(Some(1)), DataValue::Boolean(Some(true))], - }, + Tuple::new( + Some(Arc::new(vec![0])), + vec![DataValue::Int32(Some(1)), DataValue::Boolean(Some(true))], + ), &[LogicalType::Integer, LogicalType::Boolean], false, )?; transaction.append_tuple( &"test".to_string(), - Tuple { - id: Some(DataValue::Int32(Some(2))), - values: vec![DataValue::Int32(Some(2)), DataValue::Boolean(Some(false))], - }, + Tuple::new( + Some(Arc::new(vec![0])), + vec![DataValue::Int32(Some(2)), DataValue::Boolean(Some(true))], + ), &[LogicalType::Integer, LogicalType::Boolean], false, )?; @@ -218,7 +217,7 @@ mod test { )?; let option_1 = iter.next_tuple()?; - assert_eq!(option_1.unwrap().id, Some(DataValue::Int32(Some(2)))); + assert_eq!(option_1.unwrap().pk_indices, Some(Arc::new(vec![0]))); let option_2 = iter.next_tuple()?; assert_eq!(option_2, None); @@ -251,11 +250,11 @@ mod test { DataValue::Int32(Some(3)), DataValue::Int32(Some(4)), ]; - let id_builder = TupleIdBuilder::new(table.schema_ref()); + let pk_indices = Arc::new(vec![0]); let mut iter = IndexIter { offset: 0, limit: None, - id_builder, + pk_indices: &pk_indices, params: IndexImplParams { tuple_schema_ref: table.schema_ref().clone(), projections: vec![0], @@ -285,8 +284,8 @@ mod test { }; let mut result = Vec::new(); - while let Some(tuple) = iter.next_tuple()? { - result.push(tuple.id.unwrap()); + while let Some(mut tuple) = iter.next_tuple()? { + result.push(tuple.id().unwrap().clone()); } assert_eq!(result, tuple_ids); @@ -326,7 +325,7 @@ mod test { .unwrap(); while let Some(tuple) = iter.next_tuple()? { - assert_eq!(tuple.id, Some(DataValue::Int32(Some(1)))); + assert_eq!(tuple.pk_indices, Some(Arc::new(vec![0]))); assert_eq!( tuple.values, vec![DataValue::Int32(Some(1)), DataValue::Int32(Some(1))] diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index 63bf7ae8..1333d5ea 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -1,11 +1,10 @@ use crate::catalog::view::View; -use crate::catalog::{ColumnRef, ColumnRelation, TableMeta}; +use crate::catalog::{ColumnRef, ColumnRelation, PrimaryKeyIndices, TableMeta}; use crate::errors::DatabaseError; use crate::serdes::{ReferenceSerialization, ReferenceTables}; use crate::storage::{TableCache, Transaction}; use crate::types::index::{Index, IndexId, IndexMeta, IndexType}; use crate::types::tuple::{Schema, Tuple, TupleId}; -use crate::types::tuple_builder::TupleIdBuilder; use crate::types::value::DataValue; use crate::types::LogicalType; use bytes::Bytes; @@ -223,11 +222,11 @@ impl TableCodec { /// Value: Tuple pub fn encode_tuple( table_name: &str, - tuple: &Tuple, + tuple: &mut Tuple, types: &[LogicalType], ) -> Result<(Bytes, Bytes), DatabaseError> { - let tuple_id = tuple.id.clone().ok_or(DatabaseError::PrimaryKeyNotFound)?; - let key = Self::encode_tuple_key(table_name, &tuple_id)?; + let tuple_id = tuple.id().ok_or(DatabaseError::PrimaryKeyNotFound)?; + let key = Self::encode_tuple_key(table_name, tuple_id)?; Ok((Bytes::from(key), Bytes::from(tuple.serialize_to(types)?))) } @@ -248,12 +247,12 @@ impl TableCodec { pub fn decode_tuple( table_types: &[LogicalType], - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, projections: &[usize], schema: &Schema, bytes: &[u8], ) -> Tuple { - Tuple::deserialize_from(table_types, id_builder, projections, schema, bytes) + Tuple::deserialize_from(table_types, pk_indices, projections, schema, bytes) } /// Key: {TableName}{INDEX_META_TAG}{BOUND_MIN_TAG}{IndexID} @@ -479,7 +478,6 @@ mod tests { use crate::storage::Storage; use crate::types::index::{Index, IndexMeta, IndexType}; use crate::types::tuple::Tuple; - use crate::types::tuple_builder::TupleIdBuilder; use crate::types::value::DataValue; use crate::types::LogicalType; use bytes::Bytes; @@ -511,29 +509,24 @@ mod tests { fn test_table_codec_tuple() -> Result<(), DatabaseError> { let table_catalog = build_table_codec(); - let tuple = Tuple { - id: Some(DataValue::Int32(Some(0))), - values: vec![ + let mut tuple = Tuple::new( + Some(Arc::new(vec![0])), + vec![ DataValue::Int32(Some(0)), DataValue::Decimal(Some(Decimal::new(1, 0))), ], - }; + ); let (_, bytes) = TableCodec::encode_tuple( &table_catalog.name, - &tuple, + &mut tuple, &[LogicalType::Integer, LogicalType::Decimal(None, None)], )?; let schema = table_catalog.schema_ref(); - let mut id_builder = TupleIdBuilder::new(schema); + let pk_indices = table_catalog.primary_keys_indices(); + tuple.clear_id(); assert_eq!( - TableCodec::decode_tuple( - &table_catalog.types(), - &mut id_builder, - &[0, 1], - schema, - &bytes - ), + TableCodec::decode_tuple(&table_catalog.types(), pk_indices, &[0, 1], schema, &bytes), tuple ); diff --git a/src/types/tuple.rs b/src/types/tuple.rs index 73c07802..79d7ae61 100644 --- a/src/types/tuple.rs +++ b/src/types/tuple.rs @@ -1,7 +1,6 @@ -use crate::catalog::ColumnRef; +use crate::catalog::{ColumnRef, PrimaryKeyIndices}; use crate::db::ResultIter; use crate::errors::DatabaseError; -use crate::types::tuple_builder::TupleIdBuilder; use crate::types::value::DataValue; use crate::types::LogicalType; use comfy_table::{Cell, Table}; @@ -10,8 +9,9 @@ use std::sync::Arc; use std::sync::LazyLock; pub static EMPTY_TUPLE: LazyLock = LazyLock::new(|| Tuple { - id: None, + pk_indices: None, values: vec![], + id_buf: None, }); const BITS_MAX_INDEX: usize = 8; @@ -29,14 +29,42 @@ pub fn types(schema: &Schema) -> Vec { #[derive(Clone, Debug, PartialEq)] pub struct Tuple { - pub id: Option, + pub(crate) pk_indices: Option, pub values: Vec, + id_buf: Option>, } impl Tuple { + pub fn new(pk_indices: Option, values: Vec) -> Self { + Tuple { + pk_indices, + values, + id_buf: None, + } + } + + pub fn id(&mut self) -> Option<&TupleId> { + self.id_buf + .get_or_insert_with(|| { + self.pk_indices.as_ref().map(|pk_indices| { + if pk_indices.len() == 1 { + self.values[0].clone() + } else { + let mut values = Vec::with_capacity(pk_indices.len()); + + for i in pk_indices.iter() { + values.push(self.values[*i].clone()); + } + DataValue::Tuple(Some((values, false))) + } + }) + }) + .as_ref() + } + pub fn deserialize_from( table_types: &[LogicalType], - id_builder: &mut TupleIdBuilder, + pk_indices: &PrimaryKeyIndices, projections: &[usize], schema: &Schema, bytes: &[u8], @@ -62,13 +90,13 @@ impl Tuple { if is_none(bytes[i / BITS_MAX_INDEX], i % BITS_MAX_INDEX) { if projections[projection_i] == i { tuple_values.push(DataValue::none(logic_type)); - Self::values_push(schema, &tuple_values, id_builder, &mut projection_i); + projection_i += 1; } } else if let Some(len) = logic_type.raw_len() { /// fixed length (e.g.: int) if projections[projection_i] == i { tuple_values.push(DataValue::from_raw(&bytes[pos..pos + len], logic_type)); - Self::values_push(schema, &tuple_values, id_builder, &mut projection_i); + projection_i += 1; } pos += len; } else { @@ -77,29 +105,18 @@ impl Tuple { pos += 4; if projections[projection_i] == i { tuple_values.push(DataValue::from_raw(&bytes[pos..pos + len], logic_type)); - Self::values_push(schema, &tuple_values, id_builder, &mut projection_i); + projection_i += 1; } pos += len; } } Tuple { - id: id_builder.build(), + pk_indices: Some(pk_indices.clone()), values: tuple_values, + id_buf: None, } } - fn values_push( - tuple_columns: &Schema, - tuple_values: &[DataValue], - id_builder: &mut TupleIdBuilder, - projection_i: &mut usize, - ) { - if tuple_columns[*projection_i].desc().is_primary() { - id_builder.append(tuple_values[*projection_i].clone()); - } - *projection_i += 1; - } - /// e.g.: bits(u8)..|data_0(len for utf8_1)|utf8_0|data_1| /// Tips: all len is u32 pub fn serialize_to(&self, types: &[LogicalType]) -> Result, DatabaseError> { @@ -130,6 +147,10 @@ impl Tuple { Ok(bytes) } + + pub(crate) fn clear_id(&mut self) { + self.id_buf = None; + } } pub fn create_table(iter: I) -> Result { @@ -162,7 +183,6 @@ pub fn create_table(iter: I) -> Result { mod tests { use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; use crate::types::tuple::Tuple; - use crate::types::tuple_builder::TupleIdBuilder; use crate::types::value::{DataValue, Utf8Type}; use crate::types::LogicalType; use itertools::Itertools; @@ -280,9 +300,9 @@ mod tests { ]); let tuples = vec![ - Tuple { - id: Some(DataValue::Int32(Some(0))), - values: vec![ + Tuple::new( + Some(Arc::new(vec![0])), + vec![ DataValue::Int32(Some(0)), DataValue::UInt32(Some(1)), DataValue::Utf8 { @@ -316,10 +336,10 @@ mod tests { unit: CharLengthUnits::Octets, }, ], - }, - Tuple { - id: Some(DataValue::Int32(Some(1))), - values: vec![ + ), + Tuple::new( + Some(Arc::new(vec![0])), + vec![ DataValue::Int32(Some(1)), DataValue::UInt32(None), DataValue::Utf8 { @@ -353,25 +373,24 @@ mod tests { unit: CharLengthUnits::Octets, }, ], - }, + ), ]; let types = columns .iter() .map(|column| column.datatype().clone()) .collect_vec(); let columns = Arc::new(columns); - let mut id_builder = TupleIdBuilder::new(&columns); let tuple_0 = Tuple::deserialize_from( &types, - &mut id_builder, + &Arc::new(vec![0]), &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], &columns, &tuples[0].serialize_to(&types).unwrap(), ); let tuple_1 = Tuple::deserialize_from( &types, - &mut id_builder, + &Arc::new(vec![0]), &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], &columns, &tuples[1].serialize_to(&types).unwrap(), diff --git a/src/types/tuple_builder.rs b/src/types/tuple_builder.rs index 7c557ad0..9bf6b186 100644 --- a/src/types/tuple_builder.rs +++ b/src/types/tuple_builder.rs @@ -1,63 +1,17 @@ +use crate::catalog::PrimaryKeyIndices; use crate::errors::DatabaseError; -use crate::types::tuple::{Schema, Tuple, TupleId}; +use crate::types::tuple::{Schema, Tuple}; use crate::types::value::{DataValue, Utf8Type}; -use itertools::Itertools; use sqlparser::ast::CharLengthUnits; -pub struct TupleIdBuilder { - primary_indexes: Vec, - tmp_keys: Vec>, -} - pub struct TupleBuilder<'a> { schema: &'a Schema, -} - -impl TupleIdBuilder { - pub fn new(schema: &Schema) -> Self { - let primary_indexes = schema - .iter() - .filter_map(|column| column.desc().primary()) - .enumerate() - .sorted_by_key(|(_, p_i)| *p_i) - .map(|(i, _)| i) - .collect_vec(); - let tmp_keys = Vec::with_capacity(primary_indexes.len()); - Self { - primary_indexes, - tmp_keys, - } - } - - pub fn append(&mut self, value: DataValue) { - self.tmp_keys.push(Some(value)); - } - - pub fn build(&mut self) -> Option { - if self.tmp_keys.len() != self.primary_indexes.len() { - self.tmp_keys.clear(); - return None; - } - (!self.tmp_keys.is_empty()).then(|| { - if self.tmp_keys.len() == 1 { - self.tmp_keys.pop().unwrap().unwrap() - } else { - let mut primary_keys = Vec::with_capacity(self.primary_indexes.len()); - - for i in self.primary_indexes.iter() { - primary_keys.push(self.tmp_keys[*i].take().unwrap()); - } - self.tmp_keys.clear(); - - DataValue::Tuple(Some((primary_keys, false))) - } - }) - } + pk_indices: Option<&'a PrimaryKeyIndices>, } impl<'a> TupleBuilder<'a> { - pub fn new(schema: &'a Schema) -> Self { - TupleBuilder { schema } + pub fn new(schema: &'a Schema, pk_indices: Option<&'a PrimaryKeyIndices>) -> Self { + TupleBuilder { schema, pk_indices } } pub fn build_result(message: String) -> Tuple { @@ -67,7 +21,7 @@ impl<'a> TupleBuilder<'a> { unit: CharLengthUnits::Characters, }]; - Tuple { id: None, values } + Tuple::new(None, values) } pub fn build_with_row<'b>( @@ -75,28 +29,21 @@ impl<'a> TupleBuilder<'a> { row: impl IntoIterator, ) -> Result { let mut values = Vec::with_capacity(self.schema.len()); - let mut id_builder = TupleIdBuilder::new(self.schema); for (i, value) in row.into_iter().enumerate() { - let data_value = DataValue::Utf8 { - value: Some(value.to_string()), - ty: Utf8Type::Variable(None), - unit: CharLengthUnits::Characters, - } - .cast(self.schema[i].datatype())?; - - if self.schema[i].desc().is_primary() { - id_builder.append(data_value.clone()); - } - values.push(data_value); + values.push( + DataValue::Utf8 { + value: Some(value.to_string()), + ty: Utf8Type::Variable(None), + unit: CharLengthUnits::Characters, + } + .cast(self.schema[i].datatype())?, + ); } if values.len() != self.schema.len() { return Err(DatabaseError::MisMatch("types", "values")); } - Ok(Tuple { - id: id_builder.build(), - values, - }) + Ok(Tuple::new(self.pk_indices.cloned(), values)) } } diff --git a/tests/macros-test/src/main.rs b/tests/macros-test/src/main.rs index bd604326..33e246a7 100644 --- a/tests/macros-test/src/main.rs +++ b/tests/macros-test/src/main.rs @@ -45,7 +45,7 @@ mod test { }, ]; - (Tuple { id: None, values }, schema_ref) + (Tuple::new(None, values), schema_ref) } #[derive(Default, Debug, PartialEq)] @@ -80,24 +80,21 @@ mod test { assert_eq!(my_struct.c2, "LOL"); } - scala_function!(MyScalaFunction::sum(LogicalType::Integer, LogicalType::Integer) -> LogicalType::Integer => (|v1: DataValue, v2: DataValue| { + scala_function!(MyScalaFunction::SUM(LogicalType::Integer, LogicalType::Integer) -> LogicalType::Integer => (|v1: DataValue, v2: DataValue| { let plus_evaluator = EvaluatorFactory::binary_create(LogicalType::Integer, BinaryOperator::Plus)?; Ok(plus_evaluator.0.binary_eval(&v1, &v2)) })); - table_function!(MyTableFunction::test_numbers(LogicalType::Integer) -> [c1: LogicalType::Integer, c2: LogicalType::Integer] => (|v1: DataValue| { + table_function!(MyTableFunction::TEST_NUMBERS(LogicalType::Integer) -> [c1: LogicalType::Integer, c2: LogicalType::Integer] => (|v1: DataValue| { let num = v1.i32().unwrap(); Ok(Box::new((0..num) .into_iter() - .map(|i| Ok(Tuple { - id: None, - values: vec![ + .map(|i| Ok(Tuple::new(None, vec![ DataValue::Int32(Some(i)), DataValue::Int32(Some(i)), - ] - }))) as Box>>) + ])))) as Box>>) })); #[test] @@ -112,10 +109,7 @@ mod test { unit: CharLengthUnits::Characters, }), ], - &Tuple { - id: None, - values: vec![], - }, + &Tuple::new(None, vec![]), &vec![], )?; @@ -148,17 +142,17 @@ mod test { ); assert_eq!( numbers.next().unwrap().unwrap(), - Tuple { - id: None, - values: vec![DataValue::Int32(Some(0)), DataValue::Int32(Some(0)),] - } + Tuple::new( + None, + vec![DataValue::Int32(Some(0)), DataValue::Int32(Some(0)),] + ) ); assert_eq!( numbers.next().unwrap().unwrap(), - Tuple { - id: None, - values: vec![DataValue::Int32(Some(1)), DataValue::Int32(Some(1)),] - } + Tuple::new( + None, + vec![DataValue::Int32(Some(1)), DataValue::Int32(Some(1)),] + ) ); assert!(numbers.next().is_none()); diff --git a/tpcc/Cargo.toml b/tpcc/Cargo.toml index 09bff840..2a39d5bb 100644 --- a/tpcc/Cargo.toml +++ b/tpcc/Cargo.toml @@ -6,10 +6,9 @@ edition = "2021" [dependencies] clap = { version = "4", features = ["derive"] } chrono = { version = "0.4" } -fnck_sql = { version = "0.0.7", path = "..", package = "fnck_sql" } +fnck_sql = { version = "0.0.8", path = "..", package = "fnck_sql" } indicatif = { version = "0.17" } ordered-float = { version = "4" } rand = { version = "0.8" } rust_decimal = { version = "1" } -tempfile = { version = "3" } thiserror = { version = "1" } \ No newline at end of file diff --git a/tpcc/README.md b/tpcc/README.md index 158c80d1..58474070 100644 --- a/tpcc/README.md +++ b/tpcc/README.md @@ -6,11 +6,11 @@ run `cargo run -p tpcc --release` to run tpcc - YMTC PC411-1024GB-B - Tips: TPCC currently only supports single thread ```shell -|New-Order| sc: 80029 lt: 0 fl: 821 -|Payment| sc: 80005 lt: 0 fl: 0 -|Order-Status| sc: 8001 lt: 0 fl: 412 -|Delivery| sc: 8001 lt: 0 fl: 0 -|Stock-Level| sc: 8001 lt: 0 fl: 0 +|New-Order| sc: 88139 lt: 0 fl: 897 +|Payment| sc: 88120 lt: 0 fl: 0 +|Order-Status| sc: 8812 lt: 0 fl: 388 +|Delivery| sc: 8812 lt: 0 fl: 0 +|Stock-Level| sc: 8812 lt: 0 fl: 0 in 720 sec. (all must be [OK]) [transaction percentage] @@ -24,20 +24,133 @@ in 720 sec. Order-Status: 100.0 [OK] Delivery: 100.0 [OK] Stock-Level: 100.0 [OK] - New-Order Total: 80029 - Payment Total: 80005 - Order-Status Total: 8001 - Delivery Total: 8001 - Stock-Level Total: 8001 + New-Order Total: 88139 + Payment Total: 88120 + Order-Status Total: 8812 + Delivery Total: 8812 + Stock-Level Total: 8812 + + + + +1.New-Order + +0.001, 5110 +0.002, 63448 +0.003, 19415 +0.004, 78 +0.005, 3 +0.006, 1 +0.013, 2 + +2.Payment + +0.001, 81269 +0.002, 6794 +0.003, 12 +0.004, 1 + +3.Order-Status + +0.014, 34 +0.015, 143 +0.016, 207 +0.017, 225 +0.018, 221 +0.019, 196 +0.020, 162 +0.021, 170 +0.022, 166 +0.023, 206 +0.024, 190 +0.025, 134 +0.026, 151 +0.027, 287 +0.028, 274 +0.029, 273 +0.030, 206 +0.031, 169 +0.032, 170 +0.033, 149 +0.034, 136 +0.035, 181 +0.036, 244 +0.037, 295 +0.038, 294 +0.039, 232 +0.040, 201 +0.041, 181 +0.042, 173 +0.043, 165 +0.044, 154 +0.045, 175 +0.046, 267 +0.047, 286 +0.048, 233 +0.049, 190 +0.050, 153 +0.051, 183 +0.052, 199 +0.053, 155 +0.054, 190 +0.055, 237 +0.056, 190 +0.057, 151 +0.058, 82 +0.059, 50 +0.060, 14 +0.061, 5 +0.062, 4 +0.063, 2 +0.064, 2 +0.065, 1 +0.073, 1 +0.075, 1 +0.078, 1 +0.087, 2 +0.102, 2 +0.131, 3 +0.188, 1 + +4.Delivery + +0.012, 96 +0.013, 580 +0.014, 786 +0.015, 882 +0.016, 893 +0.017, 1087 +0.018, 1200 +0.019, 1038 +0.020, 842 +0.021, 576 +0.022, 416 +0.023, 247 +0.024, 94 +0.025, 13 +0.027, 1 +0.028, 2 +0.031, 1 +0.034, 1 +0.050, 1 + +5.Stock-Level + +0.001, 1299 +0.002, 2836 +0.003, 3192 +0.004, 1150 +0.005, 172 +0.006, 7 <90th Percentile RT (MaxRT)> - New-Order : 0.003 (0.006) + New-Order : 0.003 (0.012) Payment : 0.001 (0.003) -Order-Status : 0.062 (0.188) - Delivery : 0.022 (0.052) +Order-Status : 0.054 (0.188) + Delivery : 0.021 (0.049) Stock-Level : 0.004 (0.006) -6669 Tpmc +7345 Tpmc ```