Skip to content

Commit

Permalink
refactor: reconstruct the Key encoding of each structure of TableCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Sep 27, 2023
1 parent cdf3694 commit 026f34b
Show file tree
Hide file tree
Showing 8 changed files with 368 additions and 327 deletions.
3 changes: 1 addition & 2 deletions src/catalog/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ impl RootCatalog {
}
let table = TableCatalog::new(
table_name.clone(),
columns,
vec![]
columns
)?;

self.table_idxs.insert(table_name.clone(), table);
Expand Down
32 changes: 23 additions & 9 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use itertools::Itertools;

use crate::catalog::{CatalogError, ColumnCatalog, ColumnRef};
use crate::types::ColumnId;
Expand Down Expand Up @@ -73,20 +72,24 @@ impl TableCatalog {
Ok(col_id)
}

pub(crate) fn add_index_meta(&mut self, mut index: IndexMeta) -> &IndexMeta {
let index_id = self.indexes.len();

index.id = index_id as u32;
self.indexes.push(Arc::new(index));

&self.indexes[index_id]
}

pub(crate) fn new(
name: TableName,
columns: Vec<ColumnCatalog>,
indexes: Vec<IndexMeta>
columns: Vec<ColumnCatalog>
) -> Result<TableCatalog, CatalogError> {
let indexes = indexes.into_iter()
.map(Arc::new)
.collect_vec();

let mut table_catalog = TableCatalog {
name,
column_idxs: BTreeMap::new(),
columns: BTreeMap::new(),
indexes,
indexes: vec![],
};

for col_catalog in columns.into_iter() {
Expand All @@ -95,6 +98,17 @@ impl TableCatalog {

Ok(table_catalog)
}

pub(crate) fn new_with_indexes(
name: TableName,
columns: Vec<ColumnCatalog>,
indexes: Vec<IndexMetaRef>
) -> Result<TableCatalog, CatalogError> {
let mut catalog = TableCatalog::new(name, columns)?;
catalog.indexes = indexes;

Ok(catalog)
}
}

#[cfg(test)]
Expand All @@ -112,7 +126,7 @@ mod tests {
let col0 = ColumnCatalog::new("a".into(), false, ColumnDesc::new(LogicalType::Integer, false, false));
let col1 = ColumnCatalog::new("b".into(), false, ColumnDesc::new(LogicalType::Boolean, false, false));
let col_catalogs = vec![col0, col1];
let table_catalog = TableCatalog::new(Arc::new("test".to_string()), col_catalogs, vec![]).unwrap();
let table_catalog = TableCatalog::new(Arc::new("test".to_string()), col_catalogs).unwrap();

assert_eq!(table_catalog.contains_column(&"a".to_string()), true);
assert_eq!(table_catalog.contains_column(&"b".to_string()), true);
Expand Down
30 changes: 14 additions & 16 deletions src/execution/executor/show/show_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,21 @@ impl<S: Storage> Executor<S> for ShowTables {
impl ShowTables {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn _execute<S: Storage>(self, storage: S) {
if let Some(tables) = storage.show_tables().await {
for (table,column_count) in tables {
let columns: Vec<ColumnRef> = vec![
Arc::new(ColumnCatalog::new_dummy("TABLES".to_string())),
Arc::new(ColumnCatalog::new_dummy("COLUMN_COUNT".to_string())),
];
let values: Vec<ValueRef> = vec![
Arc::new(DataValue::Utf8(Some(table))),
Arc::new(DataValue::UInt32(Some(column_count as u32))),
];
let tables = storage.show_tables().await?;

yield Tuple {
id: None,
columns,
values,
};
}
for table in tables {
let columns: Vec<ColumnRef> = vec![
Arc::new(ColumnCatalog::new_dummy("TABLES".to_string())),
];
let values: Vec<ValueRef> = vec![
Arc::new(DataValue::Utf8(Some(table))),
];

yield Tuple {
id: None,
columns,
values,
};
}
}
}
102 changes: 52 additions & 50 deletions src/storage/kip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,36 +41,38 @@ impl KipStorage {
})
}

fn column_collect(name: &String, tx: &mvcc::Transaction) -> Option<(Vec<ColumnCatalog>, Option<TableName>)> {
fn column_collect(name: &String, tx: &mvcc::Transaction) -> Result<(Vec<ColumnCatalog>, Option<TableName>), StorageError> {
let (column_min, column_max) = TableCodec::columns_bound(name);
let mut column_iter = tx.iter(Bound::Included(&column_min), Bound::Included(&column_max)).ok()?;
let mut column_iter = tx.iter(Bound::Included(&column_min), Bound::Included(&column_max))?;

let mut columns = vec![];
let mut name_option = None;

while let Some((key, value_option)) = column_iter.try_next().ok().flatten() {
while let Some((_, value_option)) = column_iter.try_next().ok().flatten() {
if let Some(value) = value_option {
if let Some((table_name, column)) = TableCodec::decode_column(&key, &value) {
if name != table_name.as_str() { return None; }
let _ = name_option.insert(table_name);
let (table_name, column) = TableCodec::decode_column(&value)?;

columns.push(column);
if name != table_name.as_str() {
return Ok((vec![], None));
}
let _ = name_option.insert(table_name);

columns.push(column);
}
}

Some((columns, name_option))
Ok((columns, name_option))
}

fn index_meta_collect(name: &String, tx: &mvcc::Transaction) -> Option<Vec<IndexMeta>> {
fn index_meta_collect(name: &String, tx: &mvcc::Transaction) -> Option<Vec<IndexMetaRef>> {
let (index_min, index_max) = TableCodec::index_meta_bound(name);
let mut index_metas = vec![];
let mut index_iter = tx.iter(Bound::Included(&index_min), Bound::Included(&index_max)).ok()?;

while let Some((_, value_option)) = index_iter.try_next().ok().flatten() {
if let Some(value) = value_option {
if let Some(index_meta) = TableCodec::decode_index_meta(&value).ok() {
index_metas.push(index_meta);
index_metas.push(Arc::new(index_meta));
}
}
}
Expand All @@ -95,54 +97,54 @@ impl KipStorage {

Ok(())
}
}

#[async_trait]
impl Storage for KipStorage {
type TransactionType = KipTransaction;

async fn create_table(&self, table_name: TableName, mut columns: Vec<ColumnCatalog>) -> Result<TableName, StorageError> {
let mut tx = self.inner.new_transaction().await;

for (i, col) in columns.iter_mut().enumerate() {
col.id = Some(i as u32);
}

for (key, value) in columns
.iter()
.filter_map(|col| TableCodec::encode_column(&table_name, &col))
{
tx.set(key, value);
}
let mut indexes = Vec::new();
fn create_index_meta_for_table(
tx: &mut mvcc::Transaction,
table: &mut TableCatalog
) -> Result<(), StorageError> {
let table_name = table.name.clone();

for col in columns
.iter()
for col in table.all_columns()
.into_iter()
.filter(|col| col.desc.is_unique)
{
if let Some(col_id) = col.id {
let meta = IndexMeta {
id: indexes.len() as u32,
id: 0,
column_ids: vec![col_id],
name: format!("uk_{}", col.name),
is_unique: true,
};
let (key, value) = TableCodec::encode_index_meta(&table_name, &meta)?;
let meta_ref = table.add_index_meta(meta);
let (key, value) = TableCodec::encode_index_meta(&table_name, meta_ref)?;

indexes.push(meta);
tx.set(key, value);
}
}
Ok(())
}
}

#[async_trait]
impl Storage for KipStorage {
type TransactionType = KipTransaction;

async fn create_table(&self, table_name: TableName, columns: Vec<ColumnCatalog>) -> Result<TableName, StorageError> {
let mut tx = self.inner.new_transaction().await;
let mut table_catalog = TableCatalog::new(table_name.clone(), columns)?;

Self::create_index_meta_for_table(&mut tx, &mut table_catalog)?;

let (k, v)= TableCodec::encode_root_table(table_name.as_str(), columns.len())?;
for (_, column) in &table_catalog.columns {
let (key, value) = TableCodec::encode_column(column)?;
tx.set(key, value);
}

let (k, v)= TableCodec::encode_root_table(&table_name)?;
self.inner.set(k, v).await?;

tx.commit().await?;

self.cache.put(
table_name.to_string(),
TableCatalog::new(table_name.clone(), columns, indexes)?
);
self.cache.put(table_name.to_string(), table_catalog);

Ok(table_name)
}
Expand All @@ -165,7 +167,7 @@ impl Storage for KipStorage {
for col_key in col_keys {
tx.remove(&col_key)?
}
tx.remove(&TableCodec::encode_root_table_key(name.as_str()))?;
tx.remove(&TableCodec::encode_root_table_key(name))?;
tx.commit().await?;

let _ = self.cache.remove(name);
Expand Down Expand Up @@ -203,11 +205,11 @@ impl Storage for KipStorage {
if option.is_none() {
let tx = self.inner.new_transaction().await;
// TODO: unify the data into a `Meta` prefix and use one iteration to collect all data
let (columns, name_option) = Self::column_collect(name, &tx)?;
let (columns, name_option) = Self::column_collect(name, &tx).ok()?;
let indexes = Self::index_meta_collect(name, &tx)?;

if let Some(catalog) = name_option
.and_then(|table_name| TableCatalog::new(table_name, columns, indexes).ok())
.and_then(|table_name| TableCatalog::new_with_indexes(table_name, columns, indexes).ok())
{
option = self.cache.get_or_insert(name.to_string(), |_| Ok(catalog)).ok();
}
Expand All @@ -216,22 +218,22 @@ impl Storage for KipStorage {
option
}

async fn show_tables(&self) -> Option<Vec<(String,usize)>> {
async fn show_tables(&self) -> Result<Vec<String>, StorageError> {
let mut tables = vec![];
let (min, max) = TableCodec::root_table_bound();

let tx = self.inner.new_transaction().await;
let mut iter = tx.iter(Bound::Included(&min), Bound::Included(&max)).ok()?;
let mut iter = tx.iter(Bound::Included(&min), Bound::Included(&max))?;

while let Some((key, value_option)) = iter.try_next().ok().flatten() {
while let Some((_, value_option)) = iter.try_next().ok().flatten() {
if let Some(value) = value_option {
if let Some((table_name, column_count)) = TableCodec::decode_root_table(&key, &value) {
tables.push((table_name,column_count));
}
let table_name = TableCodec::decode_root_table(&value)?;

tables.push(table_name);
}
}

Some(tables)
Ok(tables)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Storage for MemStorage {
}
}

async fn show_tables(&self) -> Option<Vec<(String, usize)>> {
async fn show_tables(&self) -> Result<Vec<String>, StorageError> {
todo!()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait Storage: Sync + Send + Clone + 'static {
async fn transaction(&self, name: &String) -> Option<Self::TransactionType>;
async fn table(&self, name: &String) -> Option<&TableCatalog>;

async fn show_tables(&self) -> Option<Vec<(String,usize)>>;
async fn show_tables(&self) -> Result<Vec<String>, StorageError>;
}

/// Optional bounds of the reader, of the form (offset, limit).
Expand Down
Loading

0 comments on commit 026f34b

Please sign in to comment.