Skip to content

Commit

Permalink
Support archival queries by modifying the WorkingSet (#1184)
Browse files Browse the repository at this point in the history
* use archival working set instead of archival storage

* add missing archival_query_test file

* use impl StateReaderAndWriter

* fix version import

* use separate trait to mark statemap accessed workingsets

* doc cleanup

* simplify by removing trait impl

* archival set

* fix typo

* use alloc

* fix the std dep issue

* fix issue with archival accessory writes

* make archival state functions private
  • Loading branch information
dubbelosix authored Dec 6, 2023
1 parent c6a012f commit a099cb3
Show file tree
Hide file tree
Showing 13 changed files with 602 additions and 73 deletions.
46 changes: 35 additions & 11 deletions full-node/db/sov-db/src/native_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use sov_schema_db::{SchemaBatch, DB};

use crate::rocks_db_config::gen_rocksdb_options;
use crate::schema::tables::{ModuleAccessoryState, NATIVE_TABLES};
use crate::schema::types::StateKey;
use crate::schema::types::AccessoryKey;

/// Specifies a particular version of the Accessory state.
pub type Version = u64;

/// A typed wrapper around RocksDB for storing native-only accessory state.
/// Internally, this is roughly just an [`Arc<SchemaDB>`].
Expand Down Expand Up @@ -37,20 +40,37 @@ impl NativeDB {
}

/// Queries for a value in the [`NativeDB`], given a key.
pub fn get_value_option(&self, key: &StateKey) -> anyhow::Result<Option<Vec<u8>>> {
self.db
.get::<ModuleAccessoryState>(key)
.map(Option::flatten)
pub fn get_value_option(
&self,
key: &AccessoryKey,
version: Version,
) -> anyhow::Result<Option<Vec<u8>>> {
let mut iter = self.db.iter::<ModuleAccessoryState>()?;
iter.seek_for_prev(&(key.to_vec(), version))?;
let found = iter.next();
match found {
Some(result) => {
let ((found_key, found_version), value) = result?;
if &found_key == key {
anyhow::ensure!(found_version <= version, "Bug! iterator isn't returning expected values. expected a version <= {version:} but found {found_version:}");
Ok(value)
} else {
Ok(None)
}
}
None => Ok(None),
}
}

/// Sets a sequence of key-value pairs in the [`NativeDB`]. The write is atomic.
pub fn set_values(
&self,
key_value_pairs: impl IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
version: Version,
) -> anyhow::Result<()> {
let mut batch = SchemaBatch::default();
for (key, value) in key_value_pairs {
batch.put::<ModuleAccessoryState>(&key, &value)?;
batch.put::<ModuleAccessoryState>(&(key, version), &value)?;
}
self.db.write_schemas(batch)
}
Expand Down Expand Up @@ -144,9 +164,13 @@ mod tests {

let key = b"foo".to_vec();
let value = b"bar".to_vec();
db.set_values(vec![(key.clone(), Some(value.clone()))])
db.set_values(vec![(key.clone(), Some(value.clone()))], 0)
.unwrap();
assert_eq!(db.get_value_option(&key, 0).unwrap(), Some(value.clone()));
let value2 = b"bar2".to_vec();
db.set_values(vec![(key.clone(), Some(value2.clone()))], 1)
.unwrap();
assert_eq!(db.get_value_option(&key).unwrap(), Some(value));
assert_eq!(db.get_value_option(&key, 0).unwrap(), Some(value));
}

#[test]
Expand All @@ -155,8 +179,8 @@ mod tests {
let db = NativeDB::with_path(tmpdir.path()).unwrap();

let key = b"deleted".to_vec();
db.set_values(vec![(key.clone(), None)]).unwrap();
assert_eq!(db.get_value_option(&key).unwrap(), None);
db.set_values(vec![(key.clone(), None)], 0).unwrap();
assert_eq!(db.get_value_option(&key, 0).unwrap(), None);
}

#[test]
Expand All @@ -165,6 +189,6 @@ mod tests {
let db = NativeDB::with_path(tmpdir.path()).unwrap();

let key = b"spam".to_vec();
assert_eq!(db.get_value_option(&key).unwrap(), None);
assert_eq!(db.get_value_option(&key, 0).unwrap(), None);
}
}
49 changes: 44 additions & 5 deletions full-node/db/sov-db/src/schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ define_table_with_default_codec!(
(SlotByHash) DbHash => SlotNumber
);

define_table_with_default_codec!(
/// Non-JMT state stored by a module for JSON-RPC use.
(ModuleAccessoryState) AccessoryKey => AccessoryStateValue
);

define_table_with_seek_key_codec!(
/// The primary source for batch data
(BatchByNumber) BatchNumber => StoredBatch
Expand Down Expand Up @@ -347,3 +342,47 @@ define_table_with_default_codec!(
/// which requires the ability to fetch values by hash.
(KeyHashToKey) [u8;32] => StateKey
);

define_table_without_codec!(
/// Non-JMT state stored by a module for JSON-RPC use.
(ModuleAccessoryState) (AccessoryKey, Version) => AccessoryStateValue
);

impl KeyEncoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn encode_key(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
let mut out = Vec::with_capacity(self.0.len() + std::mem::size_of::<Version>() + 8);
self.0
.as_slice()
.serialize(&mut out)
.map_err(CodecError::from)?;
// Write the version in big-endian order so that sorting order is based on the most-significant bytes of the key
out.write_u64::<BigEndian>(self.1)
.expect("serialization to vec is infallible");
Ok(out)
}
}

impl SeekKeyEncoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn encode_seek_key(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
<(Vec<u8>, u64) as KeyEncoder<ModuleAccessoryState>>::encode_key(self)
}
}

impl KeyDecoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn decode_key(data: &[u8]) -> sov_schema_db::schema::Result<Self> {
let mut cursor = maybestd::io::Cursor::new(data);
let key = Vec::<u8>::deserialize_reader(&mut cursor)?;
let version = cursor.read_u64::<BigEndian>()?;
Ok((key, version))
}
}

impl ValueCodec<ModuleAccessoryState> for AccessoryStateValue {
fn encode_value(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
self.try_to_vec().map_err(CodecError::from)
}

fn decode_value(data: &[u8]) -> sov_schema_db::schema::Result<Self> {
Ok(Self::deserialize_reader(&mut &data[..])?)
}
}
1 change: 0 additions & 1 deletion module-system/module-implementations/sov-bank/src/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ impl<C: sov_modules_api::Context> Token<C> {

self.balances.set(from, &from_balance, working_set);
self.balances.set(to, &to_balance, working_set);

Ok(())
}
/// Burns a specified `amount` of token from the address `from`. First check that the address has enough token to burn,
Expand Down
Loading

0 comments on commit a099cb3

Please sign in to comment.