Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support archival queries by modifying the WorkingSet #1184

Merged
merged 16 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions full-node/db/sov-db/src/native_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use sov_schema_db::{SchemaBatch, DB};

use crate::rocks_db_config::gen_rocksdb_options;
use crate::schema::tables::{ModuleAccessoryState, NATIVE_TABLES};
use crate::schema::types::StateKey;
use crate::schema::types::AccessoryKey;

/// Specifies a particular version of the Accessory state.
pub type Version = u64;

/// A typed wrapper around RocksDB for storing native-only accessory state.
/// Internally, this is roughly just an [`Arc<SchemaDB>`].
Expand Down Expand Up @@ -37,20 +40,37 @@ impl NativeDB {
}

/// Queries for a value in the [`NativeDB`], given a key.
pub fn get_value_option(&self, key: &StateKey) -> anyhow::Result<Option<Vec<u8>>> {
self.db
.get::<ModuleAccessoryState>(key)
.map(Option::flatten)
pub fn get_value_option(
&self,
key: &AccessoryKey,
version: Version,
) -> anyhow::Result<Option<Vec<u8>>> {
let mut iter = self.db.iter::<ModuleAccessoryState>()?;
iter.seek_for_prev(&(key.to_vec(), version))?;
let found = iter.next();
match found {
Some(result) => {
let ((found_key, found_version), value) = result?;
if &found_key == key {
anyhow::ensure!(found_version <= version, "Bug! iterator isn't returning expected values. expected a version <= {version:} but found {found_version:}");
Ok(value)
} else {
Ok(None)
}
}
None => Ok(None),
}
}

/// Sets a sequence of key-value pairs in the [`NativeDB`]. The write is atomic.
pub fn set_values(
&self,
key_value_pairs: impl IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
version: Version,
) -> anyhow::Result<()> {
let mut batch = SchemaBatch::default();
for (key, value) in key_value_pairs {
batch.put::<ModuleAccessoryState>(&key, &value)?;
batch.put::<ModuleAccessoryState>(&(key, version), &value)?;
}
self.db.write_schemas(batch)
}
Expand Down Expand Up @@ -144,9 +164,13 @@ mod tests {

let key = b"foo".to_vec();
let value = b"bar".to_vec();
db.set_values(vec![(key.clone(), Some(value.clone()))])
db.set_values(vec![(key.clone(), Some(value.clone()))], 0)
.unwrap();
assert_eq!(db.get_value_option(&key, 0).unwrap(), Some(value.clone()));
let value2 = b"bar2".to_vec();
db.set_values(vec![(key.clone(), Some(value2.clone()))], 1)
.unwrap();
assert_eq!(db.get_value_option(&key).unwrap(), Some(value));
assert_eq!(db.get_value_option(&key, 0).unwrap(), Some(value));
}

#[test]
Expand All @@ -155,8 +179,8 @@ mod tests {
let db = NativeDB::with_path(tmpdir.path()).unwrap();

let key = b"deleted".to_vec();
db.set_values(vec![(key.clone(), None)]).unwrap();
assert_eq!(db.get_value_option(&key).unwrap(), None);
db.set_values(vec![(key.clone(), None)], 0).unwrap();
assert_eq!(db.get_value_option(&key, 0).unwrap(), None);
}

#[test]
Expand All @@ -165,6 +189,6 @@ mod tests {
let db = NativeDB::with_path(tmpdir.path()).unwrap();

let key = b"spam".to_vec();
assert_eq!(db.get_value_option(&key).unwrap(), None);
assert_eq!(db.get_value_option(&key, 0).unwrap(), None);
}
}
49 changes: 44 additions & 5 deletions full-node/db/sov-db/src/schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ define_table_with_default_codec!(
(SlotByHash) DbHash => SlotNumber
);

define_table_with_default_codec!(
/// Non-JMT state stored by a module for JSON-RPC use.
(ModuleAccessoryState) AccessoryKey => AccessoryStateValue
);

