Skip to content

Commit

Permalink
perf: reconstruction Sort operator reconstruction using radix sort + …
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Dec 23, 2023
1 parent 6f0c550 commit 4ca1f32
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 109 deletions.
154 changes: 98 additions & 56 deletions src/execution/executor/dql/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,54 @@ use crate::execution::ExecutorError;
use crate::planner::operator::sort::{SortField, SortOperator};
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::value::ValueRef;
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use std::cell::RefCell;
use std::cmp::Ordering;
use std::mem;
use crate::types::errors::TypeError;

const BUCKET_SIZE: usize = u8::MAX as usize + 1;

// LSD Radix Sort
fn radix_sort<T>(tuple_groups: &mut Vec<(T, Vec<u8>)>) {
if let Some(max_len) = tuple_groups
.iter()
.map(|(_, bytes)| bytes.len())
.max()
{
// init buckets
let mut temp_buckets = Vec::new();
for _ in 0..BUCKET_SIZE {
temp_buckets.push(Vec::new());
}

// Use Option Vector to avoid empty data allocation
let mut temp_groups = tuple_groups
.drain(..)
.into_iter()
.map(|item| Some(item))
.collect_vec();

for i in (0..max_len).rev() {
for option in temp_groups.into_iter() {
let (t, bytes) = option.unwrap();
let index = if bytes.len() > i {
bytes[i]
} else {
0
};
temp_buckets[index as usize].push(Some((t, bytes)));
}

temp_groups = temp_buckets
.iter_mut()
.map(|group| mem::replace(group, vec![]))
.flatten()
.collect_vec();
}
tuple_groups.extend(temp_groups.into_iter().flatten());
}
}

