Skip to content

Commit

Permalink
fix: expired statistics meta file clean (#225)
Browse files Browse the repository at this point in the history
* fix: expired statistics meta file clean

* chore: codefmt

* chore: sort paths to ensure normal unit test under different systems
  • Loading branch information
KKould authored Sep 25, 2024
1 parent 12a8e98 commit fc1cda4
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/execution/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
for tuple in tuples {
throw!(transaction.append(&table_name, tuple, &types, true));
}
throw!(transaction.drop_column(cache.0, &table_name, &column_name));
throw!(transaction.drop_column(cache.0, cache.1, &table_name, &column_name));

yield Ok(TupleBuilder::build_result("1".to_string()));
} else if if_exists {
Expand Down
143 changes: 132 additions & 11 deletions src/execution/dml/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use crate::types::tuple::Tuple;
use crate::types::value::{DataValue, Utf8Type};
use itertools::Itertools;
use sqlparser::ast::CharLengthUnits;
use std::collections::HashSet;
use std::ffi::OsStr;
use std::fmt::Formatter;
use std::fs::DirEntry;
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{fmt, fs};

const DEFAULT_NUM_OF_BUCKETS: usize = 100;
Expand Down Expand Up @@ -95,30 +97,46 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze {
}
drop(coroutine);
let mut values = Vec::with_capacity(builders.len());
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("It's the end of the world!")
.as_secs();
let dir_path = dirs::config_dir()
.expect("Your system does not have a Config directory!")
.join(DEFAULT_STATISTICS_META_PATH)
.join(table_name.as_str())
.join(ts.to_string());
.join(table_name.as_str());
// For DEBUG
// println!("Statistics Path: {:#?}", dir_path);
throw!(fs::create_dir_all(&dir_path).map_err(DatabaseError::IO));

let mut active_index_paths = HashSet::new();

for (index_id, _, builder) in builders {
let path: String = dir_path.join(index_id.to_string()).to_string_lossy().into();
let index_file = OsStr::new(&index_id.to_string()).to_os_string();
let path = dir_path.join(&index_file);
let temp_path = path.with_extension("tmp");
let path_str: String = path.to_string_lossy().into();

let (histogram, sketch) = throw!(builder.build(DEFAULT_NUM_OF_BUCKETS));
let meta = StatisticsMeta::new(histogram, sketch);

throw!(meta.to_file(&path));
throw!(meta.to_file(&temp_path));
values.push(Arc::new(DataValue::Utf8 {
value: Some(path.clone()),
value: Some(path_str.clone()),
ty: Utf8Type::Variable(None),
unit: CharLengthUnits::Characters,
}));
throw!(transaction.save_table_meta(cache.1, &table_name, path, meta));
throw!(transaction.save_table_meta(cache.1, &table_name, path_str, meta));
throw!(fs::rename(&temp_path, &path).map_err(DatabaseError::IO));

active_index_paths.insert(index_file);
}

// clean expired index
for entry in throw!(fs::read_dir(dir_path).map_err(DatabaseError::IO)) {
let entry: DirEntry = throw!(entry.map_err(DatabaseError::IO));

if !active_index_paths.remove(&entry.file_name()) {
throw!(fs::remove_file(&entry.path()).map_err(DatabaseError::IO));
}
}

yield Ok(Tuple { id: None, values });
},
)
Expand All @@ -134,3 +152,106 @@ impl fmt::Display for AnalyzeOperator {
Ok(())
}
}

#[cfg(test)]
mod test {
use crate::db::DataBaseBuilder;
use crate::errors::DatabaseError;
use crate::execution::dml::analyze::{DEFAULT_NUM_OF_BUCKETS, DEFAULT_STATISTICS_META_PATH};
use crate::optimizer::core::statistics_meta::StatisticsMeta;
use std::ffi::OsStr;
use std::fs;
use tempfile::TempDir;

#[test]
fn test_analyze() -> Result<(), DatabaseError> {
test_statistics_meta()?;
test_clean_expired_index()?;

Ok(())
}

fn test_statistics_meta() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?;

let _ = fnck_sql.run("create table t1 (a int primary key, b int)")?;
let _ = fnck_sql.run("create index b_index on t1 (b)")?;
let _ = fnck_sql.run("create index p_index on t1 (a, b)")?;

for i in 0..DEFAULT_NUM_OF_BUCKETS + 1 {
let _ = fnck_sql.run(format!("insert into t1 values({i}, {})", i % 20))?;
}
let _ = fnck_sql.run("analyze table t1")?;

let dir_path = dirs::config_dir()
.expect("Your system does not have a Config directory!")
.join(DEFAULT_STATISTICS_META_PATH)
.join("t1");

let mut paths = Vec::new();

for entry in fs::read_dir(&dir_path)? {
paths.push(entry?.path());
}
paths.sort();

let statistics_meta_pk_index = StatisticsMeta::from_file(&paths[0])?;

assert_eq!(statistics_meta_pk_index.index_id(), 0);
assert_eq!(statistics_meta_pk_index.histogram().values_len(), 101);

let statistics_meta_b_index = StatisticsMeta::from_file(&paths[1])?;

assert_eq!(statistics_meta_b_index.index_id(), 1);
assert_eq!(statistics_meta_b_index.histogram().values_len(), 101);

let statistics_meta_p_index = StatisticsMeta::from_file(&paths[2])?;

assert_eq!(statistics_meta_p_index.index_id(), 2);
assert_eq!(statistics_meta_p_index.histogram().values_len(), 101);

Ok(())
}

