Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Optimize StatisticsMetaLoader::load #167

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ pub struct TableCatalog {
schema_ref: SchemaRef,
}

//TODO: can add some like Table description and other information as attributes
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TableMeta {
pub(crate) colum_meta_paths: Vec<String>,
pub(crate) table_name: TableName,
}

Expand Down Expand Up @@ -174,10 +174,7 @@ impl TableCatalog {

impl TableMeta {
pub(crate) fn empty(table_name: TableName) -> Self {
TableMeta {
colum_meta_paths: vec![],
table_name,
}
TableMeta { table_name }
}
}

Expand Down
21 changes: 7 additions & 14 deletions src/execution/volcano/dml/analyze.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::catalog::{TableMeta, TableName};
use crate::catalog::TableName;
use crate::errors::DatabaseError;
use crate::execution::volcano::dql::projection::Projection;
use crate::execution::volcano::{build_read, BoxedExecutor, WriteExecutor};
Expand Down Expand Up @@ -88,6 +88,7 @@ impl Analyze {
}
}
}
let mut values = Vec::with_capacity(builders.len());
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("It's the end of the world!")
Expand All @@ -99,23 +100,15 @@ impl Analyze {
.join(ts.to_string());
fs::create_dir_all(&dir_path)?;

let mut meta = TableMeta::empty(table_name.clone());

for (index_id, _, builder) in builders {
let path = dir_path.join(index_id.to_string());
let path: String = dir_path.join(index_id.to_string()).to_string_lossy().into();
let (histogram, sketch) = builder.build(DEFAULT_NUM_OF_BUCKETS)?;
let meta = StatisticsMeta::new(histogram, sketch);

StatisticsMeta::new(histogram, sketch).to_file(&path)?;
meta.colum_meta_paths.push(path.to_string_lossy().into());
meta.to_file(&path)?;
values.push(Arc::new(DataValue::Utf8(Some(path.clone()))));
transaction.save_table_meta(&table_name, path, meta)?;
}
transaction.save_table_meta(&meta)?;

let values = meta
.colum_meta_paths
.into_iter()
.map(|path| Arc::new(DataValue::Utf8(Some(path))))
.collect_vec();

yield Tuple { id: None, values };
}
}
Expand Down
11 changes: 2 additions & 9 deletions src/execution/volcano/dql/show_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ impl ShowTables {
pub async fn _execute<T: Transaction>(self, transaction: &T) {
let metas = transaction.table_metas()?;

for TableMeta {
table_name,
colum_meta_paths: histogram_paths,
} in metas
{
let values = vec![
Arc::new(DataValue::Utf8(Some(table_name.to_string()))),
Arc::new(DataValue::UInt32(Some(histogram_paths.len() as u32))),
];
for TableMeta { table_name } in metas {
let values = vec![Arc::new(DataValue::Utf8(Some(table_name.to_string())))];

yield Tuple { id: None, values };
}
Expand Down
40 changes: 20 additions & 20 deletions src/optimizer/core/statistics_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,45 @@ use crate::errors::DatabaseError;
use crate::expression::range_detacher::Range;
use crate::optimizer::core::cm_sketch::CountMinSketch;
use crate::optimizer::core::histogram::Histogram;
use crate::storage::kip::StatisticsMetaCache;
use crate::storage::Transaction;
use crate::types::index::IndexId;
use crate::types::value::DataValue;
use kip_db::kernel::utils::lru_cache::ShardingLruCache;
use serde::{Deserialize, Serialize};
use std::fs::OpenOptions;
use std::io::Write;
use std::path::Path;
use std::slice;

pub struct StatisticMetaLoader<'a, T: Transaction> {
cache: &'a ShardingLruCache<TableName, Vec<StatisticsMeta>>,
cache: &'a StatisticsMetaCache,
tx: &'a T,
}

impl<'a, T: Transaction> StatisticMetaLoader<'a, T> {
pub fn new(
tx: &'a T,
cache: &'a ShardingLruCache<TableName, Vec<StatisticsMeta>>,
) -> StatisticMetaLoader<'a, T> {
pub fn new(tx: &'a T, cache: &'a StatisticsMetaCache) -> StatisticMetaLoader<'a, T> {
StatisticMetaLoader { cache, tx }
}

pub fn load(&self, table_name: TableName) -> Result<&Vec<StatisticsMeta>, DatabaseError> {
let option = self.cache.get(&table_name);
pub fn load(
&self,
table_name: &TableName,
index_id: IndexId,
) -> Result<Option<&StatisticsMeta>, DatabaseError> {
let key = (table_name.clone(), index_id);
let option = self.cache.get(&key);

if let Some(statistics_metas) = option {
Ok(statistics_metas)
} else {
let paths = self.tx.statistics_meta_paths(&table_name)?;
let mut statistics_metas = Vec::with_capacity(paths.len());

for path in paths {
statistics_metas.push(StatisticsMeta::from_file(path)?);
}
if let Some(statistics_meta) = option {
return Ok(Some(statistics_meta));
}
if let Some(path) = self.tx.table_meta_path(table_name.as_str(), index_id)? {
let statistics_meta = StatisticsMeta::from_file(path)?;

Ok(self
.cache
.get_or_insert(table_name, |_| Ok(statistics_metas))?)
Ok(Some(
self.cache.get_or_insert(key, |_| Ok(statistics_meta))?,
))
} else {
Ok(None)
}
}
}
Expand Down
37 changes: 14 additions & 23 deletions src/optimizer/rule/implementation/dql/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::errors::DatabaseError;
use crate::optimizer::core::memo::{Expression, GroupExpression};
use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate};
use crate::optimizer::core::rule::{ImplementationRule, MatchPattern};
use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta};
use crate::optimizer::core::statistics_meta::StatisticMetaLoader;
use crate::planner::operator::{Operator, PhysicalOption};
use crate::storage::Transaction;
use crate::types::index::{IndexId, IndexType};
use crate::types::index::IndexType;
use lazy_static::lazy_static;

lazy_static! {
Expand Down Expand Up @@ -34,14 +34,18 @@ impl<T: Transaction> ImplementationRule<T> for SeqScanImplementation {
group_expr: &mut GroupExpression,
) -> Result<(), DatabaseError> {
if let Operator::Scan(scan_op) = op {
let statistics_metas = loader.load(scan_op.table_name.clone())?;
let mut cost = None;
let cost = scan_op
.index_infos
.iter()
.find(|index_info| {
let column_ids = &index_info.meta.column_ids;

if let Some(statistics_meta) =
find_statistics_meta(statistics_metas, &scan_op.primary_key)
{
cost = Some(statistics_meta.histogram().values_len());
}
column_ids.len() == 1 && column_ids[0] == scan_op.primary_key
})
.map(|index_info| loader.load(&scan_op.table_name, index_info.meta.id))
.transpose()?
.flatten()
.map(|statistics_meta| statistics_meta.histogram().values_len());

group_expr.append_expr(Expression {
op: PhysicalOption::SeqScan,
Expand Down Expand Up @@ -71,17 +75,15 @@ impl<T: Transaction> ImplementationRule<T> for IndexScanImplementation {
group_expr: &mut GroupExpression,
) -> Result<(), DatabaseError> {
if let Operator::Scan(scan_op) = op {
let statistics_metas = loader.load(scan_op.table_name.clone())?;
for index_info in scan_op.index_infos.iter() {
if index_info.range.is_none() {
continue;
}
let mut cost = None;

if let Some(range) = &index_info.range {
// FIXME: Only UniqueIndex
if let Some(statistics_meta) =
find_statistics_meta(statistics_metas, &index_info.meta.id)
loader.load(&scan_op.table_name, index_info.meta.id)?
{
let mut row_count = statistics_meta.collect_count(range)?;

Expand All @@ -105,14 +107,3 @@ impl<T: Transaction> ImplementationRule<T> for IndexScanImplementation {
}
}
}

fn find_statistics_meta<'a>(
statistics_metas: &'a [StatisticsMeta],
index_id: &IndexId,
) -> Option<&'a StatisticsMeta> {
assert!(statistics_metas.is_sorted_by_key(StatisticsMeta::index_id));
statistics_metas
.binary_search_by(|statistics_meta| statistics_meta.index_id().cmp(index_id))
.ok()
.map(|i| &statistics_metas[i])
}
7 changes: 3 additions & 4 deletions src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ impl LogicalPlan {
..
}) => schema_ref.clone(),
Operator::Dummy => Arc::new(vec![]),
Operator::Show => Arc::new(vec![
Arc::new(ColumnCatalog::new_dummy("TABLE".to_string())),
Arc::new(ColumnCatalog::new_dummy("STATISTICS_METAS_LEN".to_string())),
]),
Operator::Show => Arc::new(vec![Arc::new(ColumnCatalog::new_dummy(
"TABLE".to_string(),
))]),
Operator::Explain => {
Arc::new(vec![Arc::new(ColumnCatalog::new_dummy("PLAN".to_string()))])
}
Expand Down
41 changes: 25 additions & 16 deletions src/storage/kip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
#[derive(Clone)]
pub struct KipStorage {
pub inner: Arc<storage::KipStorage>,
pub(crate) meta_cache: Arc<ShardingLruCache<TableName, Vec<StatisticsMeta>>>,
pub(crate) meta_cache: Arc<StatisticsMetaCache>,
}

impl KipStorage {
Expand Down Expand Up @@ -53,10 +53,12 @@ impl Storage for KipStorage {
}
}

pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), StatisticsMeta>;

pub struct KipTransaction {
tx: mvcc::Transaction,
table_cache: ShardingLruCache<String, TableCatalog>,
meta_cache: Arc<ShardingLruCache<TableName, Vec<StatisticsMeta>>>,
meta_cache: Arc<StatisticsMetaCache>,
}

impl Transaction for KipTransaction {
Expand Down Expand Up @@ -388,26 +390,33 @@ impl Transaction for KipTransaction {
Ok(metas)
}

fn save_table_meta(&mut self, table_meta: &TableMeta) -> Result<(), DatabaseError> {
fn save_table_meta(
&mut self,
table_name: &TableName,
path: String,
statistics_meta: StatisticsMeta,
) -> Result<(), DatabaseError> {
// TODO: clean old meta file
let _ = self.meta_cache.remove(&table_meta.table_name);
let (key, value) = TableCodec::encode_root_table(table_meta)?;
let index_id = statistics_meta.index_id();
let _ = self
.meta_cache
.put((table_name.clone(), index_id), statistics_meta);
let (key, value) = TableCodec::encode_statistics_path(table_name.as_str(), index_id, path);
self.tx.set(key, value);

Ok(())
}

fn statistics_meta_paths(&self, table_name: &str) -> Result<Vec<String>, DatabaseError> {
if let Some(bytes) = self
.tx
.get(&TableCodec::encode_root_table_key(table_name))?
{
let meta = TableCodec::decode_root_table(&bytes)?;

return Ok(meta.colum_meta_paths);
}

Ok(vec![])
fn table_meta_path(
&self,
table_name: &str,
index_id: IndexId,
) -> Result<Option<String>, DatabaseError> {
let key = TableCodec::encode_statistics_path_key(table_name, index_id);
self.tx
.get(&key)?
.map(|bytes| TableCodec::decode_statistics_path(&bytes))
.transpose()
}

fn meta_loader(&self) -> StatisticMetaLoader<Self>
Expand Down
15 changes: 12 additions & 3 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod table_codec;
use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName};
use crate::errors::DatabaseError;
use crate::expression::range_detacher::Range;
use crate::optimizer::core::statistics_meta::StatisticMetaLoader;
use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta};
use crate::storage::table_codec::TableCodec;
use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType};
use crate::types::tuple::{Tuple, TupleId};
Expand Down Expand Up @@ -100,8 +100,17 @@ pub trait Transaction: Sync + Send + 'static {
fn drop_data(&mut self, table_name: &str) -> Result<(), DatabaseError>;
fn table(&self, table_name: TableName) -> Option<&TableCatalog>;
fn table_metas(&self) -> Result<Vec<TableMeta>, DatabaseError>;
fn save_table_meta(&mut self, table_meta: &TableMeta) -> Result<(), DatabaseError>;
fn statistics_meta_paths(&self, table_name: &str) -> Result<Vec<String>, DatabaseError>;
fn save_table_meta(
&mut self,
table_name: &TableName,
path: String,
statistics_meta: StatisticsMeta,
) -> Result<(), DatabaseError>;
fn table_meta_path(
&self,
table_name: &str,
index_id: IndexId,
) -> Result<Option<String>, DatabaseError>;
fn meta_loader(&self) -> StatisticMetaLoader<Self>
where
Self: Sized;
Expand Down
Loading
Loading