diff --git a/Cargo.lock b/Cargo.lock index cb9240a..71f0b0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + [[package]] name = "encode_unicode" version = "0.3.6" @@ -150,6 +156,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -162,6 +177,7 @@ version = "0.1.0" dependencies = [ "anyhow", "dialoguer", + "itertools", "prettytable-rs", "signal-hook", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index e555919..3465b3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,4 +12,5 @@ prettytable-rs = "0.10.0" signal-hook = "0.3.17" [dev-dependencies] +itertools = "0.12.0" tempfile = "3.9.0" diff --git a/src/page.rs b/src/page.rs index cbbccaa..1717516 100644 --- a/src/page.rs +++ b/src/page.rs @@ -2,6 +2,8 @@ use crate::common::PageID; use self::table_page::{TablePage, TABLE_PAGE_PAGE_TYPE}; +pub mod b_plus_tree_internal_page; +pub mod b_plus_tree_leaf_page; pub mod table_page; const PAGE_TYPE_OFFSET: usize = 0; diff --git a/src/page/b_plus_tree_internal_page.rs b/src/page/b_plus_tree_internal_page.rs new file mode 100644 index 0000000..a03558f --- /dev/null +++ b/src/page/b_plus_tree_internal_page.rs @@ -0,0 +1,67 @@ +use crate::common::{PageID, LSN, PAGE_SIZE}; + +use super::{PageType, PAGE_ID_OFFSET, PAGE_ID_SIZE, PAGE_TYPE_OFFSET, PAGE_TYPE_SIZE}; + +pub const B_PLUS_TREE_INTERNAL_PAGE_PAGE_TYPE: PageType = PageType(3); + +const LSN_OFFSET: usize = PAGE_ID_OFFSET + PAGE_ID_SIZE; +const LSN_SIZE: usize = 8; +const PARENT_PAGE_ID_OFFSET: usize = LSN_OFFSET + LSN_SIZE; +const PARENT_PAGE_ID_SIZE: usize = 4; +const LOWER_OFFSET_OFFSET: usize = PARENT_PAGE_ID_OFFSET + PARENT_PAGE_ID_SIZE; +const LOWER_OFFSET_SIZE: usize = 4; +const UPPER_OFFSET_OFFSET: usize = LOWER_OFFSET_OFFSET + LOWER_OFFSET_SIZE; +const UPPER_OFFSET_SIZE: usize = 4; +const HEADER_SIZE: usize = PAGE_TYPE_SIZE + + PAGE_ID_SIZE + + LSN_SIZE + + PARENT_PAGE_ID_SIZE + + LOWER_OFFSET_SIZE + + UPPER_OFFSET_SIZE; +// const LINE_POINTER_OFFSET_SIZE: usize = 4; +// const LINE_POINTER_SIZE_SIZE: usize = 4; +// const LINE_POINTER_SIZE: usize = LINE_POINTER_OFFSET_SIZE + LINE_POINTER_SIZE_SIZE; + +pub struct BPlusTreeInternalPage { + pub data: Box<[u8]>, +} + +impl BPlusTreeInternalPage { + pub fn new(page_id: PageID, parent_page_id: PageID) -> Self { + let mut data = vec![0u8; PAGE_SIZE]; + data[PAGE_TYPE_OFFSET..(PAGE_TYPE_OFFSET + PAGE_TYPE_SIZE)] + .copy_from_slice(&B_PLUS_TREE_INTERNAL_PAGE_PAGE_TYPE.0.to_le_bytes()); + data[PAGE_ID_OFFSET..(PAGE_ID_OFFSET + PAGE_ID_SIZE)] + .copy_from_slice(&page_id.0.to_le_bytes()); + data[PARENT_PAGE_ID_OFFSET..(PARENT_PAGE_ID_OFFSET + PARENT_PAGE_ID_SIZE)] + .copy_from_slice(&parent_page_id.0.to_le_bytes()); + data[LOWER_OFFSET_OFFSET..(LOWER_OFFSET_OFFSET + LOWER_OFFSET_SIZE)] + .copy_from_slice(&(HEADER_SIZE as u32).to_le_bytes()); + data[UPPER_OFFSET_OFFSET..(UPPER_OFFSET_OFFSET + UPPER_OFFSET_SIZE)] + .copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes()); + BPlusTreeInternalPage { data: data.into() } + } + pub fn from_data(data: &[u8]) -> Self { + BPlusTreeInternalPage { data: data.into() } + } + + pub fn lsn(&self) -> LSN { + let mut buf = [0u8; 8]; + buf.copy_from_slice(&self.data[LSN_OFFSET..(LSN_OFFSET + LSN_SIZE)]); + LSN(u64::from_le_bytes(buf)) + } + pub fn set_lsn(&mut self, lsn: LSN) { + self.data[LSN_OFFSET..(LSN_OFFSET + LSN_SIZE)].copy_from_slice(&lsn.0.to_le_bytes()); + } + pub fn parent_page_id(&self) -> PageID { + let mut buf = [0u8; 4]; + buf.copy_from_slice( + &self.data[PARENT_PAGE_ID_OFFSET..(PARENT_PAGE_ID_OFFSET + PARENT_PAGE_ID_SIZE)], + ); + PageID(u32::from_le_bytes(buf)) + } + pub fn set_parent_page_id(&mut self, parent_page_id: PageID) { + self.data[PARENT_PAGE_ID_OFFSET..(PARENT_PAGE_ID_OFFSET + PARENT_PAGE_ID_SIZE)] + .copy_from_slice(&parent_page_id.0.to_le_bytes()); + } +} diff --git a/src/page/b_plus_tree_leaf_page.rs b/src/page/b_plus_tree_leaf_page.rs new file mode 100644 index 0000000..385e3b9 --- /dev/null +++ b/src/page/b_plus_tree_leaf_page.rs @@ -0,0 +1,390 @@ +use crate::{ + catalog::Schema, + common::{PageID, INVALID_PAGE_ID, LSN, PAGE_SIZE, RID}, + tuple::Tuple, + value::Value, +}; + +use super::{PageType, PAGE_ID_OFFSET, PAGE_ID_SIZE, PAGE_TYPE_OFFSET, PAGE_TYPE_SIZE}; + +pub const B_PLUS_TREE_LEAF_PAGE_PAGE_TYPE: PageType = PageType(2); + +const LSN_OFFSET: usize = PAGE_ID_OFFSET + PAGE_ID_SIZE; +const LSN_SIZE: usize = 8; +const PARENT_PAGE_ID_OFFSET: usize = LSN_OFFSET + LSN_SIZE; +const PARENT_PAGE_ID_SIZE: usize = 4; +const PREV_PAGE_ID_OFFSET: usize = PARENT_PAGE_ID_OFFSET + PARENT_PAGE_ID_SIZE; +const PREV_PAGE_ID_SIZE: usize = 4; +const NEXT_PAGE_ID_OFFSET: usize = PREV_PAGE_ID_OFFSET + PREV_PAGE_ID_SIZE; +const NEXT_PAGE_ID_SIZE: usize = 4; +const LOWER_OFFSET_OFFSET: usize = NEXT_PAGE_ID_OFFSET + NEXT_PAGE_ID_SIZE; +const LOWER_OFFSET_SIZE: usize = 4; +const UPPER_OFFSET_OFFSET: usize = LOWER_OFFSET_OFFSET + LOWER_OFFSET_SIZE; +const UPPER_OFFSET_SIZE: usize = 4; +const HEADER_SIZE: usize = PAGE_TYPE_SIZE + + PAGE_ID_SIZE + + LSN_SIZE + + PARENT_PAGE_ID_SIZE + + PREV_PAGE_ID_SIZE + + NEXT_PAGE_ID_SIZE + + LOWER_OFFSET_SIZE + + UPPER_OFFSET_SIZE; +const LINE_POINTER_OFFSET_SIZE: usize = 4; +const LINE_POINTER_SIZE_SIZE: usize = 4; +const LINE_POINTER_SIZE: usize = LINE_POINTER_OFFSET_SIZE + LINE_POINTER_SIZE_SIZE; + +// RID +const VALUE_SIZE: usize = 8; + +pub struct BPlusTreeLeafPage { + pub data: Box<[u8]>, +} + +impl BPlusTreeLeafPage { + pub fn new(page_id: PageID, parent_page_id: PageID, prev_page_id: Option) -> Self { + let mut data = vec![0u8; PAGE_SIZE]; + data[PAGE_TYPE_OFFSET..(PAGE_TYPE_OFFSET + PAGE_TYPE_SIZE)] + .copy_from_slice(&B_PLUS_TREE_LEAF_PAGE_PAGE_TYPE.0.to_le_bytes()); + data[PAGE_ID_OFFSET..(PAGE_ID_OFFSET + PAGE_ID_SIZE)] + .copy_from_slice(&page_id.0.to_le_bytes()); + data[PARENT_PAGE_ID_OFFSET..(PARENT_PAGE_ID_OFFSET + PARENT_PAGE_ID_SIZE)] + .copy_from_slice(&parent_page_id.0.to_le_bytes()); + if let Some(prev_page_id) = prev_page_id { + data[PREV_PAGE_ID_OFFSET..(PREV_PAGE_ID_OFFSET + PREV_PAGE_ID_SIZE)] + .copy_from_slice(&prev_page_id.0.to_le_bytes()); + } else { + data[PREV_PAGE_ID_OFFSET..(PREV_PAGE_ID_OFFSET + PREV_PAGE_ID_SIZE)] + .copy_from_slice(&INVALID_PAGE_ID.0.to_le_bytes()); + } + data[NEXT_PAGE_ID_OFFSET..(NEXT_PAGE_ID_OFFSET + NEXT_PAGE_ID_SIZE)] + .copy_from_slice(&INVALID_PAGE_ID.0.to_le_bytes()); + data[LOWER_OFFSET_OFFSET..(LOWER_OFFSET_OFFSET + LOWER_OFFSET_SIZE)] + .copy_from_slice(&(HEADER_SIZE as u32).to_le_bytes()); + data[UPPER_OFFSET_OFFSET..(UPPER_OFFSET_OFFSET + UPPER_OFFSET_SIZE)] + .copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes()); + BPlusTreeLeafPage { data: data.into() } + } + pub fn from_data(data: &[u8]) -> Self { + BPlusTreeLeafPage { data: data.into() } + } + + pub fn lsn(&self) -> LSN { + let mut buf = [0u8; 8]; + buf.copy_from_slice(&self.data[LSN_OFFSET..(LSN_OFFSET + LSN_SIZE)]); + LSN(u64::from_le_bytes(buf)) + } + pub fn set_lsn(&mut self, lsn: LSN) { + self.data[LSN_OFFSET..(LSN_OFFSET + LSN_SIZE)].copy_from_slice(&lsn.0.to_le_bytes()); + } + pub fn parent_page_id(&self) -> PageID { + let mut buf = [0u8; 4]; + buf.copy_from_slice( + &self.data[PARENT_PAGE_ID_OFFSET..(PARENT_PAGE_ID_OFFSET + PARENT_PAGE_ID_SIZE)], + ); + PageID(u32::from_le_bytes(buf)) + } + pub fn set_parent_page_id(&mut self, parent_page_id: PageID) { + self.data[PARENT_PAGE_ID_OFFSET..(PARENT_PAGE_ID_OFFSET + PARENT_PAGE_ID_SIZE)] + .copy_from_slice(&parent_page_id.0.to_le_bytes()); + } + pub fn prev_page_id(&self) -> PageID { + let mut buf = [0u8; 4]; + buf.copy_from_slice( + &self.data[PREV_PAGE_ID_OFFSET..(PREV_PAGE_ID_OFFSET + PREV_PAGE_ID_SIZE)], + ); + PageID(u32::from_le_bytes(buf)) + } + pub fn set_prev_page_id(&mut self, prev_page_id: PageID) { + self.data[PREV_PAGE_ID_OFFSET..(PREV_PAGE_ID_OFFSET + PREV_PAGE_ID_SIZE)] + .copy_from_slice(&prev_page_id.0.to_le_bytes()); + } + pub fn next_page_id(&self) -> PageID { + let mut buf = [0u8; 4]; + buf.copy_from_slice( + &self.data[NEXT_PAGE_ID_OFFSET..(NEXT_PAGE_ID_OFFSET + NEXT_PAGE_ID_SIZE)], + ); + PageID(u32::from_le_bytes(buf)) + } + pub fn set_next_page_id(&mut self, next_page_id: PageID) { + self.data[NEXT_PAGE_ID_OFFSET..(NEXT_PAGE_ID_OFFSET + NEXT_PAGE_ID_SIZE)] + .copy_from_slice(&next_page_id.0.to_le_bytes()); + } + pub fn lower_offset(&self) -> u32 { + let mut buf = [0u8; 4]; + buf.copy_from_slice( + &self.data[LOWER_OFFSET_OFFSET..(LOWER_OFFSET_OFFSET + LOWER_OFFSET_SIZE)], + ); + u32::from_le_bytes(buf) + } + pub fn set_lower_offset(&mut self, lower_offset: u32) { + self.data[LOWER_OFFSET_OFFSET..(LOWER_OFFSET_OFFSET + LOWER_OFFSET_SIZE)] + .copy_from_slice(&lower_offset.to_le_bytes()); + } + pub fn upper_offset(&self) -> u32 { + let mut buf = [0u8; 4]; + buf.copy_from_slice( + &self.data[UPPER_OFFSET_OFFSET..(UPPER_OFFSET_OFFSET + UPPER_OFFSET_SIZE)], + ); + u32::from_le_bytes(buf) + } + pub fn set_upper_offset(&mut self, upper_offset: u32) { + self.data[UPPER_OFFSET_OFFSET..(UPPER_OFFSET_OFFSET + UPPER_OFFSET_SIZE)] + .copy_from_slice(&upper_offset.to_le_bytes()); + } + pub fn line_pointer_offset(&self, index: usize) -> u32 { + let offset = HEADER_SIZE + index * LINE_POINTER_SIZE; + let mut bytes = [0u8; 4]; + bytes.copy_from_slice(&self.data[offset..(offset + LINE_POINTER_OFFSET_SIZE)]); + u32::from_le_bytes(bytes) + } + pub fn line_pointer_size(&self, index: usize) -> u32 { + let offset = HEADER_SIZE + index * LINE_POINTER_SIZE + LINE_POINTER_OFFSET_SIZE; + let mut bytes = [0u8; 4]; + bytes.copy_from_slice(&self.data[offset..(offset + LINE_POINTER_SIZE_SIZE)]); + u32::from_le_bytes(bytes) + } + pub fn num_line_pointers(&self) -> u32 { + let lower_offset = self.lower_offset(); + (lower_offset - HEADER_SIZE as u32) / LINE_POINTER_SIZE as u32 + } + + pub fn key_at(&self, index: usize, schema: &Schema) -> Vec { + let key_offset = self.line_pointer_offset(index) as usize + VALUE_SIZE; + let key_size = self.line_pointer_size(index) as usize - VALUE_SIZE; + let bytes = &self.data[key_offset..(key_offset + key_size)]; + Tuple::new(None, bytes).values(schema) + } + pub fn value_at(&self, index: usize) -> RID { + let value_offset = self.line_pointer_offset(index) as usize; + let page_id = PageID(u32::from_le_bytes([ + self.data[value_offset], + self.data[value_offset + 1], + self.data[value_offset + 2], + self.data[value_offset + 3], + ])); + let tuple_index = u32::from_le_bytes([ + self.data[value_offset + 4], + self.data[value_offset + 5], + self.data[value_offset + 6], + self.data[value_offset + 7], + ]); + RID(page_id, tuple_index) + } + // lower_bound + pub fn key_index(&self, key: &[Value], schema: &Schema) -> usize { + let mut ng = -1; + let mut ok = self.num_line_pointers() as i32; + while (ok - ng).abs() > 1 { + let mid = (ok + ng) / 2; + let mid_key = self.key_at(mid as usize, schema); + let order = Value::compare_values(&mid_key, key).unwrap(); + if order.is_gt() || order.is_eq() { + ok = mid; + } else { + ng = mid; + } + } + ok as usize + } + // upper_bound + pub fn key_index_upper(&self, key: &[Value], schema: &Schema) -> usize { + let mut ng = -1; + let mut ok = self.num_line_pointers() as i32; + while (ok - ng).abs() > 1 { + let mid = (ok + ng) / 2; + let mid_key = self.key_at(mid as usize, schema); + let order = Value::compare_values(&mid_key, key).unwrap(); + if order.is_gt() { + ok = mid; + } else { + ng = mid; + } + } + ok as usize + } + pub fn lookup(&self, key: &[Value], schema: &Schema) -> Option> { + let index = self.key_index(key, schema); + if index >= self.num_line_pointers() as usize { + return None; + } + let mut rids = vec![]; + for i in index..self.num_line_pointers() as usize { + let key_at = self.key_at(i, schema); + if key + .iter() + .zip(key_at.iter()) + .any(|(k, k_at)| k.perform_not_equal(k_at).unwrap().is_true()) + { + break; + } + rids.push(self.value_at(i)); + } + if rids.is_empty() { + return None; + } + Some(rids) + } + pub fn insert(&mut self, key: &[Value], value: RID, schema: &Schema) { + let index = self.key_index_upper(key, schema); + + let key_bytes = Tuple::temp_tuple(key).data; + let value_bytes = [value.0 .0.to_le_bytes(), value.1.to_le_bytes()] + .concat() + .into_boxed_slice(); + let new_entry = [value_bytes, key_bytes].concat(); + + let mut entries = vec![]; + for i in 0..self.num_line_pointers() { + let offset = self.line_pointer_offset(i as usize) as usize; + let size = self.line_pointer_size(i as usize) as usize; + let entry_bytes = self.data[offset..(offset + size)].to_vec(); + entries.push(entry_bytes); + } + entries.insert(index, new_entry); + + let mut current_offset = PAGE_SIZE; + for (i, entry) in entries.iter().enumerate() { + let size = entry.len() as u32; + let offset_bytes = ((current_offset as u32) - size).to_le_bytes(); + let size_bytes = size.to_le_bytes(); + self.data[(HEADER_SIZE + i * LINE_POINTER_SIZE) + ..(HEADER_SIZE + i * LINE_POINTER_SIZE + LINE_POINTER_OFFSET_SIZE)] + .copy_from_slice(&offset_bytes); + self.data[(HEADER_SIZE + i * LINE_POINTER_SIZE + LINE_POINTER_OFFSET_SIZE) + ..(HEADER_SIZE + i * LINE_POINTER_SIZE + LINE_POINTER_SIZE)] + .copy_from_slice(&size_bytes); + self.data[current_offset - size as usize..current_offset].copy_from_slice(entry); + current_offset -= size as usize; + } + self.set_upper_offset(current_offset as u32); + self.set_lower_offset(HEADER_SIZE as u32 + entries.len() as u32 * LINE_POINTER_SIZE as u32); + } +} + +#[cfg(test)] +mod tests { + use crate::{ + catalog::{Column, DataType}, + value::{integer::IntegerValue, varchar::VarcharValue}, + }; + + use super::*; + use anyhow::Result; + use itertools::Itertools; + + #[test] + fn test_insert_and_lookup() -> Result<()> { + let schema = Schema { + columns: vec![ + Column { + name: "id".to_string(), + data_type: DataType::Integer, + }, + Column { + name: "name".to_string(), + data_type: DataType::Varchar, + }, + ], + }; + let values_list = vec![ + ( + vec![ + Value::Integer(IntegerValue(1)), + Value::Varchar(VarcharValue("bar".to_string())), + ], + 0, + ), + ( + vec![ + Value::Integer(IntegerValue(1)), + Value::Varchar(VarcharValue("bar".to_string())), + ], + 1, + ), + ( + vec![ + Value::Integer(IntegerValue(1)), + Value::Varchar(VarcharValue("foo".to_string())), + ], + 2, + ), + ( + vec![ + Value::Integer(IntegerValue(2)), + Value::Varchar(VarcharValue("foo".to_string())), + ], + 3, + ), + ( + vec![ + Value::Integer(IntegerValue(3)), + Value::Varchar(VarcharValue("foo".to_string())), + ], + 4, + ), + ]; + + // tests all permutations of the values + for indexes in (0..5).permutations(5) { + let mut page = BPlusTreeLeafPage::new(PageID(0), INVALID_PAGE_ID, None); + for i in indexes { + page.insert(&values_list[i].0, RID(PageID(1), values_list[i].1), &schema); + } + + let values = page.lookup( + &[ + Value::Integer(IntegerValue(1)), + Value::Varchar(VarcharValue("bar".to_string())), + ], + &schema, + ); + let mut expected = vec![RID(PageID(1), 0), RID(PageID(1), 1)]; + let mut actual = values.unwrap(); + expected.sort(); + actual.sort(); + assert_eq!(expected, actual); + + let values = page.lookup( + &[ + Value::Integer(IntegerValue(1)), + Value::Varchar(VarcharValue("foo".to_string())), + ], + &schema, + ); + assert_eq!(values, Some(vec![RID(PageID(1), 2),])); + let values = page.lookup( + &[ + Value::Integer(IntegerValue(2)), + Value::Varchar(VarcharValue("foo".to_string())), + ], + &schema, + ); + assert_eq!(values, Some(vec![RID(PageID(1), 3),])); + let values = page.lookup( + &[ + Value::Integer(IntegerValue(3)), + Value::Varchar(VarcharValue("foo".to_string())), + ], + &schema, + ); + assert_eq!(values, Some(vec![RID(PageID(1), 4),])); + let values = page.lookup( + &[ + Value::Integer(IntegerValue(4)), + Value::Varchar(VarcharValue("foo".to_string())), + ], + &schema, + ); + assert_eq!(values, None); + let values = page.lookup( + &[ + Value::Integer(IntegerValue(1)), + Value::Varchar(VarcharValue("baz".to_string())), + ], + &schema, + ); + assert_eq!(values, None); + } + Ok(()) + } +} diff --git a/src/value.rs b/src/value.rs index 7c2b0ad..8d25e58 100644 --- a/src/value.rs +++ b/src/value.rs @@ -1,4 +1,7 @@ -use std::fmt::{Display, Formatter}; +use std::{ + cmp::Ordering, + fmt::{Display, Formatter}, +}; use anyhow::{anyhow, Result}; @@ -137,56 +140,76 @@ impl Value { if self.is_null_value() || other.is_null_value() { return Ok(Value::Null); } - match self.convert_to(&DataType::Integer)? { - Value::Integer(value) => match other.convert_to(&DataType::Integer)? { - Value::Integer(other_value) => { - Ok(Value::Boolean(value.perform_less_than(&other_value))) - } + match (self, other) { + (Value::Varchar(value), Value::Varchar(other_value)) => { + Ok(Value::Boolean(value.perform_less_than(other_value))) + } + _ => match self.convert_to(&DataType::Integer)? { + Value::Integer(value) => match other.convert_to(&DataType::Integer)? { + Value::Integer(other_value) => { + Ok(Value::Boolean(value.perform_less_than(&other_value))) + } + _ => unreachable!(), + }, _ => unreachable!(), }, - _ => unreachable!(), } } pub fn perform_less_than_or_equal(&self, other: &Value) -> Result { if self.is_null_value() || other.is_null_value() { return Ok(Value::Null); } - match self.convert_to(&DataType::Integer)? { - Value::Integer(value) => match other.convert_to(&DataType::Integer)? { - Value::Integer(other_value) => Ok(Value::Boolean( - value.perform_less_than_or_equal(&other_value), - )), + match (self, other) { + (Value::Varchar(value), Value::Varchar(other_value)) => Ok(Value::Boolean( + value.perform_less_than_or_equal(other_value), + )), + _ => match self.convert_to(&DataType::Integer)? { + Value::Integer(value) => match other.convert_to(&DataType::Integer)? { + Value::Integer(other_value) => Ok(Value::Boolean( + value.perform_less_than_or_equal(&other_value), + )), + _ => unreachable!(), + }, _ => unreachable!(), }, - _ => unreachable!(), } } pub fn perform_greater_than(&self, other: &Value) -> Result { if self.is_null_value() || other.is_null_value() { return Ok(Value::Null); } - match self.convert_to(&DataType::Integer)? { - Value::Integer(value) => match other.convert_to(&DataType::Integer)? { - Value::Integer(other_value) => { - Ok(Value::Boolean(value.perform_greater_than(&other_value))) - } + match (self, other) { + (Value::Varchar(value), Value::Varchar(other_value)) => { + Ok(Value::Boolean(value.perform_greater_than(other_value))) + } + _ => match self.convert_to(&DataType::Integer)? { + Value::Integer(value) => match other.convert_to(&DataType::Integer)? { + Value::Integer(other_value) => { + Ok(Value::Boolean(value.perform_greater_than(&other_value))) + } + _ => unreachable!(), + }, _ => unreachable!(), }, - _ => unreachable!(), } } pub fn perform_greater_than_or_equal(&self, other: &Value) -> Result { if self.is_null_value() || other.is_null_value() { return Ok(Value::Null); } - match self.convert_to(&DataType::Integer)? { - Value::Integer(value) => match other.convert_to(&DataType::Integer)? { - Value::Integer(other_value) => Ok(Value::Boolean( - value.perform_greater_than_or_equal(&other_value), - )), + match (self, other) { + (Value::Varchar(value), Value::Varchar(other_value)) => Ok(Value::Boolean( + value.perform_greater_than_or_equal(other_value), + )), + _ => match self.convert_to(&DataType::Integer)? { + Value::Integer(value) => match other.convert_to(&DataType::Integer)? { + Value::Integer(other_value) => Ok(Value::Boolean( + value.perform_greater_than_or_equal(&other_value), + )), + _ => unreachable!(), + }, _ => unreachable!(), }, - _ => unreachable!(), } } @@ -287,6 +310,40 @@ impl Value { pub fn is_null_value(&self) -> bool { matches!(self, Value::Null) } + pub fn is_true(&self) -> bool { + match self { + Value::Boolean(value) => value.0, + _ => unreachable!(), + } + } + pub fn is_false(&self) -> bool { + match self { + Value::Boolean(value) => !value.0, + _ => unreachable!(), + } + } + pub fn compare_values(a: &[Value], b: &[Value]) -> Result { + for (a_item, b_item) in a.iter().zip(b.iter()) { + if a_item.is_null_value() || b_item.is_null_value() { + if a_item.is_null_value() && b_item.is_null_value() { + continue; + } + if a_item.is_null_value() { + return Ok(Ordering::Less); + } + if b_item.is_null_value() { + return Ok(Ordering::Greater); + } + } + if a_item.perform_greater_than(b_item)?.is_true() { + return Ok(Ordering::Greater); + } + if a_item.perform_less_than(b_item)?.is_true() { + return Ok(Ordering::Less); + } + } + Ok(Ordering::Equal) + } } #[cfg(test)] diff --git a/src/value/varchar.rs b/src/value/varchar.rs index 4cabc26..915700b 100644 --- a/src/value/varchar.rs +++ b/src/value/varchar.rs @@ -49,4 +49,16 @@ impl VarcharValue { pub fn perform_not_equal(&self, other: &VarcharValue) -> BooleanValue { BooleanValue(self.0 != other.0) } + pub fn perform_less_than(&self, other: &VarcharValue) -> BooleanValue { + BooleanValue(self.0 < other.0) + } + pub fn perform_less_than_or_equal(&self, other: &VarcharValue) -> BooleanValue { + BooleanValue(self.0 <= other.0) + } + pub fn perform_greater_than(&self, other: &VarcharValue) -> BooleanValue { + BooleanValue(self.0 > other.0) + } + pub fn perform_greater_than_or_equal(&self, other: &VarcharValue) -> BooleanValue { + BooleanValue(self.0 >= other.0) + } }