Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(katana-db): database schema #1837

Closed
wants to merge 13 commits into from
3 changes: 2 additions & 1 deletion crates/katana/storage/db/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use katana_primitives::block::Header;
use katana_primitives::block::{BlockNumber, Header};
use katana_primitives::contract::{ContractAddress, GenericContractInfo};
use katana_primitives::receipt::Receipt;
use katana_primitives::trace::TxExecInfo;
Expand Down Expand Up @@ -39,6 +39,7 @@ impl_compress_and_decompress_for_table_values!(
Receipt,
FieldElement,
ContractAddress,
Vec<BlockNumber>,
BlockList,
GenericContractInfo,
StoredBlockBodyIndices,
Expand Down
29 changes: 19 additions & 10 deletions crates/katana/storage/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,53 @@ pub mod utils;
pub mod version;

use mdbx::{DbEnv, DbEnvKind};
use tables::Schema;
use utils::is_database_empty;
use version::{check_db_version, create_db_version_file, DatabaseVersionError, CURRENT_DB_VERSION};
pub use version::CURRENT_DB_VERSION;
use version::{check_db_version, create_db_version_file, DatabaseVersionError};

/// Initialize the database at the given path and returning a handle to the its
/// environment.
///
/// This will create the default tables, if necessary.
pub fn init_db<P: AsRef<Path>>(path: P) -> anyhow::Result<DbEnv> {
init_db_with_schema::<tables::SchemaV1>(path)
}

/// Open the database at the given `path` in read-write mode.
pub fn open_db<P: AsRef<Path>>(path: P) -> anyhow::Result<DbEnv> {
open_db_with_schema::<tables::SchemaV1>(path)
}

pub(crate) fn init_db_with_schema<S: Schema>(path: impl AsRef<Path>) -> anyhow::Result<DbEnv<S>> {
if is_database_empty(path.as_ref()) {
fs::create_dir_all(&path).with_context(|| {
format!("Creating database directory at path {}", path.as_ref().display())
})?;
create_db_version_file(&path, CURRENT_DB_VERSION).with_context(|| {
create_db_version_file(&path, S::VERSION).with_context(|| {
format!("Inserting database version file at path {}", path.as_ref().display())
})?
} else {
match check_db_version(&path) {
Ok(_) => {}
Err(DatabaseVersionError::FileNotFound) => {
create_db_version_file(&path, CURRENT_DB_VERSION).with_context(|| {
Err(DatabaseVersionError::FileNotFound) => create_db_version_file(&path, S::VERSION)
.with_context(|| {
format!(
"No database version file found. Inserting version file at path {}",
path.as_ref().display()
)
})?
}
})?,
Err(err) => return Err(anyhow!(err)),
}
}

let env = open_db(path)?;
let env = open_db_with_schema::<S>(path)?;
env.create_tables()?;
Ok(env)
}

/// Open the database at the given `path` in read-write mode.
pub fn open_db<P: AsRef<Path>>(path: P) -> anyhow::Result<DbEnv> {
DbEnv::open(path.as_ref(), DbEnvKind::RW).with_context(|| {
fn open_db_with_schema<S: Schema>(path: impl AsRef<Path>) -> anyhow::Result<DbEnv<S>> {
DbEnv::<S>::open(path.as_ref(), DbEnvKind::RW).with_context(|| {
format!("Opening database in read-write mode at path {}", path.as_ref().display())
})
}
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/storage/db/src/mdbx/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::borrow::Cow;
use std::marker::PhantomData;

use libmdbx::{self, TransactionKind, WriteFlags, RW};
use libmdbx::{TransactionKind, WriteFlags, RW};

use crate::codecs::{Compress, Encode};
use crate::error::DatabaseError;
Expand Down
42 changes: 25 additions & 17 deletions crates/katana/storage/db/src/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use libmdbx::{DatabaseFlags, EnvironmentFlags, Geometry, Mode, PageSize, SyncMod

use self::tx::Tx;
use crate::error::DatabaseError;
use crate::tables::{TableType, Tables};
use crate::tables::{Schema, SchemaV1, TableType};
use crate::utils;

const GIGABYTE: usize = 1024 * 1024 * 1024;
Expand All @@ -31,21 +31,27 @@ pub enum DbEnvKind {

/// Wrapper for `libmdbx-sys` environment.
#[derive(Debug)]
pub struct DbEnv(libmdbx::Environment);
pub struct DbEnv<S = SchemaV1>
where
S: Schema,
{
inner: libmdbx::Environment,
_tables: std::marker::PhantomData<S>,
}

impl DbEnv {
impl<S: Schema> DbEnv<S> {
/// Opens the database at the specified path with the given `EnvKind`.
///
/// It does not create the tables, for that call [`DbEnv::create_tables`].
pub fn open(path: impl AsRef<Path>, kind: DbEnvKind) -> Result<DbEnv, DatabaseError> {
pub fn open(path: impl AsRef<Path>, kind: DbEnvKind) -> Result<DbEnv<S>, DatabaseError> {
let mode = match kind {
DbEnvKind::RO => Mode::ReadOnly,
DbEnvKind::RW => Mode::ReadWrite { sync_mode: SyncMode::Durable },
};

let mut builder = libmdbx::Environment::builder();
builder
.set_max_dbs(Tables::ALL.len())
.set_max_dbs(S::all().len())
.set_geometry(Geometry {
// Maximum database size of 1 terabytes
size: Some(0..(TERABYTE)),
Expand All @@ -65,14 +71,17 @@ impl DbEnv {
})
.set_max_readers(DEFAULT_MAX_READERS);

Ok(DbEnv(builder.open(path.as_ref()).map_err(DatabaseError::OpenEnv)?))
Ok(DbEnv {
inner: builder.open(path.as_ref()).map_err(DatabaseError::OpenEnv)?,
_tables: std::marker::PhantomData,
})
}

/// Creates all the defined tables in [`Tables`], if necessary.
pub fn create_tables(&self) -> Result<(), DatabaseError> {
let tx = self.0.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?;
let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?;

for table in Tables::ALL {
for table in S::all() {
let flags = match table.table_type() {
TableType::Table => DatabaseFlags::default(),
TableType::DupSort => DatabaseFlags::DUP_SORT,
Expand All @@ -87,20 +96,20 @@ impl DbEnv {
}

/// Begin a read-only transaction.
pub fn tx(&self) -> Result<Tx<RO>, DatabaseError> {
Ok(Tx::new(self.0.begin_ro_txn().map_err(DatabaseError::CreateROTx)?))
pub fn tx(&self) -> Result<Tx<RO, S>, DatabaseError> {
Ok(Tx::new(self.inner.begin_ro_txn().map_err(DatabaseError::CreateROTx)?))
}

/// Begin a read-write transaction.
pub fn tx_mut(&self) -> Result<Tx<RW>, DatabaseError> {
Ok(Tx::new(self.0.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?))
pub fn tx_mut(&self) -> Result<Tx<RW, S>, DatabaseError> {
Ok(Tx::new(self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?))
}

/// Takes a function and passes a read-write transaction into it, making sure it's always
/// committed in the end of the execution.
pub fn update<T, F>(&self, f: F) -> Result<T, DatabaseError>
where
F: FnOnce(&Tx<RW>) -> T,
F: FnOnce(&Tx<RW, S>) -> T,
{
let tx = self.tx_mut()?;
let res = f(&tx);
Expand All @@ -111,9 +120,8 @@ impl DbEnv {

#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils {
use std::path::Path;

use super::{DbEnv, DbEnvKind};
use super::*;
use crate::tables;

const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";

Expand All @@ -127,7 +135,7 @@ pub mod test_utils {

/// Create database for testing with specified path
pub fn create_test_db_with_path(kind: DbEnvKind, path: &Path) -> DbEnv {
let env = DbEnv::open(path, kind).expect(ERROR_DB_CREATION);
let env = DbEnv::<tables::SchemaV1>::open(path, kind).expect(ERROR_DB_CREATION);
env.create_tables().expect("Failed to create tables.");
env
}
Expand Down
58 changes: 44 additions & 14 deletions crates/katana/storage/db/src/mdbx/tx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Transaction wrapper for libmdbx-sys.

use std::str::FromStr;
use std::marker::PhantomData;

use libmdbx::ffi::DBI;
use libmdbx::{TransactionKind, WriteFlags, RW};
Expand All @@ -9,29 +9,38 @@ use parking_lot::RwLock;
use super::cursor::Cursor;
use crate::codecs::{Compress, Encode};
use crate::error::DatabaseError;
use crate::tables::{Table, Tables, NUM_TABLES};
use crate::tables::{Schema, SchemaV1, Table, NUM_TABLES};
use crate::utils::decode_one;

/// Alias for read-only transaction.
pub type TxRO = Tx<libmdbx::RO>;
/// Alias for read-write transaction.
pub type TxRW = Tx<libmdbx::RW>;
/// Alias for read-only transaction on the default schema.
pub type TxRO = Tx<libmdbx::RO, SchemaV1>;
/// Alias for read-write transaction on the default schema.
pub type TxRW = Tx<libmdbx::RW, SchemaV1>;

/// Database transaction.
///
/// Wrapper for a `libmdbx` transaction.
#[derive(Debug)]
pub struct Tx<K: TransactionKind> {
pub struct Tx<K: TransactionKind, S: Schema> {
/// Libmdbx-sys transaction.
inner: libmdbx::Transaction<K>,
/// Marker for the db schema.
_schema: std::marker::PhantomData<S>,
// the array size is hardcoded to the number of tables in current db version for now. ideally
// we could use the associated constant from the schema trait. but that would require the
// `generic_const_exprs`.
/// Database table handle cache.
db_handles: RwLock<[Option<DBI>; NUM_TABLES]>,
}

impl<K: TransactionKind> Tx<K> {
impl<K, S> Tx<K, S>
where
K: TransactionKind,
S: Schema,
{
/// Creates new `Tx` object with a `RO` or `RW` transaction.
pub fn new(inner: libmdbx::Transaction<K>) -> Self {
Self { inner, db_handles: Default::default() }
Self { inner, _schema: PhantomData, db_handles: Default::default() }
}

/// Creates a cursor to iterate over a table items.
Expand All @@ -44,13 +53,18 @@ impl<K: TransactionKind> Tx<K> {

/// Gets a table database handle if it exists, otherwise creates it.
pub fn get_dbi<T: Table>(&self) -> Result<DBI, DatabaseError> {
// SAFETY:
// the index is guaranteed to be in bounds by the schema only on current schema
// version because we hardcode the size exactly for the number of tables in current db
// schema. see `tables::v1::NUM_TABLES`.
let table = S::index::<T>().expect(&format!("table {} not found in schema", T::NAME));

let mut handles = self.db_handles.write();
let table = Tables::from_str(T::NAME).expect("requested table should be part of `Tables`.");
let dbi_handle = handles.get_mut(table).expect("should exist");

let dbi_handle = handles.get_mut(table as usize).expect("should exist");
if dbi_handle.is_none() {
*dbi_handle =
Some(self.inner.open_db(Some(T::NAME)).map_err(DatabaseError::OpenDb)?.dbi());
let dbi = self.inner.open_db(Some(T::NAME)).map_err(DatabaseError::OpenDb)?.dbi();
*dbi_handle = Some(dbi);
}

Ok(dbi_handle.expect("is some; qed"))
Expand All @@ -66,6 +80,22 @@ impl<K: TransactionKind> Tx<K> {
.transpose()
}

/// Gets a value from a table using the given key without checking if the table exist in the
/// schema.
pub fn get_unchecked<T: Table>(
&self,
key: T::Key,
) -> Result<Option<<T as Table>::Value>, DatabaseError> {
let dbi = self.inner.open_db(Some(T::NAME)).map_err(DatabaseError::OpenDb)?.dbi();
let key = Encode::encode(key);

self.inner
.get(dbi, key.as_ref())
.map_err(DatabaseError::Read)?
.map(decode_one::<T>)
.transpose()
}

/// Returns number of entries in the table using cheap DB stats invocation.
pub fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
self.inner
Expand All @@ -80,7 +110,7 @@ impl<K: TransactionKind> Tx<K> {
}
}

impl Tx<RW> {
impl<S: Schema> Tx<RW, S> {
/// Inserts an item into a database.
///
/// This function stores key/data pairs in the database. The default behavior is to enter the
Expand Down
Loading
Loading