Skip to content

Commit

Permalink
jsonrpc: initialize new coin_index_2
Browse files Browse the repository at this point in the history
Initialize the new coin_index_2 index and clear the old coin_index
column family.
  • Loading branch information
bmwill committed Oct 11, 2024
1 parent b875eba commit eccf6e3
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 5 deletions.
1 change: 1 addition & 0 deletions crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ impl<'a> TestAuthorityBuilder<'a> {
.protocol_config()
.max_move_identifier_len_as_option(),
false,
&authority_store,
)))
};
let rest_index = if self.disable_indexer {
Expand Down
123 changes: 120 additions & 3 deletions crates/sui-core/src/jsonrpc_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ use sui_types::error::{SuiError, SuiResult, UserInputError};
use sui_types::inner_temporary_store::TxCoins;
use sui_types::object::{Object, Owner};
use sui_types::parse_sui_struct_tag;
use sui_types::storage::error::Error as StorageError;
use tokio::task::spawn_blocking;
use tracing::{debug, instrument, trace};
use tracing::{debug, info, instrument, trace};
use typed_store::rocks::{
default_db_options, read_size_from_env, DBBatch, DBMap, DBOptions, MetricConf,
};
use typed_store::traits::Map;
use typed_store::traits::{TableSummary, TypedStoreDebug};
use typed_store::DBMapUtils;

use crate::authority::AuthorityStore;
use crate::par_index_live_object_set::{LiveObjectIndexer, ParMakeLiveObjectIndexer};

type OwnerIndexKey = (SuiAddress, ObjectID);
type CoinIndexKey = (SuiAddress, String, ObjectID);
type DynamicFieldKey = (ObjectID, ObjectID);
Expand Down Expand Up @@ -223,6 +227,7 @@ pub struct IndexStoreTables {
owner_index: DBMap<OwnerIndexKey, ObjectInfo>,

#[default_options_override_fn = "coin_index_table_default_config"]
#[deprecated]
coin_index: DBMap<CoinIndexKey, CoinInfo>,
#[default_options_override_fn = "coin_index_table_default_config"]
coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
Expand Down Expand Up @@ -261,6 +266,38 @@ impl IndexStoreTables {
pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
&self.coin_index_2
}

#[allow(deprecated)]
fn init(&mut self, authority_store: &AuthorityStore) -> Result<(), StorageError> {
// If the new coin index is empty, populate it
if self.coin_index_2.is_empty() {
info!("Initializing JSON-RPC coin index");

let make_live_object_indexer = CoinParLiveObjectSetIndexer { tables: self };

crate::par_index_live_object_set::par_index_live_object_set(
authority_store,
&make_live_object_indexer,
)?;

info!("Finished initializing JSON-RPC coin index");
}

// Clear old, deprecated column families
if !self.coin_index.is_empty() {
self.coin_index.unsafe_clear()?;
}

if !self.loaded_child_object_versions.is_empty() {
self.loaded_child_object_versions.unsafe_clear()?;
}

if !self.timestamps.is_empty() {
self.timestamps.unsafe_clear()?;
}

Ok(())
}
}

pub struct IndexStore {
Expand Down Expand Up @@ -312,7 +349,7 @@ fn coin_index_table_default_config() -> DBOptions {
}

impl IndexStore {
pub fn new(
pub fn new_without_init(
path: PathBuf,
registry: &Registry,
max_type_length: Option<u64>,
Expand All @@ -325,6 +362,7 @@ impl IndexStore {
None,
remove_deprecated_tables,
);

let metrics = IndexStoreMetrics::new(registry);
let caches = IndexStoreCaches {
per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
Expand All @@ -350,6 +388,19 @@ impl IndexStore {
}
}

pub fn new(
path: PathBuf,
registry: &Registry,
max_type_length: Option<u64>,
remove_deprecated_tables: bool,
authority_store: &AuthorityStore,
) -> Self {
let mut store =
Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
store.tables.init(authority_store).unwrap();
store
}

pub fn tables(&self) -> &IndexStoreTables {
&self.tables
}
Expand Down Expand Up @@ -1657,6 +1708,71 @@ impl IndexStore {
}
}

struct CoinParLiveObjectSetIndexer<'a> {
tables: &'a IndexStoreTables,
}

struct CoinLiveObjectIndexer<'a> {
tables: &'a IndexStoreTables,
batch: typed_store::rocks::DBBatch,
}

impl<'a> ParMakeLiveObjectIndexer for CoinParLiveObjectSetIndexer<'a> {
type ObjectIndexer = CoinLiveObjectIndexer<'a>;

fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
CoinLiveObjectIndexer {
tables: self.tables,
batch: self.tables.coin_index_2.batch(),
}
}
}

impl<'a> LiveObjectIndexer for CoinLiveObjectIndexer<'a> {
fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
let Owner::AddressOwner(owner) = object.owner() else {
return Ok(());
};

// only process coin types
let Some((coin_type, coin)) = object
.coin_type_maybe()
.and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))
else {
return Ok(());
};

let key = CoinIndexKey2::new(
*owner,
coin_type.to_string(),
coin.balance.value(),
object.id(),
);
let value = CoinInfo {
version: object.version(),
digest: object.digest(),
balance: coin.balance.value(),
previous_transaction: object.previous_transaction,
};

self.batch
.insert_batch(&self.tables.coin_index_2, [(key, value)])?;

// If the batch size grows to greater that 256MB then write out to the DB so that the
// data we need to hold in memory doesn't grown unbounded.
if self.batch.size_in_bytes() >= 1 << 28 {
std::mem::replace(&mut self.batch, self.tables.coin_index_2.batch()).write()?;
}

Ok(())
}

fn finish(self) -> Result<(), StorageError> {
self.batch.write()?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::IndexStore;
Expand All @@ -1681,7 +1797,8 @@ mod tests {
// and verified from both db and cache.
// This tests make sure we are invalidating entries in the cache and always reading latest
// balance.
let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
let index_store =
IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
let address: SuiAddress = AccountAddress::random().into();
let mut written_objects = BTreeMap::new();
let mut object_map = BTreeMap::new();
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ pub mod mock_consensus;
pub mod module_cache_metrics;
pub mod mysticeti_adapter;
pub mod overload_monitor;
mod par_index_live_object_set;
pub(crate) mod post_consensus_tx_reorder;
pub mod quorum_driver;
pub mod rest_index;
mod par_index_live_object_set;
pub mod safe_client;
mod scoring_decision;
mod stake_aggregator;
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-core/src/rest_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ impl IndexStoreTables {
package_store,
};

crate::par_index_live_object_set::par_index_live_object_set(authority_store, &make_live_object_indexer)?;
crate::par_index_live_object_set::par_index_live_object_set(
authority_store,
&make_live_object_indexer,
)?;

self.coin.multi_insert(coin_index.into_inner().unwrap())?;

Expand Down
1 change: 1 addition & 0 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ impl SuiNode {
.protocol_config()
.max_move_identifier_len_as_option(),
config.remove_deprecated_tables,
&store,
)))
} else {
None
Expand Down

0 comments on commit eccf6e3

Please sign in to comment.