Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add DataType::Char and add LEN to Describe #174

Merged
merged 8 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ let fnck_sql = DataBaseBuilder::path("./data")
- UInteger
- Bigint
- UBigint
- Char
- Varchar
- DDL
- Begin (Server only)
Expand Down Expand Up @@ -177,6 +178,7 @@ let fnck_sql = DataBaseBuilder::path("./data")
- UBigint
- Float
- Double
- Char
- Varchar
- Date
- DateTime
Expand Down
11 changes: 9 additions & 2 deletions src/execution/volcano/ddl/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@ impl<T: Transaction> WriteExecutor<T> for AddColumn {

impl AddColumn {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
async fn _execute<T: Transaction>(self, transaction: &mut T) {
async fn _execute<T: Transaction>(mut self, transaction: &mut T) {
let AddColumnOperator {
table_name,
column,
if_not_exists,
} = &self.op;
let mut unique_values = column.desc().is_unique.then(Vec::new);
let mut tuples = Vec::new();
let schema = self.input.output_schema();
let mut types = Vec::with_capacity(schema.len() + 1);

for column_ref in schema.iter() {
types.push(*column_ref.datatype());
}
types.push(*column.datatype());

#[for_await]
for tuple in build_read(self.input, transaction) {
Expand All @@ -54,7 +61,7 @@ impl AddColumn {
tuples.push(tuple);
}
for tuple in tuples {
transaction.append(table_name, tuple, true)?;
transaction.append(table_name, tuple, &types, true)?;
}
let col_id = transaction.add_column(table_name, column, *if_not_exists)?;

Expand Down
9 changes: 8 additions & 1 deletion src/execution/volcano/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ impl DropColumn {
))?;
}
let mut tuples = Vec::new();
let mut types = Vec::with_capacity(tuple_columns.len() - 1);

for (i, column_ref) in tuple_columns.iter().enumerate() {
if i == column_index {
continue;
}
types.push(*column_ref.datatype());
}
#[for_await]
for tuple in build_read(self.input, transaction) {
let mut tuple: Tuple = tuple?;
Expand All @@ -55,7 +62,7 @@ impl DropColumn {
tuples.push(tuple);
}
for tuple in tuples {
transaction.append(&table_name, tuple, true)?;
transaction.append(&table_name, tuple, &types, true)?;
}
transaction.drop_column(&table_name, &column_name)?;

Expand Down
5 changes: 3 additions & 2 deletions src/execution/volcano/dml/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::errors::DatabaseError;
use crate::execution::volcano::{BoxedExecutor, WriteExecutor};
use crate::planner::operator::copy_from_file::CopyFromFileOperator;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use crate::types::tuple::{types, Tuple};
use crate::types::tuple_builder::TupleBuilder;
use futures_async_stream::try_stream;
use std::fs::File;
Expand All @@ -30,6 +30,7 @@ impl<T: Transaction> WriteExecutor<T> for CopyFromFile {
impl CopyFromFile {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
pub async fn _execute<T: Transaction>(self, transaction: &mut T) {
let types = types(&self.op.schema_ref);
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx1, mut rx1) = tokio::sync::mpsc::channel(1);
// # Cancellation
Expand All @@ -39,7 +40,7 @@ impl CopyFromFile {
let handle = tokio::task::spawn_blocking(|| self.read_file_blocking(tx));
let mut size = 0_usize;
while let Some(chunk) = rx.recv().await {
transaction.append(&table_name, chunk, false)?;
transaction.append(&table_name, chunk, &types, false)?;
size += 1;
}
handle.await??;
Expand Down
3 changes: 2 additions & 1 deletion src/execution/volcano/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Insert {
.ok_or_else(|| DatabaseError::NotNull)?;

if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() {
let types = table_catalog.types();
#[for_await]
for tuple in build_read(input, transaction) {
let Tuple { values, .. } = tuple?;
Expand Down Expand Up @@ -99,7 +100,7 @@ impl Insert {
}
}
for tuple in tuples {
transaction.append(&table_name, tuple, is_overwrite)?;
transaction.append(&table_name, tuple, &types, is_overwrite)?;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/execution/volcano/dml/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::planner::operator::update::UpdateOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::index::Index;
use crate::types::tuple::types;
use crate::types::tuple::Tuple;
use futures_async_stream::try_stream;
use std::collections::HashMap;
Expand Down Expand Up @@ -44,6 +45,7 @@ impl Update {
} = self;
let values_schema = values.output_schema().clone();
let input_schema = input.output_schema().clone();
let types = types(&input_schema);

if let Some(table_catalog) = transaction.table(table_name.clone()).cloned() {
let mut value_map = HashMap::new();
Expand Down Expand Up @@ -94,7 +96,7 @@ impl Update {
transaction.add_index(&table_name, index, tuple.id.as_ref().unwrap())?;
}

transaction.append(&table_name, tuple, is_overwrite)?;
transaction.append(&table_name, tuple, &types, is_overwrite)?;
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/execution/volcano/dql/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ impl Describe {
};

for column in table.columns() {
let datatype = column.datatype();
let values = vec![
Arc::new(DataValue::Utf8(Some(column.name().to_string()))),
Arc::new(DataValue::Utf8(Some(column.datatype().to_string()))),
Arc::new(DataValue::Utf8(Some(datatype.to_string()))),
Arc::new(DataValue::Utf8(Some(
datatype
.raw_len()
.map(|len| len.to_string())
.unwrap_or_else(|| "DYNAMIC".to_string()),
))),
Arc::new(DataValue::Utf8(Some(column.nullable.to_string()))),
key_fn(column),
column
.default_value()
.unwrap_or_else(|| Arc::new(DataValue::none(column.datatype()))),
.unwrap_or_else(|| Arc::new(DataValue::none(datatype))),
];
yield Tuple { id: None, values };
}
Expand Down
2 changes: 1 addition & 1 deletion src/expression/value_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl DataValue {
_ => return Err(DatabaseError::UnsupportedBinaryOperator(unified_type, *op)),
}
}
LogicalType::Varchar(_) => {
LogicalType::Varchar(_) | LogicalType::Char(_) => {
let left_value = unpack_utf8(self.clone().cast(&unified_type)?);
let right_value = unpack_utf8(right.clone().cast(&unified_type)?);

Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/core/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl Histogram {
) -> Result<bool, DatabaseError> {
let float_value = |value: &DataValue, prefix_len: usize| {
let value = match value.logical_type() {
LogicalType::Varchar(_) => match value {
LogicalType::Varchar(_) | LogicalType::Char(_) => match value {
DataValue::Utf8(value) => value.as_ref().map(|string| {
if prefix_len > string.len() {
return 0.0;
Expand Down
1 change: 1 addition & 0 deletions src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl LogicalPlan {
Operator::Describe(_) => Arc::new(vec![
Arc::new(ColumnCatalog::new_dummy("FIELD".to_string())),
Arc::new(ColumnCatalog::new_dummy("TYPE".to_string())),
Arc::new(ColumnCatalog::new_dummy("LEN".to_string())),
Arc::new(ColumnCatalog::new_dummy("NULL".to_string())),
Arc::new(ColumnCatalog::new_dummy("Key".to_string())),
Arc::new(ColumnCatalog::new_dummy("DEFAULT".to_string())),
Expand Down
5 changes: 4 additions & 1 deletion src/storage/kip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ impl Transaction for KipTransaction {
&mut self,
table_name: &str,
tuple: Tuple,
types: &[LogicalType],
is_overwrite: bool,
) -> Result<(), DatabaseError> {
let (key, value) = TableCodec::encode_tuple(table_name, &tuple)?;
let (key, value) = TableCodec::encode_tuple(table_name, &tuple, types)?;

if !is_overwrite && self.tx.get(&key)?.is_some() {
return Err(DatabaseError::DuplicatePrimaryKey);
Expand Down Expand Up @@ -611,6 +612,7 @@ mod test {
Arc::new(DataValue::Boolean(Some(true))),
],
},
&[LogicalType::Integer, LogicalType::Boolean],
false,
)?;
transaction.append(
Expand All @@ -622,6 +624,7 @@ mod test {
Arc::new(DataValue::Boolean(Some(false))),
],
},
&[LogicalType::Integer, LogicalType::Boolean],
false,
)?;

Expand Down
3 changes: 2 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::storage::table_codec::TableCodec;
use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType};
use crate::types::tuple::{Tuple, TupleId};
use crate::types::value::{DataValue, ValueRef};
use crate::types::ColumnId;
use crate::types::{ColumnId, LogicalType};
use kip_db::kernel::lsm::iterator::Iter as DBIter;
use kip_db::kernel::lsm::mvcc;
use std::collections::{Bound, VecDeque};
Expand Down Expand Up @@ -75,6 +75,7 @@ pub trait Transaction: Sync + Send + 'static {
&mut self,
table_name: &str,
tuple: Tuple,
types: &[LogicalType],
is_overwrite: bool,
) -> Result<(), DatabaseError>;

Expand Down
18 changes: 13 additions & 5 deletions src/storage/table_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,15 @@ impl TableCodec {

/// Key: {TableName}{TUPLE_TAG}{BOUND_MIN_TAG}{RowID}(Sorted)
/// Value: Tuple
pub fn encode_tuple(table_name: &str, tuple: &Tuple) -> Result<(Bytes, Bytes), DatabaseError> {
pub fn encode_tuple(
table_name: &str,
tuple: &Tuple,
types: &[LogicalType],
) -> Result<(Bytes, Bytes), DatabaseError> {
let tuple_id = tuple.id.clone().ok_or(DatabaseError::PrimaryKeyNotFound)?;
let key = Self::encode_tuple_key(table_name, &tuple_id)?;

Ok((Bytes::from(key), Bytes::from(tuple.serialize_to())))
Ok((Bytes::from(key), Bytes::from(tuple.serialize_to(types))))
}

pub fn encode_tuple_key(
Expand Down Expand Up @@ -223,7 +227,7 @@ impl TableCodec {
) -> Result<(Bytes, Bytes), DatabaseError> {
let key = TableCodec::encode_index_key(name, index, Some(tuple_id))?;

Ok((Bytes::from(key), Bytes::from(tuple_id.to_raw())))
Ok((Bytes::from(key), Bytes::from(tuple_id.to_raw(None))))
}

fn _encode_index_key(name: &str, index: &Index) -> Result<Vec<u8>, DatabaseError> {
Expand Down Expand Up @@ -263,7 +267,7 @@ impl TableCodec {

if let Some(tuple_id) = tuple_id {
if matches!(index.ty, IndexType::Normal | IndexType::Composite) {
key_prefix.append(&mut tuple_id.to_raw());
key_prefix.append(&mut tuple_id.to_raw(None));
}
}
Ok(key_prefix)
Expand Down Expand Up @@ -381,7 +385,11 @@ mod tests {
Arc::new(DataValue::Decimal(Some(Decimal::new(1, 0)))),
],
};
let (_, bytes) = TableCodec::encode_tuple(&table_catalog.name, &tuple)?;
let (_, bytes) = TableCodec::encode_tuple(
&table_catalog.name,
&tuple,
&[LogicalType::Integer, LogicalType::Decimal(None, None)],
)?;
let schema = table_catalog.schema_ref();

assert_eq!(
Expand Down
49 changes: 42 additions & 7 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use chrono::{NaiveDate, NaiveDateTime};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::any::TypeId;
use std::cmp;

use crate::errors::DatabaseError;
use sqlparser::ast::ExactNumberInfo;
use sqlparser::ast::{CharLengthUnits, CharacterLength, ExactNumberInfo};
use strum_macros::AsRefStr;

pub type ColumnId = u32;
Expand All @@ -33,6 +34,7 @@ pub enum LogicalType {
UBigint,
Float,
Double,
Char(u32),
Varchar(Option<u32>),
Date,
DateTime,
Expand Down Expand Up @@ -80,7 +82,6 @@ impl LogicalType {

pub fn raw_len(&self) -> Option<usize> {
match self {
LogicalType::Invalid => Some(0),
LogicalType::SqlNull => Some(0),
LogicalType::Boolean => Some(1),
LogicalType::Tinyint => Some(1),
Expand All @@ -95,10 +96,11 @@ impl LogicalType {
LogicalType::Double => Some(8),
/// Note: The non-fixed length type's raw_len is None e.g. Varchar
LogicalType::Varchar(_) => None,
LogicalType::Char(len) => Some(*len as usize),
LogicalType::Decimal(_, _) => Some(16),
LogicalType::Date => Some(4),
LogicalType::DateTime => Some(8),
LogicalType::Tuple => unreachable!(),
LogicalType::Invalid | LogicalType::Tuple => unreachable!(),
}
}

Expand Down Expand Up @@ -287,9 +289,16 @@ impl LogicalType {
LogicalType::UBigint => matches!(to, LogicalType::Float | LogicalType::Double),
LogicalType::Float => matches!(to, LogicalType::Double),
LogicalType::Double => false,
LogicalType::Char(_) => false,
LogicalType::Varchar(_) => false,
LogicalType::Date => matches!(to, LogicalType::DateTime | LogicalType::Varchar(_)),
LogicalType::DateTime => matches!(to, LogicalType::Date | LogicalType::Varchar(_)),
LogicalType::Date => matches!(
to,
LogicalType::DateTime | LogicalType::Varchar(_) | LogicalType::Char(_)
),
LogicalType::DateTime => matches!(
to,
LogicalType::Date | LogicalType::Varchar(_) | LogicalType::Char(_)
),
LogicalType::Decimal(_, _) | LogicalType::Tuple => false,
}
}
Expand All @@ -301,8 +310,34 @@ impl TryFrom<sqlparser::ast::DataType> for LogicalType {

fn try_from(value: sqlparser::ast::DataType) -> Result<Self, Self::Error> {
match value {
sqlparser::ast::DataType::Char(len) | sqlparser::ast::DataType::Varchar(len) => {
Ok(LogicalType::Varchar(len.map(|len| len.length as u32)))
sqlparser::ast::DataType::Char(char_len)
| sqlparser::ast::DataType::Character(char_len) => {
let mut len = 1;
if let Some(CharacterLength { length, unit }) = char_len {
if matches!(unit, Some(CharLengthUnits::Octets)) {
return Err(DatabaseError::UnsupportedStmt(format!(
"char unit: {:?}",
unit
)));
}
len = cmp::max(len, length)
}
Ok(LogicalType::Char(len as u32))
}
sqlparser::ast::DataType::CharVarying(varchar_len)
| sqlparser::ast::DataType::CharacterVarying(varchar_len)
| sqlparser::ast::DataType::Varchar(varchar_len) => {
let mut len = None;
if let Some(CharacterLength { length, unit }) = varchar_len {
if matches!(unit, Some(CharLengthUnits::Octets)) {
return Err(DatabaseError::UnsupportedStmt(format!(
"char unit: {:?}",
unit
)));
}
len = Some(length as u32)
}
Ok(LogicalType::Varchar(len))
}
sqlparser::ast::DataType::Float(_) => Ok(LogicalType::Float),
sqlparser::ast::DataType::Double | sqlparser::ast::DataType::DoublePrecision => {
Expand Down
Loading
Loading