fn test_clean_expired_index() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?;

let _ = fnck_sql.run("create table t1 (a int primary key, b int)")?;
let _ = fnck_sql.run("create index b_index on t1 (b)")?;
let _ = fnck_sql.run("create index p_index on t1 (a, b)")?;

for i in 0..DEFAULT_NUM_OF_BUCKETS + 1 {
let _ = fnck_sql.run(format!("insert into t1 values({i}, {i})"))?;
}
let _ = fnck_sql.run("analyze table t1")?;

let dir_path = dirs::config_dir()
.expect("Your system does not have a Config directory!")
.join(DEFAULT_STATISTICS_META_PATH)
.join("t1");

let mut file_names = Vec::new();

for entry in fs::read_dir(&dir_path)? {
file_names.push(entry?.file_name());
}
file_names.sort();

assert_eq!(file_names.len(), 3);
assert_eq!(file_names[0], OsStr::new("0"));
assert_eq!(file_names[1], OsStr::new("1"));
assert_eq!(file_names[2], OsStr::new("2"));

let _ = fnck_sql.run("alter table t1 drop column b")?;
let _ = fnck_sql.run("analyze table t1")?;

let mut entries = fs::read_dir(&dir_path)?;

assert_eq!(entries.next().unwrap()?.file_name(), OsStr::new("0"));
assert!(entries.next().is_none());

Ok(())
}
}
23 changes: 21 additions & 2 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub trait Transaction: Sized {
fn drop_column(
&mut self,
table_cache: &TableCache,
meta_cache: &StatisticsMetaCache,
table_name: &TableName,
column_name: &str,
) -> Result<(), DatabaseError> {
Expand All @@ -270,6 +271,8 @@ pub trait Transaction: Sized {

let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id);
self._drop_data(&index_min, &index_max)?;

self.remove_table_meta(meta_cache, table_name, index_meta.id)?;
}
table_cache.remove(table_name);

Expand Down Expand Up @@ -332,6 +335,9 @@ pub trait Transaction: Sized {
let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name.as_str());
self._drop_data(&index_meta_min, &index_meta_max)?;

let (statistics_min, statistics_max) = TableCodec::statistics_bound(table_name.as_str());
self._drop_data(&statistics_min, &statistics_max)?;

self.remove(&TableCodec::encode_root_table_key(table_name.as_str()))?;
table_cache.remove(&table_name);

Expand Down Expand Up @@ -383,11 +389,10 @@ pub trait Transaction: Sized {
path: String,
statistics_meta: StatisticsMeta,
) -> Result<(), DatabaseError> {
// TODO: clean old meta file
let index_id = statistics_meta.index_id();
meta_cache.put((table_name.clone(), index_id), statistics_meta);
let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path);

let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path);
self.set(key, value)?;

Ok(())
Expand All @@ -404,6 +409,20 @@ pub trait Transaction: Sized {
.transpose()
}

fn remove_table_meta(
&mut self,
meta_cache: &StatisticsMetaCache,
table_name: &TableName,
index_id: IndexId,
) -> Result<(), DatabaseError> {
let key = TableCodec::encode_statistics_path_key(table_name, index_id);
self.remove(&key)?;

meta_cache.remove(&(table_name.clone(), index_id));

Ok(())
}

fn meta_loader<'a>(&'a self, meta_cache: &'a StatisticsMetaCache) -> StatisticMetaLoader<Self>
where
Self: Sized,
Expand Down
11 changes: 11 additions & 0 deletions src/storage/table_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,17 @@ impl TableCodec {
(op(BOUND_MIN_TAG), op(BOUND_MAX_TAG))
}

pub fn statistics_bound(table_name: &str) -> (Vec<u8>, Vec<u8>) {
let op = |bound_id| {
let mut key_prefix = Self::key_prefix(CodecType::Statistics, table_name);

key_prefix.push(bound_id);
key_prefix
};

(op(BOUND_MIN_TAG), op(BOUND_MAX_TAG))
}

/// Key: {TableName}{TUPLE_TAG}{BOUND_MIN_TAG}{RowID}(Sorted)
/// Value: Tuple
pub fn encode_tuple(
Expand Down

0 comments on commit fc1cda4

Please sign in to comment.