diff --git a/crates/sui-core/src/authority/test_authority_builder.rs b/crates/sui-core/src/authority/test_authority_builder.rs index bcbf4c05b1f9d6..3427153f8395a2 100644 --- a/crates/sui-core/src/authority/test_authority_builder.rs +++ b/crates/sui-core/src/authority/test_authority_builder.rs @@ -270,6 +270,7 @@ impl<'a> TestAuthorityBuilder<'a> { .protocol_config() .max_move_identifier_len_as_option(), false, + &authority_store, ))) }; let rest_index = if self.disable_indexer { diff --git a/crates/sui-core/src/jsonrpc_index.rs b/crates/sui-core/src/jsonrpc_index.rs index 168342814a1b9e..9041f99b72b8a0 100644 --- a/crates/sui-core/src/jsonrpc_index.rs +++ b/crates/sui-core/src/jsonrpc_index.rs @@ -31,8 +31,9 @@ use sui_types::error::{SuiError, SuiResult, UserInputError}; use sui_types::inner_temporary_store::TxCoins; use sui_types::object::{Object, Owner}; use sui_types::parse_sui_struct_tag; +use sui_types::storage::error::Error as StorageError; use tokio::task::spawn_blocking; -use tracing::{debug, instrument, trace}; +use tracing::{debug, info, instrument, trace}; use typed_store::rocks::{ default_db_options, read_size_from_env, DBBatch, DBMap, DBOptions, MetricConf, }; @@ -40,6 +41,9 @@ use typed_store::traits::Map; use typed_store::traits::{TableSummary, TypedStoreDebug}; use typed_store::DBMapUtils; +use crate::authority::AuthorityStore; +use crate::par_index_live_object_set::{LiveObjectIndexer, ParMakeLiveObjectIndexer}; + type OwnerIndexKey = (SuiAddress, ObjectID); type CoinIndexKey = (SuiAddress, String, ObjectID); type DynamicFieldKey = (ObjectID, ObjectID); @@ -223,6 +227,7 @@ pub struct IndexStoreTables { owner_index: DBMap, #[default_options_override_fn = "coin_index_table_default_config"] + #[deprecated] coin_index: DBMap, #[default_options_override_fn = "coin_index_table_default_config"] coin_index_2: DBMap, @@ -261,6 +266,38 @@ impl IndexStoreTables { pub fn coin_index(&self) -> &DBMap { &self.coin_index_2 } + + #[allow(deprecated)] + fn init(&mut self, authority_store: &AuthorityStore) -> Result<(), StorageError> { + // If the new coin index is empty, populate it + if self.coin_index_2.is_empty() { + info!("Initializing JSON-RPC coin index"); + + let make_live_object_indexer = CoinParLiveObjectSetIndexer { tables: self }; + + crate::par_index_live_object_set::par_index_live_object_set( + authority_store, + &make_live_object_indexer, + )?; + + info!("Finished initializing JSON-RPC coin index"); + } + + // Clear old, deprecated column families + if !self.coin_index.is_empty() { + self.coin_index.unsafe_clear()?; + } + + if !self.loaded_child_object_versions.is_empty() { + self.loaded_child_object_versions.unsafe_clear()?; + } + + if !self.timestamps.is_empty() { + self.timestamps.unsafe_clear()?; + } + + Ok(()) + } } pub struct IndexStore { @@ -312,7 +349,7 @@ fn coin_index_table_default_config() -> DBOptions { } impl IndexStore { - pub fn new( + pub fn new_without_init( path: PathBuf, registry: &Registry, max_type_length: Option, @@ -325,6 +362,7 @@ impl IndexStore { None, remove_deprecated_tables, ); + let metrics = IndexStoreMetrics::new(registry); let caches = IndexStoreCaches { per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000), @@ -350,6 +388,19 @@ impl IndexStore { } } + pub fn new( + path: PathBuf, + registry: &Registry, + max_type_length: Option, + remove_deprecated_tables: bool, + authority_store: &AuthorityStore, + ) -> Self { + let mut store = + Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables); + store.tables.init(authority_store).unwrap(); + store + } + pub fn tables(&self) -> &IndexStoreTables { &self.tables } @@ -1657,6 +1708,71 @@ impl IndexStore { } } +struct CoinParLiveObjectSetIndexer<'a> { + tables: &'a IndexStoreTables, +} + +struct CoinLiveObjectIndexer<'a> { + tables: &'a IndexStoreTables, + batch: typed_store::rocks::DBBatch, +} + +impl<'a> ParMakeLiveObjectIndexer for CoinParLiveObjectSetIndexer<'a> { + type ObjectIndexer = CoinLiveObjectIndexer<'a>; + + fn make_live_object_indexer(&self) -> Self::ObjectIndexer { + CoinLiveObjectIndexer { + tables: self.tables, + batch: self.tables.coin_index_2.batch(), + } + } +} + +impl<'a> LiveObjectIndexer for CoinLiveObjectIndexer<'a> { + fn index_object(&mut self, object: Object) -> Result<(), StorageError> { + let Owner::AddressOwner(owner) = object.owner() else { + return Ok(()); + }; + + // only process coin types + let Some((coin_type, coin)) = object + .coin_type_maybe() + .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin))) + else { + return Ok(()); + }; + + let key = CoinIndexKey2::new( + *owner, + coin_type.to_string(), + coin.balance.value(), + object.id(), + ); + let value = CoinInfo { + version: object.version(), + digest: object.digest(), + balance: coin.balance.value(), + previous_transaction: object.previous_transaction, + }; + + self.batch + .insert_batch(&self.tables.coin_index_2, [(key, value)])?; + + // If the batch size grows to greater that 256MB then write out to the DB so that the + // data we need to hold in memory doesn't grown unbounded. + if self.batch.size_in_bytes() >= 1 << 28 { + std::mem::replace(&mut self.batch, self.tables.coin_index_2.batch()).write()?; + } + + Ok(()) + } + + fn finish(self) -> Result<(), StorageError> { + self.batch.write()?; + Ok(()) + } +} + #[cfg(test)] mod tests { use super::IndexStore; @@ -1681,7 +1797,8 @@ mod tests { // and verified from both db and cache. // This tests make sure we are invalidating entries in the cache and always reading latest // balance. - let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false); + let index_store = + IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false); let address: SuiAddress = AccountAddress::random().into(); let mut written_objects = BTreeMap::new(); let mut object_map = BTreeMap::new(); diff --git a/crates/sui-core/src/lib.rs b/crates/sui-core/src/lib.rs index 88b4b23edb40fd..2260f210a509ca 100644 --- a/crates/sui-core/src/lib.rs +++ b/crates/sui-core/src/lib.rs @@ -26,10 +26,10 @@ pub mod mock_consensus; pub mod module_cache_metrics; pub mod mysticeti_adapter; pub mod overload_monitor; +mod par_index_live_object_set; pub(crate) mod post_consensus_tx_reorder; pub mod quorum_driver; pub mod rest_index; -mod par_index_live_object_set; pub mod safe_client; mod scoring_decision; mod stake_aggregator; diff --git a/crates/sui-core/src/rest_index.rs b/crates/sui-core/src/rest_index.rs index bfe305ee3b9df3..438ca125ed6cfc 100644 --- a/crates/sui-core/src/rest_index.rs +++ b/crates/sui-core/src/rest_index.rs @@ -231,7 +231,10 @@ impl IndexStoreTables { package_store, }; - crate::par_index_live_object_set::par_index_live_object_set(authority_store, &make_live_object_indexer)?; + crate::par_index_live_object_set::par_index_live_object_set( + authority_store, + &make_live_object_indexer, + )?; self.coin.multi_insert(coin_index.into_inner().unwrap())?; diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index fb0c2c0996b512..efcf87b326afaa 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -573,6 +573,7 @@ impl SuiNode { .protocol_config() .max_move_identifier_len_as_option(), config.remove_deprecated_tables, + &store, ))) } else { None