Skip to content

Commit

Permalink
perf: Optimize StatisticsMetaLoader::load
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Mar 15, 2024
1 parent f8488f4 commit 70da684
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 97 deletions.
7 changes: 2 additions & 5 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ pub struct TableCatalog {
schema_ref: SchemaRef,
}

//TODO: can add some like Table description and other information as attributes
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TableMeta {
pub(crate) colum_meta_paths: Vec<String>,
pub(crate) table_name: TableName,
}

Expand Down Expand Up @@ -174,10 +174,7 @@ impl TableCatalog {

impl TableMeta {
pub(crate) fn empty(table_name: TableName) -> Self {
TableMeta {
colum_meta_paths: vec![],
table_name,
}
TableMeta { table_name }
}
}

Expand Down
21 changes: 7 additions & 14 deletions src/execution/volcano/dml/analyze.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::catalog::{TableMeta, TableName};
use crate::catalog::TableName;
use crate::errors::DatabaseError;
use crate::execution::volcano::dql::projection::Projection;
use crate::execution::volcano::{build_read, BoxedExecutor, WriteExecutor};
Expand Down Expand Up @@ -88,6 +88,7 @@ impl Analyze {
}
}
}
let mut values = Vec::with_capacity(builders.len());
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("It's the end of the world!")
Expand All @@ -99,23 +100,15 @@ impl Analyze {
.join(ts.to_string());
fs::create_dir_all(&dir_path)?;

let mut meta = TableMeta::empty(table_name.clone());

for (index_id, _, builder) in builders {
let path = dir_path.join(index_id.to_string());
let path: String = dir_path.join(index_id.to_string()).to_string_lossy().into();
let (histogram, sketch) = builder.build(DEFAULT_NUM_OF_BUCKETS)?;
let meta = StatisticsMeta::new(histogram, sketch);

StatisticsMeta::new(histogram, sketch).to_file(&path)?;
meta.colum_meta_paths.push(path.to_string_lossy().into());
meta.to_file(&path)?;
values.push(Arc::new(DataValue::Utf8(Some(path.clone()))));
transaction.save_table_meta(&table_name, path, meta)?;
}
transaction.save_table_meta(&meta)?;

let values = meta
.colum_meta_paths
.into_iter()
.map(|path| Arc::new(DataValue::Utf8(Some(path))))
.collect_vec();

yield Tuple { id: None, values };
}
}
Expand Down
11 changes: 2 additions & 9 deletions src/execution/volcano/dql/show_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ impl ShowTables {
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let metas = transaction.table_metas()?;

for TableMeta {
table_name,
colum_meta_paths: histogram_paths,
} in metas
{
let values = vec![
Arc::new(DataValue::Utf8(Some(table_name.to_string()))),
Arc::new(DataValue::UInt32(Some(histogram_paths.len() as u32))),
];
for TableMeta { table_name } in metas {
let values = vec![Arc::new(DataValue::Utf8(Some(table_name.to_string())))];

yield Tuple { id: None, values };
}
Expand Down
40 changes: 20 additions & 20 deletions src/optimizer/core/statistics_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,45 @@ use crate::errors::DatabaseError;
use crate::expression::range_detacher::Range;
use crate::optimizer::core::cm_sketch::CountMinSketch;
use crate::optimizer::core::histogram::Histogram;
use crate::storage::kip::StatisticsMetaCache;
use crate::storage::Transaction;
use crate::types::index::IndexId;
use crate::types::value::DataValue;
use kip_db::kernel::utils::lru_cache::ShardingLruCache;
use serde::{Deserialize, Serialize};
use std::fs::OpenOptions;
use std::io::Write;
use std::path::Path;
use std::slice;

pub struct StatisticMetaLoader<'a, T: Transaction> {
cache: &'a ShardingLruCache<TableName, Vec<StatisticsMeta>>,
cache: &'a StatisticsMetaCache,
tx: &'a T,
}

impl<'a, T: Transaction> StatisticMetaLoader<'a, T> {
pub fn new(
tx: &'a T,
cache: &'a ShardingLruCache<TableName, Vec<StatisticsMeta>>,
) -> StatisticMetaLoader<'a, T> {
pub fn new(tx: &'a T, cache: &'a StatisticsMetaCache) -> StatisticMetaLoader<'a, T> {
StatisticMetaLoader { cache, tx }
}

pub fn load(&self, table_name: TableName) -> Result<&Vec<StatisticsMeta>, DatabaseError> {
let option = self.cache.get(&table_name);
pub fn load(
&self,
table_name: &TableName,
index_id: IndexId,
) -> Result<Option<&StatisticsMeta>, DatabaseError> {
let key = (table_name.clone(), index_id);
let option = self.cache.get(&key);

if let Some(statistics_metas) = option {
Ok(statistics_metas)
} else {
let paths = self.tx.statistics_meta_paths(&table_name)?;
let mut statistics_metas = Vec::with_capacity(paths.len());

for path in paths {
statistics_metas.push(StatisticsMeta::from_file(path)?);
}
if let Some(statistics_meta) = option {
return Ok(Some(statistics_meta));
}
if let Some(path) = self.tx.table_meta_path(table_name.as_str(), index_id)? {
let statistics_meta = StatisticsMeta::from_file(path)?;

Ok(self
.cache
.get_or_insert(table_name, |_| Ok(statistics_metas))?)
Ok(Some(
self.cache.get_or_insert(key, |_| Ok(statistics_meta))?,
))
} else {
Ok(None)
}
}
}
Expand Down
37 changes: 14 additions & 23 deletions src/optimizer/rule/implementation/dql/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::errors::DatabaseError;
use crate::optimizer::core::memo::{Expression, GroupExpression};
use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate};
use crate::optimizer::core::rule::{ImplementationRule, MatchPattern};
use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta};
use crate::optimizer::core::statistics_meta::StatisticMetaLoader;
use crate::planner::operator::{Operator, PhysicalOption};
use crate::storage::Transaction;
use crate::types::index::{IndexId, IndexType};
use crate::types::index::IndexType;
use lazy_static::lazy_static;

