Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: expired statistics meta file clean #225

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/execution/ddl/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropColumn {
for tuple in tuples {
throw!(transaction.append(&table_name, tuple, &types, true));
}
throw!(transaction.drop_column(cache.0, &table_name, &column_name));
throw!(transaction.drop_column(cache.0, cache.1, &table_name, &column_name));

yield Ok(TupleBuilder::build_result("1".to_string()));
} else if if_exists {
Expand Down
143 changes: 132 additions & 11 deletions src/execution/dml/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use crate::types::tuple::Tuple;
use crate::types::value::{DataValue, Utf8Type};
use itertools::Itertools;
use sqlparser::ast::CharLengthUnits;
use std::collections::HashSet;
use std::ffi::OsStr;
use std::fmt::Formatter;
use std::fs::DirEntry;
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{fmt, fs};

const DEFAULT_NUM_OF_BUCKETS: usize = 100;
Expand Down Expand Up @@ -95,30 +97,46 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze {
}
drop(coroutine);
let mut values = Vec::with_capacity(builders.len());
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("It's the end of the world!")
.as_secs();
let dir_path = dirs::config_dir()
.expect("Your system does not have a Config directory!")
.join(DEFAULT_STATISTICS_META_PATH)
.join(table_name.as_str())
.join(ts.to_string());
.join(table_name.as_str());
// For DEBUG
// println!("Statistics Path: {:#?}", dir_path);
throw!(fs::create_dir_all(&dir_path).map_err(DatabaseError::IO));

let mut active_index_paths = HashSet::new();

for (index_id, _, builder) in builders {
let path: String = dir_path.join(index_id.to_string()).to_string_lossy().into();
let index_file = OsStr::new(&index_id.to_string()).to_os_string();
let path = dir_path.join(&index_file);
let temp_path = path.with_extension("tmp");
let path_str: String = path.to_string_lossy().into();

let (histogram, sketch) = throw!(builder.build(DEFAULT_NUM_OF_BUCKETS));
let meta = StatisticsMeta::new(histogram, sketch);

throw!(meta.to_file(&path));
throw!(meta.to_file(&temp_path));
values.push(Arc::new(DataValue::Utf8 {
value: Some(path.clone()),
value: Some(path_str.clone()),
ty: Utf8Type::Variable(None),
unit: CharLengthUnits::Characters,
}));
throw!(transaction.save_table_meta(cache.1, &table_name, path, meta));
throw!(transaction.save_table_meta(cache.1, &table_name, path_str, meta));
throw!(fs::rename(&temp_path, &path).map_err(DatabaseError::IO));

active_index_paths.insert(index_file);
}

// clean expired index
for entry in throw!(fs::read_dir(dir_path).map_err(DatabaseError::IO)) {
let entry: DirEntry = throw!(entry.map_err(DatabaseError::IO));

if !active_index_paths.remove(&entry.file_name()) {
throw!(fs::remove_file(&entry.path()).map_err(DatabaseError::IO));
}
}

yield Ok(Tuple { id: None, values });
},
)
Expand All @@ -134,3 +152,106 @@ impl fmt::Display for AnalyzeOperator {
Ok(())
}
}

#[cfg(test)]
mod test {
use crate::db::DataBaseBuilder;
use crate::errors::DatabaseError;
use crate::execution::dml::analyze::{DEFAULT_NUM_OF_BUCKETS, DEFAULT_STATISTICS_META_PATH};
use crate::optimizer::core::statistics_meta::StatisticsMeta;
use std::ffi::OsStr;
use std::fs;
use tempfile::TempDir;

#[test]
fn test_analyze() -> Result<(), DatabaseError> {
test_statistics_meta()?;
test_clean_expired_index()?;

Ok(())
}

fn test_statistics_meta() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?;

let _ = fnck_sql.run("create table t1 (a int primary key, b int)")?;
let _ = fnck_sql.run("create index b_index on t1 (b)")?;
let _ = fnck_sql.run("create index p_index on t1 (a, b)")?;

for i in 0..DEFAULT_NUM_OF_BUCKETS + 1 {
let _ = fnck_sql.run(format!("insert into t1 values({i}, {})", i % 20))?;
}
let _ = fnck_sql.run("analyze table t1")?;

let dir_path = dirs::config_dir()
.expect("Your system does not have a Config directory!")
.join(DEFAULT_STATISTICS_META_PATH)
.join("t1");

let mut paths = Vec::new();

for entry in fs::read_dir(&dir_path)? {
paths.push(entry?.path());
}
paths.sort();

let statistics_meta_pk_index = StatisticsMeta::from_file(&paths[0])?;

assert_eq!(statistics_meta_pk_index.index_id(), 0);
assert_eq!(statistics_meta_pk_index.histogram().values_len(), 101);

let statistics_meta_b_index = StatisticsMeta::from_file(&paths[1])?;

assert_eq!(statistics_meta_b_index.index_id(), 1);
assert_eq!(statistics_meta_b_index.histogram().values_len(), 101);

let statistics_meta_p_index = StatisticsMeta::from_file(&paths[2])?;

assert_eq!(statistics_meta_p_index.index_id(), 2);
assert_eq!(statistics_meta_p_index.histogram().values_len(), 101);

Ok(())
}

