Skip to content

Commit

Permalink
migration first working version
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy committed Apr 16, 2024
1 parent cd077d9 commit 0f52ea0
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 30 deletions.
42 changes: 31 additions & 11 deletions crates/katana/storage/db/src/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,22 @@ impl<S: Schema> DbEnv<S> {

/// Creates all the defined tables in [`Tables`], if necessary.
pub fn create_tables(&self) -> Result<(), DatabaseError> {
let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?;
// let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?;

for table in dbg!(S::all()) {
let flags = match table.table_type() {
TableType::Table => DatabaseFlags::default(),
TableType::DupSort => DatabaseFlags::DUP_SORT,
};
// for table in dbg!(S::all()) {
// let flags = match table.table_type() {
// TableType::Table => DatabaseFlags::default(),
// TableType::DupSort => DatabaseFlags::DUP_SORT,
// };

tx.create_db(Some(table.name()), flags).map_err(DatabaseError::CreateTable)?;
}
// tx.create_db(Some(table.name()), flags).map_err(DatabaseError::CreateTable)?;
// }

tx.commit().map_err(DatabaseError::Commit)?;
// tx.commit().map_err(DatabaseError::Commit)?;

Ok(())
// Ok(())

self.create_tables_from_schema::<S>()
}

/// Begin a read-only transaction.
Expand All @@ -109,7 +111,7 @@ impl<S: Schema> DbEnv<S> {
/// committed in the end of the execution.
pub fn view<T, F>(&self, f: F) -> Result<T, DatabaseError>
where
F: FnOnce(&Tx<RO>) -> T,
F: FnOnce(&Tx<RO, S>) -> T,
{
let tx = self.tx()?;
let res = f(&tx);
Expand All @@ -128,6 +130,24 @@ impl<S: Schema> DbEnv<S> {
tx.commit()?;
Ok(res)
}

/// Creates all the defined tables in [`Tables`], if necessary.
pub(crate) fn create_tables_from_schema<R: Schema>(&self) -> Result<(), DatabaseError> {
let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?;

for table in R::all() {
let flags = match table.table_type() {
TableType::Table => DatabaseFlags::default(),
TableType::DupSort => DatabaseFlags::DUP_SORT,
};

tx.create_db(Some(table.name()), flags).map_err(DatabaseError::CreateTable)?;
}

tx.commit().map_err(DatabaseError::Commit)?;

Ok(())
}
}

#[cfg(any(test, feature = "test-utils"))]
Expand Down
24 changes: 23 additions & 1 deletion crates/katana/storage/db/src/mdbx/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::marker::PhantomData;

use libmdbx::ffi::DBI;
use libmdbx::{TransactionKind, WriteFlags, RW};
use libmdbx::{DatabaseFlags, TransactionKind, WriteFlags, RW};
use parking_lot::RwLock;

use super::cursor::Cursor;
Expand Down Expand Up @@ -51,6 +51,12 @@ where
.map_err(DatabaseError::CreateCursor)
}

/// Creates a cursor to iterate over a table items.
pub fn cursor_unchecked<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
let dbi = self.inner.open_db(Some(T::NAME)).map_err(DatabaseError::OpenDb)?.dbi();
self.inner.cursor_with_dbi(dbi).map(Cursor::new).map_err(DatabaseError::CreateCursor)
}

/// Gets a table database handle if it exists, otherwise creates it.
pub fn get_dbi<T: Table>(&self) -> Result<DBI, DatabaseError> {
// SAFETY:
Expand Down Expand Up @@ -111,6 +117,10 @@ where
}