lazy_static! {
Expand Down Expand Up @@ -34,14 +34,18 @@ impl<T: Transaction> ImplementationRule<T> for SeqScanImplementation {
group_expr: &mut GroupExpression,
) -> Result<(), DatabaseError> {
if let Operator::Scan(scan_op) = op {
let statistics_metas = loader.load(scan_op.table_name.clone())?;
let mut cost = None;
let cost = scan_op
.index_infos
.iter()
.find(|index_info| {
let column_ids = &index_info.meta.column_ids;

if let Some(statistics_meta) =
find_statistics_meta(statistics_metas, &scan_op.primary_key)
{
cost = Some(statistics_meta.histogram().values_len());
}
column_ids.len() == 1 && column_ids[0] == scan_op.primary_key
})
.map(|index_info| loader.load(&scan_op.table_name, index_info.meta.id))
.transpose()?
.flatten()
.map(|statistics_meta| statistics_meta.histogram().values_len());

group_expr.append_expr(Expression {
op: PhysicalOption::SeqScan,
Expand Down Expand Up @@ -71,17 +75,15 @@ impl<T: Transaction> ImplementationRule<T> for IndexScanImplementation {
group_expr: &mut GroupExpression,
) -> Result<(), DatabaseError> {
if let Operator::Scan(scan_op) = op {
let statistics_metas = loader.load(scan_op.table_name.clone())?;
for index_info in scan_op.index_infos.iter() {
if index_info.range.is_none() {
continue;
}
let mut cost = None;

if let Some(range) = &index_info.range {
// FIXME: Only UniqueIndex
if let Some(statistics_meta) =
find_statistics_meta(statistics_metas, &index_info.meta.id)
loader.load(&scan_op.table_name, index_info.meta.id)?
{
let mut row_count = statistics_meta.collect_count(range)?;

Expand All @@ -105,14 +107,3 @@ impl<T: Transaction> ImplementationRule<T> for IndexScanImplementation {
}
}
}

fn find_statistics_meta<'a>(
statistics_metas: &'a [StatisticsMeta],
index_id: &IndexId,
) -> Option<&'a StatisticsMeta> {
assert!(statistics_metas.is_sorted_by_key(StatisticsMeta::index_id));
statistics_metas
.binary_search_by(|statistics_meta| statistics_meta.index_id().cmp(index_id))
.ok()
.map(|i| &statistics_metas[i])
}
7 changes: 3 additions & 4 deletions src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ impl LogicalPlan {
..
}) => schema_ref.clone(),
Operator::Dummy => Arc::new(vec![]),
Operator::Show => Arc::new(vec![
Arc::new(ColumnCatalog::new_dummy("TABLE".to_string())),
Arc::new(ColumnCatalog::new_dummy("STATISTICS_METAS_LEN".to_string())),
]),
Operator::Show => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy(
"TABLE".to_string(),
))]),
Operator::Explain => {
Arc::new(vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))])
}
Expand Down
40 changes: 25 additions & 15 deletions src/storage/kip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
#[derive(Clone)]
pub struct KipStorage {
pub inner: Arc<storage::KipStorage>,
pub(crate) meta_cache: Arc<ShardingLruCache<TableName, Vec<StatisticsMeta>>>,
pub(crate) meta_cache: Arc<StatisticsMetaCache>,
}