fn test_clean_expired_index() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?;

let _ = fnck_sql.run("create table t1 (a int primary key, b int)")?;
let _ = fnck_sql.run("create index b_index on t1 (b)")?;
let _ = fnck_sql.run("create index p_index on t1 (a, b)")?;

for i in 0..DEFAULT_NUM_OF_BUCKETS + 1 {
let _ = fnck_sql.run(format!("insert into t1 values({i}, {i})"))?;
}
let _ = fnck_sql.run("analyze table t1")?;

let dir_path = dirs::config_dir()
.expect("Your system does not have a Config directory!")
.join(DEFAULT_STATISTICS_META_PATH)
.join("t1");

let mut file_names = Vec::new();

for entry in fs::read_dir(&dir_path)? {
file_names.push(entry?.file_name());
}
file_names.sort();

assert_eq!(file_names.len(), 3);
assert_eq!(file_names[0], OsStr::new("0"));
assert_eq!(file_names[1], OsStr::new("1"));
assert_eq!(file_names[2], OsStr::new("2"));

let _ = fnck_sql.run("alter table t1 drop column b")?;
let _ = fnck_sql.run("analyze table t1")?;

let mut entries = fs::read_dir(&dir_path)?;

assert_eq!(entries.next().unwrap()?.file_name(), OsStr::new("0"));
assert!(entries.next().is_none());

Ok(())
}
}
23 changes: 21 additions & 2 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub trait Transaction: Sized {
fn drop_column(
&mut self,
table_cache: &TableCache,
meta_cache: &StatisticsMetaCache,
table_name: &TableName,
column_name: &str,
) -> Result<(), DatabaseError> {
Expand All @@ -270,6 +271,8 @@ pub trait Transaction: Sized {

let (index_min, index_max) = TableCodec::index_bound(table_name, &index_meta.id);
self._drop_data(&index_min, &index_max)?;

self.remove_table_meta(meta_cache, table_name, index_meta.id)?;
}
table_cache.remove(table_name);

Expand Down Expand Up @@ -332,6 +335,9 @@ pub trait Transaction: Sized {
let (index_meta_min, index_meta_max) = TableCodec::index_meta_bound(table_name.as_str());
self._drop_data(&index_meta_min, &index_meta_max)?;

let (statistics_min, statistics_max) = TableCodec::statistics_bound(table_name.as_str());
self._drop_data(&statistics_min, &statistics_max)?;

self.remove(&TableCodec::encode_root_table_key(table_name.as_str()))?;
table_cache.remove(&table_name);

Expand Down Expand Up @@ -383,11 +389,10 @@ pub trait Transaction: Sized {
path: String,
statistics_meta: StatisticsMeta,
) -> Result<(), DatabaseError> {
// TODO: clean old meta file
let index_id = statistics_meta.index_id();
meta_cache.put((table_name.clone(), index_id), statistics_meta);
let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path);

let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path);
self.set(key, value)?;

Ok(())
Expand All @@ -404,6 +409,20 @@ pub trait Transaction: Sized {
.transpose()
}

fn remove_table_meta(
&mut self,
meta_cache: &StatisticsMetaCache,
table_name: &TableName,
index_id: IndexId,
) -> Result<(), DatabaseError> {
let key = TableCodec::encode_statistics_path_key(table_name, index_id);
self.remove(&key)?;

meta_cache.remove(&(table_name.clone(), index_id));

Ok(())
}

fn meta_loader<'a>(&'a self, meta_cache: &'a StatisticsMetaCache) -> StatisticMetaLoader<Self>
where
Self: Sized,
Expand Down
11 changes: 11 additions & 0 deletions src/storage/table_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,17 @@ impl TableCodec {
(op(BOUND_MIN_TAG), op(BOUND_MAX_TAG))
}

pub fn statistics_bound(table_name: &str) -> (Vec<u8>, Vec<u8>) {
let op = |bound_id| {
let mut key_prefix = Self::key_prefix(CodecType::Statistics, table_name);

key_prefix.push(bound_id);
key_prefix
};

(op(BOUND_MIN_TAG), op(BOUND_MAX_TAG))
}

/// Key: {TableName}{TUPLE_TAG}{BOUND_MIN_TAG}{RowID}(Sorted)
/// Value: Tuple
pub fn encode_tuple(
Expand Down
Loading