diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 7a0457ab..752c8103 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -23,9 +23,9 @@ pub struct TableCatalog { schema_ref: SchemaRef, } +//TODO: can add some like Table description and other information as attributes #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct TableMeta { - pub(crate) colum_meta_paths: Vec, pub(crate) table_name: TableName, } @@ -174,10 +174,7 @@ impl TableCatalog { impl TableMeta { pub(crate) fn empty(table_name: TableName) -> Self { - TableMeta { - colum_meta_paths: vec![], - table_name, - } + TableMeta { table_name } } } diff --git a/src/execution/volcano/dml/analyze.rs b/src/execution/volcano/dml/analyze.rs index 1561fdd2..ef721164 100644 --- a/src/execution/volcano/dml/analyze.rs +++ b/src/execution/volcano/dml/analyze.rs @@ -1,4 +1,4 @@ -use crate::catalog::{TableMeta, TableName}; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::execution::volcano::dql::projection::Projection; use crate::execution::volcano::{build_read, BoxedExecutor, WriteExecutor}; @@ -88,6 +88,7 @@ impl Analyze { } } } + let mut values = Vec::with_capacity(builders.len()); let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("It's the end of the world!") @@ -99,23 +100,15 @@ impl Analyze { .join(ts.to_string()); fs::create_dir_all(&dir_path)?; - let mut meta = TableMeta::empty(table_name.clone()); - for (index_id, _, builder) in builders { - let path = dir_path.join(index_id.to_string()); + let path: String = dir_path.join(index_id.to_string()).to_string_lossy().into(); let (histogram, sketch) = builder.build(DEFAULT_NUM_OF_BUCKETS)?; + let meta = StatisticsMeta::new(histogram, sketch); - StatisticsMeta::new(histogram, sketch).to_file(&path)?; - meta.colum_meta_paths.push(path.to_string_lossy().into()); + meta.to_file(&path)?; + values.push(Arc::new(DataValue::Utf8(Some(path.clone())))); + transaction.save_table_meta(&table_name, path, meta)?; } - transaction.save_table_meta(&meta)?; - - let values = meta - .colum_meta_paths - .into_iter() - .map(|path| Arc::new(DataValue::Utf8(Some(path)))) - .collect_vec(); - yield Tuple { id: None, values }; } } diff --git a/src/execution/volcano/dql/show_table.rs b/src/execution/volcano/dql/show_table.rs index 49ff0b6a..0f6d9c37 100644 --- a/src/execution/volcano/dql/show_table.rs +++ b/src/execution/volcano/dql/show_table.rs @@ -20,15 +20,8 @@ impl ShowTables { pub async fn _execute(self, transaction: &T) { let metas = transaction.table_metas()?; - for TableMeta { - table_name, - colum_meta_paths: histogram_paths, - } in metas - { - let values = vec![ - Arc::new(DataValue::Utf8(Some(table_name.to_string()))), - Arc::new(DataValue::UInt32(Some(histogram_paths.len() as u32))), - ]; + for TableMeta { table_name } in metas { + let values = vec![Arc::new(DataValue::Utf8(Some(table_name.to_string())))]; yield Tuple { id: None, values }; } diff --git a/src/optimizer/core/statistics_meta.rs b/src/optimizer/core/statistics_meta.rs index 5ef85f1c..fdfe6e7d 100644 --- a/src/optimizer/core/statistics_meta.rs +++ b/src/optimizer/core/statistics_meta.rs @@ -3,10 +3,10 @@ use crate::errors::DatabaseError; use crate::expression::range_detacher::Range; use crate::optimizer::core::cm_sketch::CountMinSketch; use crate::optimizer::core::histogram::Histogram; +use crate::storage::kip::StatisticsMetaCache; use crate::storage::Transaction; use crate::types::index::IndexId; use crate::types::value::DataValue; -use kip_db::kernel::utils::lru_cache::ShardingLruCache; use serde::{Deserialize, Serialize}; use std::fs::OpenOptions; use std::io::Write; @@ -14,34 +14,34 @@ use std::path::Path; use std::slice; pub struct StatisticMetaLoader<'a, T: Transaction> { - cache: &'a ShardingLruCache>, + cache: &'a StatisticsMetaCache, tx: &'a T, } impl<'a, T: Transaction> StatisticMetaLoader<'a, T> { - pub fn new( - tx: &'a T, - cache: &'a ShardingLruCache>, - ) -> StatisticMetaLoader<'a, T> { + pub fn new(tx: &'a T, cache: &'a StatisticsMetaCache) -> StatisticMetaLoader<'a, T> { StatisticMetaLoader { cache, tx } } - pub fn load(&self, table_name: TableName) -> Result<&Vec, DatabaseError> { - let option = self.cache.get(&table_name); + pub fn load( + &self, + table_name: &TableName, + index_id: IndexId, + ) -> Result, DatabaseError> { + let key = (table_name.clone(), index_id); + let option = self.cache.get(&key); - if let Some(statistics_metas) = option { - Ok(statistics_metas) - } else { - let paths = self.tx.statistics_meta_paths(&table_name)?; - let mut statistics_metas = Vec::with_capacity(paths.len()); - - for path in paths { - statistics_metas.push(StatisticsMeta::from_file(path)?); - } + if let Some(statistics_meta) = option { + return Ok(Some(statistics_meta)); + } + if let Some(path) = self.tx.table_meta_path(table_name.as_str(), index_id)? { + let statistics_meta = StatisticsMeta::from_file(path)?; - Ok(self - .cache - .get_or_insert(table_name, |_| Ok(statistics_metas))?) + Ok(Some( + self.cache.get_or_insert(key, |_| Ok(statistics_meta))?, + )) + } else { + Ok(None) } } } diff --git a/src/optimizer/rule/implementation/dql/scan.rs b/src/optimizer/rule/implementation/dql/scan.rs index d4b088d4..6fcd0c26 100644 --- a/src/optimizer/rule/implementation/dql/scan.rs +++ b/src/optimizer/rule/implementation/dql/scan.rs @@ -2,10 +2,10 @@ use crate::errors::DatabaseError; use crate::optimizer::core::memo::{Expression, GroupExpression}; use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate}; use crate::optimizer::core::rule::{ImplementationRule, MatchPattern}; -use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta}; +use crate::optimizer::core::statistics_meta::StatisticMetaLoader; use crate::planner::operator::{Operator, PhysicalOption}; use crate::storage::Transaction; -use crate::types::index::{IndexId, IndexType}; +use crate::types::index::IndexType; use lazy_static::lazy_static; lazy_static! { @@ -34,14 +34,18 @@ impl ImplementationRule for SeqScanImplementation { group_expr: &mut GroupExpression, ) -> Result<(), DatabaseError> { if let Operator::Scan(scan_op) = op { - let statistics_metas = loader.load(scan_op.table_name.clone())?; - let mut cost = None; + let cost = scan_op + .index_infos + .iter() + .find(|index_info| { + let column_ids = &index_info.meta.column_ids; - if let Some(statistics_meta) = - find_statistics_meta(statistics_metas, &scan_op.primary_key) - { - cost = Some(statistics_meta.histogram().values_len()); - } + column_ids.len() == 1 && column_ids[0] == scan_op.primary_key + }) + .map(|index_info| loader.load(&scan_op.table_name, index_info.meta.id)) + .transpose()? + .flatten() + .map(|statistics_meta| statistics_meta.histogram().values_len()); group_expr.append_expr(Expression { op: PhysicalOption::SeqScan, @@ -71,7 +75,6 @@ impl ImplementationRule for IndexScanImplementation { group_expr: &mut GroupExpression, ) -> Result<(), DatabaseError> { if let Operator::Scan(scan_op) = op { - let statistics_metas = loader.load(scan_op.table_name.clone())?; for index_info in scan_op.index_infos.iter() { if index_info.range.is_none() { continue; @@ -79,9 +82,8 @@ impl ImplementationRule for IndexScanImplementation { let mut cost = None; if let Some(range) = &index_info.range { - // FIXME: Only UniqueIndex if let Some(statistics_meta) = - find_statistics_meta(statistics_metas, &index_info.meta.id) + loader.load(&scan_op.table_name, index_info.meta.id)? { let mut row_count = statistics_meta.collect_count(range)?; @@ -105,14 +107,3 @@ impl ImplementationRule for IndexScanImplementation { } } } - -fn find_statistics_meta<'a>( - statistics_metas: &'a [StatisticsMeta], - index_id: &IndexId, -) -> Option<&'a StatisticsMeta> { - assert!(statistics_metas.is_sorted_by_key(StatisticsMeta::index_id)); - statistics_metas - .binary_search_by(|statistics_meta| statistics_meta.index_id().cmp(index_id)) - .ok() - .map(|i| &statistics_metas[i]) -} diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 2622c26d..e90bf804 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -95,10 +95,9 @@ impl LogicalPlan { .. }) => schema_ref.clone(), Operator::Dummy => Arc::new(vec![]), - Operator::Show => Arc::new(vec![ - Arc::new(ColumnCatalog::new_dummy("TABLE".to_string())), - Arc::new(ColumnCatalog::new_dummy("STATISTICS_METAS_LEN".to_string())), - ]), + Operator::Show => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy( + "TABLE".to_string(), + ))]), Operator::Explain => { Arc::new(vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))]) } diff --git a/src/storage/kip.rs b/src/storage/kip.rs index 24a9065f..38a7c213 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -22,7 +22,7 @@ use std::sync::Arc; #[derive(Clone)] pub struct KipStorage { pub inner: Arc, - pub(crate) meta_cache: Arc>>, + pub(crate) meta_cache: Arc, } impl KipStorage { @@ -53,10 +53,12 @@ impl Storage for KipStorage { } } +pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), StatisticsMeta>; + pub struct KipTransaction { tx: mvcc::Transaction, table_cache: ShardingLruCache, - meta_cache: Arc>>, + meta_cache: Arc, } impl Transaction for KipTransaction { @@ -388,26 +390,33 @@ impl Transaction for KipTransaction { Ok(metas) } - fn save_table_meta(&mut self, table_meta: &TableMeta) -> Result<(), DatabaseError> { + fn save_table_meta( + &mut self, + table_name: &TableName, + path: String, + statistics_meta: StatisticsMeta, + ) -> Result<(), DatabaseError> { // TODO: clean old meta file - let _ = self.meta_cache.remove(&table_meta.table_name); - let (key, value) = TableCodec::encode_root_table(table_meta)?; + let index_id = statistics_meta.index_id(); + let _ = self + .meta_cache + .put((table_name.clone(), index_id), statistics_meta); + let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path); self.tx.set(key, value); Ok(()) } - fn statistics_meta_paths(&self, table_name: &str) -> Result, DatabaseError> { - if let Some(bytes) = self - .tx - .get(&TableCodec::encode_root_table_key(table_name))? - { - let meta = TableCodec::decode_root_table(&bytes)?; - - return Ok(meta.colum_meta_paths); - } - - Ok(vec![]) + fn table_meta_path( + &self, + table_name: &str, + index_id: IndexId, + ) -> Result, DatabaseError> { + let key = TableCodec::encode_statistics_path_key(table_name, index_id); + self.tx + .get(&key)? + .map(|bytes| TableCodec::decode_statistics_path(&bytes)) + .transpose() } fn meta_loader(&self) -> StatisticMetaLoader diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 4977c88a..3ba5d891 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,7 +4,7 @@ mod table_codec; use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName}; use crate::errors::DatabaseError; use crate::expression::range_detacher::Range; -use crate::optimizer::core::statistics_meta::StatisticMetaLoader; +use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta}; use crate::storage::table_codec::TableCodec; use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType}; use crate::types::tuple::{Tuple, TupleId}; @@ -100,8 +100,17 @@ pub trait Transaction: Sync + Send + 'static { fn drop_data(&mut self, table_name: &str) -> Result<(), DatabaseError>; fn table(&self, table_name: TableName) -> Option<&TableCatalog>; fn table_metas(&self) -> Result, DatabaseError>; - fn save_table_meta(&mut self, table_meta: &TableMeta) -> Result<(), DatabaseError>; - fn statistics_meta_paths(&self, table_name: &str) -> Result, DatabaseError>; + fn save_table_meta( + &mut self, + table_name: &TableName, + path: String, + statistics_meta: StatisticsMeta, + ) -> Result<(), DatabaseError>; + fn table_meta_path( + &self, + table_name: &str, + index_id: IndexId, + ) -> Result, DatabaseError>; fn meta_loader(&self) -> StatisticMetaLoader where Self: Sized; diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index ab12dd2d..977ecc29 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -5,6 +5,7 @@ use crate::types::tuple::{Schema, Tuple, TupleId}; use crate::types::value::DataValue; use crate::types::LogicalType; use bytes::Bytes; +use integer_encoding::FixedInt; use lazy_static::lazy_static; use std::sync::Arc; @@ -23,6 +24,7 @@ enum CodecType { Column, IndexMeta, Index, + Statistics, Tuple, Root, } @@ -44,6 +46,9 @@ impl TableCodec { CodecType::Index => { table_bytes.push(b'3'); } + CodecType::Statistics => { + table_bytes.push(b'4'); + } CodecType::Tuple => { table_bytes.push(b'8'); } @@ -52,7 +57,7 @@ impl TableCodec { bytes.push(BOUND_MIN_TAG); bytes.append(&mut table_bytes); - table_bytes = bytes + return bytes; } } @@ -291,8 +296,32 @@ impl TableCodec { Ok(bincode::deserialize::(bytes)?) } + /// Key: {TableName}{STATISTICS_TAG}{BOUND_MIN_TAG}{INDEX_ID} + /// Value: StatisticsMeta Path + pub fn encode_statistics_path( + table_name: &str, + index_id: IndexId, + path: String, + ) -> (Bytes, Bytes) { + let key = Self::encode_statistics_path_key(table_name, index_id); + + (Bytes::from(key), Bytes::from(path)) + } + + pub fn encode_statistics_path_key(table_name: &str, index_id: IndexId) -> Vec { + let mut key_prefix = Self::key_prefix(CodecType::Statistics, table_name); + + key_prefix.push(BOUND_MIN_TAG); + key_prefix.extend_from_slice(index_id.encode_fixed_light()); + key_prefix + } + + pub fn decode_statistics_path(bytes: &[u8]) -> Result { + Ok(String::from_utf8(bytes.to_vec())?) + } + /// Key: Root{BOUND_MIN_TAG}{TableName} - /// Value: TableName + /// Value: TableMeta pub fn encode_root_table(meta: &TableMeta) -> Result<(Bytes, Bytes), DatabaseError> { let key = Self::encode_root_table_key(&meta.table_name); @@ -367,7 +396,6 @@ mod tests { fn test_root_catalog() { let table_catalog = build_table_codec(); let (_, bytes) = TableCodec::encode_root_table(&TableMeta { - colum_meta_paths: vec![], table_name: table_catalog.name.clone(), }) .unwrap(); @@ -375,7 +403,15 @@ mod tests { let table_meta = TableCodec::decode_root_table(&bytes).unwrap(); assert_eq!(table_meta.table_name.as_str(), table_catalog.name.as_str()); - assert!(table_meta.colum_meta_paths.is_empty()); + } + + #[test] + fn test_table_codec_statistics_meta_path() { + let path = String::from("./lol"); + let (_, bytes) = TableCodec::encode_statistics_path("t1", 0, path.clone()); + let decode_path = TableCodec::decode_statistics_path(&bytes).unwrap(); + + assert_eq!(path, decode_path); } #[test]