pub struct Sort {
sort_fields: Vec<SortField>,
Expand Down Expand Up @@ -40,70 +82,70 @@ impl Sort {
limit,
input,
} = self;
let mut tuples: Vec<(usize, Tuple)> = vec![];
let mut tuples: Vec<Tuple> = vec![];

#[for_await]
for (i, tuple) in input.enumerate() {
tuples.push((i, tuple?));
for tuple in input {
tuples.push(tuple?);
}
let sort_values: Vec<Vec<ValueRef>> = tuples
.iter()
.map(|(_, tuple)| {
sort_fields
.iter()
.map(|SortField { expr, .. }| expr.eval(tuple))
.try_collect()
})
.try_collect()?;
let mut tuples_with_keys: Vec<(Tuple, Vec<u8>)> = tuples
.into_iter()
.map(|tuple| {
let mut full_key = Vec::new();

tuples.sort_by(|(i_1, _), (i_2, _)| {
let mut ordering = Ordering::Equal;

for (
sort_index,
SortField {
asc, nulls_first, ..
},
) in sort_fields.iter().enumerate()
{
let value_1 = &sort_values[*i_1][sort_index];
let value_2 = &sort_values[*i_2][sort_index];

ordering = value_1.partial_cmp(&value_2).unwrap_or_else(|| {
match (value_1.is_null(), value_2.is_null()) {
(false, true) => {
if *nulls_first {
Ordering::Less
} else {
Ordering::Greater
}
}
(true, false) => {
if *nulls_first {
Ordering::Greater
} else {
Ordering::Less
}
for SortField { expr, nulls_first, asc } in &sort_fields {
let mut key = Vec::new();

expr
.eval(&tuple)?
.memcomparable_encode(&mut key)?;
key.push(if *nulls_first {
u8::MAX
} else {
u8::MIN
});

if !asc {
for byte in key.iter_mut() {
*byte ^= 0xFF;
}
_ => Ordering::Equal,
}
});
if !*asc {
ordering = ordering.reverse();
full_key.extend(key);
}
if ordering != Ordering::Equal {
break;
}
}
Ok::<(Tuple, Vec<u8>), TypeError>((tuple, full_key))
})
.try_collect()?;

ordering
});
drop(sort_values);
radix_sort(&mut tuples_with_keys);

let len = limit.unwrap_or(tuples.len());
let len = limit.unwrap_or(tuples_with_keys.len());

for tuple in tuples.drain(..len).map(|(_, tuple)| tuple) {
for tuple in tuples_with_keys
.drain(..len)
.map(|(tuple, _)| tuple) {
yield tuple;
}
}
}

#[test]
fn test_sort() {
let mut tupels = vec![
(0, "abc".as_bytes().to_vec()),
(1, "abz".as_bytes().to_vec()),
(2, "abe".as_bytes().to_vec()),
(3, "abcd".as_bytes().to_vec()),
];

radix_sort(&mut tupels);

assert_eq!(
tupels,
vec![
(0, "abc".as_bytes().to_vec()),
(3, "abcd".as_bytes().to_vec()),
(2, "abe".as_bytes().to_vec()),
(1, "abz".as_bytes().to_vec()),
]
)
}
2 changes: 1 addition & 1 deletion src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum ExecutorError {
#[from]
TypeError,
),
#[error("storage_ap error: {0}")]
#[error("storage error: {0}")]
StorageError(
#[source]
#[from]
Expand Down
21 changes: 19 additions & 2 deletions src/storage/table_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use crate::types::index::{Index, IndexId, IndexMeta};
use crate::types::tuple::{Tuple, TupleId};
use bytes::Bytes;
use lazy_static::lazy_static;
use crate::types::LogicalType;

const BOUND_MIN_TAG: u8 = 0;
const BOUND_MAX_TAG: u8 = 1;

lazy_static! {
static ref ROOT_BYTES: Vec<u8> = b"Root".to_vec();
}
Expand Down Expand Up @@ -136,7 +138,21 @@ impl TableCodec {
let mut key_prefix = Self::key_prefix(CodecType::Tuple, table_name);
key_prefix.push(BOUND_MIN_TAG);

tuple_id.to_primary_key(&mut key_prefix)?;
if !matches!(
tuple_id.logical_type(),
LogicalType::Tinyint
| LogicalType::Smallint
| LogicalType::Integer
| LogicalType::Bigint
| LogicalType::UTinyint
| LogicalType::USmallint
| LogicalType::UInteger
| LogicalType::UBigint
| LogicalType::Varchar(_)
) {
return Err(TypeError::InvalidType);
}
tuple_id.memcomparable_encode(&mut key_prefix)?;

Ok(key_prefix)
}
Expand Down Expand Up @@ -195,7 +211,8 @@ impl TableCodec {
key_prefix.push(BOUND_MIN_TAG);

for col_v in &index.column_values {
col_v.to_index_key(&mut key_prefix)?;
col_v.memcomparable_encode(&mut key_prefix)?;
key_prefix.push(BOUND_MIN_TAG);
}

Ok(key_prefix)
Expand Down
72 changes: 24 additions & 48 deletions src/types/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,30 +500,7 @@ impl DataValue {
}
}

pub fn to_primary_key(&self, b: &mut Vec<u8>) -> Result<(), TypeError> {
match self {
DataValue::Int8(Some(v)) => encode_u!(b, *v as u8 ^ 0x80_u8),
DataValue::Int16(Some(v)) => encode_u!(b, *v as u16 ^ 0x8000_u16),
DataValue::Int32(Some(v)) => encode_u!(b, *v as u32 ^ 0x80000000_u32),
DataValue::Int64(Some(v)) => encode_u!(b, *v as u64 ^ 0x8000000000000000_u64),
DataValue::UInt8(Some(v)) => encode_u!(b, v),
DataValue::UInt16(Some(v)) => encode_u!(b, v),
DataValue::UInt32(Some(v)) => encode_u!(b, v),
DataValue::UInt64(Some(v)) => encode_u!(b, v),
DataValue::Utf8(Some(v)) => Self::encode_bytes(b, v.as_bytes()),
value => {
return if value.is_null() {
Err(TypeError::PrimaryKeyNotFound)
} else {
Err(TypeError::InvalidType)
}
}
}

Ok(())
}

pub fn to_index_key(&self, b: &mut Vec<u8>) -> Result<(), TypeError> {
pub fn memcomparable_encode(&self, b: &mut Vec<u8>) -> Result<(), TypeError> {
match self {
DataValue::Int8(Some(v)) => encode_u!(b, *v as u8 ^ 0x80_u8),
DataValue::Int16(Some(v)) => encode_u!(b, *v as u16 ^ 0x8000_u16),
Expand Down Expand Up @@ -561,12 +538,11 @@ impl DataValue {

encode_u!(b, u);
}
DataValue::Null => (),
DataValue::Decimal(Some(_v)) => todo!(),
value => {
return if value.is_null() {
todo!()
} else {
Err(TypeError::InvalidType)
if !value.is_null() {
return Err(TypeError::InvalidType);
}
}
}
Expand Down Expand Up @@ -1059,14 +1035,14 @@ mod test {
use crate::types::value::DataValue;

#[test]
fn test_to_primary_key() -> Result<(), TypeError> {
fn test_mem_comparable_int() -> Result<(), TypeError> {
let mut key_i8_1 = Vec::new();
let mut key_i8_2 = Vec::new();
let mut key_i8_3 = Vec::new();

DataValue::Int8(Some(i8::MIN)).to_primary_key(&mut key_i8_1)?;
DataValue::Int8(Some(-1_i8)).to_primary_key(&mut key_i8_2)?;
DataValue::Int8(Some(i8::MAX)).to_primary_key(&mut key_i8_3)?;
DataValue::Int8(Some(i8::MIN)).memcomparable_encode(&mut key_i8_1)?;
DataValue::Int8(Some(-1_i8)).memcomparable_encode(&mut key_i8_2)?;
DataValue::Int8(Some(i8::MAX)).memcomparable_encode(&mut key_i8_3)?;

println!("{:?} < {:?}", key_i8_1, key_i8_2);
println!("{:?} < {:?}", key_i8_2, key_i8_3);
Expand All @@ -1077,9 +1053,9 @@ mod test {
let mut key_i16_2 = Vec::new();
let mut key_i16_3 = Vec::new();

DataValue::Int16(Some(i16::MIN)).to_primary_key(&mut key_i16_1)?;
DataValue::Int16(Some(-1_i16)).to_primary_key(&mut key_i16_2)?;
DataValue::Int16(Some(i16::MAX)).to_primary_key(&mut key_i16_3)?;
DataValue::Int16(Some(i16::MIN)).memcomparable_encode(&mut key_i16_1)?;
DataValue::Int16(Some(-1_i16)).memcomparable_encode(&mut key_i16_2)?;
DataValue::Int16(Some(i16::MAX)).memcomparable_encode(&mut key_i16_3)?;

println!("{:?} < {:?}", key_i16_1, key_i16_2);
println!("{:?} < {:?}", key_i16_2, key_i16_3);
Expand All @@ -1090,9 +1066,9 @@ mod test {
let mut key_i32_2 = Vec::new();
let mut key_i32_3 = Vec::new();

DataValue::Int32(Some(i32::MIN)).to_primary_key(&mut key_i32_1)?;
DataValue::Int32(Some(-1_i32)).to_primary_key(&mut key_i32_2)?;
DataValue::Int32(Some(i32::MAX)).to_primary_key(&mut key_i32_3)?;
DataValue::Int32(Some(i32::MIN)).memcomparable_encode(&mut key_i32_1)?;
DataValue::Int32(Some(-1_i32)).memcomparable_encode(&mut key_i32_2)?;
DataValue::Int32(Some(i32::MAX)).memcomparable_encode(&mut key_i32_3)?;

println!("{:?} < {:?}", key_i32_1, key_i32_2);
println!("{:?} < {:?}", key_i32_2, key_i32_3);
Expand All @@ -1103,9 +1079,9 @@ mod test {
let mut key_i64_2 = Vec::new();
let mut key_i64_3 = Vec::new();

DataValue::Int64(Some(i64::MIN)).to_primary_key(&mut key_i64_1)?;
DataValue::Int64(Some(-1_i64)).to_primary_key(&mut key_i64_2)?;
DataValue::Int64(Some(i64::MAX)).to_primary_key(&mut key_i64_3)?;
DataValue::Int64(Some(i64::MIN)).memcomparable_encode(&mut key_i64_1)?;
DataValue::Int64(Some(-1_i64)).memcomparable_encode(&mut key_i64_2)?;
DataValue::Int64(Some(i64::MAX)).memcomparable_encode(&mut key_i64_3)?;

println!("{:?} < {:?}", key_i64_1, key_i64_2);
println!("{:?} < {:?}", key_i64_2, key_i64_3);
Expand All @@ -1116,14 +1092,14 @@ mod test {
}

#[test]
fn test_to_index_key_f() -> Result<(), TypeError> {
fn test_mem_comparable_float() -> Result<(), TypeError> {
let mut key_f32_1 = Vec::new();
let mut key_f32_2 = Vec::new();
let mut key_f32_3 = Vec::new();

DataValue::Float32(Some(f32::MIN)).to_index_key(&mut key_f32_1)?;
DataValue::Float32(Some(-1_f32)).to_index_key(&mut key_f32_2)?;
DataValue::Float32(Some(f32::MAX)).to_index_key(&mut key_f32_3)?;
DataValue::Float32(Some(f32::MIN)).memcomparable_encode(&mut key_f32_1)?;
DataValue::Float32(Some(-1_f32)).memcomparable_encode(&mut key_f32_2)?;
DataValue::Float32(Some(f32::MAX)).memcomparable_encode(&mut key_f32_3)?;

println!("{:?} < {:?}", key_f32_1, key_f32_2);
println!("{:?} < {:?}", key_f32_2, key_f32_3);
Expand All @@ -1134,9 +1110,9 @@ mod test {
let mut key_f64_2 = Vec::new();
let mut key_f64_3 = Vec::new();

DataValue::Float64(Some(f64::MIN)).to_index_key(&mut key_f64_1)?;
DataValue::Float64(Some(-1_f64)).to_index_key(&mut key_f64_2)?;
DataValue::Float64(Some(f64::MAX)).to_index_key(&mut key_f64_3)?;
DataValue::Float64(Some(f64::MIN)).memcomparable_encode(&mut key_f64_1)?;
DataValue::Float64(Some(-1_f64)).memcomparable_encode(&mut key_f64_2)?;
DataValue::Float64(Some(f64::MAX)).memcomparable_encode(&mut key_f64_3)?;

println!("{:?} < {:?}", key_f64_1, key_f64_2);
println!("{:?} < {:?}", key_f64_2, key_f64_3);
Expand Down
Loading

0 comments on commit 4ca1f32

Please sign in to comment.