From 6cd3fea40e5fe7a2c9e1428765f1e88f47365150 Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 25 Sep 2024 22:07:30 +0800 Subject: [PATCH 1/3] fix: expired statistics meta file clean --- src/execution/ddl/drop_column.rs | 2 +- src/execution/dml/analyze.rs | 130 ++++++++++++++++++++++++++++--- src/storage/mod.rs | 23 +++++- src/storage/table_codec.rs | 11 +++ 4 files changed, 152 insertions(+), 14 deletions(-) diff --git a/src/execution/ddl/drop_column.rs b/src/execution/ddl/drop_column.rs index c6e47ae7..216493f3 100644 --- a/src/execution/ddl/drop_column.rs +++ b/src/execution/ddl/drop_column.rs @@ -69,7 +69,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn { for tuple in tuples { throw!(transaction.append(&table_name, tuple, &types, true)); } - throw!(transaction.drop_column(cache.0, &table_name, &column_name)); + throw!(transaction.drop_column(cache.0, cache.1, &table_name, &column_name)); yield Ok(TupleBuilder::build_result("1".to_string())); } else if if_exists { diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index c8a4e2b6..8b84253a 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -18,8 +18,8 @@ use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; use std::{fmt, fs}; +use std::collections::HashSet; const DEFAULT_NUM_OF_BUCKETS: usize = 100; const DEFAULT_STATISTICS_META_PATH: &str = "fnck_sql_statistics_metas"; @@ -95,30 +95,51 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { } drop(coroutine); let mut values = Vec::with_capacity(builders.len()); - let ts = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("It's the end of the world!") - .as_secs(); let dir_path = dirs::config_dir() .expect("Your system does not have a Config directory!") .join(DEFAULT_STATISTICS_META_PATH) - .join(table_name.as_str()) - .join(ts.to_string()); + .join(table_name.as_str()); + // For DEBUG + // println!("Statistics Path: {:#?}", dir_path); throw!(fs::create_dir_all(&dir_path).map_err(DatabaseError::IO)); + let mut active_index_paths = HashSet::new(); + for (index_id, _, builder) in builders { - let path: String = dir_path.join(index_id.to_string()).to_string_lossy().into(); + let path = dir_path.join(index_id.to_string()); + let temp_path = path.with_extension("tmp"); + let path_str: String = path + .to_string_lossy() + .into(); let (histogram, sketch) = throw!(builder.build(DEFAULT_NUM_OF_BUCKETS)); let meta = StatisticsMeta::new(histogram, sketch); - throw!(meta.to_file(&path)); + throw!(meta.to_file(&temp_path)); values.push(Arc::new(DataValue::Utf8 { - value: Some(path.clone()), + value: Some(path_str.clone()), ty: Utf8Type::Variable(None), unit: CharLengthUnits::Characters, })); - throw!(transaction.save_table_meta(cache.1, &table_name, path, meta)); + throw!(transaction.save_table_meta(cache.1, &table_name, path_str, meta)); + throw!(fs::rename(&temp_path, &path).map_err(DatabaseError::IO)); + + if let Some(file_name) = path.file_name() { + active_index_paths.insert(file_name.to_os_string()); + } + } + + // clean expired index + for entry in throw!(fs::read_dir(dir_path).map_err(DatabaseError::IO)) { + let entry = throw!(entry.map_err(DatabaseError::IO)); + let path = entry.path(); + + if let Some(file_name) = path.file_name() { + if !active_index_paths.remove(&file_name.to_os_string()) { + throw!(fs::remove_file(&path).map_err(DatabaseError::IO)); + } + } } + yield Ok(Tuple { id: None, values }); }, ) @@ -134,3 +155,90 @@ impl fmt::Display for AnalyzeOperator { Ok(()) } } + +#[cfg(test)] +mod test { + use std::ffi::OsStr; + use std::fs; + use tempfile::TempDir; + use crate::db::DataBaseBuilder; + use crate::errors::DatabaseError; + use crate::execution::dml::analyze::{DEFAULT_NUM_OF_BUCKETS, DEFAULT_STATISTICS_META_PATH}; + use crate::optimizer::core::statistics_meta::StatisticsMeta; + + #[test] + fn test_statistics_meta() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?; + + let _ = fnck_sql.run("create table t1 (a int primary key, b int)")?; + let _ = fnck_sql.run("create index b_index on t1 (b)")?; + let _ = fnck_sql.run("create index p_index on t1 (a, b)")?; + + for i in 0..DEFAULT_NUM_OF_BUCKETS + 1 { + let _ = fnck_sql.run(format!("insert into t1 values({i}, {})", i % 20))?; + } + let _ = fnck_sql.run("analyze table t1")?; + + let dir_path = dirs::config_dir() + .expect("Your system does not have a Config directory!") + .join(DEFAULT_STATISTICS_META_PATH) + .join("t1"); + + let mut paths = fs::read_dir(&dir_path)?; + + let statistics_meta_pk_index = StatisticsMeta::from_file(paths.next().unwrap()?.path())?; + + assert_eq!(statistics_meta_pk_index.index_id(), 0); + assert_eq!(statistics_meta_pk_index.histogram().values_len(), 101); + + let statistics_meta_b_index = StatisticsMeta::from_file(paths.next().unwrap()?.path())?; + + assert_eq!(statistics_meta_b_index.index_id(), 1); + assert_eq!(statistics_meta_b_index.histogram().values_len(), 101); + + let statistics_meta_p_index = StatisticsMeta::from_file(paths.next().unwrap()?.path())?; + + assert_eq!(statistics_meta_p_index.index_id(), 2); + assert_eq!(statistics_meta_p_index.histogram().values_len(), 101); + + Ok(()) + } + + #[test] + fn test_clean_expired_index() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?; + + let _ = fnck_sql.run("create table t1 (a int primary key, b int)")?; + let _ = fnck_sql.run("create index b_index on t1 (b)")?; + let _ = fnck_sql.run("create index p_index on t1 (a, b)")?; + + for i in 0..DEFAULT_NUM_OF_BUCKETS + 1 { + let _ = fnck_sql.run(format!("insert into t1 values({i}, {i})"))?; + } + let _ = fnck_sql.run("analyze table t1")?; + + let dir_path = dirs::config_dir() + .expect("Your system does not have a Config directory!") + .join(DEFAULT_STATISTICS_META_PATH) + .join("t1"); + + let mut paths = fs::read_dir(&dir_path)?; + + assert_eq!(paths.next().unwrap()?.file_name(), OsStr::new("0")); + assert_eq!(paths.next().unwrap()?.file_name(), OsStr::new("1")); + assert_eq!(paths.next().unwrap()?.file_name(), OsStr::new("2")); + assert!(paths.next().is_none()); + + let _ = fnck_sql.run("alter table t1 drop column b")?; + let _ = fnck_sql.run("analyze table t1")?; + + let mut paths = fs::read_dir(&dir_path)?; + + assert_eq!(paths.next().unwrap()?.file_name(), OsStr::new("0")); + assert!(paths.next().is_none()); + + Ok(()) + } +} \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c3ba7fba..097ab10e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -252,6 +252,7 @@ pub trait Transaction: Sized { fn drop_column( &mut self, table_cache: &TableCache, + meta_cache: &StatisticsMetaCache, table_name: &TableName, column_name: &str, ) -> Result<(), DatabaseError> { @@ -270,6 +271,8 @@ pub trait Transaction: Sized { let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id); self._drop_data(&index_min, &index_max)?; + + self.remove_table_meta(meta_cache, table_name, index_meta.id)?; } table_cache.remove(table_name); @@ -332,6 +335,9 @@ pub trait Transaction: Sized { let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name.as_str()); self._drop_data(&index_meta_min, &index_meta_max)?; + let (statistics_min, statistics_max) = TableCodec::statistics_bound(table_name.as_str()); + self._drop_data(&statistics_min, &statistics_max)?; + self.remove(&TableCodec::encode_root_table_key(table_name.as_str()))?; table_cache.remove(&table_name); @@ -383,11 +389,10 @@ pub trait Transaction: Sized { path: String, statistics_meta: StatisticsMeta, ) -> Result<(), DatabaseError> { - // TODO: clean old meta file let index_id = statistics_meta.index_id(); meta_cache.put((table_name.clone(), index_id), statistics_meta); - let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path); + let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path); self.set(key, value)?; Ok(()) @@ -404,6 +409,20 @@ pub trait Transaction: Sized { .transpose() } + fn remove_table_meta( + &mut self, + meta_cache: &StatisticsMetaCache, + table_name: &TableName, + index_id: IndexId, + ) -> Result<(), DatabaseError> { + let key = TableCodec::encode_statistics_path_key(table_name, index_id); + self.remove(&key)?; + + meta_cache.remove(&(table_name.clone(), index_id)); + + Ok(()) + } + fn meta_loader<'a>(&'a self, meta_cache: &'a StatisticsMetaCache) -> StatisticMetaLoader where Self: Sized, diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index 8ab429e7..1554abdd 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -161,6 +161,17 @@ impl TableCodec { (op(BOUND_MIN_TAG), op(BOUND_MAX_TAG)) } + pub fn statistics_bound(table_name: &str) -> (Vec, Vec) { + let op = |bound_id| { + let mut key_prefix = Self::key_prefix(CodecType::Statistics, table_name); + + key_prefix.push(bound_id); + key_prefix + }; + + (op(BOUND_MIN_TAG), op(BOUND_MAX_TAG)) + } + /// Key: {TableName}{TUPLE_TAG}{BOUND_MIN_TAG}{RowID}(Sorted) /// Value: Tuple pub fn encode_tuple( From 37c5656ef22e578b64b2ec31f862ba9a50ef9eb3 Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 25 Sep 2024 22:10:11 +0800 Subject: [PATCH 2/3] chore: codefmt --- src/execution/dml/analyze.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index 8b84253a..051666da 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -13,13 +13,13 @@ use crate::types::tuple::Tuple; use crate::types::value::{DataValue, Utf8Type}; use itertools::Itertools; use sqlparser::ast::CharLengthUnits; +use std::collections::HashSet; use std::fmt::Formatter; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; use std::sync::Arc; use std::{fmt, fs}; -use std::collections::HashSet; const DEFAULT_NUM_OF_BUCKETS: usize = 100; const DEFAULT_STATISTICS_META_PATH: &str = "fnck_sql_statistics_metas"; @@ -108,9 +108,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { for (index_id, _, builder) in builders { let path = dir_path.join(index_id.to_string()); let temp_path = path.with_extension("tmp"); - let path_str: String = path - .to_string_lossy() - .into(); + let path_str: String = path.to_string_lossy().into(); let (histogram, sketch) = throw!(builder.build(DEFAULT_NUM_OF_BUCKETS)); let meta = StatisticsMeta::new(histogram, sketch); @@ -158,13 +156,13 @@ impl fmt::Display for AnalyzeOperator { #[cfg(test)] mod test { - use std::ffi::OsStr; - use std::fs; - use tempfile::TempDir; use crate::db::DataBaseBuilder; use crate::errors::DatabaseError; use crate::execution::dml::analyze::{DEFAULT_NUM_OF_BUCKETS, DEFAULT_STATISTICS_META_PATH}; use crate::optimizer::core::statistics_meta::StatisticsMeta; + use std::ffi::OsStr; + use std::fs; + use tempfile::TempDir; #[test] fn test_statistics_meta() -> Result<(), DatabaseError> { @@ -241,4 +239,4 @@ mod test { Ok(()) } -} \ No newline at end of file +} From d92608edb258f86a64a4f0c97d2ac89a32a76030 Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 25 Sep 2024 23:18:42 +0800 Subject: [PATCH 3/3] chore: sort paths to ensure normal unit test under different systems --- src/execution/dml/analyze.rs | 61 ++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index 051666da..c2b1c7b4 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -14,7 +14,9 @@ use crate::types::value::{DataValue, Utf8Type}; use itertools::Itertools; use sqlparser::ast::CharLengthUnits; use std::collections::HashSet; +use std::ffi::OsStr; use std::fmt::Formatter; +use std::fs::DirEntry; use std::ops::Coroutine; use std::ops::CoroutineState; use std::pin::Pin; @@ -106,9 +108,11 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { let mut active_index_paths = HashSet::new(); for (index_id, _, builder) in builders { - let path = dir_path.join(index_id.to_string()); + let index_file = OsStr::new(&index_id.to_string()).to_os_string(); + let path = dir_path.join(&index_file); let temp_path = path.with_extension("tmp"); let path_str: String = path.to_string_lossy().into(); + let (histogram, sketch) = throw!(builder.build(DEFAULT_NUM_OF_BUCKETS)); let meta = StatisticsMeta::new(histogram, sketch); @@ -121,20 +125,15 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { throw!(transaction.save_table_meta(cache.1, &table_name, path_str, meta)); throw!(fs::rename(&temp_path, &path).map_err(DatabaseError::IO)); - if let Some(file_name) = path.file_name() { - active_index_paths.insert(file_name.to_os_string()); - } + active_index_paths.insert(index_file); } // clean expired index for entry in throw!(fs::read_dir(dir_path).map_err(DatabaseError::IO)) { - let entry = throw!(entry.map_err(DatabaseError::IO)); - let path = entry.path(); + let entry: DirEntry = throw!(entry.map_err(DatabaseError::IO)); - if let Some(file_name) = path.file_name() { - if !active_index_paths.remove(&file_name.to_os_string()) { - throw!(fs::remove_file(&path).map_err(DatabaseError::IO)); - } + if !active_index_paths.remove(&entry.file_name()) { + throw!(fs::remove_file(&entry.path()).map_err(DatabaseError::IO)); } } @@ -165,6 +164,13 @@ mod test { use tempfile::TempDir; #[test] + fn test_analyze() -> Result<(), DatabaseError> { + test_statistics_meta()?; + test_clean_expired_index()?; + + Ok(()) + } + fn test_statistics_meta() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?; @@ -183,19 +189,24 @@ mod test { .join(DEFAULT_STATISTICS_META_PATH) .join("t1"); - let mut paths = fs::read_dir(&dir_path)?; + let mut paths = Vec::new(); + + for entry in fs::read_dir(&dir_path)? { + paths.push(entry?.path()); + } + paths.sort(); - let statistics_meta_pk_index = StatisticsMeta::from_file(paths.next().unwrap()?.path())?; + let statistics_meta_pk_index = StatisticsMeta::from_file(&paths[0])?; assert_eq!(statistics_meta_pk_index.index_id(), 0); assert_eq!(statistics_meta_pk_index.histogram().values_len(), 101); - let statistics_meta_b_index = StatisticsMeta::from_file(paths.next().unwrap()?.path())?; + let statistics_meta_b_index = StatisticsMeta::from_file(&paths[1])?; assert_eq!(statistics_meta_b_index.index_id(), 1); assert_eq!(statistics_meta_b_index.histogram().values_len(), 101); - let statistics_meta_p_index = StatisticsMeta::from_file(paths.next().unwrap()?.path())?; + let statistics_meta_p_index = StatisticsMeta::from_file(&paths[2])?; assert_eq!(statistics_meta_p_index.index_id(), 2); assert_eq!(statistics_meta_p_index.histogram().values_len(), 101); @@ -203,7 +214,6 @@ mod test { Ok(()) } - #[test] fn test_clean_expired_index() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?; @@ -222,20 +232,25 @@ mod test { .join(DEFAULT_STATISTICS_META_PATH) .join("t1"); - let mut paths = fs::read_dir(&dir_path)?; + let mut file_names = Vec::new(); + + for entry in fs::read_dir(&dir_path)? { + file_names.push(entry?.file_name()); + } + file_names.sort(); - assert_eq!(paths.next().unwrap()?.file_name(), OsStr::new("0")); - assert_eq!(paths.next().unwrap()?.file_name(), OsStr::new("1")); - assert_eq!(paths.next().unwrap()?.file_name(), OsStr::new("2")); - assert!(paths.next().is_none()); + assert_eq!(file_names.len(), 3); + assert_eq!(file_names[0], OsStr::new("0")); + assert_eq!(file_names[1], OsStr::new("1")); + assert_eq!(file_names[2], OsStr::new("2")); let _ = fnck_sql.run("alter table t1 drop column b")?; let _ = fnck_sql.run("analyze table t1")?; - let mut paths = fs::read_dir(&dir_path)?; + let mut entries = fs::read_dir(&dir_path)?; - assert_eq!(paths.next().unwrap()?.file_name(), OsStr::new("0")); - assert!(paths.next().is_none()); + assert_eq!(entries.next().unwrap()?.file_name(), OsStr::new("0")); + assert!(entries.next().is_none()); Ok(()) }