impl<S: Schema> Tx<RW, S> {
pub fn create_table<T: Table>(&self, flags: DatabaseFlags) -> Result<DBI, DatabaseError> {
Ok(self.inner.create_db(Some(T::NAME), flags).unwrap().dbi())
}

/// Inserts an item into a database.
///
/// This function stores key/data pairs in the database. The default behavior is to enter the
Expand All @@ -123,6 +133,18 @@ impl<S: Schema> Tx<RW, S> {
Ok(())
}

pub fn put_unchecked<T: Table>(
&self,
key: T::Key,
value: T::Value,
) -> Result<(), DatabaseError> {
let key = key.encode();
let value = value.compress();
let dbi = self.inner.open_db(Some(T::NAME)).map_err(DatabaseError::OpenDb)?.dbi();
self.inner.put(dbi, key, value, WriteFlags::UPSERT).unwrap();
Ok(())
}

/// Delete items from a database, removing the key/data pair if it exists.
///
/// If the data parameter is [Some] only the matching data item will be deleted. Otherwise, if
Expand Down
71 changes: 53 additions & 18 deletions crates/katana/storage/db/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
use std::collections::HashMap;
use std::fs;
use std::hash::Hash;
use std::path::Path;

use anyhow::Context;
use katana_primitives::contract::ContractAddress;
use libmdbx::DatabaseFlags;

use crate::error::DatabaseError;
use crate::mdbx::DbEnv;
use crate::models::list::BlockList;
use crate::models::storage::ContractStorageKey;
use crate::version::{create_db_version_file, get_db_version, DatabaseVersionError};
use crate::{open_db, tables, CURRENT_DB_VERSION};
use crate::tables::v0::{SchemaV0, StorageEntryChangeList};
use crate::version::{
create_db_version_file, get_db_version, remove_db_version_file, DatabaseVersionError,
};
use crate::{open_db, open_db_with_schema, tables, CURRENT_DB_VERSION};

#[derive(Debug, thiserror::Error)]
pub enum DatabaseMigrationError {
Expand All @@ -34,13 +42,15 @@ pub fn migrate_db<P: AsRef<Path>>(path: P) -> Result<(), DatabaseMigrationError>
let ver = get_db_version(&path)?;

match ver {
0 => migrate_from_v0_to_v1(open_db(&path)?)?,
0 => migrate_from_v0_to_v1(open_db_with_schema(&path)?)?,
_ => {
return Err(DatabaseMigrationError::UnsupportedVersion(ver));
}
}

// Update the db version to the current version

remove_db_version_file(&path)?;
create_db_version_file(path, CURRENT_DB_VERSION).context("Updating database version file")?;
Ok(())
}
Expand All @@ -63,47 +73,70 @@ pub fn migrate_db<P: AsRef<Path>>(path: P) -> Result<(), DatabaseMigrationError>
/// - Changed key type to [ContractStorageKey](crate::models::storage::ContractStorageKey).
/// - Changed value type to [BlockList](crate::models::list::BlockList).
///
fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> {
env.create_tables()?;
fn migrate_from_v0_to_v1(env: DbEnv<tables::v0::SchemaV0>) -> Result<(), DatabaseMigrationError> {
// env.create_tables_from_schema::<tables::SchemaV1>()?;

env.update(|tx| {
{
// store in a static file first before putting it back in the new table
// let file = tempfile().expect("able to create temp file");
// let file = LineWriter::new(file);

let mut cursor = tx.cursor::<tables::v0::StorageChangeSet>()?;
cursor.walk(None)?.try_for_each(|entry| {
let (old_key, old_val) = entry?;
let key = ContractStorageKey { contract_address: old_key, key: old_val.key };
tx.put::<tables::StorageChangeSet>(key, BlockList::from_iter(old_val.block_list))?;
let mut old_entries: HashMap<ContractAddress, Vec<StorageEntryChangeList>> =
HashMap::new();

cursor.walk(None)?.enumerate().try_for_each(|(i, entry)| {
let (key, val) = entry?;
dbg!(i, &key, &val);
old_entries.entry(key).or_default().push(val);
Result::<(), DatabaseError>::Ok(())
})?;

drop(cursor);
unsafe {
tx.drop_table::<tables::v0::StorageChangeSet>()?;
}
tx.create_table::<tables::StorageChangeSet>(DatabaseFlags::default())?;

for (key, vals) in old_entries {
for val in vals {
let key = ContractStorageKey { contract_address: key, key: val.key };
let val = BlockList::from_iter(val.block_list);
tx.put::<tables::StorageChangeSet>(key, val)?;
}
}

// move data from `NonceChanges` to `NonceChangeHistory`
tx.create_table::<tables::NonceChangeHistory>(DatabaseFlags::DUP_SORT)?;
let mut cursor = tx.cursor::<tables::v0::NonceChanges>()?;
cursor.walk(None)?.try_for_each(|entry| {
let (key, val) = entry?;
tx.put::<tables::NonceChangeHistory>(key, val)?;
tx.put_unchecked::<tables::NonceChangeHistory>(key, val)?;
Result::<(), DatabaseError>::Ok(())
})?;

tx.create_table::<tables::StorageChangeHistory>(DatabaseFlags::DUP_SORT)?;
// move data from `StorageChanges` to `StorageChangeHistory`
let mut cursor = tx.cursor::<tables::v0::StorageChanges>()?;
cursor.walk(None)?.try_for_each(|entry| {
let (key, val) = entry?;
tx.put::<tables::StorageChangeHistory>(key, val)?;
tx.put_unchecked::<tables::StorageChangeHistory>(key, val)?;
Result::<(), DatabaseError>::Ok(())
})?;

tx.create_table::<tables::ClassChangeHistory>(DatabaseFlags::DUP_SORT)?;
// move data from `ContractClassChanges` to `ClassChangeHistory`
let mut cursor = tx.cursor::<tables::v0::ContractClassChanges>()?;
cursor.walk(None)?.try_for_each(|entry| {
let (key, val) = entry?;
tx.put::<tables::ClassChangeHistory>(key, val)?;
tx.put_unchecked::<tables::ClassChangeHistory>(key, val)?;
Result::<(), DatabaseError>::Ok(())
})?;
}

// drop the old tables
unsafe {
tx.drop_table::<tables::v0::StorageChangeSet>()?;
tx.drop_table::<tables::v0::NonceChanges>()?;
tx.drop_table::<tables::v0::StorageChanges>()?;
tx.drop_table::<tables::v0::ContractClassChanges>()?;
Expand All @@ -126,7 +159,7 @@ mod tests {
use crate::models::list::BlockList;
use crate::models::storage::{ContractStorageEntry, ContractStorageKey};
use crate::tables::v0::{self, StorageEntryChangeList};
use crate::{init_db, tables};
use crate::{init_db, open_db, tables};

const ERROR_CREATE_TEMP_DIR: &str = "Failed to create temp dir.";
const ERROR_MIGRATE_DB: &str = "Failed to migrate db.";
Expand All @@ -139,9 +172,9 @@ mod tests {
}

// TODO(kariy): create Arbitrary for database key/value types to easily create random test vectors
fn create_v0_test_db() -> (DbEnv<v0::Tables>, PathBuf) {
fn create_v0_test_db() -> (DbEnv<v0::SchemaV0>, PathBuf) {
let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path();
let db = crate::init_db_with_schema::<v0::Tables>(&path).expect(ERROR_INIT_DB);
let db = crate::init_db_with_schema::<v0::SchemaV0>(&path).expect(ERROR_INIT_DB);

db.update(|tx| {
tx.put::<v0::NonceChanges>(
Expand Down Expand Up @@ -233,8 +266,10 @@ mod tests {

#[test]
fn migrate_from_v0() {
let (env, path) = create_v0_test_db();
let _ = migrate_db(path).expect(ERROR_MIGRATE_DB);
// we cant have multiple instances of the db open in the same process, so we drop here first before migrating
let (_, path) = create_v0_test_db();
let _ = migrate_db(&path).expect(ERROR_MIGRATE_DB);
let env = open_db(path).unwrap();

env.view(|tx| {
let mut cursor = tx.cursor::<tables::NonceChangeHistory>().unwrap();
Expand Down
6 changes: 6 additions & 0 deletions crates/katana/storage/db/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ pub(super) fn default_version_file_path(path: &Path) -> PathBuf {
path.join(DB_VERSION_FILE_NAME)
}

pub(super) fn remove_db_version_file(path: impl AsRef<Path>) -> Result<(), DatabaseVersionError> {
let path = path.as_ref();
let path = if path.is_dir() { default_version_file_path(path) } else { path.to_path_buf() };
fs::remove_file(path).map_err(DatabaseVersionError::Io)
}

#[cfg(test)]
mod tests {

Expand Down

0 comments on commit 0f52ea0

Please sign in to comment.