Skip to content

Commit

Permalink
perf: optimized DataValue::Utf8 convert to encoding of primary/uniq…
Browse files Browse the repository at this point in the history
…ue key
  • Loading branch information
KKould committed Sep 25, 2023
1 parent 1f48edc commit cdf3694
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 19 deletions.
4 changes: 2 additions & 2 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ mod test {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let kipsql = Database::with_kipdb(temp_dir.path()).await?;

let _ = kipsql.run("create table t1 (a int primary key, b int unique null, k int)").await?;
let _ = kipsql.run("create table t1 (a int primary key, b int unique null, k int, z varchar unique null)").await?;
let _ = kipsql.run("create table t2 (c int primary key, d int unsigned null, e datetime)").await?;
let _ = kipsql.run("insert into t1 (a, b, k) values (-99, 1, 1), (-1, 2, 2), (5, 3, 2)").await?;
let _ = kipsql.run("insert into t1 (a, b, k, z) values (-99, 1, 1, 'k'), (-1, 2, 2, 'i'), (5, 3, 2, 'p')").await?;
let _ = kipsql.run("insert into t2 (d, c, e) values (2, 1, '2021-05-20 21:00:00'), (3, 4, '2023-09-10 00:00:00')").await?;
let _ = kipsql.run("create table t3 (a int primary key, b decimal(4,2))").await?;
let _ = kipsql.run("insert into t3 (a, b) values (1, 1111), (2, 2.01), (3, 3.00)").await?;
Expand Down
33 changes: 18 additions & 15 deletions src/execution/executor/dml/insert.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use std::collections::HashMap;
use std::sync::Arc;
use futures_async_stream::try_stream;
use itertools::Itertools;
use crate::catalog::TableName;
use crate::execution::executor::{BoxedExecutor, Executor};
use crate::execution::ExecutorError;
use crate::planner::operator::insert::InsertOperator;
use crate::storage::{Storage, Transaction};
use crate::types::ColumnId;
use crate::types::index::Index;
use crate::types::tuple::Tuple;
use crate::types::value::{DataValue, ValueRef};
use crate::types::value::DataValue;

pub struct Insert {
table_name: TableName,
Expand Down Expand Up @@ -47,25 +45,30 @@ impl Insert {
#[for_await]
for tuple in input {
let Tuple { columns, values, .. } = tuple?;
let primary_idx = primary_key_index.get_or_insert_with(|| {
let mut tuple_map = HashMap::new();
for (i, value) in values.into_iter().enumerate() {
let col = &columns[i];
let cast_val = DataValue::clone(&value).cast(&col.datatype())?;

if let Some(col_id) = col.id {
tuple_map.insert(col_id, Arc::new(cast_val));
}
}
let primary_col_id = primary_key_index.get_or_insert_with(|| {
columns.iter()
.find_position(|col| col.desc.is_primary)
.map(|(i, _)| i)
.find(|col| col.desc.is_primary)
.map(|col| col.id.unwrap())
.unwrap()
});
let id = values[*primary_idx].clone();
let mut tuple_map: HashMap<ColumnId, ValueRef> = values
.into_iter()
.enumerate()
.filter_map(|(i, value)| columns[i].id.map(|id| (id, value)))
.collect();
let all_columns = table_catalog.all_columns_with_id();
let tuple_id = tuple_map.get(primary_col_id)
.cloned()
.unwrap();
let mut tuple = Tuple {
id: Some(id.clone()),
id: Some(tuple_id.clone()),
columns: Vec::with_capacity(all_columns.len()),
values: Vec::with_capacity(all_columns.len()),
};

for (col_id, col) in all_columns {
let value = tuple_map.remove(col_id)
.unwrap_or_else(|| Arc::new(DataValue::none(col.datatype())));
Expand All @@ -74,7 +77,7 @@ impl Insert {
unique_values
.entry(col.id)
.or_insert_with(|| vec![])
.push((id.clone(), value.clone()))
.push((tuple_id.clone(), value.clone()))
}
if value.is_null() && !col.nullable {
return Err(ExecutorError::InternalError(format!("Non-null fields do not allow null values to be passed in: {:?}", col)));
Expand Down
55 changes: 53 additions & 2 deletions src/types/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ lazy_static! {
pub const DATE_FMT: &str = "%Y-%m-%d";
pub const DATE_TIME_FMT: &str = "%Y-%m-%d %H:%M:%S";

const ENCODE_GROUP_SIZE: usize = 8;
const ENCODE_MARKER: u8 = 0xFF;

pub type ValueRef = Arc<DataValue>;

#[derive(Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -383,6 +386,54 @@ impl DataValue {
}
}

// EncodeBytes guarantees the encoded value is in ascending order for comparison,
// encoding with the following rule:
//
// [group1][marker1]...[groupN][markerN]
// group is 8 bytes slice which is padding with 0.
// marker is `0xFF - padding 0 count`
//
// For example:
//
// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247]
// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250]
// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251]
// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247]
//
// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
fn encode_bytes(b: &mut Vec<u8>, data: &[u8]) {
let d_len = data.len();
let realloc_size = (d_len / ENCODE_GROUP_SIZE + 1) * (ENCODE_GROUP_SIZE + 1);
Self::realloc_bytes(b, realloc_size);

let mut idx = 0;
while idx <= d_len {
let remain = d_len - idx;
let pad_count: usize;

if remain >= ENCODE_GROUP_SIZE {
b.extend_from_slice(&data[idx..idx + ENCODE_GROUP_SIZE]);
pad_count = 0;
} else {
pad_count = ENCODE_GROUP_SIZE - remain;
b.extend_from_slice(&data[idx..]);
b.extend_from_slice(&vec![0; pad_count]);
}

b.push(ENCODE_MARKER - pad_count as u8);
idx += ENCODE_GROUP_SIZE;
}
}

fn realloc_bytes(b: &mut Vec<u8>, size: usize) {
let len = b.len();

if size > len {
b.reserve(size - len);
b.resize(size, 0);
}
}

pub fn to_primary_key(&self, b: &mut Vec<u8>) -> Result<(), TypeError> {
match self {
DataValue::Int8(Some(v)) => encode_u!(b, *v as u8 ^ 0x80_u8),
Expand All @@ -393,7 +444,7 @@ impl DataValue {
DataValue::UInt16(Some(v)) => encode_u!(b, v),
DataValue::UInt32(Some(v)) => encode_u!(b, v),
DataValue::UInt64(Some(v)) => encode_u!(b, v),
DataValue::Utf8(Some(v)) => b.copy_from_slice(&mut v.as_bytes()),
DataValue::Utf8(Some(v)) => Self::encode_bytes(b, v.as_bytes()),
value => {
return if value.is_null() {
Err(TypeError::NotNull)
Expand All @@ -420,7 +471,7 @@ impl DataValue {
DataValue::UInt16(Some(v)) => encode_u!(b, v),
DataValue::UInt32(Some(v)) => encode_u!(b, v),
DataValue::UInt64(Some(v)) => encode_u!(b, v),
DataValue::Utf8(Some(v)) => b.copy_from_slice(&mut v.as_bytes()),
DataValue::Utf8(Some(v)) => Self::encode_bytes(b, v.as_bytes()),
DataValue::Boolean(Some(v)) => b.push(if *v { b'1' } else { b'0' }),
DataValue::Float32(Some(f)) => {
let mut u = f.to_bits();
Expand Down

0 comments on commit cdf3694

Please sign in to comment.