From f9dcc8268cd80457ff0605f4c00fecc2b1a5d94c Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 15 Apr 2024 20:38:07 +0800 Subject: [PATCH 01/26] init db migration --- crates/katana/storage/db/src/lib.rs | 1 + crates/katana/storage/db/src/migration.rs | 51 +++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 crates/katana/storage/db/src/migration.rs diff --git a/crates/katana/storage/db/src/lib.rs b/crates/katana/storage/db/src/lib.rs index 8fbfb45882..907abf5345 100644 --- a/crates/katana/storage/db/src/lib.rs +++ b/crates/katana/storage/db/src/lib.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, Context}; pub mod codecs; pub mod error; pub mod mdbx; +pub mod migration; pub mod models; pub mod tables; pub mod utils; diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs new file mode 100644 index 0000000000..ecd187c72a --- /dev/null +++ b/crates/katana/storage/db/src/migration.rs @@ -0,0 +1,51 @@ +use std::path::Path; + +use crate::{ + error::DatabaseError, + open_db, tables, + version::{get_db_version, DatabaseVersionError}, + CURRENT_DB_VERSION, +}; + +#[derive(Debug, thiserror::Error)] +pub enum DatabaseMigrationError { + #[error("Unsupported database version for migration: {0}")] + UnsupportedVersion(u32), + + #[error(transparent)] + DatabaseVersion(#[from] DatabaseVersionError), + + #[error(transparent)] + Database(#[from] DatabaseError), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +/// Performs a database migration for an already initialized database with an older +/// version of the database schema. +/// +/// DB migration can only be done on a supported older version of the database schema, +/// meaning not all older versions can be migrated. +pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> { + // check that the db version is supported, otherwise return an error + let ver = get_db_version(&path)?; + + if ver != 0 { + return Err(DatabaseMigrationError::UnsupportedVersion(ver)); + } + + // perform the migration + // 1. create all the tables exist in the current schema + // 2. migrate all the data from the old schema to the new schema + // 3. update the db version to the current version + + let env = open_db(path)?; + env.create_tables()?; + + env.update(|tx| { + let mut cursor = tx.cursor::()?; + })?; + + Ok(()) +} From bc93b342550a1dc1df3b65dadfc5d0647bc9d907 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 15 Apr 2024 22:16:05 +0800 Subject: [PATCH 02/26] add more comment --- crates/katana/storage/db/src/tables/v0.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/katana/storage/db/src/tables/v0.rs b/crates/katana/storage/db/src/tables/v0.rs index ea30d2924d..5c19e35bd3 100644 --- a/crates/katana/storage/db/src/tables/v0.rs +++ b/crates/katana/storage/db/src/tables/v0.rs @@ -38,6 +38,11 @@ define_schema_enum! { (StorageChangeSet, TableType::DupSort) ]} +// TODO(kariy): maybe add database changelog ? +// +// Refer to: +// - https://github.com/dojoengine/dojo/pull/1773 +// - https://github.com/dojoengine/dojo/pull/1774 tables! { /// Contract nonce changes by block. NonceChanges: (BlockNumber, ContractAddress) => ContractNonceChange, From b5479e9b250aa049acb8c273ba68c6699bdd6e02 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 15 Apr 2024 23:13:39 +0800 Subject: [PATCH 03/26] migrate block list --- crates/katana/storage/db/src/migration.rs | 48 ++++++++++++++++----- crates/katana/storage/db/src/models/list.rs | 6 +++ 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index ecd187c72a..3dbb30513d 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -1,9 +1,13 @@ use std::path::Path; +use anyhow::Context; + use crate::{ error::DatabaseError, + mdbx::DbEnv, + models::{list::BlockList, storage::ContractStorageKey}, open_db, tables, - version::{get_db_version, DatabaseVersionError}, + version::{create_db_version_file, get_db_version, DatabaseVersionError}, CURRENT_DB_VERSION, }; @@ -31,21 +35,43 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> // check that the db version is supported, otherwise return an error let ver = get_db_version(&path)?; - if ver != 0 { + if ver == 0 { + // perform the migration + // 1. create all the tables exist in the current schema + // 2. migrate all the data from the old schema to the new schema + let db = open_db(&path)?; + migrate_from_v0_to_v1(db)?; + } else { return Err(DatabaseMigrationError::UnsupportedVersion(ver)); } - // perform the migration - // 1. create all the tables exist in the current schema - // 2. migrate all the data from the old schema to the new schema - // 3. update the db version to the current version + // Update the db version to the current version + create_db_version_file(path, CURRENT_DB_VERSION).context("Updating database version file")?; + Ok(()) +} - let env = open_db(path)?; +fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { env.create_tables()?; - env.update(|tx| { - let mut cursor = tx.cursor::()?; - })?; + // migrate the block list + let mut cursor = tx.cursor::()?; + let walker = cursor.walk(None)?; + for old_entry in walker { + let (old_key, old_val) = old_entry?; - Ok(()) + let key = ContractStorageKey { contract_address: old_key, key: old_val.key }; + let value = BlockList::from_iter(old_val.block_list); + + tx.put::(key, value)?; + } + + drop(cursor); + + // drop the old table + unsafe { + tx.drop_table::()?; + } + + Ok(()) + })? } diff --git a/crates/katana/storage/db/src/models/list.rs b/crates/katana/storage/db/src/models/list.rs index 4623341732..a456c20461 100644 --- a/crates/katana/storage/db/src/models/list.rs +++ b/crates/katana/storage/db/src/models/list.rs @@ -40,6 +40,12 @@ impl IntegerSet { } } +impl FromIterator for IntegerSet { + fn from_iter>(iter: T) -> Self { + Self(RoaringTreemap::from_iter(iter)) + } +} + impl From<[u64; N]> for IntegerSet { fn from(arr: [u64; N]) -> Self { Self(RoaringTreemap::from_iter(arr)) From 6dea0a29bfa41b99136e9830b545cf29bb47b9d0 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 00:29:12 +0800 Subject: [PATCH 04/26] add test --- crates/katana/storage/db/src/migration.rs | 92 +++++++++++++++++------ 1 file changed, 70 insertions(+), 22 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 3dbb30513d..d923807259 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -1,6 +1,6 @@ use std::path::Path; -use anyhow::Context; +use anyhow::{anyhow, Context}; use crate::{ error::DatabaseError, @@ -29,20 +29,17 @@ pub enum DatabaseMigrationError { /// Performs a database migration for an already initialized database with an older /// version of the database schema. /// -/// DB migration can only be done on a supported older version of the database schema, -/// meaning not all older versions can be migrated. +/// Database migration can only be done on a supported older version of the database schema, +/// meaning not all older versions can be migrated from. pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> { // check that the db version is supported, otherwise return an error let ver = get_db_version(&path)?; - if ver == 0 { - // perform the migration - // 1. create all the tables exist in the current schema - // 2. migrate all the data from the old schema to the new schema - let db = open_db(&path)?; - migrate_from_v0_to_v1(db)?; - } else { - return Err(DatabaseMigrationError::UnsupportedVersion(ver)); + match ver { + 0 => migrate_from_v0_to_v1(open_db(&path)?)?, + _ => { + return Err(DatabaseMigrationError::UnsupportedVersion(ver)); + } } // Update the db version to the current version @@ -50,28 +47,79 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> Ok(()) } +/// Perform migration for database version 0 to version 1. fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { env.create_tables()?; + env.update(|tx| { - // migrate the block list - let mut cursor = tx.cursor::()?; - let walker = cursor.walk(None)?; - for old_entry in walker { - let (old_key, old_val) = old_entry?; + { + let mut cursor = tx.cursor::()?; + cursor.walk(None)?.try_for_each(|entry| { + let (old_key, old_val) = entry?; + let key = ContractStorageKey { contract_address: old_key, key: old_val.key }; + tx.put::(key, BlockList::from_iter(old_val.block_list))?; + Result::<(), DatabaseError>::Ok(()) + })?; - let key = ContractStorageKey { contract_address: old_key, key: old_val.key }; - let value = BlockList::from_iter(old_val.block_list); + // move data from `NonceChanges` to `NonceChangeHistory` + let mut cursor = tx.cursor::()?; + cursor.walk(None)?.try_for_each(|entry| { + let (key, val) = entry?; + tx.put::(key, val)?; + Result::<(), DatabaseError>::Ok(()) + })?; - tx.put::(key, value)?; - } + // move data from `StorageChanges` to `StorageChangeHistory` + let mut cursor = tx.cursor::()?; + cursor.walk(None)?.try_for_each(|entry| { + let (key, val) = entry?; + tx.put::(key, val)?; + Result::<(), DatabaseError>::Ok(()) + })?; - drop(cursor); + // move data from `ContractClassChanges` to `ClassChangeHistory` + let mut cursor = tx.cursor::()?; + cursor.walk(None)?.try_for_each(|entry| { + let (key, val) = entry?; + tx.put::(key, val)?; + Result::<(), DatabaseError>::Ok(()) + })?; + } - // drop the old table + // drop the old tables unsafe { tx.drop_table::()?; + tx.drop_table::()?; + tx.drop_table::()?; + tx.drop_table::()?; } Ok(()) })? } + +#[cfg(test)] +mod tests { + + use crate::{init_db, mdbx::DbEnv}; + use std::path::PathBuf; + + use super::migrate_db; + + fn create_test_db() -> (DbEnv, PathBuf) { + let path = tempfile::TempDir::new().expect("Failed to create temp dir.").into_path(); + let db = init_db(&path).expect("Failed to initialize db"); + (db, path) + } + + #[test] + fn migrate_from_current_version() { + let (_, path) = create_test_db(); + let err = migrate_db(path).unwrap_err(); + assert_eq!( + err.to_string(), + "Unsupported database version for migration: 1", + "Can't migrate from the current version" + ); + } +} From 216b004e23a72d2eb08d57c814af6a3b6b138447 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 00:55:22 +0800 Subject: [PATCH 05/26] wip --- crates/katana/storage/db/src/migration.rs | 31 +++++++++++++++++++---- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index d923807259..8fe2320fa0 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -101,25 +101,46 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { #[cfg(test)] mod tests { - use crate::{init_db, mdbx::DbEnv}; + use crate::{init_db, mdbx::DbEnv, open_db, version::create_db_version_file}; use std::path::PathBuf; use super::migrate_db; + const ERROR_CREATE_TEMP_DIR: &str = "Failed to create temp dir."; + const ERROR_MIGRATE_DB: &str = "Failed to migrate db."; + const ERROR_INIT_DB: &str = "Failed to initialize db."; + const ERROR_CREATE_TABLES: &str = "Failed to create tables."; + const ERROR_CREATE_VER_FILE: &str = "Failed to create version file."; + fn create_test_db() -> (DbEnv, PathBuf) { - let path = tempfile::TempDir::new().expect("Failed to create temp dir.").into_path(); - let db = init_db(&path).expect("Failed to initialize db"); + let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path(); + let db = init_db(&path).expect(ERROR_INIT_DB); + (db, path) + } + + fn create_v0_test_db() -> (DbEnv, PathBuf) { + let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path(); + + let db = open_db(&path).expect(ERROR_INIT_DB); + let _ = db.create_v0_tables().expect(ERROR_CREATE_TABLES); + let _ = create_db_version_file(&path, 0).expect(ERROR_CREATE_VER_FILE); + (db, path) } #[test] fn migrate_from_current_version() { let (_, path) = create_test_db(); - let err = migrate_db(path).unwrap_err(); assert_eq!( - err.to_string(), + migrate_db(path).unwrap_err().to_string(), "Unsupported database version for migration: 1", "Can't migrate from the current version" ); } + + #[test] + fn migrate_from_v0() { + let (env, path) = create_v0_test_db(); + let _ = migrate_db(path).expect(ERROR_MIGRATE_DB); + } } From 5210fb98adebfba341202dc41e87174ea1776684 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 01:13:01 +0800 Subject: [PATCH 06/26] fmt --- crates/katana/storage/db/src/migration.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 8fe2320fa0..7b6e153e55 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -2,14 +2,12 @@ use std::path::Path; use anyhow::{anyhow, Context}; -use crate::{ - error::DatabaseError, - mdbx::DbEnv, - models::{list::BlockList, storage::ContractStorageKey}, - open_db, tables, - version::{create_db_version_file, get_db_version, DatabaseVersionError}, - CURRENT_DB_VERSION, -}; +use crate::error::DatabaseError; +use crate::mdbx::DbEnv; +use crate::models::list::BlockList; +use crate::models::storage::ContractStorageKey; +use crate::version::{create_db_version_file, get_db_version, DatabaseVersionError}; +use crate::{open_db, tables, CURRENT_DB_VERSION}; #[derive(Debug, thiserror::Error)] pub enum DatabaseMigrationError { @@ -101,10 +99,12 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { #[cfg(test)] mod tests { - use crate::{init_db, mdbx::DbEnv, open_db, version::create_db_version_file}; use std::path::PathBuf; use super::migrate_db; + use crate::mdbx::DbEnv; + use crate::version::create_db_version_file; + use crate::{init_db, open_db}; const ERROR_CREATE_TEMP_DIR: &str = "Failed to create temp dir."; const ERROR_MIGRATE_DB: &str = "Failed to migrate db."; From a83ad2dfbe1dcc9b9720c4da39f9f6274c85b5c8 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 16:06:12 +0800 Subject: [PATCH 07/26] wip --- crates/katana/storage/db/src/migration.rs | 68 ++++++++++++++++++++++- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 7b6e153e55..13151f97fa 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -1,6 +1,6 @@ use std::path::Path; -use anyhow::{anyhow, Context}; +use anyhow::Context; use crate::error::DatabaseError; use crate::mdbx::DbEnv; @@ -30,7 +30,7 @@ pub enum DatabaseMigrationError { /// Database migration can only be done on a supported older version of the database schema, /// meaning not all older versions can be migrated from. pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> { - // check that the db version is supported, otherwise return an error + // check that the db version is supported let ver = get_db_version(&path)?; match ver { @@ -46,6 +46,23 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> } /// Perform migration for database version 0 to version 1. +/// +/// # Changelog from v0 to v1 +/// +/// 1. [ContractClassChanges](tables::v0::ContractClassChanges) +/// - Renamed to [ClassChangeHistory](tables::ClassCh +/// +/// 2. [StorageChanges](tables::v0::StorageChanges) +/// - Renamed to [StorageChangeHistory](tables::StorageChangeHistory) +/// +/// 3. [NonceChanges](tables::v0::NonceChanges) +/// - Renamed to [NonceChangeHistory](tables::NonceChangeHistory) +/// +/// 4. [StorageChangeSet](tables::v0::StorageChangeSet) +/// - Changed table type from dupsort to normal table. +/// - Changed key type to [ContractStorageKey](crate::models::storage::ContractStorageKey). +/// - Changed value type to [BlockList](crate::models::list::BlockList). +/// fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { env.create_tables()?; @@ -101,8 +118,13 @@ mod tests { use std::path::PathBuf; + use starknet::macros::felt; + use super::migrate_db; + use super::tables::v0; use crate::mdbx::DbEnv; + use crate::models::contract::ContractNonceChange; + use crate::tables::v0::StorageEntryChangeList; use crate::version::create_db_version_file; use crate::{init_db, open_db}; @@ -118,6 +140,7 @@ mod tests { (db, path) } + // TODO(kariy): create Arbitrary for database key/value types to easily create random test vectors fn create_v0_test_db() -> (DbEnv, PathBuf) { let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path(); @@ -125,6 +148,47 @@ mod tests { let _ = db.create_v0_tables().expect(ERROR_CREATE_TABLES); let _ = create_db_version_file(&path, 0).expect(ERROR_CREATE_VER_FILE); + db.update(|tx| { + tx.put::( + felt!("0x1").into(), + StorageEntryChangeList { key: felt!("0x1"), block_list: vec![1, 2] }, + ) + .unwrap(); + tx.put::( + felt!("0x1").into(), + StorageEntryChangeList { key: felt!("0x2"), block_list: vec![1, 3] }, + ) + .unwrap(); + tx.put::( + felt!("0x2").into(), + StorageEntryChangeList { key: felt!("0x3"), block_list: vec![4, 5] }, + ) + .unwrap(); + + tx.put::( + 1, + ContractNonceChange { contract_address: felt!("0x1").into(), nonce: felt!("0x2") }, + ) + .unwrap(); + tx.put::( + 1, + ContractNonceChange { contract_address: felt!("0x2").into(), nonce: felt!("0x2") }, + ) + .unwrap(); + tx.put::( + 3, + ContractNonceChange { contract_address: felt!("0x3").into(), nonce: felt!("0x2") }, + ) + .unwrap(); + + tx.put::( + 1, + ContractNonceChange { contract_address: felt!("0x1").into(), nonce: felt!("0x2") }, + ) + .unwrap(); + }) + .expect(ERROR_INIT_DB); + (db, path) } From 5d54f90542918e0260b05fa50478527a466ae153 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 16:29:15 +0800 Subject: [PATCH 08/26] wip --- crates/katana/storage/db/src/migration.rs | 69 +++++++++++++++++------ 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 13151f97fa..6396044cfc 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -123,8 +123,9 @@ mod tests { use super::migrate_db; use super::tables::v0; use crate::mdbx::DbEnv; - use crate::models::contract::ContractNonceChange; - use crate::tables::v0::StorageEntryChangeList; + use crate::models::contract::{ContractClassChange, ContractNonceChange}; + use crate::models::storage::{ContractStorageEntry, ContractStorageKey}; + use crate::tables::v0::{ContractClassChanges, StorageEntryChangeList}; use crate::version::create_db_version_file; use crate::{init_db, open_db}; @@ -149,6 +150,39 @@ mod tests { let _ = create_db_version_file(&path, 0).expect(ERROR_CREATE_VER_FILE); db.update(|tx| { + tx.put::( + 1, + ContractNonceChange { contract_address: felt!("0x1").into(), nonce: felt!("0x2") }, + ) + .unwrap(); + tx.put::( + 1, + ContractNonceChange { contract_address: felt!("0x2").into(), nonce: felt!("0x2") }, + ) + .unwrap(); + tx.put::( + 3, + ContractNonceChange { contract_address: felt!("0x3").into(), nonce: felt!("0x2") }, + ) + .unwrap(); + + tx.put::( + 1, + ContractClassChange { + contract_address: felt!("0x1").into(), + class_hash: felt!("0x1"), + }, + ) + .unwrap(); + tx.put::( + 1, + ContractClassChange { + contract_address: felt!("0x2").into(), + class_hash: felt!("0x1"), + }, + ) + .unwrap(); + tx.put::( felt!("0x1").into(), StorageEntryChangeList { key: felt!("0x1"), block_list: vec![1, 2] }, @@ -165,25 +199,26 @@ mod tests { ) .unwrap(); - tx.put::( + tx.put::( 1, - ContractNonceChange { contract_address: felt!("0x1").into(), nonce: felt!("0x2") }, + ContractStorageEntry { + key: ContractStorageKey { + contract_address: felt!("0x1").into(), + key: felt!("0x1"), + }, + value: felt!("0x1"), + }, ) .unwrap(); - tx.put::( - 1, - ContractNonceChange { contract_address: felt!("0x2").into(), nonce: felt!("0x2") }, - ) - .unwrap(); - tx.put::( + tx.put::( 3, - ContractNonceChange { contract_address: felt!("0x3").into(), nonce: felt!("0x2") }, - ) - .unwrap(); - - tx.put::( - 1, - ContractNonceChange { contract_address: felt!("0x1").into(), nonce: felt!("0x2") }, + ContractStorageEntry { + key: ContractStorageKey { + contract_address: felt!("0x1").into(), + key: felt!("0x2"), + }, + value: felt!("0x2"), + }, ) .unwrap(); }) From 612f6fc2159d662b7b0436e96c894a1091c97b92 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 17:16:08 +0800 Subject: [PATCH 09/26] update test --- crates/katana/storage/db/src/migration.rs | 41 ++++++++++++++++++- .../katana/storage/db/src/models/contract.rs | 12 ++++++ .../katana/storage/db/src/models/storage.rs | 6 +++ 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 6396044cfc..1b6352c337 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -124,10 +124,11 @@ mod tests { use super::tables::v0; use crate::mdbx::DbEnv; use crate::models::contract::{ContractClassChange, ContractNonceChange}; + use crate::models::list::BlockList; use crate::models::storage::{ContractStorageEntry, ContractStorageKey}; use crate::tables::v0::{ContractClassChanges, StorageEntryChangeList}; use crate::version::create_db_version_file; - use crate::{init_db, open_db}; + use crate::{init_db, open_db, tables}; const ERROR_CREATE_TEMP_DIR: &str = "Failed to create temp dir."; const ERROR_MIGRATE_DB: &str = "Failed to migrate db."; @@ -241,5 +242,43 @@ mod tests { fn migrate_from_v0() { let (env, path) = create_v0_test_db(); let _ = migrate_db(path).expect(ERROR_MIGRATE_DB); + + env.view(|tx| { + let mut cursor = tx.cursor::().unwrap(); + let val1 = cursor.seek_by_key_subkey(1, felt!("0x1").into()).unwrap(); + let val2 = cursor.seek_by_key_subkey(1, felt!("0x2").into()).unwrap(); + let val3 = cursor.seek_by_key_subkey(3, felt!("0x3").into()).unwrap(); + + let exp_val1 = ContractNonceChange::new(felt!("0x1").into(), felt!("0x2")); + let exp_val2 = ContractNonceChange::new(felt!("0x2").into(), felt!("0x2")); + let exp_val3 = ContractNonceChange::new(felt!("0x3").into(), felt!("0x2")); + assert_eq!(val1, Some(exp_val1)); + assert_eq!(val2, Some(exp_val2)); + assert_eq!(val3, Some(exp_val3)); + + let mut cursor = tx.cursor::().unwrap(); + let val1 = cursor.seek_by_key_subkey(1, felt!("0x1").into()).unwrap(); + let val2 = cursor.seek_by_key_subkey(1, felt!("0x2").into()).unwrap(); + + let exp_val1 = ContractClassChange::new(felt!("0x1").into(), felt!("0x1")); + let exp_val2 = ContractClassChange::new(felt!("0x2").into(), felt!("0x1")); + assert_eq!(val1, Some(exp_val1)); + assert_eq!(val2, Some(exp_val2)); + + let key1 = ContractStorageKey::new(felt!("0x1").into(), felt!("0x1")); + let key2 = ContractStorageKey::new(felt!("0x1").into(), felt!("0x2")); + let key3 = ContractStorageKey::new(felt!("0x2").into(), felt!("0x3")); + let val1 = tx.get::(key1).unwrap(); + let val2 = tx.get::(key2).unwrap(); + let val3 = tx.get::(key3).unwrap(); + + let exp_val1 = BlockList::from([1, 2]); + let exp_val2 = BlockList::from([1, 3]); + let exp_val3 = BlockList::from([4, 5]); + assert_eq!(val1, Some(exp_val1)); + assert_eq!(val2, Some(exp_val2)); + assert_eq!(val3, Some(exp_val3)); + }) + .unwrap(); } } diff --git a/crates/katana/storage/db/src/models/contract.rs b/crates/katana/storage/db/src/models/contract.rs index 241e29dd59..070a3088cd 100644 --- a/crates/katana/storage/db/src/models/contract.rs +++ b/crates/katana/storage/db/src/models/contract.rs @@ -18,6 +18,12 @@ pub struct ContractClassChange { pub class_hash: ClassHash, } +impl ContractClassChange { + pub fn new(contract_address: ContractAddress, class_hash: ClassHash) -> Self { + Self { contract_address, class_hash } + } +} + impl Compress for ContractClassChange { type Compressed = Vec; fn compress(self) -> Self::Compressed { @@ -44,6 +50,12 @@ pub struct ContractNonceChange { pub nonce: Nonce, } +impl ContractNonceChange { + pub fn new(contract_address: ContractAddress, nonce: Nonce) -> Self { + Self { contract_address, nonce } + } +} + impl Compress for ContractNonceChange { type Compressed = Vec; fn compress(self) -> Self::Compressed { diff --git a/crates/katana/storage/db/src/models/storage.rs b/crates/katana/storage/db/src/models/storage.rs index 5412b22b1f..047491246b 100644 --- a/crates/katana/storage/db/src/models/storage.rs +++ b/crates/katana/storage/db/src/models/storage.rs @@ -40,6 +40,12 @@ pub struct ContractStorageKey { pub key: StorageKey, } +impl ContractStorageKey { + pub fn new(contract_address: ContractAddress, key: StorageKey) -> Self { + Self { contract_address, key } + } +} + impl Encode for ContractStorageKey { type Encoded = [u8; 64]; fn encode(self) -> Self::Encoded { From dca1973b5a789044ccf9b188bb91255a6f30213b Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 18:01:14 +0800 Subject: [PATCH 10/26] fmt --- crates/katana/storage/db/src/migration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 1b6352c337..aa49797c6c 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -126,7 +126,7 @@ mod tests { use crate::models::contract::{ContractClassChange, ContractNonceChange}; use crate::models::list::BlockList; use crate::models::storage::{ContractStorageEntry, ContractStorageKey}; - use crate::tables::v0::{ContractClassChanges, StorageEntryChangeList}; + use crate::tables::v0::StorageEntryChangeList; use crate::version::create_db_version_file; use crate::{init_db, open_db, tables}; From 284f7b1feeea1b9bfa4b9a0a1905e700e9bd017b Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 22:37:59 +0800 Subject: [PATCH 11/26] wip --- crates/katana/storage/db/src/migration.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index aa49797c6c..efe6c0311d 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -127,14 +127,11 @@ mod tests { use crate::models::list::BlockList; use crate::models::storage::{ContractStorageEntry, ContractStorageKey}; use crate::tables::v0::StorageEntryChangeList; - use crate::version::create_db_version_file; - use crate::{init_db, open_db, tables}; + use crate::{init_db, tables}; const ERROR_CREATE_TEMP_DIR: &str = "Failed to create temp dir."; const ERROR_MIGRATE_DB: &str = "Failed to migrate db."; const ERROR_INIT_DB: &str = "Failed to initialize db."; - const ERROR_CREATE_TABLES: &str = "Failed to create tables."; - const ERROR_CREATE_VER_FILE: &str = "Failed to create version file."; fn create_test_db() -> (DbEnv, PathBuf) { let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path(); @@ -145,10 +142,7 @@ mod tests { // TODO(kariy): create Arbitrary for database key/value types to easily create random test vectors fn create_v0_test_db() -> (DbEnv, PathBuf) { let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path(); - - let db = open_db(&path).expect(ERROR_INIT_DB); - let _ = db.create_v0_tables().expect(ERROR_CREATE_TABLES); - let _ = create_db_version_file(&path, 0).expect(ERROR_CREATE_VER_FILE); + let db = crate::init_db_with_schema::(&path).expect(ERROR_INIT_DB); db.update(|tx| { tx.put::( From fd9e7ebec498b933d634a60e10b52d8f26b1cf4a Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 16 Apr 2024 22:46:49 +0800 Subject: [PATCH 12/26] wip --- crates/katana/storage/db/src/mdbx/mod.rs | 2 +- crates/katana/storage/db/src/migration.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/katana/storage/db/src/mdbx/mod.rs b/crates/katana/storage/db/src/mdbx/mod.rs index 6d941b91c4..5ea92e919b 100644 --- a/crates/katana/storage/db/src/mdbx/mod.rs +++ b/crates/katana/storage/db/src/mdbx/mod.rs @@ -81,7 +81,7 @@ impl DbEnv { pub fn create_tables(&self) -> Result<(), DatabaseError> { let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?; - for table in S::all() { + for table in dbg!(S::all()) { let flags = match table.table_type() { TableType::Table => DatabaseFlags::default(), TableType::DupSort => DatabaseFlags::DUP_SORT, diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index efe6c0311d..e238753981 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -121,12 +121,11 @@ mod tests { use starknet::macros::felt; use super::migrate_db; - use super::tables::v0; use crate::mdbx::DbEnv; use crate::models::contract::{ContractClassChange, ContractNonceChange}; use crate::models::list::BlockList; use crate::models::storage::{ContractStorageEntry, ContractStorageKey}; - use crate::tables::v0::StorageEntryChangeList; + use crate::tables::v0::{self, StorageEntryChangeList}; use crate::{init_db, tables}; const ERROR_CREATE_TEMP_DIR: &str = "Failed to create temp dir."; @@ -140,9 +139,9 @@ mod tests { } // TODO(kariy): create Arbitrary for database key/value types to easily create random test vectors - fn create_v0_test_db() -> (DbEnv, PathBuf) { + fn create_v0_test_db() -> (DbEnv, PathBuf) { let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path(); - let db = crate::init_db_with_schema::(&path).expect(ERROR_INIT_DB); + let db = crate::init_db_with_schema::(&path).expect(ERROR_INIT_DB); db.update(|tx| { tx.put::( From d98afc2e6510f54e1e95fd116911381a5bb715eb Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 04:04:43 +0800 Subject: [PATCH 13/26] migration first working version --- crates/katana/storage/db/src/mdbx/mod.rs | 40 +++++++++---- crates/katana/storage/db/src/mdbx/tx.rs | 24 +++++++- crates/katana/storage/db/src/migration.rs | 71 +++++++++++++++++------ crates/katana/storage/db/src/version.rs | 6 ++ 4 files changed, 112 insertions(+), 29 deletions(-) diff --git a/crates/katana/storage/db/src/mdbx/mod.rs b/crates/katana/storage/db/src/mdbx/mod.rs index 5ea92e919b..b4a4b290bc 100644 --- a/crates/katana/storage/db/src/mdbx/mod.rs +++ b/crates/katana/storage/db/src/mdbx/mod.rs @@ -79,20 +79,22 @@ impl DbEnv { /// Creates all the defined tables in [`Tables`], if necessary. pub fn create_tables(&self) -> Result<(), DatabaseError> { - let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?; + // let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?; - for table in dbg!(S::all()) { - let flags = match table.table_type() { - TableType::Table => DatabaseFlags::default(), - TableType::DupSort => DatabaseFlags::DUP_SORT, - }; + // for table in dbg!(S::all()) { + // let flags = match table.table_type() { + // TableType::Table => DatabaseFlags::default(), + // TableType::DupSort => DatabaseFlags::DUP_SORT, + // }; - tx.create_db(Some(table.name()), flags).map_err(DatabaseError::CreateTable)?; - } + // tx.create_db(Some(table.name()), flags).map_err(DatabaseError::CreateTable)?; + // } - tx.commit().map_err(DatabaseError::Commit)?; + // tx.commit().map_err(DatabaseError::Commit)?; - Ok(()) + // Ok(()) + + self.create_tables_from_schema::() } /// Begin a read-only transaction. @@ -128,6 +130,24 @@ impl DbEnv { tx.commit()?; Ok(res) } + + /// Creates all the defined tables in [`Tables`], if necessary. + pub(crate) fn create_tables_from_schema(&self) -> Result<(), DatabaseError> { + let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?; + + for table in R::all() { + let flags = match table.table_type() { + TableType::Table => DatabaseFlags::default(), + TableType::DupSort => DatabaseFlags::DUP_SORT, + }; + + tx.create_db(Some(table.name()), flags).map_err(DatabaseError::CreateTable)?; + } + + tx.commit().map_err(DatabaseError::Commit)?; + + Ok(()) + } } #[cfg(any(test, feature = "test-utils"))] diff --git a/crates/katana/storage/db/src/mdbx/tx.rs b/crates/katana/storage/db/src/mdbx/tx.rs index 60570e612c..f11b670a5c 100644 --- a/crates/katana/storage/db/src/mdbx/tx.rs +++ b/crates/katana/storage/db/src/mdbx/tx.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use libmdbx::ffi::DBI; -use libmdbx::{TransactionKind, WriteFlags, RW}; +use libmdbx::{DatabaseFlags, TransactionKind, WriteFlags, RW}; use parking_lot::RwLock; use super::cursor::Cursor; @@ -51,6 +51,12 @@ where .map_err(DatabaseError::CreateCursor) } + /// Creates a cursor to iterate over a table items. + pub fn cursor_unchecked(&self) -> Result, DatabaseError> { + let dbi = self.inner.open_db(Some(T::NAME)).map_err(DatabaseError::OpenDb)?.dbi(); + self.inner.cursor_with_dbi(dbi).map(Cursor::new).map_err(DatabaseError::CreateCursor) + } + /// Gets a table database handle if it exists, otherwise creates it. pub fn get_dbi(&self) -> Result { // SAFETY: @@ -112,6 +118,10 @@ where } impl Tx { + pub fn create_table(&self, flags: DatabaseFlags) -> Result { + Ok(self.inner.create_db(Some(T::NAME), flags).unwrap().dbi()) + } + /// Inserts an item into a database. /// /// This function stores key/data pairs in the database. The default behavior is to enter the @@ -124,6 +134,18 @@ impl Tx { Ok(()) } + pub fn put_unchecked( + &self, + key: T::Key, + value: T::Value, + ) -> Result<(), DatabaseError> { + let key = key.encode(); + let value = value.compress(); + let dbi = self.inner.open_db(Some(T::NAME)).map_err(DatabaseError::OpenDb)?.dbi(); + self.inner.put(dbi, key, value, WriteFlags::UPSERT).unwrap(); + Ok(()) + } + /// Delete items from a database, removing the key/data pair if it exists. /// /// If the data parameter is [Some] only the matching data item will be deleted. Otherwise, if diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index e238753981..b9be69e0f7 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -1,13 +1,21 @@ +use std::collections::HashMap; +use std::fs; +use std::hash::Hash; use std::path::Path; use anyhow::Context; +use katana_primitives::contract::ContractAddress; +use libmdbx::DatabaseFlags; use crate::error::DatabaseError; use crate::mdbx::DbEnv; use crate::models::list::BlockList; use crate::models::storage::ContractStorageKey; -use crate::version::{create_db_version_file, get_db_version, DatabaseVersionError}; -use crate::{open_db, tables, CURRENT_DB_VERSION}; +use crate::tables::v0::{SchemaV0, StorageEntryChangeList}; +use crate::version::{ + create_db_version_file, get_db_version, remove_db_version_file, DatabaseVersionError, +}; +use crate::{open_db, open_db_with_schema, tables, CURRENT_DB_VERSION}; #[derive(Debug, thiserror::Error)] pub enum DatabaseMigrationError { @@ -34,13 +42,15 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> let ver = get_db_version(&path)?; match ver { - 0 => migrate_from_v0_to_v1(open_db(&path)?)?, + 0 => migrate_from_v0_to_v1(open_db_with_schema(&path)?)?, _ => { return Err(DatabaseMigrationError::UnsupportedVersion(ver)); } } // Update the db version to the current version + + remove_db_version_file(&path)?; create_db_version_file(path, CURRENT_DB_VERSION).context("Updating database version file")?; Ok(()) } @@ -63,47 +73,70 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> /// - Changed key type to [ContractStorageKey](crate::models::storage::ContractStorageKey). /// - Changed value type to [BlockList](crate::models::list::BlockList). /// -fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { - env.create_tables()?; +fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { + // env.create_tables_from_schema::()?; env.update(|tx| { { + // store in a static file first before putting it back in the new table + // let file = tempfile().expect("able to create temp file"); + // let file = LineWriter::new(file); + let mut cursor = tx.cursor::()?; - cursor.walk(None)?.try_for_each(|entry| { - let (old_key, old_val) = entry?; - let key = ContractStorageKey { contract_address: old_key, key: old_val.key }; - tx.put::(key, BlockList::from_iter(old_val.block_list))?; + let mut old_entries: HashMap> = + HashMap::new(); + + cursor.walk(None)?.enumerate().try_for_each(|(i, entry)| { + let (key, val) = entry?; + dbg!(i, &key, &val); + old_entries.entry(key).or_default().push(val); Result::<(), DatabaseError>::Ok(()) })?; + drop(cursor); + unsafe { + tx.drop_table::()?; + } + tx.create_table::(DatabaseFlags::default())?; + + for (key, vals) in old_entries { + for val in vals { + let key = ContractStorageKey { contract_address: key, key: val.key }; + let val = BlockList::from_iter(val.block_list); + tx.put::(key, val)?; + } + } + // move data from `NonceChanges` to `NonceChangeHistory` + tx.create_table::(DatabaseFlags::DUP_SORT)?; let mut cursor = tx.cursor::()?; cursor.walk(None)?.try_for_each(|entry| { let (key, val) = entry?; - tx.put::(key, val)?; + tx.put_unchecked::(key, val)?; Result::<(), DatabaseError>::Ok(()) })?; + tx.create_table::(DatabaseFlags::DUP_SORT)?; // move data from `StorageChanges` to `StorageChangeHistory` let mut cursor = tx.cursor::()?; cursor.walk(None)?.try_for_each(|entry| { let (key, val) = entry?; - tx.put::(key, val)?; + tx.put_unchecked::(key, val)?; Result::<(), DatabaseError>::Ok(()) })?; + tx.create_table::(DatabaseFlags::DUP_SORT)?; // move data from `ContractClassChanges` to `ClassChangeHistory` let mut cursor = tx.cursor::()?; cursor.walk(None)?.try_for_each(|entry| { let (key, val) = entry?; - tx.put::(key, val)?; + tx.put_unchecked::(key, val)?; Result::<(), DatabaseError>::Ok(()) })?; } // drop the old tables unsafe { - tx.drop_table::()?; tx.drop_table::()?; tx.drop_table::()?; tx.drop_table::()?; @@ -126,7 +159,7 @@ mod tests { use crate::models::list::BlockList; use crate::models::storage::{ContractStorageEntry, ContractStorageKey}; use crate::tables::v0::{self, StorageEntryChangeList}; - use crate::{init_db, tables}; + use crate::{init_db, open_db, tables}; const ERROR_CREATE_TEMP_DIR: &str = "Failed to create temp dir."; const ERROR_MIGRATE_DB: &str = "Failed to migrate db."; @@ -139,9 +172,9 @@ mod tests { } // TODO(kariy): create Arbitrary for database key/value types to easily create random test vectors - fn create_v0_test_db() -> (DbEnv, PathBuf) { + fn create_v0_test_db() -> (DbEnv, PathBuf) { let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path(); - let db = crate::init_db_with_schema::(&path).expect(ERROR_INIT_DB); + let db = crate::init_db_with_schema::(&path).expect(ERROR_INIT_DB); db.update(|tx| { tx.put::( @@ -233,8 +266,10 @@ mod tests { #[test] fn migrate_from_v0() { - let (env, path) = create_v0_test_db(); - let _ = migrate_db(path).expect(ERROR_MIGRATE_DB); + // we cant have multiple instances of the db open in the same process, so we drop here first before migrating + let (_, path) = create_v0_test_db(); + let _ = migrate_db(&path).expect(ERROR_MIGRATE_DB); + let env = open_db(path).unwrap(); env.view(|tx| { let mut cursor = tx.cursor::().unwrap(); diff --git a/crates/katana/storage/db/src/version.rs b/crates/katana/storage/db/src/version.rs index 813ea19edd..5ca874bf27 100644 --- a/crates/katana/storage/db/src/version.rs +++ b/crates/katana/storage/db/src/version.rs @@ -77,6 +77,12 @@ pub(super) fn default_version_file_path(path: &Path) -> PathBuf { path.join(DB_VERSION_FILE_NAME) } +pub(super) fn remove_db_version_file(path: impl AsRef) -> Result<(), DatabaseVersionError> { + let path = path.as_ref(); + let path = if path.is_dir() { default_version_file_path(path) } else { path.to_path_buf() }; + fs::remove_file(path).map_err(DatabaseVersionError::Io) +} + #[cfg(test)] mod tests { From ceee7cb80e7798af11aae27d0b6827304adbe7cb Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 04:40:20 +0800 Subject: [PATCH 14/26] remove create table method and use inner tx instead --- crates/katana/storage/db/src/error.rs | 4 ++-- crates/katana/storage/db/src/mdbx/mod.rs | 19 +++---------------- crates/katana/storage/db/src/mdbx/tx.rs | 8 ++------ crates/katana/storage/db/src/migration.rs | 21 +++++++++++++++------ 4 files changed, 22 insertions(+), 30 deletions(-) diff --git a/crates/katana/storage/db/src/error.rs b/crates/katana/storage/db/src/error.rs index a527756edb..37a8d43823 100644 --- a/crates/katana/storage/db/src/error.rs +++ b/crates/katana/storage/db/src/error.rs @@ -6,8 +6,8 @@ pub enum DatabaseError { #[error(transparent)] Codec(#[from] CodecError), - #[error("failed to create db table: {0}")] - CreateTable(libmdbx::Error), + #[error("failed to create db table '{table}': {error}")] + CreateTable { table: &'static str, error: libmdbx::Error }, #[error("failed to commit db transaction: {0}")] Commit(libmdbx::Error), diff --git a/crates/katana/storage/db/src/mdbx/mod.rs b/crates/katana/storage/db/src/mdbx/mod.rs index b4a4b290bc..248f4ddb71 100644 --- a/crates/katana/storage/db/src/mdbx/mod.rs +++ b/crates/katana/storage/db/src/mdbx/mod.rs @@ -79,21 +79,6 @@ impl DbEnv { /// Creates all the defined tables in [`Tables`], if necessary. pub fn create_tables(&self) -> Result<(), DatabaseError> { - // let tx = self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?; - - // for table in dbg!(S::all()) { - // let flags = match table.table_type() { - // TableType::Table => DatabaseFlags::default(), - // TableType::DupSort => DatabaseFlags::DUP_SORT, - // }; - - // tx.create_db(Some(table.name()), flags).map_err(DatabaseError::CreateTable)?; - // } - - // tx.commit().map_err(DatabaseError::Commit)?; - - // Ok(()) - self.create_tables_from_schema::() } @@ -141,7 +126,9 @@ impl DbEnv { TableType::DupSort => DatabaseFlags::DUP_SORT, }; - tx.create_db(Some(table.name()), flags).map_err(DatabaseError::CreateTable)?; + let name = table.name(); + tx.create_db(Some(name), flags) + .map_err(|error| DatabaseError::CreateTable { table: name, error })?; } tx.commit().map_err(DatabaseError::Commit)?; diff --git a/crates/katana/storage/db/src/mdbx/tx.rs b/crates/katana/storage/db/src/mdbx/tx.rs index f11b670a5c..a2372931e4 100644 --- a/crates/katana/storage/db/src/mdbx/tx.rs +++ b/crates/katana/storage/db/src/mdbx/tx.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use libmdbx::ffi::DBI; -use libmdbx::{DatabaseFlags, TransactionKind, WriteFlags, RW}; +use libmdbx::{TransactionKind, WriteFlags, RW}; use parking_lot::RwLock; use super::cursor::Cursor; @@ -23,7 +23,7 @@ pub type TxRW = Tx; #[derive(Debug)] pub struct Tx { /// Libmdbx-sys transaction. - inner: libmdbx::Transaction, + pub(crate) inner: libmdbx::Transaction, /// Marker for the db schema. _schema: std::marker::PhantomData, // the array size is hardcoded to the number of tables in current db version for now. ideally @@ -118,10 +118,6 @@ where } impl Tx { - pub fn create_table(&self, flags: DatabaseFlags) -> Result { - Ok(self.inner.create_db(Some(T::NAME), flags).unwrap().dbi()) - } - /// Inserts an item into a database. /// /// This function stores key/data pairs in the database. The default behavior is to enter the diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index b9be69e0f7..10d5bb7920 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -11,11 +11,12 @@ use crate::error::DatabaseError; use crate::mdbx::DbEnv; use crate::models::list::BlockList; use crate::models::storage::ContractStorageKey; -use crate::tables::v0::{SchemaV0, StorageEntryChangeList}; +use crate::tables::v0::StorageEntryChangeList; +use crate::tables::Table; use crate::version::{ create_db_version_file, get_db_version, remove_db_version_file, DatabaseVersionError, }; -use crate::{open_db, open_db_with_schema, tables, CURRENT_DB_VERSION}; +use crate::{open_db_with_schema, tables, CURRENT_DB_VERSION}; #[derive(Debug, thiserror::Error)] pub enum DatabaseMigrationError { @@ -76,6 +77,14 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { // env.create_tables_from_schema::()?; + macro_rules! create_table { + ($tx:expr, $table:ty, $flags:expr) => { + $tx.inner.create_db(Some(<$table as Table>::NAME), $flags).map_err(|error| { + DatabaseError::CreateTable { table: <$table as Table>::NAME, error } + })?; + }; + } + env.update(|tx| { { // store in a static file first before putting it back in the new table @@ -97,7 +106,7 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas unsafe { tx.drop_table::()?; } - tx.create_table::(DatabaseFlags::default())?; + create_table!(tx, tables::StorageChangeSet, DatabaseFlags::default()); for (key, vals) in old_entries { for val in vals { @@ -108,7 +117,7 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas } // move data from `NonceChanges` to `NonceChangeHistory` - tx.create_table::(DatabaseFlags::DUP_SORT)?; + create_table!(tx, tables::NonceChangeHistory, DatabaseFlags::DUP_SORT); let mut cursor = tx.cursor::()?; cursor.walk(None)?.try_for_each(|entry| { let (key, val) = entry?; @@ -116,7 +125,7 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas Result::<(), DatabaseError>::Ok(()) })?; - tx.create_table::(DatabaseFlags::DUP_SORT)?; + create_table!(tx, tables::StorageChangeHistory, DatabaseFlags::DUP_SORT); // move data from `StorageChanges` to `StorageChangeHistory` let mut cursor = tx.cursor::()?; cursor.walk(None)?.try_for_each(|entry| { @@ -125,7 +134,7 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas Result::<(), DatabaseError>::Ok(()) })?; - tx.create_table::(DatabaseFlags::DUP_SORT)?; + create_table!(tx, tables::ClassChangeHistory, DatabaseFlags::DUP_SORT); // move data from `ContractClassChanges` to `ClassChangeHistory` let mut cursor = tx.cursor::()?; cursor.walk(None)?.try_for_each(|entry| { From d2acee72425176bc9ab7702be01d7bf2bd8d67c3 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 04:44:16 +0800 Subject: [PATCH 15/26] wip --- crates/katana/storage/db/src/mdbx/mod.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/crates/katana/storage/db/src/mdbx/mod.rs b/crates/katana/storage/db/src/mdbx/mod.rs index 248f4ddb71..7f2be15f81 100644 --- a/crates/katana/storage/db/src/mdbx/mod.rs +++ b/crates/katana/storage/db/src/mdbx/mod.rs @@ -92,18 +92,6 @@ impl DbEnv { Ok(Tx::new(self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?)) } - /// Takes a function and passes a read-only transaction into it, making sure it's always - /// committed in the end of the execution. - pub fn view(&self, f: F) -> Result - where - F: FnOnce(&Tx) -> T, - { - let tx = self.tx()?; - let res = f(&tx); - tx.commit()?; - Ok(res) - } - /// Takes a function and passes a read-write transaction into it, making sure it's always /// committed in the end of the execution. pub fn update(&self, f: F) -> Result From dc5a23060af63a48d16fbd8e09d89c1d1c6af080 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 04:56:38 +0800 Subject: [PATCH 16/26] wip --- crates/katana/storage/db/src/mdbx/mod.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/katana/storage/db/src/mdbx/mod.rs b/crates/katana/storage/db/src/mdbx/mod.rs index 7f2be15f81..248f4ddb71 100644 --- a/crates/katana/storage/db/src/mdbx/mod.rs +++ b/crates/katana/storage/db/src/mdbx/mod.rs @@ -92,6 +92,18 @@ impl DbEnv { Ok(Tx::new(self.inner.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?)) } + /// Takes a function and passes a read-only transaction into it, making sure it's always + /// committed in the end of the execution. + pub fn view(&self, f: F) -> Result + where + F: FnOnce(&Tx) -> T, + { + let tx = self.tx()?; + let res = f(&tx); + tx.commit()?; + Ok(res) + } + /// Takes a function and passes a read-write transaction into it, making sure it's always /// committed in the end of the execution. pub fn update(&self, f: F) -> Result From 5141e15b7295bfc223b43e3bc04dd8fd1708fe99 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 19:43:57 +0800 Subject: [PATCH 17/26] add test for unchecked operations --- crates/katana/storage/db/src/mdbx/mod.rs | 35 ++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/crates/katana/storage/db/src/mdbx/mod.rs b/crates/katana/storage/db/src/mdbx/mod.rs index 248f4ddb71..511b7f8a30 100644 --- a/crates/katana/storage/db/src/mdbx/mod.rs +++ b/crates/katana/storage/db/src/mdbx/mod.rs @@ -172,8 +172,10 @@ mod tests { use crate::codecs::Encode; use crate::mdbx::cursor::Walker; use crate::mdbx::test_utils::create_test_db; - use crate::models::storage::StorageEntry; - use crate::tables::{BlockHashes, ContractInfo, ContractStorage, Headers, Table}; + use crate::models::contract::ContractNonceChange; + use crate::models::list::BlockList; + use crate::models::storage::{ContractStorageKey, StorageEntry}; + use crate::tables::{self, BlockHashes, ContractInfo, ContractStorage, Headers, Table}; const ERROR_PUT: &str = "Not able to insert value into table."; const ERROR_DELETE: &str = "Failed to delete value from table."; @@ -429,4 +431,33 @@ mod tests { ); } } + + #[test] + fn db_get_put_unchecked() { + // create a database with the v1 schema + let env = create_test_db(DbEnvKind::RW); + let tx = env.tx_mut().expect(ERROR_INIT_TX); + + // drop a table first bcs database already reached max tables limit + unsafe { tx.drop_table::().unwrap() } + + // get a value from an existing table from the database will not return error + let result = tx.get_unchecked::(ContractStorageKey::default()); + assert!(result.is_ok()); + + // get a value from a nonexistent table from the database will return error + let result = tx.get_unchecked::(1); + assert_eq!(result, Err(DatabaseError::OpenDb(libmdbx::Error::NotFound))); + + // put a value into an existing table in the database will not return error + let key = ContractStorageKey::default(); + let val = BlockList::new(); + let result = tx.put_unchecked::(key, val); + assert!(result.is_ok()); + + // put a value into a nonexistent table in the database will return error + let val = ContractNonceChange::new(felt!("0x1").into(), felt!("0x1")); + let result = tx.put_unchecked::(1, val); + assert_eq!(result, Err(DatabaseError::OpenDb(libmdbx::Error::NotFound))); + } } From 0b2b6ede9abdf25b8619730b356f6a651fea0d96 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 20:25:43 +0800 Subject: [PATCH 18/26] clean up test --- crates/katana/storage/db/src/migration.rs | 96 +++++-------------- .../katana/storage/db/src/models/storage.rs | 6 ++ crates/katana/storage/db/src/tables/v0.rs | 6 ++ 3 files changed, 37 insertions(+), 71 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 10d5bb7920..da1db33c0a 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -186,77 +186,31 @@ mod tests { let db = crate::init_db_with_schema::(&path).expect(ERROR_INIT_DB); db.update(|tx| { - tx.put::( - 1, - ContractNonceChange { contract_address: felt!("0x1").into(), nonce: felt!("0x2") }, - ) - .unwrap(); - tx.put::( - 1, - ContractNonceChange { contract_address: felt!("0x2").into(), nonce: felt!("0x2") }, - ) - .unwrap(); - tx.put::( - 3, - ContractNonceChange { contract_address: felt!("0x3").into(), nonce: felt!("0x2") }, - ) - .unwrap(); - - tx.put::( - 1, - ContractClassChange { - contract_address: felt!("0x1").into(), - class_hash: felt!("0x1"), - }, - ) - .unwrap(); - tx.put::( - 1, - ContractClassChange { - contract_address: felt!("0x2").into(), - class_hash: felt!("0x1"), - }, - ) - .unwrap(); - - tx.put::( - felt!("0x1").into(), - StorageEntryChangeList { key: felt!("0x1"), block_list: vec![1, 2] }, - ) - .unwrap(); - tx.put::( - felt!("0x1").into(), - StorageEntryChangeList { key: felt!("0x2"), block_list: vec![1, 3] }, - ) - .unwrap(); - tx.put::( - felt!("0x2").into(), - StorageEntryChangeList { key: felt!("0x3"), block_list: vec![4, 5] }, - ) - .unwrap(); - - tx.put::( - 1, - ContractStorageEntry { - key: ContractStorageKey { - contract_address: felt!("0x1").into(), - key: felt!("0x1"), - }, - value: felt!("0x1"), - }, - ) - .unwrap(); - tx.put::( - 3, - ContractStorageEntry { - key: ContractStorageKey { - contract_address: felt!("0x1").into(), - key: felt!("0x2"), - }, - value: felt!("0x2"), - }, - ) - .unwrap(); + let val1 = ContractNonceChange::new(felt!("0x1").into(), felt!("0x2")); + let val2 = ContractNonceChange::new(felt!("0x2").into(), felt!("0x2")); + let val3 = ContractNonceChange::new(felt!("0x3").into(), felt!("0x2")); + tx.put::(1, val1).unwrap(); + tx.put::(1, val2).unwrap(); + tx.put::(3, val3).unwrap(); + + let val1 = ContractClassChange::new(felt!("0x1").into(), felt!("0x1")); + let val2 = ContractClassChange::new(felt!("0x2").into(), felt!("0x1")); + tx.put::(1, val1).unwrap(); + tx.put::(1, val2).unwrap(); + + let val1 = StorageEntryChangeList::new(felt!("0x1"), vec![1, 2]); + let val2 = StorageEntryChangeList::new(felt!("0x2"), vec![1, 3]); + let val3 = StorageEntryChangeList::new(felt!("0x3"), vec![4, 5]); + tx.put::(felt!("0x1").into(), val1).unwrap(); + tx.put::(felt!("0x1").into(), val2).unwrap(); + tx.put::(felt!("0x2").into(), val3).unwrap(); + + let subkey = ContractStorageKey::new(felt!("0x1").into(), felt!("0x1")); + let val1 = ContractStorageEntry::new(subkey, felt!("0x1")); + let subkey = ContractStorageKey::new(felt!("0x1").into(), felt!("0x2")); + let val2 = ContractStorageEntry::new(subkey, felt!("0x2")); + tx.put::(1, val1).unwrap(); + tx.put::(3, val2).unwrap(); }) .expect(ERROR_INIT_DB); diff --git a/crates/katana/storage/db/src/models/storage.rs b/crates/katana/storage/db/src/models/storage.rs index 047491246b..48abef394e 100644 --- a/crates/katana/storage/db/src/models/storage.rs +++ b/crates/katana/storage/db/src/models/storage.rs @@ -71,6 +71,12 @@ pub struct ContractStorageEntry { pub value: StorageValue, } +impl ContractStorageEntry { + pub fn new(key: ContractStorageKey, value: StorageValue) -> Self { + Self { key, value } + } +} + impl Compress for ContractStorageEntry { type Compressed = Vec; fn compress(self) -> Self::Compressed { diff --git a/crates/katana/storage/db/src/tables/v0.rs b/crates/katana/storage/db/src/tables/v0.rs index 5c19e35bd3..5c7f5a6ca1 100644 --- a/crates/katana/storage/db/src/tables/v0.rs +++ b/crates/katana/storage/db/src/tables/v0.rs @@ -64,6 +64,12 @@ pub struct StorageEntryChangeList { pub block_list: Vec, } +impl StorageEntryChangeList { + pub fn new(key: StorageKey, block_list: Vec) -> Self { + Self { key, block_list } + } +} + // The `key` field is the subkey of the dupsort table, so we must use // the Encode and Decode traits when de/serializing it to the database. impl Compress for StorageEntryChangeList { From bba53be3d4e7ce4b9541c37a85e63e01d11967d2 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 21:16:14 +0800 Subject: [PATCH 19/26] add etl processor for dumping temporarily to file before inserting it again to the tables --- crates/katana/storage/db/src/mdbx/mod.rs | 2 + crates/katana/storage/db/src/migration.rs | 106 +++++++++++++++++++--- 2 files changed, 97 insertions(+), 11 deletions(-) diff --git a/crates/katana/storage/db/src/mdbx/mod.rs b/crates/katana/storage/db/src/mdbx/mod.rs index 511b7f8a30..4c0a4e7134 100644 --- a/crates/katana/storage/db/src/mdbx/mod.rs +++ b/crates/katana/storage/db/src/mdbx/mod.rs @@ -163,6 +163,8 @@ pub mod test_utils { #[cfg(test)] mod tests { + use std::os; + use katana_primitives::block::Header; use katana_primitives::contract::{ContractAddress, GenericContractInfo}; use katana_primitives::FieldElement; diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index da1db33c0a..c5cb9503ad 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -1,18 +1,20 @@ use std::collections::HashMap; -use std::fs; -use std::hash::Hash; +use std::io::{self, Read, Write}; use std::path::Path; use anyhow::Context; use katana_primitives::contract::ContractAddress; +use katana_primitives::genesis::json; use libmdbx::DatabaseFlags; +use tempfile::NamedTempFile; +use crate::codecs::{Compress, Encode}; use crate::error::DatabaseError; use crate::mdbx::DbEnv; use crate::models::list::BlockList; use crate::models::storage::ContractStorageKey; use crate::tables::v0::StorageEntryChangeList; -use crate::tables::Table; +use crate::tables::{Key, Table}; use crate::version::{ create_db_version_file, get_db_version, remove_db_version_file, DatabaseVersionError, }; @@ -29,6 +31,9 @@ pub enum DatabaseMigrationError { #[error(transparent)] Database(#[from] DatabaseError), + #[error("failed to open temporary file: {0}")] + Io(#[from] io::Error), + #[error(transparent)] Other(#[from] anyhow::Error), } @@ -49,10 +54,14 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> } } - // Update the db version to the current version + // Update the db version to the migrated version + { + // we have to remove it first because the version file is read-only + remove_db_version_file(&path)?; + create_db_version_file(path, CURRENT_DB_VERSION) + } + .context("Updating database version file")?; - remove_db_version_file(&path)?; - create_db_version_file(path, CURRENT_DB_VERSION).context("Updating database version file")?; Ok(()) } @@ -87,17 +96,12 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas env.update(|tx| { { - // store in a static file first before putting it back in the new table - // let file = tempfile().expect("able to create temp file"); - // let file = LineWriter::new(file); - let mut cursor = tx.cursor::()?; let mut old_entries: HashMap> = HashMap::new(); cursor.walk(None)?.enumerate().try_for_each(|(i, entry)| { let (key, val) = entry?; - dbg!(i, &key, &val); old_entries.entry(key).or_default().push(val); Result::<(), DatabaseError>::Ok(()) })?; @@ -155,6 +159,86 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas })? } +// - collects all the buffer that we want to push to a temp file +// - each yeeter will be speciifc to a particular table +// - when the buffer is full, we write to a temp file + +struct Yeeter { + files: Vec, + buffer: Vec<(::Encoded, ::Compressed)>, +} + +impl Yeeter { + fn push(&mut self, key: T::Key, value: T::Value) { + self.buffer.push((key.encode(), value.compress())); + } + + fn flush(&mut self) -> Result<(), io::Error> { + let mut buffer = Vec::with_capacity(self.buffer.len()); + std::mem::swap(&mut buffer, &mut self.buffer); + + // write to file + let mut file = YeeterFile::new()?; + for (key, value) in buffer { + file.write(key.as_ref(), value.as_ref())?; + } + + self.files.push(file); + Ok(()) + } +} + +struct YeeterFile { + // the underlying file used to store the buffer + file: NamedTempFile, + // the total number of key/value pairs written to the file + len: usize, +} + +impl YeeterFile { + fn new() -> Result { + let file = NamedTempFile::new()?; + Ok(Self { file, len: 0 }) + } + + fn write(&mut self, key: &[u8], value: &[u8]) -> Result<(), io::Error> { + let key_size = key.len().to_be_bytes(); + let value_size = value.len().to_be_bytes(); + + self.file.write_all(&key_size)?; + self.file.write_all(&value_size)?; + self.file.write_all(&key)?; + self.file.write_all(&value)?; + + self.len += 1; + + Ok(()) + } + + fn read_next(&mut self) -> Result, Vec)>, io::Error> { + // check if we have reached the end of the file + if self.len == 0 { + return Ok(None); + } + + // get thee sizes of the key and value + let mut key_size = [0u8; 8]; + let mut value_size = [0u8; 8]; + self.file.read_exact(&mut key_size)?; + self.file.read_exact(&mut value_size)?; + + // read the key and value + let mut key = Vec::with_capacity(u64::from_be_bytes(key_size) as usize); + let mut value = Vec::with_capacity(u64::from_be_bytes(value_size) as usize); + self.file.read_exact(&mut key)?; + self.file.read_exact(&mut value)?; + + self.len -= 1; + + Ok(Some((key, value))) + } +} + #[cfg(test)] mod tests { From f03a0257f2c9768e2228fa3784e6c96b48a4619c Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 21:55:08 +0800 Subject: [PATCH 20/26] update etl --- crates/katana/storage/db/src/migration.rs | 91 +++++++++++++++++------ 1 file changed, 69 insertions(+), 22 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index c5cb9503ad..d863846142 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::io::{self, Read, Write}; +use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; use std::path::Path; use anyhow::Context; @@ -8,7 +8,7 @@ use katana_primitives::genesis::json; use libmdbx::DatabaseFlags; use tempfile::NamedTempFile; -use crate::codecs::{Compress, Encode}; +use crate::codecs::{Compress, Decode, Decompress, Encode}; use crate::error::DatabaseError; use crate::mdbx::DbEnv; use crate::models::list::BlockList; @@ -173,46 +173,93 @@ impl Yeeter { self.buffer.push((key.encode(), value.compress())); } + // write to file fn flush(&mut self) -> Result<(), io::Error> { let mut buffer = Vec::with_capacity(self.buffer.len()); std::mem::swap(&mut buffer, &mut self.buffer); + self.files.push(YeeterFile::new(buffer)?); + Ok(()) + } +} + +// impl IntoIterator for Yeeter { +// type Item = Result<(T::Key, T::Value), io::Error>; +// type IntoIter = YeeterIter<'a, T>; + +// fn into_iter(self) -> Self::IntoIter { +// YeeterIter { yeeter: self, index: (0, 0) } +// } +// } - // write to file - let mut file = YeeterFile::new()?; - for (key, value) in buffer { - file.write(key.as_ref(), value.as_ref())?; +struct YeeterIter<'a, T: Table> { + yeeter: &'a mut Yeeter, + index: (usize, usize), // (file, entry) +} + +impl Iterator for YeeterIter<'_, T> { + type Item = Result<(T::Key, T::Value), io::Error>; + + fn next(&mut self) -> Option { + if self.index.0 >= self.yeeter.files.len() { + return None; } - self.files.push(file); - Ok(()) + let file = &mut self.yeeter.files[self.index.0]; + if self.index.1 >= file.len { + self.index.0 += 1; // move to the next file + self.index.1 = 0; // reset the entry index + return self.next(); + } + + match file.read_next() { + Ok(Some((key_buf, value_buf))) => { + let key = ::decode(&key_buf).unwrap(); + let value = ::decompress(&value_buf).unwrap(); + + self.index.1 += 1; + + Some(Ok((key, value))) + } + Ok(None) => None, + Err(error) => Some(Err(error)), + } } } struct YeeterFile { // the underlying file used to store the buffer - file: NamedTempFile, + file: BufReader, // the total number of key/value pairs written to the file len: usize, } impl YeeterFile { - fn new() -> Result { - let file = NamedTempFile::new()?; - Ok(Self { file, len: 0 }) - } + fn new(buf: Vec<(K, V)>) -> Result + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let mut file = NamedTempFile::new()?; + let mut len = 0; - fn write(&mut self, key: &[u8], value: &[u8]) -> Result<(), io::Error> { - let key_size = key.len().to_be_bytes(); - let value_size = value.len().to_be_bytes(); + for (key, val) in buf { + let key = key.as_ref(); + let val = val.as_ref(); - self.file.write_all(&key_size)?; - self.file.write_all(&value_size)?; - self.file.write_all(&key)?; - self.file.write_all(&value)?; + let key_size = key.len().to_be_bytes(); + let val_size = val.len().to_be_bytes(); - self.len += 1; + file.write_all(&key_size)?; + file.write_all(&val_size)?; + file.write_all(&key)?; + file.write_all(&val)?; - Ok(()) + len += 1; + } + + // reset the file cursor to the beginning + file.seek(SeekFrom::Start(0))?; + Ok(Self { file: BufReader::new(file), len }) } fn read_next(&mut self) -> Result, Vec)>, io::Error> { From 8c6eb72239cb23a31017aaf36935daeeca183269 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 17 Apr 2024 22:36:03 +0800 Subject: [PATCH 21/26] use yeeter as intermediary when moving migrating data to new table --- crates/katana/storage/db/Cargo.toml | 5 +- crates/katana/storage/db/src/migration.rs | 79 ++++++++++++----------- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/crates/katana/storage/db/Cargo.toml b/crates/katana/storage/db/Cargo.toml index 1c0efbe423..8a6cc9029e 100644 --- a/crates/katana/storage/db/Cargo.toml +++ b/crates/katana/storage/db/Cargo.toml @@ -14,7 +14,7 @@ page_size = "0.6.0" parking_lot.workspace = true serde.workspace = true serde_json.workspace = true -tempfile = { version = "3.8.1", optional = true } +tempfile = "3.8.1" thiserror.workspace = true cairo-vm.workspace = true @@ -37,12 +37,11 @@ rev = "b34b0d3" cairo-lang-starknet.workspace = true criterion = "0.5.1" starknet.workspace = true -tempfile = "3.8.1" [features] default = [ "postcard" ] postcard = [ "dep:postcard" ] -test-utils = [ "dep:tempfile" ] +test-utils = [ ] [[bench]] harness = false diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index d863846142..153bb72b3f 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; -use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; +use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write}; +use std::marker::PhantomData; use std::path::Path; use anyhow::Context; @@ -97,27 +98,27 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas env.update(|tx| { { let mut cursor = tx.cursor::()?; - let mut old_entries: HashMap> = - HashMap::new(); + let mut yeeter = Yeeter::::new(); - cursor.walk(None)?.enumerate().try_for_each(|(i, entry)| { + cursor.walk(None)?.try_for_each(|entry| { let (key, val) = entry?; - old_entries.entry(key).or_default().push(val); + yeeter.push(key, val); Result::<(), DatabaseError>::Ok(()) })?; + yeeter.flush()?; + drop(cursor); unsafe { tx.drop_table::()?; } create_table!(tx, tables::StorageChangeSet, DatabaseFlags::default()); - for (key, vals) in old_entries { - for val in vals { - let key = ContractStorageKey { contract_address: key, key: val.key }; - let val = BlockList::from_iter(val.block_list); - tx.put::(key, val)?; - } + for entry in yeeter.iter() { + let (key, val) = entry?; + let key = ContractStorageKey { contract_address: key, key: val.key }; + let val = BlockList::from_iter(val.block_list); + tx.put::(key, val)?; } // move data from `NonceChanges` to `NonceChangeHistory` @@ -169,6 +170,10 @@ struct Yeeter { } impl Yeeter { + fn new() -> Self { + Self { files: Vec::new(), buffer: Vec::new() } + } + fn push(&mut self, key: T::Key, value: T::Value) { self.buffer.push((key.encode(), value.compress())); } @@ -180,37 +185,27 @@ impl Yeeter { self.files.push(YeeterFile::new(buffer)?); Ok(()) } -} -// impl IntoIterator for Yeeter { -// type Item = Result<(T::Key, T::Value), io::Error>; -// type IntoIter = YeeterIter<'a, T>; - -// fn into_iter(self) -> Self::IntoIter { -// YeeterIter { yeeter: self, index: (0, 0) } -// } -// } + fn iter(&mut self) -> YeeterIter { + YeeterIter { files: &mut self.files, index: (0, 0), _table: PhantomData } + } +} struct YeeterIter<'a, T: Table> { - yeeter: &'a mut Yeeter, + files: &'a mut Vec, index: (usize, usize), // (file, entry) + _table: PhantomData, } impl Iterator for YeeterIter<'_, T> { type Item = Result<(T::Key, T::Value), io::Error>; fn next(&mut self) -> Option { - if self.index.0 >= self.yeeter.files.len() { + if self.index.0 >= self.files.len() { return None; } - let file = &mut self.yeeter.files[self.index.0]; - if self.index.1 >= file.len { - self.index.0 += 1; // move to the next file - self.index.1 = 0; // reset the entry index - return self.next(); - } - + let file = &mut self.files[self.index.0]; match file.read_next() { Ok(Some((key_buf, value_buf))) => { let key = ::decode(&key_buf).unwrap(); @@ -220,7 +215,13 @@ impl Iterator for YeeterIter<'_, T> { Some(Ok((key, value))) } - Ok(None) => None, + + Ok(None) => { + self.index.0 += 1; // move to the next file + self.index.1 = 0; // reset the entry index + return self.next(); + } + Err(error) => Some(Err(error)), } } @@ -239,8 +240,8 @@ impl YeeterFile { K: AsRef<[u8]>, V: AsRef<[u8]>, { - let mut file = NamedTempFile::new()?; - let mut len = 0; + let mut file = BufWriter::new(NamedTempFile::new()?); + let len = buf.len(); for (key, val) in buf { let key = key.as_ref(); @@ -251,15 +252,14 @@ impl YeeterFile { file.write_all(&key_size)?; file.write_all(&val_size)?; - file.write_all(&key)?; - file.write_all(&val)?; - - len += 1; + file.write_all(key)?; + file.write_all(val)?; } + let mut file = BufReader::new(file.into_inner()?); // reset the file cursor to the beginning file.seek(SeekFrom::Start(0))?; - Ok(Self { file: BufReader::new(file), len }) + Ok(Self { file, len }) } fn read_next(&mut self) -> Result, Vec)>, io::Error> { @@ -275,8 +275,9 @@ impl YeeterFile { self.file.read_exact(&mut value_size)?; // read the key and value - let mut key = Vec::with_capacity(u64::from_be_bytes(key_size) as usize); - let mut value = Vec::with_capacity(u64::from_be_bytes(value_size) as usize); + + let mut key = vec![0; u64::from_be_bytes(key_size) as usize]; + let mut value = vec![0; u64::from_be_bytes(value_size) as usize]; self.file.read_exact(&mut key)?; self.file.read_exact(&mut value)?; From 6597cf761ea6386ea2901714d78a909509a4ffcd Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 18 Apr 2024 01:08:54 +0800 Subject: [PATCH 22/26] add comment --- crates/katana/storage/db/src/mdbx/mod.rs | 2 - crates/katana/storage/db/src/migration.rs | 82 ++++++++++++++--------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/crates/katana/storage/db/src/mdbx/mod.rs b/crates/katana/storage/db/src/mdbx/mod.rs index 4c0a4e7134..511b7f8a30 100644 --- a/crates/katana/storage/db/src/mdbx/mod.rs +++ b/crates/katana/storage/db/src/mdbx/mod.rs @@ -163,8 +163,6 @@ pub mod test_utils { #[cfg(test)] mod tests { - use std::os; - use katana_primitives::block::Header; use katana_primitives::contract::{ContractAddress, GenericContractInfo}; use katana_primitives::FieldElement; diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 153bb72b3f..01a921c02d 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -1,11 +1,10 @@ -use std::collections::HashMap; use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write}; use std::marker::PhantomData; +use std::mem; use std::path::Path; -use anyhow::Context; -use katana_primitives::contract::ContractAddress; -use katana_primitives::genesis::json; +use anyhow::{anyhow, Context}; + use libmdbx::DatabaseFlags; use tempfile::NamedTempFile; @@ -14,8 +13,7 @@ use crate::error::DatabaseError; use crate::mdbx::DbEnv; use crate::models::list::BlockList; use crate::models::storage::ContractStorageKey; -use crate::tables::v0::StorageEntryChangeList; -use crate::tables::{Key, Table}; +use crate::tables::Table; use crate::version::{ create_db_version_file, get_db_version, remove_db_version_file, DatabaseVersionError, }; @@ -85,8 +83,6 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> /// - Changed value type to [BlockList](crate::models::list::BlockList). /// fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { - // env.create_tables_from_schema::()?; - macro_rules! create_table { ($tx:expr, $table:ty, $flags:expr) => { $tx.inner.create_db(Some(<$table as Table>::NAME), $flags).map_err(|error| { @@ -97,24 +93,23 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas env.update(|tx| { { - let mut cursor = tx.cursor::()?; let mut yeeter = Yeeter::::new(); - cursor.walk(None)?.try_for_each(|entry| { - let (key, val) = entry?; - yeeter.push(key, val); - Result::<(), DatabaseError>::Ok(()) - })?; - - yeeter.flush()?; + { + let mut cursor = tx.cursor::()?; + cursor.walk(None)?.try_for_each(|entry| { + let (key, val) = entry?; + yeeter.push(key, val)?; + Result::<(), DatabaseMigrationError>::Ok(()) + })?; + } - drop(cursor); unsafe { tx.drop_table::()?; } create_table!(tx, tables::StorageChangeSet, DatabaseFlags::default()); - for entry in yeeter.iter() { + for entry in yeeter.iter()? { let (key, val) = entry?; let key = ContractStorageKey { contract_address: key, key: val.key }; let val = BlockList::from_iter(val.block_list); @@ -163,10 +158,13 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas // - collects all the buffer that we want to push to a temp file // - each yeeter will be speciifc to a particular table // - when the buffer is full, we write to a temp file - +// +/// use to temporarily store data in the disk. use when migrating tables. +/// storing data may not be feasible in memory as the size may be too large. +/// so we store the data in a temp file first and then read it back when needed. struct Yeeter { files: Vec, - buffer: Vec<(::Encoded, ::Compressed)>, + buffer: Vec<(T::Key, T::Value)>, } impl Yeeter { @@ -174,20 +172,41 @@ impl Yeeter { Self { files: Vec::new(), buffer: Vec::new() } } - fn push(&mut self, key: T::Key, value: T::Value) { - self.buffer.push((key.encode(), value.compress())); + fn push(&mut self, key: T::Key, value: T::Value) -> Result<(), io::Error> { + const BUFFER_THRESHOLD: usize = 500 * (1024 * 1024); // 500 MB + + self.buffer.push((key, value)); + + // check size of buffer, in terms of bytes + // if buffer size is greater than the threshold, flush to file + let buffer_size = mem::size_of_val(self.buffer.as_slice()); + dbg!(buffer_size); + if buffer_size > BUFFER_THRESHOLD { + self.flush()?; + } + + Ok(()) } // write to file fn flush(&mut self) -> Result<(), io::Error> { let mut buffer = Vec::with_capacity(self.buffer.len()); std::mem::swap(&mut buffer, &mut self.buffer); - self.files.push(YeeterFile::new(buffer)?); + + let buffer = buffer.into_iter().map(|(k, v)| (k.encode(), v.compress())); + let file = YeeterFile::new(buffer)?; + self.files.push(file); + Ok(()) } - fn iter(&mut self) -> YeeterIter { - YeeterIter { files: &mut self.files, index: (0, 0), _table: PhantomData } + fn iter(&mut self) -> Result, io::Error> { + // flush the remaining buffer + if !self.buffer.is_empty() { + self.flush()?; + } + + Ok(YeeterIter { files: &mut self.files, index: (0, 0), _table: PhantomData }) } } @@ -198,7 +217,7 @@ struct YeeterIter<'a, T: Table> { } impl Iterator for YeeterIter<'_, T> { - type Item = Result<(T::Key, T::Value), io::Error>; + type Item = Result<(T::Key, T::Value), anyhow::Error>; fn next(&mut self) -> Option { if self.index.0 >= self.files.len() { @@ -210,9 +229,7 @@ impl Iterator for YeeterIter<'_, T> { Ok(Some((key_buf, value_buf))) => { let key = ::decode(&key_buf).unwrap(); let value = ::decompress(&value_buf).unwrap(); - self.index.1 += 1; - Some(Ok((key, value))) } @@ -222,7 +239,7 @@ impl Iterator for YeeterIter<'_, T> { return self.next(); } - Err(error) => Some(Err(error)), + Err(error) => Some(Err(anyhow!(error))), } } } @@ -235,13 +252,14 @@ struct YeeterFile { } impl YeeterFile { - fn new(buf: Vec<(K, V)>) -> Result + fn new(buf: I) -> Result where + I: Iterator, K: AsRef<[u8]>, V: AsRef<[u8]>, { let mut file = BufWriter::new(NamedTempFile::new()?); - let len = buf.len(); + let mut len = 0; for (key, val) in buf { let key = key.as_ref(); @@ -254,6 +272,8 @@ impl YeeterFile { file.write_all(&val_size)?; file.write_all(key)?; file.write_all(val)?; + + len += 1; } let mut file = BufReader::new(file.into_inner()?); From b149fe24680beba55330a2958ef0bbfe9f0689ba Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 18 Apr 2024 02:12:54 +0800 Subject: [PATCH 23/26] add tracing --- Cargo.lock | 1 + crates/katana/storage/db/Cargo.toml | 1 + crates/katana/storage/db/src/lib.rs | 9 +++++++++ crates/katana/storage/db/src/migration.rs | 11 +++++++++-- 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index efcf48e4a5..7167ba180b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6781,6 +6781,7 @@ dependencies = [ "starknet_api", "tempfile", "thiserror", + "tracing", ] [[package]] diff --git a/crates/katana/storage/db/Cargo.toml b/crates/katana/storage/db/Cargo.toml index 8a6cc9029e..2a0f4ea555 100644 --- a/crates/katana/storage/db/Cargo.toml +++ b/crates/katana/storage/db/Cargo.toml @@ -16,6 +16,7 @@ serde.workspace = true serde_json.workspace = true tempfile = "3.8.1" thiserror.workspace = true +tracing.workspace = true cairo-vm.workspace = true roaring = { version = "0.10.3", features = [ "serde" ] } diff --git a/crates/katana/storage/db/src/lib.rs b/crates/katana/storage/db/src/lib.rs index 907abf5345..ba05a70b03 100644 --- a/crates/katana/storage/db/src/lib.rs +++ b/crates/katana/storage/db/src/lib.rs @@ -16,10 +16,13 @@ pub mod version; use mdbx::{DbEnv, DbEnvKind}; use tables::Schema; +use tracing::{info, warn}; use utils::is_database_empty; pub use version::CURRENT_DB_VERSION; use version::{check_db_version, create_db_version_file, DatabaseVersionError}; +pub(crate) const LOG_TARGET: &str = "katana::db"; + /// Initialize the database at the given path and returning a handle to the its /// environment. /// @@ -44,6 +47,12 @@ pub(crate) fn init_db_with_schema(path: impl AsRef) -> anyhow:: } else { match check_db_version(&path) { Ok(_) => {} + Err(DatabaseVersionError::MismatchVersion { expected, found }) => { + warn!(target: LOG_TARGET, %expected, %found, "Mismatch database version."); + + info!(target: LOG_TARGET, "Attempting to migrate database."); + migration::migrate_db(path.as_ref()).context("Migrating database")?; + } Err(DatabaseVersionError::FileNotFound) => create_db_version_file(&path, S::VERSION) .with_context(|| { format!( diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 01a921c02d..2a15a6d847 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -7,17 +7,20 @@ use anyhow::{anyhow, Context}; use libmdbx::DatabaseFlags; use tempfile::NamedTempFile; +use tracing::trace; use crate::codecs::{Compress, Decode, Decompress, Encode}; use crate::error::DatabaseError; use crate::mdbx::DbEnv; use crate::models::list::BlockList; use crate::models::storage::ContractStorageKey; +use crate::tables::Schema; use crate::tables::Table; +use crate::tables::{v0::SchemaV0, SchemaV1}; use crate::version::{ create_db_version_file, get_db_version, remove_db_version_file, DatabaseVersionError, }; -use crate::{open_db_with_schema, tables, CURRENT_DB_VERSION}; +use crate::{open_db_with_schema, tables, CURRENT_DB_VERSION, LOG_TARGET}; #[derive(Debug, thiserror::Error)] pub enum DatabaseMigrationError { @@ -82,6 +85,7 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> /// - Changed key type to [ContractStorageKey](crate::models::storage::ContractStorageKey). /// - Changed value type to [BlockList](crate::models::list::BlockList). /// +#[tracing::instrument(target = "katana::db", skip(env))] fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { macro_rules! create_table { ($tx:expr, $table:ty, $flags:expr) => { @@ -95,6 +99,7 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas { let mut yeeter = Yeeter::::new(); + trace!(target: LOG_TARGET, table = tables::v0::StorageChangeSet::NAME, "Migrating table."); { let mut cursor = tx.cursor::()?; cursor.walk(None)?.try_for_each(|entry| { @@ -116,6 +121,7 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas tx.put::(key, val)?; } + trace!(target: LOG_TARGET, table = tables::v0::NonceChanges::NAME, "Migrating table."); // move data from `NonceChanges` to `NonceChangeHistory` create_table!(tx, tables::NonceChangeHistory, DatabaseFlags::DUP_SORT); let mut cursor = tx.cursor::()?; @@ -125,6 +131,7 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas Result::<(), DatabaseError>::Ok(()) })?; + trace!(target: LOG_TARGET, table = tables::v0::StorageChanges::NAME, "Migrating table."); create_table!(tx, tables::StorageChangeHistory, DatabaseFlags::DUP_SORT); // move data from `StorageChanges` to `StorageChangeHistory` let mut cursor = tx.cursor::()?; @@ -134,6 +141,7 @@ fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), Databas Result::<(), DatabaseError>::Ok(()) })?; + trace!(target: LOG_TARGET, table = tables::v0::ContractClassChanges::NAME, "Migrating table."); create_table!(tx, tables::ClassChangeHistory, DatabaseFlags::DUP_SORT); // move data from `ContractClassChanges` to `ClassChangeHistory` let mut cursor = tx.cursor::()?; @@ -180,7 +188,6 @@ impl Yeeter { // check size of buffer, in terms of bytes // if buffer size is greater than the threshold, flush to file let buffer_size = mem::size_of_val(self.buffer.as_slice()); - dbg!(buffer_size); if buffer_size > BUFFER_THRESHOLD { self.flush()?; } From f6056c304048beea1b3ae7d2d3272394f46dc57d Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 18 Apr 2024 02:13:58 +0800 Subject: [PATCH 24/26] fmt --- crates/katana/storage/db/src/migration.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 2a15a6d847..4c481f8370 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -4,7 +4,6 @@ use std::mem; use std::path::Path; use anyhow::{anyhow, Context}; - use libmdbx::DatabaseFlags; use tempfile::NamedTempFile; use tracing::trace; @@ -14,9 +13,8 @@ use crate::error::DatabaseError; use crate::mdbx::DbEnv; use crate::models::list::BlockList; use crate::models::storage::ContractStorageKey; -use crate::tables::Schema; -use crate::tables::Table; -use crate::tables::{v0::SchemaV0, SchemaV1}; +use crate::tables::v0::SchemaV0; +use crate::tables::{Schema, SchemaV1, Table}; use crate::version::{ create_db_version_file, get_db_version, remove_db_version_file, DatabaseVersionError, }; @@ -84,7 +82,6 @@ pub fn migrate_db>(path: P) -> Result<(), DatabaseMigrationError> /// - Changed table type from dupsort to normal table. /// - Changed key type to [ContractStorageKey](crate::models::storage::ContractStorageKey). /// - Changed value type to [BlockList](crate::models::list::BlockList). -/// #[tracing::instrument(target = "katana::db", skip(env))] fn migrate_from_v0_to_v1(env: DbEnv) -> Result<(), DatabaseMigrationError> { macro_rules! create_table { @@ -339,7 +336,8 @@ mod tests { (db, path) } - // TODO(kariy): create Arbitrary for database key/value types to easily create random test vectors + // TODO(kariy): create Arbitrary for database key/value types to easily create random test + // vectors fn create_v0_test_db() -> (DbEnv, PathBuf) { let path = tempfile::TempDir::new().expect(ERROR_CREATE_TEMP_DIR).into_path(); let db = crate::init_db_with_schema::(&path).expect(ERROR_INIT_DB); @@ -388,7 +386,8 @@ mod tests { #[test] fn migrate_from_v0() { - // we cant have multiple instances of the db open in the same process, so we drop here first before migrating + // we cant have multiple instances of the db open in the same process, so we drop here first + // before migrating let (_, path) = create_v0_test_db(); let _ = migrate_db(&path).expect(ERROR_MIGRATE_DB); let env = open_db(path).unwrap(); From 16c1397093de72cf4b149895d8f57953a50fa1d5 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 18 Apr 2024 02:17:25 +0800 Subject: [PATCH 25/26] clippy --- crates/katana/storage/db/src/migration.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/katana/storage/db/src/migration.rs b/crates/katana/storage/db/src/migration.rs index 4c481f8370..84b87ec44c 100644 --- a/crates/katana/storage/db/src/migration.rs +++ b/crates/katana/storage/db/src/migration.rs @@ -13,8 +13,7 @@ use crate::error::DatabaseError; use crate::mdbx::DbEnv; use crate::models::list::BlockList; use crate::models::storage::ContractStorageKey; -use crate::tables::v0::SchemaV0; -use crate::tables::{Schema, SchemaV1, Table}; +use crate::tables::Table; use crate::version::{ create_db_version_file, get_db_version, remove_db_version_file, DatabaseVersionError, }; @@ -204,7 +203,7 @@ impl Yeeter { Ok(()) } - fn iter(&mut self) -> Result, io::Error> { + fn iter(&mut self) -> Result, io::Error> { // flush the remaining buffer if !self.buffer.is_empty() { self.flush()?; @@ -231,8 +230,8 @@ impl Iterator for YeeterIter<'_, T> { let file = &mut self.files[self.index.0]; match file.read_next() { Ok(Some((key_buf, value_buf))) => { - let key = ::decode(&key_buf).unwrap(); - let value = ::decompress(&value_buf).unwrap(); + let key = ::decode(key_buf).unwrap(); + let value = ::decompress(value_buf).unwrap(); self.index.1 += 1; Some(Ok((key, value))) } @@ -240,7 +239,7 @@ impl Iterator for YeeterIter<'_, T> { Ok(None) => { self.index.0 += 1; // move to the next file self.index.1 = 0; // reset the entry index - return self.next(); + self.next() } Err(error) => Some(Err(anyhow!(error))), @@ -286,6 +285,7 @@ impl YeeterFile { Ok(Self { file, len }) } + #[allow(clippy::type_complexity)] fn read_next(&mut self) -> Result, Vec)>, io::Error> { // check if we have reached the end of the file if self.len == 0 { @@ -389,7 +389,7 @@ mod tests { // we cant have multiple instances of the db open in the same process, so we drop here first // before migrating let (_, path) = create_v0_test_db(); - let _ = migrate_db(&path).expect(ERROR_MIGRATE_DB); + migrate_db(&path).expect(ERROR_MIGRATE_DB); let env = open_db(path).unwrap(); env.view(|tx| { From d9a0e7581d03881b9ca7d98bd28b555edc2795ec Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 18 Apr 2024 02:20:48 +0800 Subject: [PATCH 26/26] update check version to be based on schema version --- crates/katana/storage/db/src/lib.rs | 2 +- crates/katana/storage/db/src/version.rs | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/katana/storage/db/src/lib.rs b/crates/katana/storage/db/src/lib.rs index ba05a70b03..131cab4bb0 100644 --- a/crates/katana/storage/db/src/lib.rs +++ b/crates/katana/storage/db/src/lib.rs @@ -45,7 +45,7 @@ pub(crate) fn init_db_with_schema(path: impl AsRef) -> anyhow:: format!("Inserting database version file at path {}", path.as_ref().display()) })? } else { - match check_db_version(&path) { + match check_db_version::(&path) { Ok(_) => {} Err(DatabaseVersionError::MismatchVersion { expected, found }) => { warn!(target: LOG_TARGET, %expected, %found, "Mismatch database version."); diff --git a/crates/katana/storage/db/src/version.rs b/crates/katana/storage/db/src/version.rs index 5ca874bf27..43c8cfaec6 100644 --- a/crates/katana/storage/db/src/version.rs +++ b/crates/katana/storage/db/src/version.rs @@ -51,10 +51,12 @@ pub(super) fn create_db_version_file( /// Check the version of the database at the given `path`. /// /// Returning `Ok` if the version matches with [`CURRENT_DB_VERSION`], otherwise `Err` is returned. -pub(super) fn check_db_version(path: impl AsRef) -> Result<(), DatabaseVersionError> { +pub(super) fn check_db_version( + path: impl AsRef, +) -> Result<(), DatabaseVersionError> { let version = get_db_version(path)?; - if version != CURRENT_DB_VERSION { - Err(DatabaseVersionError::MismatchVersion { expected: CURRENT_DB_VERSION, found: version }) + if version != S::VERSION { + Err(DatabaseVersionError::MismatchVersion { expected: S::VERSION, found: version }) } else { Ok(()) }