impl KipStorage {
Expand Down Expand Up @@ -53,10 +53,12 @@ impl Storage for KipStorage {
}
}

pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), StatisticsMeta>;

pub struct KipTransaction {
tx: mvcc::Transaction,
table_cache: ShardingLruCache<String, TableCatalog>,
meta_cache: Arc<ShardingLruCache<TableName, Vec<StatisticsMeta>>>,
meta_cache: Arc<StatisticsMetaCache>,
}

impl Transaction for KipTransaction {
Expand Down Expand Up @@ -388,26 +390,34 @@ impl Transaction for KipTransaction {
Ok(metas)
}

fn save_table_meta(&mut self, table_meta: &TableMeta) -> Result<(), DatabaseError> {
fn save_table_meta(
&mut self,
table_name: &TableName,
path: String,
statistics_meta: StatisticsMeta,
) -> Result<(), DatabaseError> {
// TODO: clean old meta file
let _ = self.meta_cache.remove(&table_meta.table_name);
let (key, value) = TableCodec::encode_root_table(table_meta)?;
let index_id = statistics_meta.index_id();
let _ = self
.meta_cache
.put((table_name.clone(), index_id), statistics_meta);
let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path);
self.tx.set(key, value);

Ok(())
}

fn statistics_meta_paths(&self, table_name: &str) -> Result<Vec<String>, DatabaseError> {
if let Some(bytes) = self
fn table_meta_path(
&self,
table_name: &str,
index_id: IndexId,
) -> Result<Option<String>, DatabaseError> {
let key = TableCodec::encode_statistics_path_key(table_name, index_id);
self
.tx
.get(&TableCodec::encode_root_table_key(table_name))?
{
let meta = TableCodec::decode_root_table(&bytes)?;

return Ok(meta.colum_meta_paths);
}

Ok(vec![])
.get(&key)?
.map(|bytes| TableCodec::decode_statistics_path(&bytes))
.transpose()
}

fn meta_loader(&self) -> StatisticMetaLoader<Self>
Expand Down
15 changes: 12 additions & 3 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod table_codec;
use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName};
use crate::errors::DatabaseError;
use crate::expression::range_detacher::Range;
use crate::optimizer::core::statistics_meta::StatisticMetaLoader;
use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta};
use crate::storage::table_codec::TableCodec;
use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType};
use crate::types::tuple::{Tuple, TupleId};
Expand Down Expand Up @@ -100,8 +100,17 @@ pub trait Transaction: Sync + Send + 'static {
fn drop_data(&mut self, table_name: &str) -> Result<(), DatabaseError>;
fn table(&self, table_name: TableName) -> Option<&TableCatalog>;
fn table_metas(&self) -> Result<Vec<TableMeta>, DatabaseError>;
fn save_table_meta(&mut self, table_meta: &TableMeta) -> Result<(), DatabaseError>;
fn statistics_meta_paths(&self, table_name: &str) -> Result<Vec<String>, DatabaseError>;
fn save_table_meta(
&mut self,
table_name: &TableName,
path: String,
statistics_meta: StatisticsMeta,
) -> Result<(), DatabaseError>;
fn table_meta_path(
&self,
table_name: &str,
index_id: IndexId,
) -> Result<Option<String>, DatabaseError>;
fn meta_loader(&self) -> StatisticMetaLoader<Self>
where
Self: Sized;
Expand Down
Loading

0 comments on commit 70da684

Please sign in to comment.