Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: reduce the overhead incurred when deserializing Tuple #126

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ahash = "0.8.3"
lazy_static = "1.4.0"
comfy-table = "7.0.1"
bytes = "1.5.0"
kip_db = "0.1.2-alpha.24"
kip_db = "0.1.2-alpha.25"
rust_decimal = "1"
csv = "1"
regex = "1.10.2"
Expand Down
19 changes: 18 additions & 1 deletion src/catalog/table.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;

use crate::catalog::{ColumnCatalog, ColumnRef};
use crate::errors::DatabaseError;
use crate::types::index::{IndexMeta, IndexMetaRef};
use crate::types::ColumnId;
use crate::types::{ColumnId, LogicalType};

pub type TableName = Arc<String>;

Expand Down Expand Up @@ -58,6 +59,22 @@ impl TableCatalog {
self.columns.values().map(Arc::clone).collect()
}

pub(crate) fn primary_key(&self) -> Result<(usize, &ColumnRef), DatabaseError> {
self.columns
.iter()
.map(|(_, column)| column)
.enumerate()
.find(|(_, column)| column.desc.is_primary)
.ok_or(DatabaseError::PrimaryKeyNotFound)
}

pub(crate) fn types(&self) -> Vec<LogicalType> {
self.columns
.iter()
.map(|(_, column)| *column.datatype())
.collect_vec()
}