define_table_with_seek_key_codec!(
/// The primary source for batch data
(BatchByNumber) BatchNumber => StoredBatch
Expand Down Expand Up @@ -347,3 +342,47 @@ define_table_with_default_codec!(
/// which requires the ability to fetch values by hash.
(KeyHashToKey) [u8;32] => StateKey
);

define_table_without_codec!(
/// Non-JMT state stored by a module for JSON-RPC use.
(ModuleAccessoryState) (AccessoryKey, Version) => AccessoryStateValue
);

impl KeyEncoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn encode_key(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
let mut out = Vec::with_capacity(self.0.len() + std::mem::size_of::<Version>() + 8);
self.0
.as_slice()
.serialize(&mut out)
.map_err(CodecError::from)?;
// Write the version in big-endian order so that sorting order is based on the most-significant bytes of the key
out.write_u64::<BigEndian>(self.1)
.expect("serialization to vec is infallible");
Ok(out)
}
}

impl SeekKeyEncoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn encode_seek_key(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
<(Vec<u8>, u64) as KeyEncoder<ModuleAccessoryState>>::encode_key(self)
}
}

impl KeyDecoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn decode_key(data: &[u8]) -> sov_schema_db::schema::Result<Self> {
let mut cursor = maybestd::io::Cursor::new(data);
let key = Vec::<u8>::deserialize_reader(&mut cursor)?;
let version = cursor.read_u64::<BigEndian>()?;
Ok((key, version))
}
}

impl ValueCodec<ModuleAccessoryState> for AccessoryStateValue {
fn encode_value(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
self.try_to_vec().map_err(CodecError::from)
}

fn decode_value(data: &[u8]) -> sov_schema_db::schema::Result<Self> {
Ok(Self::deserialize_reader(&mut &data[..])?)
}
}
6 changes: 3 additions & 3 deletions module-system/module-implementations/sov-bank/src/call.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{bail, Context, Result};
#[cfg(feature = "native")]
use sov_modules_api::macros::CliWalletArg;
use sov_modules_api::{CallResponse, StateMapAccessor, WorkingSet};
use sov_modules_api::{CallResponse, StateMapAccessor, StateMapWorkingSet, WorkingSet};

use crate::{Amount, Bank, Coins, Token};

Expand Down Expand Up @@ -252,7 +252,7 @@ impl<C: sov_modules_api::Context> Bank<C> {
&self,
user_address: C::Address,
token_address: C::Address,
working_set: &mut WorkingSet<C>,
working_set: &mut impl StateMapWorkingSet,
) -> Option<u64> {
self.tokens
.get(&token_address, working_set)
Expand All @@ -263,7 +263,7 @@ impl<C: sov_modules_api::Context> Bank<C> {
pub fn get_token_name(
&self,
token_address: &C::Address,
working_set: &mut WorkingSet<C>,
working_set: &mut impl StateMapWorkingSet,
dubbelosix marked this conversation as resolved.
Show resolved Hide resolved
) -> Option<String> {
let token = self.tokens.get(token_address, working_set);
token.map(|token| token.name)
Expand Down
4 changes: 2 additions & 2 deletions module-system/module-implementations/sov-bank/src/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::num::ParseIntError;

use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use sov_modules_api::{StateMapAccessor, WorkingSet};
use sov_modules_api::{StateMapAccessor, StateMapWorkingSet, WorkingSet};
use sov_state::Prefix;
#[cfg(feature = "native")]
use thiserror::Error;
Expand Down Expand Up @@ -226,7 +226,7 @@ impl<C: sov_modules_api::Context> Token<C> {
&self,
from: &C::Address,
amount: Amount,
working_set: &mut WorkingSet<C>,
working_set: &mut impl StateMapWorkingSet,
) -> Result<Amount> {
let balance = self.balances.get_or_err(from, working_set)?;
let new_balance = match balance.checked_sub(amount) {
Expand Down
Loading
Loading