Skip to content

Commit

Permalink
chore: remove ValueRef to reduce Arc usage
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 6, 2024
1 parent fe88322 commit 36aa42f
Show file tree
Hide file tree
Showing 52 changed files with 762 additions and 927 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ implement_from_tuple!(
```
- User-Defined Function: `features = ["macros"]`
```rust
scala_function!(TestFunction::test(LogicalType::Integer, LogicalType::Integer) -> LogicalType::Integer => |v1: ValueRef, v2: ValueRef| {
scala_function!(TestFunction::test(LogicalType::Integer, LogicalType::Integer) -> LogicalType::Integer => |v1: DataValue, v2: DataValue| {
let plus_binary_evaluator = EvaluatorFactory::binary_create(LogicalType::Integer, BinaryOperator::Plus)?;
let value = plus_binary_evaluator.binary_eval(&v1, &v2);

Expand All @@ -130,7 +130,7 @@ let fnck_sql = DataBaseBuilder::path("./data")
```
- User-Defined Table Function: `features = ["macros"]`
```rust
table_function!(MyTableFunction::test_numbers(LogicalType::Integer) -> [c1: LogicalType::Integer, c2: LogicalType::Integer] => (|v1: ValueRef| {
table_function!(MyTableFunction::test_numbers(LogicalType::Integer) -> [c1: LogicalType::Integer, c2: LogicalType::Integer] => (|v1: DataValue| {
let num = v1.i32().unwrap();

Ok(Box::new((0..num)
Expand Down
8 changes: 4 additions & 4 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
}
Expr::CompoundIdentifier(idents) => self.bind_column_ref_from_identifiers(idents, None),
Expr::BinaryOp { left, right, op } => self.bind_binary_op_internal(left, right, op),
Expr::Value(v) => Ok(ScalarExpression::Constant(Arc::new(v.into()))),
Expr::Value(v) => Ok(ScalarExpression::Constant(v.into())),
Expr::Function(func) => self.bind_function(func),
Expr::Nested(expr) => self.bind_expr(expr),
Expr::UnaryOp { expr, op } => self.bind_unary_op_internal(expr, op),
Expand Down Expand Up @@ -77,7 +77,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
}
.cast(&logical_type)?;

Ok(ScalarExpression::Constant(Arc::new(value)))
Ok(ScalarExpression::Constant(value))
}
Expr::Between {
expr,
Expand Down Expand Up @@ -672,10 +672,10 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
}

fn wildcard_expr() -> ScalarExpression {
ScalarExpression::Constant(Arc::new(DataValue::Utf8 {
ScalarExpression::Constant(DataValue::Utf8 {
value: Some("*".to_string()),
ty: Utf8Type::Variable(None),
unit: CharLengthUnits::Characters,
}))
})
}
}
6 changes: 3 additions & 3 deletions src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::SchemaRef;
use crate::types::value::{DataValue, ValueRef};
use crate::types::value::DataValue;
use sqlparser::ast::{Expr, Ident, ObjectName};
use std::slice;
use std::sync::Arc;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
value.check_len(ty)?;

if value.logical_type() != *ty {
value = Arc::new(DataValue::clone(&value).cast(ty)?);
value = value.cast(ty)?;
}
row.push(value);
}
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<T: Transaction> Binder<'_, '_, T> {

pub(crate) fn bind_values(
&mut self,
rows: Vec<Vec<ValueRef>>,
rows: Vec<Vec<DataValue>>,
schema_ref: SchemaRef,
) -> LogicalPlan {
LogicalPlan::new(
Expand Down
4 changes: 2 additions & 2 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
if let Some(expr) = limit_expr {
let expr = self.bind_expr(expr)?;
match expr {
ScalarExpression::Constant(dv) => match dv.as_ref() {
ScalarExpression::Constant(dv) => match &dv {
DataValue::Int32(Some(v)) if *v >= 0 => limit = Some(*v as usize),
DataValue::Int64(Some(v)) if *v >= 0 => limit = Some(*v as usize),
_ => return Err(DatabaseError::InvalidType),
Expand All @@ -713,7 +713,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
if let Some(expr) = offset_expr {
let expr = self.bind_expr(&expr.value)?;
match expr {
ScalarExpression::Constant(dv) => match dv.as_ref() {
ScalarExpression::Constant(dv) => match &dv {
DataValue::Int32(Some(v)) if *v > 0 => offset = Some(*v as usize),
DataValue::Int64(Some(v)) if *v > 0 => offset = Some(*v as usize),
_ => return Err(DatabaseError::InvalidType),
Expand Down
6 changes: 3 additions & 3 deletions src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::planner::operator::update::UpdateOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::value::DataValue;
use sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins};
use std::slice;
use std::sync::Arc;
Expand Down Expand Up @@ -47,10 +46,11 @@ impl<T: Transaction> Binder<'_, '_, T> {
// Check if the value length is too long
value.check_len(ty)?;

let mut value = value.clone();
if value.logical_type() != *ty {
row.push(Arc::new(DataValue::clone(value).cast(ty)?));
value = value.cast(ty)?;
}
row.push(value.clone());
row.push(value);
}
ScalarExpression::Empty => {
let default_value = column
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::catalog::TableName;
use crate::errors::DatabaseError;
use crate::expression::ScalarExpression;
use crate::types::tuple::EMPTY_TUPLE;
use crate::types::value::ValueRef;
use crate::types::value::DataValue;
use crate::types::{ColumnId, LogicalType};
use fnck_sql_serde_macros::ReferenceSerialization;
use sqlparser::ast::CharLengthUnits;
Expand Down Expand Up @@ -166,7 +166,7 @@ impl ColumnCatalog {
&self.desc.column_datatype
}

pub(crate) fn default_value(&self) -> Result<Option<ValueRef>, DatabaseError> {
pub(crate) fn default_value(&self) -> Result<Option<DataValue>, DatabaseError> {
self.desc
.default
.as_ref()
Expand Down
28 changes: 7 additions & 21 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,7 @@ pub(crate) mod test {
tuples,
vec![Tuple {
id: None,
values: vec![Arc::new(DataValue::Date32(Some(
Local::now().num_days_from_ce()
)))],
values: vec![DataValue::Date32(Some(Local::now().num_days_from_ce()))],
}]
);
Ok(())
Expand Down Expand Up @@ -428,11 +426,11 @@ pub(crate) mod test {
vec![
Tuple {
id: None,
values: vec![Arc::new(DataValue::Int32(Some(3)))],
values: vec![DataValue::Int32(Some(3))],
},
Tuple {
id: None,
values: vec![Arc::new(DataValue::Int32(Some(4)))],
values: vec![DataValue::Int32(Some(4))],
},
]
);
Expand Down Expand Up @@ -463,32 +461,20 @@ pub(crate) mod test {

debug_assert_eq!(
tuples_1[0].values,
vec![
Arc::new(DataValue::Int32(Some(0))),
Arc::new(DataValue::Int32(Some(0)))
]
vec![DataValue::Int32(Some(0)), DataValue::Int32(Some(0))]
);
debug_assert_eq!(
tuples_1[1].values,
vec![
Arc::new(DataValue::Int32(Some(1))),
Arc::new(DataValue::Int32(Some(1)))
]
vec![DataValue::Int32(Some(1)), DataValue::Int32(Some(1))]
);

debug_assert_eq!(
tuples_2[0].values,
vec![
Arc::new(DataValue::Int32(Some(0))),
Arc::new(DataValue::Int32(Some(0)))
]
vec![DataValue::Int32(Some(0)), DataValue::Int32(Some(0))]
);
debug_assert_eq!(
tuples_2[1].values,
vec![
Arc::new(DataValue::Int32(Some(3))),
Arc::new(DataValue::Int32(Some(3)))
]
vec![DataValue::Int32(Some(3)), DataValue::Int32(Some(3))]
);

tx_1.commit()?;
Expand Down
3 changes: 1 addition & 2 deletions src/execution/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;
use std::slice;
use std::sync::Arc;

pub struct AddColumn {
op: AddColumnOperator,
Expand Down Expand Up @@ -61,7 +60,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
}
tuple.values.push(value);
} else {
tuple.values.push(Arc::new(DataValue::Null));
tuple.values.push(DataValue::Null);
}
tuples.push(tuple);
}
Expand Down
4 changes: 2 additions & 2 deletions src/execution/dml/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze {
let meta = StatisticsMeta::new(histogram, sketch);

throw!(meta.to_file(&temp_path));
values.push(Arc::new(DataValue::Utf8 {
values.push(DataValue::Utf8 {
value: Some(path_str.clone()),
ty: Utf8Type::Variable(None),
unit: CharLengthUnits::Characters,
}));
});
throw!(transaction.save_table_meta(cache.2, &table_name, path_str, meta));
throw!(fs::rename(&temp_path, &path).map_err(DatabaseError::IO));

Expand Down
4 changes: 2 additions & 2 deletions src/execution/dml/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::throw;
use crate::types::index::{Index, IndexId, IndexType};
use crate::types::tuple::Tuple;
use crate::types::tuple_builder::TupleBuilder;
use crate::types::value::ValueRef;
use crate::types::value::DataValue;
use std::collections::HashMap;
use std::ops::Coroutine;
use std::ops::CoroutineState;
Expand Down Expand Up @@ -106,6 +106,6 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete {

struct Value {
exprs: Vec<ScalarExpression>,
value_rows: Vec<Vec<ValueRef>>,
value_rows: Vec<Vec<DataValue>>,
index_ty: IndexType,
}
5 changes: 2 additions & 3 deletions src/execution/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;
use std::sync::Arc;

pub struct Insert {
table_name: TableName,
Expand Down Expand Up @@ -119,7 +118,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert {
if value.is_none() {
value = throw!(col.default_value());
}
value.unwrap_or_else(|| Arc::new(DataValue::none(col.datatype())))
value.unwrap_or_else(|| DataValue::none(col.datatype()))
};
if value.is_null() && !col.nullable() {
yield Err(DatabaseError::NotNull);
Expand All @@ -131,7 +130,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert {
id: Some(if primary_keys.len() == 1 {
tuple_id.pop().unwrap()
} else {
Arc::new(DataValue::Tuple(Some(tuple_id)))
DataValue::Tuple(Some(tuple_id))
}),
values,
});
Expand Down
3 changes: 1 addition & 2 deletions src/execution/dml/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::collections::HashMap;
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;
use std::sync::Arc;

pub struct Update {
table_name: TableName,
Expand Down Expand Up @@ -108,7 +107,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
let id = if primary_keys.len() == 1 {
primary_keys.pop().unwrap()
} else {
Arc::new(DataValue::Tuple(Some(primary_keys)))
DataValue::Tuple(Some(primary_keys))
};
if &id != tuple.id.as_ref().unwrap() {
let old_key = tuple.id.replace(id).unwrap();
Expand Down
13 changes: 6 additions & 7 deletions src/execution/dql/aggregate/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use crate::execution::dql::aggregate::sum::SumAccumulator;
use crate::execution::dql::aggregate::Accumulator;
use crate::expression::BinaryOperator;
use crate::types::evaluator::EvaluatorFactory;
use crate::types::value::{DataValue, ValueRef};
use crate::types::value::DataValue;
use crate::types::LogicalType;
use std::sync::Arc;

pub struct AvgAccumulator {
inner: SumAccumulator,
Expand All @@ -22,7 +21,7 @@ impl AvgAccumulator {
}

impl Accumulator for AvgAccumulator {
fn update_value(&mut self, value: &ValueRef) -> Result<(), DatabaseError> {
fn update_value(&mut self, value: &DataValue) -> Result<(), DatabaseError> {
if !value.is_null() {
self.inner.update_value(value)?;
self.count += 1;
Expand All @@ -31,12 +30,12 @@ impl Accumulator for AvgAccumulator {
Ok(())
}

fn evaluate(&self) -> Result<ValueRef, DatabaseError> {
fn evaluate(&self) -> Result<DataValue, DatabaseError> {
let mut value = self.inner.evaluate()?;
let value_ty = value.logical_type();

if self.count == 0 {
return Ok(Arc::new(DataValue::init(&value_ty)));
return Ok(DataValue::init(&value_ty));
}
let quantity = if value_ty.is_signed_numeric() {
DataValue::Int64(Some(self.count as i64))
Expand All @@ -46,9 +45,9 @@ impl Accumulator for AvgAccumulator {
let quantity_ty = quantity.logical_type();

if value_ty != quantity_ty {
value = Arc::new(DataValue::clone(&value).cast(&quantity_ty)?)
value = DataValue::clone(&value).cast(&quantity_ty)?
}
let evaluator = EvaluatorFactory::binary_create(quantity_ty, BinaryOperator::Divide)?;
Ok(Arc::new(evaluator.0.binary_eval(&value, &quantity)))
Ok(evaluator.0.binary_eval(&value, &quantity))
}
}
19 changes: 8 additions & 11 deletions src/execution/dql/aggregate/count.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::errors::DatabaseError;
use crate::execution::dql::aggregate::Accumulator;
use crate::types::value::{DataValue, ValueRef};
use crate::types::value::DataValue;
use ahash::RandomState;
use std::collections::HashSet;
use std::sync::Arc;

pub struct CountAccumulator {
result: i32,
Expand All @@ -16,21 +15,21 @@ impl CountAccumulator {
}

impl Accumulator for CountAccumulator {
fn update_value(&mut self, value: &ValueRef) -> Result<(), DatabaseError> {
fn update_value(&mut self, value: &DataValue) -> Result<(), DatabaseError> {
if !value.is_null() {
self.result += 1;
}

Ok(())
}

fn evaluate(&self) -> Result<ValueRef, DatabaseError> {
Ok(Arc::new(DataValue::Int32(Some(self.result))))
fn evaluate(&self) -> Result<DataValue, DatabaseError> {
Ok(DataValue::Int32(Some(self.result)))
}
}

pub struct DistinctCountAccumulator {
distinct_values: HashSet<ValueRef, RandomState>,
distinct_values: HashSet<DataValue, RandomState>,
}

impl DistinctCountAccumulator {
Expand All @@ -42,17 +41,15 @@ impl DistinctCountAccumulator {
}

impl Accumulator for DistinctCountAccumulator {
fn update_value(&mut self, value: &ValueRef) -> Result<(), DatabaseError> {
fn update_value(&mut self, value: &DataValue) -> Result<(), DatabaseError> {
if !value.is_null() {
self.distinct_values.insert(value.clone());
}

Ok(())
}

fn evaluate(&self) -> Result<ValueRef, DatabaseError> {
Ok(Arc::new(DataValue::Int32(Some(
self.distinct_values.len() as i32
))))
fn evaluate(&self) -> Result<DataValue, DatabaseError> {
Ok(DataValue::Int32(Some(self.distinct_values.len() as i32)))
}
}
Loading

0 comments on commit 36aa42f

Please sign in to comment.