/// Add a column to the table catalog.
pub(crate) fn add_column(&mut self, mut col: ColumnCatalog) -> Result<ColumnId, DatabaseError> {
if self.column_idxs.contains_key(col.name()) {
Expand Down
9 changes: 8 additions & 1 deletion src/execution/volcano/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ impl AddColumn {
if_not_exists,
} = &self.op;
let mut unique_values = column.desc().is_unique.then(|| Vec::new());
let mut tuple_columns = None;
let mut tuples = Vec::new();

#[for_await]
for tuple in build_read(self.input, transaction) {
let mut tuple: Tuple = tuple?;

tuple.columns.push(Arc::new(column.clone()));
let tuples_columns = tuple_columns.get_or_insert_with(|| {
let mut columns = Vec::clone(&tuple.columns);

columns.push(Arc::new(column.clone()));
Arc::new(columns)
});
if let Some(value) = column.default_value() {
if let Some(unique_values) = &mut unique_values {
unique_values.push((tuple.id.clone().unwrap(), value.clone()));
Expand All @@ -51,6 +57,7 @@ impl AddColumn {
} else {
tuple.values.push(Arc::new(DataValue::Null));
}
tuple.columns = tuples_columns.clone();
tuples.push(tuple);
}
for tuple in tuples {
Expand Down
17 changes: 11 additions & 6 deletions src/execution/volcano/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use futures_async_stream::try_stream;
use std::sync::Arc;

pub struct DropColumn {
op: DropColumnOperator,
Expand All @@ -32,14 +33,14 @@ impl DropColumn {
column_name,
if_exists,
} = &self.op;
let mut option_column_i = None;
let mut tuple_columns = None;
let mut tuples = Vec::new();

#[for_await]
for tuple in build_read(self.input, transaction) {
let mut tuple: Tuple = tuple?;

if option_column_i.is_none() {
if tuple_columns.is_none() {
if let Some((column_index, is_primary)) = tuple
.columns
.iter()
Expand All @@ -52,16 +53,20 @@ impl DropColumn {
"drop of primary key column is not allowed.".to_owned(),
))?;
}
option_column_i = Some(column_index);
let mut columns = Vec::clone(&tuple.columns);
let _ = columns.remove(column_index);

tuple_columns = Some((column_index, Arc::new(columns)));
}
}
if option_column_i.is_none() && *if_exists {
if tuple_columns.is_none() && *if_exists {
return Ok(());
}
let column_i = option_column_i
let (column_i, columns) = tuple_columns
.clone()
.ok_or_else(|| DatabaseError::InvalidColumn("not found column".to_string()))?;

let _ = tuple.columns.remove(column_i);
tuple.columns = columns;
let _ = tuple.values.remove(column_i);

tuples.push(tuple);
Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano/dml/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Analyze {

yield Tuple {
id: None,
columns,
columns: Arc::new(columns),
values,
};
}
Expand Down
33 changes: 19 additions & 14 deletions src/execution/volcano/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::index::Index;
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use crate::types::value::DataValue;
use futures_async_stream::try_stream;
use itertools::Itertools;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -51,9 +53,11 @@ impl Insert {
} = self;
let mut primary_key_index = None;
let mut unique_values = HashMap::new();
let mut tuples = Vec::new();
let mut tuple_values = Vec::new();

if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() {
let all_columns = table_catalog.all_columns_with_id();

#[for_await]
for tuple in build_read(input, transaction) {
let Tuple {
Expand All @@ -74,14 +78,10 @@ impl Insert {
.map(|col| col.id().unwrap())
.unwrap()
});
let all_columns = table_catalog.all_columns_with_id();
let tuple_id = tuple_map.get(primary_col_id).cloned().unwrap();
let mut tuple = Tuple {
id: Some(tuple_id.clone()),
columns: Vec::with_capacity(all_columns.len()),
values: Vec::with_capacity(all_columns.len()),
};
for (col_id, col) in all_columns {
let mut values = Vec::with_capacity(all_columns.len());

for (col_id, col) in &all_columns {
let value = tuple_map
.remove(col_id)
.or_else(|| col.default_value())
Expand All @@ -96,14 +96,19 @@ impl Insert {
if value.is_null() && !col.nullable {
return Err(DatabaseError::NotNull);
}

tuple.columns.push(col.clone());
tuple.values.push(value)
values.push(value)
}

tuples.push(tuple);
tuple_values.push((tuple_id, values));
}
for tuple in tuples {
let tuple_columns = all_columns
.into_iter()
.map(|(_, column)| column.clone())
.collect_vec();
let tuple_builder = TupleBuilder::new(tuple_columns);

for (tuple_id, values) in tuple_values {
let tuple = tuple_builder.build(Some(tuple_id), values)?;

transaction.append(&table_name, tuple, is_overwrite)?;
}
// Unique Index
Expand Down
6 changes: 5 additions & 1 deletion src/execution/volcano/dql/aggregate/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::types::value::ValueRef;
use ahash::HashMap;
use futures_async_stream::try_stream;
use itertools::Itertools;
use std::mem;
use std::sync::Arc;

pub struct HashAggExecutor {
agg_calls: Vec<ScalarExpression>,
Expand Down Expand Up @@ -108,6 +110,8 @@ impl HashAggStatus {
}

pub(crate) fn to_tuples(&mut self) -> Result<Vec<Tuple>, DatabaseError> {
let group_columns = Arc::new(mem::replace(&mut self.group_columns, vec![]));

Ok(self
.group_hash_accs
.drain()
Expand All @@ -121,7 +125,7 @@ impl HashAggStatus {

Ok::<Tuple, DatabaseError>(Tuple {
id: None,
columns: self.group_columns.clone(),
columns: group_columns.clone(),
values,
})
})
Expand Down
3 changes: 2 additions & 1 deletion src/execution/volcano/dql/aggregate/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::types::tuple::Tuple;
use crate::types::value::ValueRef;
use futures_async_stream::try_stream;
use itertools::Itertools;
use std::sync::Arc;

pub struct SimpleAggExecutor {
agg_calls: Vec<ScalarExpression>,
Expand Down Expand Up @@ -65,7 +66,7 @@ impl SimpleAggExecutor {

yield Tuple {
id: None,
columns,
columns: Arc::new(columns),
values,
};
}
Expand Down
6 changes: 2 additions & 4 deletions src/execution/volcano/dql/explain.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::catalog::ColumnCatalog;
use crate::catalog::ColumnRef;
use crate::errors::DatabaseError;
use crate::execution::volcano::{BoxedExecutor, ReadExecutor};
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::value::DataValue;
use crate::types::value::ValueRef;
use futures_async_stream::try_stream;
use std::sync::Arc;

Expand All @@ -29,8 +27,8 @@ impl<T: Transaction> ReadExecutor<T> for Explain {
impl Explain {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
pub async fn _execute(self) {
let columns: Vec<ColumnRef> = vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))];
let values: Vec<ValueRef> = vec![Arc::new(DataValue::Utf8(Some(self.plan.explain(0))))];
let columns = Arc::new(vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))]);
let values = vec![Arc::new(DataValue::Utf8(Some(self.plan.explain(0))))];

yield Tuple {
id: None,
Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano/dql/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl IndexScan {
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let ScanOperator {
table_name,
projection_columns: columns,
columns,
limit,
..
} = self.op;
Expand Down
31 changes: 21 additions & 10 deletions src/execution/volcano/dql/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl HashJoinStatus {
Self::columns_filling(&tuple, join_columns, *right_force_nullable);
let _ = mem::replace(right_init_flag, true);
}
let join_columns = Arc::new(join_columns.clone());

let mut join_tuples = if let Some(tuples) = build_map.get(&hash) {
let _ = used_set.insert(hash);
Expand Down Expand Up @@ -234,6 +235,8 @@ impl HashJoinStatus {

matches!(ty, JoinType::Left | JoinType::Full)
.then(|| {
let mut buf = None;

build_map
.drain()
.filter(|(hash, _)| !used_set.contains(hash))
Expand All @@ -245,16 +248,24 @@ impl HashJoinStatus {
} in tuples.iter_mut()
{
let _ = mem::replace(id, None);
let (mut right_values, mut right_columns): (
Vec<ValueRef>,
Vec<ColumnRef>,
) = join_columns[columns.len()..]
.iter()
.map(|col| (Arc::new(DataValue::none(col.datatype())), col.clone()))
.unzip();

values.append(&mut right_values);
columns.append(&mut right_columns);
let (right_values, full_columns) = buf.get_or_insert_with(|| {
let (right_values, mut right_columns): (
Vec<ValueRef>,
Vec<ColumnRef>,
) = join_columns[columns.len()..]
.iter()
.map(|col| {
(Arc::new(DataValue::none(col.datatype())), col.clone())
})
.unzip();
let mut full_columns = Vec::clone(columns);
full_columns.append(&mut right_columns);

(right_values, Arc::new(full_columns))
});

values.append(&mut right_values.clone());
*columns = full_columns.clone();
}
tuples
})
Expand Down
16 changes: 11 additions & 5 deletions src/execution/volcano/dql/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use futures_async_stream::try_stream;
use std::sync::Arc;

pub struct Projection {
exprs: Vec<ScalarExpression>,
Expand All @@ -28,20 +29,25 @@ impl Projection {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let Projection { exprs, input } = self;
let mut columns = None;

#[for_await]
for tuple in build_read(input, transaction) {
let mut tuple = tuple?;

let mut columns = Vec::with_capacity(exprs.len());
let mut values = Vec::with_capacity(exprs.len());
let columns = columns.get_or_insert_with(|| {
let mut columns = Vec::with_capacity(exprs.len());

for expr in exprs.iter() {
columns.push(expr.output_column());
}
Arc::new(columns)
});

for expr in exprs.iter() {
values.push(expr.eval(&tuple)?);
columns.push(expr.output_column());
}

tuple.columns = columns;
tuple.columns = columns.clone();
tuple.values = values;

yield tuple;
Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano/dql/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl SeqScan {
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let ScanOperator {
table_name,
projection_columns: columns,
columns,
limit,
..
} = self.op;
Expand Down
Loading
Loading