Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archival state support for state_db and native_db #1170

Draft
wants to merge 15 commits into
base: nightly
Choose a base branch
from
68 changes: 58 additions & 10 deletions full-node/db/sov-db/src/native_db.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use sov_schema_db::{SchemaBatch, DB};

use crate::rocks_db_config::gen_rocksdb_options;
use crate::schema::tables::{ModuleAccessoryState, NATIVE_TABLES};
use crate::schema::types::StateKey;
use crate::schema::types::AccessoryKey;

/// Specifies a particular version of the Accessory state.
pub type Version = u64;

/// A typed wrapper around RocksDB for storing native-only accessory state.
/// Internally, this is roughly just an [`Arc<SchemaDB>`].
Expand All @@ -14,6 +17,8 @@ pub struct NativeDB {
/// The underlying RocksDB instance, wrapped in an [`Arc`] for convenience
/// and [`DB`] for type safety.
db: Arc<DB>,
/// The [`Version`] that will be used for the next batch of writes to the DB.
next_version: Arc<Mutex<Version>>,
}

impl NativeDB {
Expand All @@ -31,29 +36,72 @@ impl NativeDB {
&gen_rocksdb_options(&Default::default(), false),
)?;

let next_version = Self::last_version_written(&inner)?.unwrap_or_default() + 1;
Ok(Self {
db: Arc::new(inner),
next_version: Arc::new(Mutex::new(next_version)),
dubbelosix marked this conversation as resolved.
Show resolved Hide resolved
})
}

/// Queries for a value in the [`NativeDB`], given a key.
pub fn get_value_option(&self, key: &StateKey) -> anyhow::Result<Option<Vec<u8>>> {
self.db
.get::<ModuleAccessoryState>(key)
.map(Option::flatten)
pub fn get_value_option(
&self,
key: &AccessoryKey,
version: Option<Version>,
) -> anyhow::Result<Option<Vec<u8>>> {
let version_to_use = version.unwrap_or_else(|| self.get_next_version());
let mut iter = self.db.iter::<ModuleAccessoryState>()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, we use iterator on every access to the database.

Do we want to measure performance impact on regular execution?

Maybe it is too late to discuss it, but it looks like archival state taxes on regular execution.

I am raising this point, because together with ForkManagement our storage can become bottleneck.

iter.seek_for_prev(&(key.to_vec(), version_to_use))?;
let found = iter.next();
match found {
Some(result) => {
let ((found_key, found_version), value) = result?;
if &found_key == key {
anyhow::ensure!(found_version <= version_to_use, "Bug! iterator isn't returning expected values. expected a version <= {version_to_use:} but found {found_version:}");
Ok(value)
} else {
Ok(None)
}
}
None => Ok(None),
}
}

/// Sets a sequence of key-value pairs in the [`NativeDB`]. The write is atomic.
pub fn set_values(
&self,
key_value_pairs: impl IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
) -> anyhow::Result<()> {
let version = self.get_next_version();
let mut batch = SchemaBatch::default();
for (key, value) in key_value_pairs {
batch.put::<ModuleAccessoryState>(&key, &value)?;
batch.put::<ModuleAccessoryState>(&(key, version), &value)?;
}
self.db.write_schemas(batch)
}

/// Increment the `next_version` counter by 1.
pub fn inc_next_version(&self) {
let mut version = self.next_version.lock().unwrap();
*version += 1;
}

/// Get the current value of the `next_version` counter
pub fn get_next_version(&self) -> Version {
let version = self.next_version.lock().unwrap();
*version
}

fn last_version_written(db: &DB) -> anyhow::Result<Option<Version>> {
let mut iter = db.iter::<ModuleAccessoryState>()?;
iter.seek_to_last();

let version = match iter.next() {
Some(Ok(((_, version), _))) => Some(version),
_ => None,
};
Ok(version)
}
}

#[cfg(feature = "arbitrary")]
Expand Down Expand Up @@ -146,7 +194,7 @@ mod tests {
let value = b"bar".to_vec();
db.set_values(vec![(key.clone(), Some(value.clone()))])
.unwrap();
assert_eq!(db.get_value_option(&key).unwrap(), Some(value));
assert_eq!(db.get_value_option(&key, None).unwrap(), Some(value));
}

#[test]
Expand All @@ -156,7 +204,7 @@ mod tests {

let key = b"deleted".to_vec();
db.set_values(vec![(key.clone(), None)]).unwrap();
assert_eq!(db.get_value_option(&key).unwrap(), None);
assert_eq!(db.get_value_option(&key, None).unwrap(), None);
}

#[test]
Expand All @@ -165,6 +213,6 @@ mod tests {
let db = NativeDB::with_path(tmpdir.path()).unwrap();

let key = b"spam".to_vec();
assert_eq!(db.get_value_option(&key).unwrap(), None);
assert_eq!(db.get_value_option(&key, None).unwrap(), None);
}
}
49 changes: 44 additions & 5 deletions full-node/db/sov-db/src/schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,6 @@ define_table_with_default_codec!(
(SlotByHash) DbHash => SlotNumber
);

define_table_with_default_codec!(
/// Non-JMT state stored by a module for JSON-RPC use.
(ModuleAccessoryState) AccessoryKey => AccessoryStateValue
);

define_table_with_seek_key_codec!(
/// The primary source for batch data
(BatchByNumber) BatchNumber => StoredBatch
Expand Down Expand Up @@ -332,3 +327,47 @@ define_table_with_default_codec!(
/// which requires the ability to fetch values by hash.
(KeyHashToKey) [u8;32] => StateKey
);

define_table_without_codec!(
/// Non-JMT state stored by a module for JSON-RPC use.
(ModuleAccessoryState) (AccessoryKey, Version) => AccessoryStateValue
);

impl KeyEncoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn encode_key(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
let mut out = Vec::with_capacity(self.0.len() + std::mem::size_of::<Version>() + 8);
self.0
.as_slice()
.serialize(&mut out)
.map_err(CodecError::from)?;
// Write the version in big-endian order so that sorting order is based on the most-significant bytes of the key
out.write_u64::<BigEndian>(self.1)
.expect("serialization to vec is infallible");
Ok(out)
}
}

impl SeekKeyEncoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn encode_seek_key(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
<(Vec<u8>, u64) as KeyEncoder<ModuleAccessoryState>>::encode_key(self)
}
}

impl KeyDecoder<ModuleAccessoryState> for (AccessoryKey, Version) {
fn decode_key(data: &[u8]) -> sov_schema_db::schema::Result<Self> {
let mut cursor = maybestd::io::Cursor::new(data);
let key = Vec::<u8>::deserialize_reader(&mut cursor)?;
let version = cursor.read_u64::<BigEndian>()?;
Ok((key, version))
}
}

impl ValueCodec<ModuleAccessoryState> for AccessoryStateValue {
fn encode_value(&self) -> sov_schema_db::schema::Result<Vec<u8>> {
self.try_to_vec().map_err(CodecError::from)
}

fn decode_value(data: &[u8]) -> sov_schema_db::schema::Result<Self> {
Ok(Self::deserialize_reader(&mut &data[..])?)
}
}
Loading
Loading