From 7903cdd1ef3b7eeb4adcc4efb72198540d68bdea Mon Sep 17 00:00:00 2001 From: Kould Date: Sun, 1 Sep 2024 00:09:09 +0800 Subject: [PATCH] feat: support FastSort on sort.rs --- src/db.rs | 1 + src/execution/dql/limit.rs | 2 - src/execution/dql/sort.rs | 824 +++++++++++++++++- src/optimizer/core/histogram.rs | 24 +- src/optimizer/core/memo.rs | 3 +- .../rule/implementation/dql/function_scan.rs | 27 + src/optimizer/rule/implementation/dql/mod.rs | 3 +- src/optimizer/rule/implementation/dql/sort.rs | 2 +- .../dql/{scan.rs => table_scan.rs} | 6 +- src/optimizer/rule/implementation/mod.rs | 10 +- src/planner/operator/mod.rs | 7 +- 11 files changed, 845 insertions(+), 64 deletions(-) create mode 100644 src/optimizer/rule/implementation/dql/function_scan.rs rename src/optimizer/rule/implementation/dql/{scan.rs => table_scan.rs} (97%) diff --git a/src/db.rs b/src/db.rs index 30361fd6..7de6990a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -238,6 +238,7 @@ impl Database { ImplementationRuleImpl::Projection, ImplementationRuleImpl::SeqScan, ImplementationRuleImpl::IndexScan, + ImplementationRuleImpl::FunctionScan, ImplementationRuleImpl::Sort, ImplementationRuleImpl::Values, // DML diff --git a/src/execution/dql/limit.rs b/src/execution/dql/limit.rs index 4d4a459c..46a8d69a 100644 --- a/src/execution/dql/limit.rs +++ b/src/execution/dql/limit.rs @@ -61,5 +61,3 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Limit { ) } } - -impl Limit {} diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index ef606758..504fd9ad 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -6,11 +6,63 @@ use crate::storage::{StatisticsMetaCache, TableCache, Transaction}; use crate::throw; use crate::types::tuple::{Schema, Tuple}; use itertools::Itertools; +use std::cmp::Ordering; use std::mem; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; +#[derive(Clone)] +pub(crate) struct NullableVec(Vec>); + +impl NullableVec { + pub(crate) fn with_capacity(capacity: usize) -> NullableVec { + NullableVec(Vec::with_capacity(capacity)) + } + + pub(crate) fn put(&mut self, item: T) { + self.0.push(Some(item)); + } + + pub(crate) fn take(&mut self, offset: usize) -> T { + self.0[offset].take().unwrap() + } + + pub(crate) fn get(&self, offset: usize) -> &T { + self.0[offset].as_ref().unwrap() + } + + pub(crate) fn len(&self) -> usize { + self.0.len() + } +} + +impl Default for NullableVec { + fn default() -> Self { + NullableVec(Vec::default()) + } +} + +struct RemappingIterator { + pos: usize, + tuples: NullableVec<(usize, Tuple)>, + indices: Vec, +} + +impl Iterator for RemappingIterator { + type Item = Tuple; + + fn next(&mut self) -> Option { + if self.pos > self.tuples.len() - 1 { + return None; + } + let (_, tuple) = self.tuples.take(self.indices[self.pos]); + self.pos += 1; + + Some(tuple) + } +} + const BUCKET_SIZE: usize = u8::MAX as usize + 1; // LSD Radix Sort @@ -36,38 +88,123 @@ pub(crate) fn radix_sort(mut tuples: Vec<(T, Vec)>) -> Vec { Vec::new() } -pub(crate) fn sort( - schema: &Schema, - sort_fields: &[SortField], - tuples: Vec, -) -> Result, DatabaseError> { - let tuples_with_keys: Vec<(Tuple, Vec)> = tuples - .into_iter() - .map(|tuple| { - let mut full_key = Vec::new(); - - for SortField { - expr, - nulls_first, - asc, - } in sort_fields - { - let mut key = Vec::new(); - - expr.eval(&tuple, schema)?.memcomparable_encode(&mut key)?; - if !asc { - for byte in key.iter_mut() { - *byte ^= 0xFF; +pub enum SortBy { + Radix, + Fast, +} + +impl SortBy { + pub(crate) fn sorted_tuples( + &self, + schema: &Schema, + sort_fields: &[SortField], + mut tuples: NullableVec<(usize, Tuple)>, + ) -> Result>, DatabaseError> { + match self { + SortBy::Radix => { + let mut sort_keys = Vec::with_capacity(tuples.len()); + + for (i, tuple) in tuples.0.iter().enumerate() { + assert!(tuple.is_some()); + + let mut full_key = Vec::new(); + + for SortField { + expr, + nulls_first, + asc, + } in sort_fields + { + let mut key = Vec::new(); + let tuple = tuple.as_ref().map(|(_, tuple)| tuple).unwrap(); + + expr.eval(tuple, schema)?.memcomparable_encode(&mut key)?; + if !asc { + for byte in key.iter_mut() { + *byte ^= 0xFF; + } + } + key.push(if *nulls_first { u8::MIN } else { u8::MAX }); + full_key.extend(key); } + sort_keys.push((i, full_key)) } - key.push(if *nulls_first { u8::MIN } else { u8::MAX }); - full_key.extend(key); + let indices = radix_sort(sort_keys); + + Ok(Box::new(RemappingIterator { + pos: 0, + tuples, + indices, + })) } - Ok::<(Tuple, Vec), DatabaseError>((tuple, full_key)) - }) - .try_collect()?; + SortBy::Fast => { + let fn_nulls_first = |nulls_first: bool| { + if nulls_first { + Ordering::Greater + } else { + Ordering::Less + } + }; + // Extract the results of calculating SortFields to avoid double calculation + // of data during comparison + let mut eval_results = vec![Vec::with_capacity(sort_fields.len()); tuples.len()]; + + for (x, SortField { expr, .. }) in sort_fields.iter().enumerate() { + for tuple in tuples.0.iter() { + assert!(tuple.is_some()); + + let (_, tuple) = tuple.as_ref().unwrap(); + eval_results[x].push(expr.eval(tuple, schema)?); + } + } + + tuples.0.sort_by(|tuple_1, tuple_2| { + assert!(tuple_1.is_some()); + assert!(tuple_2.is_some()); - Ok(radix_sort(tuples_with_keys)) + let (i_1, _) = tuple_1.as_ref().unwrap(); + let (i_2, _) = tuple_2.as_ref().unwrap(); + let mut ordering = Ordering::Equal; + + for ( + x, + SortField { + asc, nulls_first, .. + }, + ) in sort_fields.iter().enumerate() + { + let value_1 = &eval_results[x][*i_1]; + let value_2 = &eval_results[x][*i_2]; + + ordering = match (value_1.is_null(), value_2.is_null()) { + (false, true) => fn_nulls_first(*nulls_first), + (true, false) => fn_nulls_first(*nulls_first).reverse(), + _ => { + let mut ordering = + value_1.partial_cmp(value_2).unwrap_or(Ordering::Equal); + if !*asc { + ordering = ordering.reverse(); + } + ordering + } + }; + if ordering != Ordering::Equal { + break; + } + } + + ordering + }); + + Ok(Box::new( + tuples + .0 + .into_iter() + .map(|tuple| tuple.map(|(_, tuple)| tuple).unwrap()), + )) + } + } + } } pub struct Sort { @@ -102,32 +239,633 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Sort { } = self; let schema = input.output_schema().clone(); - let mut tuples: Vec = vec![]; + let mut tuples = NullableVec::default(); + let mut offset = 0; let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - tuples.push(throw!(tuple)); + tuples.put((offset, throw!(tuple))); + offset += 1; } - let mut tuples = throw!(sort(&schema, &sort_fields, tuples)); - let limited_tuples = tuples.drain(..limit.unwrap_or(tuples.len())).collect_vec(); - for tuple in limited_tuples { - yield Ok(tuple); + let sort_by = if tuples.len() > 256 { + SortBy::Radix + } else { + SortBy::Fast + }; + let mut limit = limit.unwrap_or(tuples.len()); + + for tuple in throw!(sort_by.sorted_tuples(&schema, &sort_fields, tuples)) { + if limit != 0 { + yield Ok(tuple); + limit -= 1; + } } }, ) } } -#[test] -fn test_sort() { - let tupels = vec![ - (0, "abc".as_bytes().to_vec()), - (1, "abz".as_bytes().to_vec()), - (2, "abe".as_bytes().to_vec()), - (3, "abcd".as_bytes().to_vec()), - ]; +#[cfg(test)] +mod test { + use crate::catalog::{ColumnCatalog, ColumnDesc}; + use crate::errors::DatabaseError; + use crate::execution::dql::sort::{radix_sort, NullableVec, SortBy}; + use crate::expression::ScalarExpression; + use crate::planner::operator::sort::SortField; + use crate::types::tuple::Tuple; + use crate::types::value::DataValue; + use crate::types::LogicalType; + use std::sync::Arc; + + #[test] + fn test_radix_sort() { + let indices = vec![ + (0, "abc".as_bytes().to_vec()), + (1, "abz".as_bytes().to_vec()), + (2, "abe".as_bytes().to_vec()), + (3, "abcd".as_bytes().to_vec()), + ]; - assert_eq!(radix_sort(tupels), vec![0, 3, 2, 1]) + assert_eq!(radix_sort(indices), vec![0, 3, 2, 1]) + } + + #[test] + fn test_single_value_desc_and_null_first() -> Result<(), DatabaseError> { + let fn_sort_fields = |asc: bool, nulls_first: bool| { + vec![SortField { + expr: ScalarExpression::Reference { + expr: Box::new(ScalarExpression::Empty), + pos: 0, + }, + asc, + nulls_first, + }] + }; + let schema = Arc::new(vec![Arc::new(ColumnCatalog::new( + "c1".to_string(), + true, + ColumnDesc::new(LogicalType::Integer, false, false, None), + ))]); + let tuples = NullableVec(vec![ + Some(( + 0_usize, + Tuple { + id: None, + values: vec![Arc::new(DataValue::Int32(None))], + }, + )), + Some(( + 1_usize, + Tuple { + id: None, + values: vec![Arc::new(DataValue::Int32(Some(0)))], + }, + )), + Some(( + 2_usize, + Tuple { + id: None, + values: vec![Arc::new(DataValue::Int32(Some(1)))], + }, + )), + ]); + + let fn_asc_and_nulls_last_eq = |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(Some(0)))]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(Some(1)))]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(None))]) + } else { + unreachable!() + } + }; + let fn_desc_and_nulls_last_eq = |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(Some(1)))]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(Some(0)))]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(None))]) + } else { + unreachable!() + } + }; + let fn_asc_and_nulls_first_eq = |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(None))]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(Some(0)))]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(Some(1)))]) + } else { + unreachable!() + } + }; + let fn_desc_and_nulls_first_eq = |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(None))]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(Some(1)))]) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!(tuple.values, vec![Arc::new(DataValue::Int32(Some(0)))]) + } else { + unreachable!() + } + }; + + // RadixSort + fn_asc_and_nulls_first_eq(SortBy::Radix.sorted_tuples( + &schema, + &fn_sort_fields(true, true), + tuples.clone(), + )?); + fn_asc_and_nulls_last_eq(SortBy::Radix.sorted_tuples( + &schema, + &fn_sort_fields(true, false), + tuples.clone(), + )?); + fn_desc_and_nulls_first_eq(SortBy::Radix.sorted_tuples( + &schema, + &fn_sort_fields(false, true), + tuples.clone(), + )?); + fn_desc_and_nulls_last_eq(SortBy::Radix.sorted_tuples( + &schema, + &fn_sort_fields(false, false), + tuples.clone(), + )?); + + // FastSort + fn_asc_and_nulls_first_eq(SortBy::Fast.sorted_tuples( + &schema, + &fn_sort_fields(true, true), + tuples.clone(), + )?); + fn_asc_and_nulls_last_eq(SortBy::Fast.sorted_tuples( + &schema, + &fn_sort_fields(true, false), + tuples.clone(), + )?); + fn_desc_and_nulls_first_eq(SortBy::Fast.sorted_tuples( + &schema, + &fn_sort_fields(false, true), + tuples.clone(), + )?); + fn_desc_and_nulls_last_eq(SortBy::Fast.sorted_tuples( + &schema, + &&fn_sort_fields(false, false), + tuples.clone(), + )?); + + Ok(()) + } + + #[test] + fn test_mixed_value_desc_and_null_first() -> Result<(), DatabaseError> { + let fn_sort_fields = + |asc_1: bool, nulls_first_1: bool, asc_2: bool, nulls_first_2: bool| { + vec![ + SortField { + expr: ScalarExpression::Reference { + expr: Box::new(ScalarExpression::Empty), + pos: 0, + }, + asc: asc_1, + nulls_first: nulls_first_1, + }, + SortField { + expr: ScalarExpression::Reference { + expr: Box::new(ScalarExpression::Empty), + pos: 0, + }, + asc: asc_2, + nulls_first: nulls_first_2, + }, + ] + }; + let schema = Arc::new(vec![ + Arc::new(ColumnCatalog::new( + "c1".to_string(), + true, + ColumnDesc::new(LogicalType::Integer, false, false, None), + )), + Arc::new(ColumnCatalog::new( + "c2".to_string(), + true, + ColumnDesc::new(LogicalType::Integer, false, false, None), + )), + ]); + let tuples = NullableVec(vec![ + Some(( + 0_usize, + Tuple { + id: None, + values: vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(None)), + ], + }, + )), + Some(( + 1_usize, + Tuple { + id: None, + values: vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(None)), + ], + }, + )), + Some(( + 2_usize, + Tuple { + id: None, + values: vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(None)), + ], + }, + )), + Some(( + 3_usize, + Tuple { + id: None, + values: vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(Some(0))), + ], + }, + )), + Some(( + 4_usize, + Tuple { + id: None, + values: vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(0))), + ], + }, + )), + Some(( + 5_usize, + Tuple { + id: None, + values: vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(0))), + ], + }, + )), + ]); + let fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq = + |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + }; + let fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq = + |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + }; + let fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq = + |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + }; + let fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq = + |mut iter: Box>| { + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(None)) + ] + ) + } else { + unreachable!() + } + if let Some(tuple) = iter.next() { + assert_eq!( + tuple.values, + vec![ + Arc::new(DataValue::Int32(None)), + Arc::new(DataValue::Int32(Some(0))) + ] + ) + } else { + unreachable!() + } + }; + + // RadixSort + fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( + &schema, + &fn_sort_fields(true, true, true, true), + tuples.clone(), + )?); + fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( + &schema, + &fn_sort_fields(true, false, true, true), + tuples.clone(), + )?); + fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( + &schema, + &fn_sort_fields(false, true, true, true), + tuples.clone(), + )?); + fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( + &schema, + &fn_sort_fields(false, false, true, true), + tuples.clone(), + )?); + + // FastSort + fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( + &schema, + &fn_sort_fields(true, true, true, true), + tuples.clone(), + )?); + fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( + &schema, + &fn_sort_fields(true, false, true, true), + tuples.clone(), + )?); + fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( + &schema, + &fn_sort_fields(false, true, true, true), + tuples.clone(), + )?); + fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( + &schema, + &fn_sort_fields(false, false, true, true), + tuples.clone(), + )?); + + Ok(()) + } } diff --git a/src/optimizer/core/histogram.rs b/src/optimizer/core/histogram.rs index 4d3e7620..1ae135af 100644 --- a/src/optimizer/core/histogram.rs +++ b/src/optimizer/core/histogram.rs @@ -1,5 +1,5 @@ use crate::errors::DatabaseError; -use crate::execution::dql::sort::radix_sort; +use crate::execution::dql::sort::{radix_sort, NullableVec}; use crate::expression::range_detacher::Range; use crate::expression::BinaryOperator; use crate::optimizer::core::cm_sketch::CountMinSketch; @@ -17,7 +17,8 @@ pub struct HistogramBuilder { index_id: IndexId, null_count: usize, - values: Vec<((usize, ValueRef), Vec)>, + values: NullableVec<(usize, ValueRef)>, + sort_keys: Vec<(usize, Vec)>, value_index: usize, } @@ -51,7 +52,8 @@ impl HistogramBuilder { Ok(Self { index_id: index_meta.id, null_count: 0, - values: capacity.map(Vec::with_capacity).unwrap_or_default(), + values: capacity.map(NullableVec::with_capacity).unwrap_or_default(), + sort_keys: capacity.map(Vec::with_capacity).unwrap_or_default(), value_index: 0, }) } @@ -63,7 +65,8 @@ impl HistogramBuilder { let mut bytes = Vec::new(); value.memcomparable_encode(&mut bytes)?; - self.values.push(((self.value_index, value.clone()), bytes)); + self.values.put((self.value_index, value.clone())); + self.sort_keys.push((self.value_index, bytes)) } self.value_index += 1; @@ -86,7 +89,8 @@ impl HistogramBuilder { let HistogramBuilder { index_id, null_count, - values, + mut values, + sort_keys, .. } = self; let mut buckets = Vec::with_capacity(number_of_buckets); @@ -96,20 +100,24 @@ impl HistogramBuilder { } else { (values_len + number_of_buckets) / number_of_buckets }; - let sorted_values = radix_sort(values); + let sorted_indices = radix_sort(sort_keys); for i in 0..number_of_buckets { let mut bucket = Bucket::empty(); let j = (i + 1) * bucket_len; - bucket.upper = sorted_values[cmp::min(j, values_len) - 1].1.clone(); + bucket.upper = values + .get(sorted_indices[cmp::min(j, values_len) - 1]) + .1 + .clone(); buckets.push(bucket); } let mut corr_xy_sum = 0.0; let mut number_of_distinct_value = 0; let mut last_value: Option = None; - for (i, (ordinal, value)) in sorted_values.into_iter().enumerate() { + for (i, index) in sorted_indices.into_iter().enumerate() { + let (ordinal, value) = values.take(index); sketch.increment(value.as_ref()); if let None | Some(true) = last_value.as_ref().map(|last_value| last_value != &value) { diff --git a/src/optimizer/core/memo.rs b/src/optimizer/core/memo.rs index 4d2be66e..04ac32ad 100644 --- a/src/optimizer/core/memo.rs +++ b/src/optimizer/core/memo.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; pub struct Expression { pub(crate) op: PhysicalOption, pub(crate) cost: Option, + // TODO: output rows } #[derive(Debug, Clone)] @@ -46,7 +47,7 @@ impl Memo { return Err(DatabaseError::EmptyPlan); } - for node_id in graph.nodes_iter(HepMatchOrder::TopDown, None) { + for node_id in graph.nodes_iter(HepMatchOrder::BottomUp, None) { for rule in implementations { if HepMatcher::new(rule.pattern(), node_id, graph).match_opt_expr() { let op = graph.operator(node_id); diff --git a/src/optimizer/rule/implementation/dql/function_scan.rs b/src/optimizer/rule/implementation/dql/function_scan.rs new file mode 100644 index 00000000..b676124d --- /dev/null +++ b/src/optimizer/rule/implementation/dql/function_scan.rs @@ -0,0 +1,27 @@ +use crate::errors::DatabaseError; +use crate::optimizer::core::memo::{Expression, GroupExpression}; +use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate}; +use crate::optimizer::core::rule::{ImplementationRule, MatchPattern}; +use crate::optimizer::core::statistics_meta::StatisticMetaLoader; +use crate::planner::operator::{Operator, PhysicalOption}; +use crate::single_mapping; +use crate::storage::Transaction; +use lazy_static::lazy_static; + +lazy_static! { + static ref FUNCTION_SCAN_PATTERN: Pattern = { + Pattern { + predicate: |op| matches!(op, Operator::FunctionScan(_)), + children: PatternChildrenPredicate::None, + } + }; +} + +#[derive(Clone)] +pub struct FunctionScanImplementation; + +single_mapping!( + FunctionScanImplementation, + FUNCTION_SCAN_PATTERN, + PhysicalOption::FunctionScan +); diff --git a/src/optimizer/rule/implementation/dql/mod.rs b/src/optimizer/rule/implementation/dql/mod.rs index 62e009ee..23a286e2 100644 --- a/src/optimizer/rule/implementation/dql/mod.rs +++ b/src/optimizer/rule/implementation/dql/mod.rs @@ -1,9 +1,10 @@ pub(crate) mod aggregate; pub(crate) mod dummy; pub(crate) mod filter; +pub(crate) mod function_scan; pub(crate) mod join; pub(crate) mod limit; pub(crate) mod projection; -pub(crate) mod scan; pub(crate) mod sort; +pub(crate) mod table_scan; pub(crate) mod values; diff --git a/src/optimizer/rule/implementation/dql/sort.rs b/src/optimizer/rule/implementation/dql/sort.rs index 31cf912f..771bbea1 100644 --- a/src/optimizer/rule/implementation/dql/sort.rs +++ b/src/optimizer/rule/implementation/dql/sort.rs @@ -20,4 +20,4 @@ lazy_static! { #[derive(Clone)] pub struct SortImplementation; -single_mapping!(SortImplementation, SORT_PATTERN, PhysicalOption::RadixSort); +single_mapping!(SortImplementation, SORT_PATTERN, PhysicalOption::Sort); diff --git a/src/optimizer/rule/implementation/dql/scan.rs b/src/optimizer/rule/implementation/dql/table_scan.rs similarity index 97% rename from src/optimizer/rule/implementation/dql/scan.rs rename to src/optimizer/rule/implementation/dql/table_scan.rs index f9147a8e..a90dcbfd 100644 --- a/src/optimizer/rule/implementation/dql/scan.rs +++ b/src/optimizer/rule/implementation/dql/table_scan.rs @@ -9,7 +9,7 @@ use crate::types::index::IndexType; use lazy_static::lazy_static; lazy_static! { - static ref SCAN_PATTERN: Pattern = { + static ref TABLE_SCAN_PATTERN: Pattern = { Pattern { predicate: |op| matches!(op, Operator::TableScan(_)), children: PatternChildrenPredicate::None, @@ -22,7 +22,7 @@ pub struct SeqScanImplementation; impl MatchPattern for SeqScanImplementation { fn pattern(&self) -> &Pattern { - &SCAN_PATTERN + &TABLE_SCAN_PATTERN } } @@ -63,7 +63,7 @@ pub struct IndexScanImplementation; impl MatchPattern for IndexScanImplementation { fn pattern(&self) -> &Pattern { - &SCAN_PATTERN + &TABLE_SCAN_PATTERN } } diff --git a/src/optimizer/rule/implementation/mod.rs b/src/optimizer/rule/implementation/mod.rs index 9f35b245..c4f14e1a 100644 --- a/src/optimizer/rule/implementation/mod.rs +++ b/src/optimizer/rule/implementation/mod.rs @@ -24,13 +24,14 @@ use crate::optimizer::rule::implementation::dql::aggregate::{ }; use crate::optimizer::rule::implementation::dql::dummy::DummyImplementation; use crate::optimizer::rule::implementation::dql::filter::FilterImplementation; +use crate::optimizer::rule::implementation::dql::function_scan::FunctionScanImplementation; use crate::optimizer::rule::implementation::dql::join::JoinImplementation; use crate::optimizer::rule::implementation::dql::limit::LimitImplementation; use crate::optimizer::rule::implementation::dql::projection::ProjectionImplementation; -use crate::optimizer::rule::implementation::dql::scan::{ +use crate::optimizer::rule::implementation::dql::sort::SortImplementation; +use crate::optimizer::rule::implementation::dql::table_scan::{ IndexScanImplementation, SeqScanImplementation, }; -use crate::optimizer::rule::implementation::dql::sort::SortImplementation; use crate::optimizer::rule::implementation::dql::values::ValuesImplementation; use crate::planner::operator::Operator; use crate::storage::Transaction; @@ -46,6 +47,7 @@ pub enum ImplementationRuleImpl { Limit, Projection, SeqScan, + FunctionScan, IndexScan, Sort, Values, @@ -76,6 +78,7 @@ impl MatchPattern for ImplementationRuleImpl { ImplementationRuleImpl::Projection => ProjectionImplementation.pattern(), ImplementationRuleImpl::SeqScan => SeqScanImplementation.pattern(), ImplementationRuleImpl::IndexScan => IndexScanImplementation.pattern(), + ImplementationRuleImpl::FunctionScan => FunctionScanImplementation.pattern(), ImplementationRuleImpl::Sort => SortImplementation.pattern(), ImplementationRuleImpl::Values => ValuesImplementation.pattern(), ImplementationRuleImpl::CopyFromFile => CopyFromFileImplementation.pattern(), @@ -128,6 +131,9 @@ impl ImplementationRule for ImplementationRuleImpl { ImplementationRuleImpl::IndexScan => { IndexScanImplementation.to_expression(operator, loader, group_expr)? } + ImplementationRuleImpl::FunctionScan => { + FunctionScanImplementation.to_expression(operator, loader, group_expr)? + } ImplementationRuleImpl::Sort => { SortImplementation.to_expression(operator, loader, group_expr)? } diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 840dfeb5..daaba13d 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -93,9 +93,9 @@ pub enum PhysicalOption { NestLoopJoin, Project, SeqScan, + FunctionScan, IndexScan(IndexInfo), - RadixSort, - // NormalSort, + Sort, Limit, Values, Insert, @@ -291,8 +291,9 @@ impl fmt::Display for PhysicalOption { PhysicalOption::NestLoopJoin => write!(f, "NestLoopJoin"), PhysicalOption::Project => write!(f, "Project"), PhysicalOption::SeqScan => write!(f, "SeqScan"), + PhysicalOption::FunctionScan => write!(f, "FunctionScan"), PhysicalOption::IndexScan(index) => write!(f, "IndexScan By {}", index), - PhysicalOption::RadixSort => write!(f, "RadixSort"), + PhysicalOption::Sort => write!(f, "Sort"), PhysicalOption::Limit => write!(f, "Limit"), PhysicalOption::Values => write!(f, "Values"), PhysicalOption::Insert => write!(f, "Insert"),