diff --git a/crates/sui-indexer-alt-schema/migrations/2024-12-03-011506_coin_balance_buckets/down.sql b/crates/sui-indexer-alt-schema/migrations/2024-12-03-011506_coin_balance_buckets/down.sql new file mode 100644 index 0000000000000..d1bfe36a8deae --- /dev/null +++ b/crates/sui-indexer-alt-schema/migrations/2024-12-03-011506_coin_balance_buckets/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS coin_balance_buckets; diff --git a/crates/sui-indexer-alt-schema/migrations/2024-12-03-011506_coin_balance_buckets/up.sql b/crates/sui-indexer-alt-schema/migrations/2024-12-03-011506_coin_balance_buckets/up.sql new file mode 100644 index 0000000000000..f2f5a0016bff0 --- /dev/null +++ b/crates/sui-indexer-alt-schema/migrations/2024-12-03-011506_coin_balance_buckets/up.sql @@ -0,0 +1,29 @@ +-- A table of coin balance buckets, keyed on object ID and checkpoint sequence number. +-- At the end of each checkpoint, we insert a row for each coin balance bucket, if it has changed. +-- We bucketize coin balances to reduce the number of distinct values and help with both write and +-- read performance. Bucket is calculated as floor(log10(coin_balance)). +-- We also keep a record when we delete or wrap an object, which we would need for consistency query. +-- All fields except the primary key will be `NULL` for delete/wrap records. +CREATE TABLE IF NOT EXISTS coin_balance_buckets +( + object_id BYTEA NOT NULL, + cp_sequence_number BIGINT NOT NULL, + -- The kind of owner of this coin. We need this to support ConsensusV2 objects. + -- A coin can be either owned by an address through fast-path ownership, or + -- by an anddress through ConsensusV2 ownership. + -- This is represented by `StoredCoinOwnerKind` in `models/objects.rs`, which is different + -- from `StoredOwnerKind` used in `obj_info` table. + owner_kind SMALLINT, + -- The address that owns this version of the coin (it is guaranteed to be + -- address-owned). + owner_id BYTEA, + -- The type of the coin, as a BCS-serialized `TypeTag`. This is only the + -- marker type, and not the full object type (e.g. `0x0...02::sui::SUI`). + coin_type BYTEA, + -- The balance bucket of the coin, which is log10(coin_balance). + coin_balance_bucket SMALLINT, + PRIMARY KEY (object_id, cp_sequence_number) +); + +CREATE INDEX IF NOT EXISTS coin_balances_buckets_owner_type +ON coin_balance_buckets (owner_kind, owner_id, coin_type, coin_balance_bucket DESC, cp_sequence_number DESC, object_id); diff --git a/crates/sui-indexer-alt-schema/src/objects.rs b/crates/sui-indexer-alt-schema/src/objects.rs index 9584e381d96f5..cc4211d0645d8 100644 --- a/crates/sui-indexer-alt-schema/src/objects.rs +++ b/crates/sui-indexer-alt-schema/src/objects.rs @@ -9,8 +9,8 @@ use sui_field_count::FieldCount; use sui_types::base_types::ObjectID; use crate::schema::{ - kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances, - wal_obj_types, + coin_balance_buckets, kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types, + wal_coin_balances, wal_obj_types, }; #[derive(Insertable, Debug, Clone, FieldCount)] @@ -51,6 +51,14 @@ pub enum StoredOwnerKind { Shared = 3, } +#[derive(AsExpression, FromSqlRow, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[diesel(sql_type = SmallInt)] +#[repr(i16)] +pub enum StoredCoinOwnerKind { + Fastpath = 0, + Consensus = 1, +} + #[derive(Insertable, Debug, Clone, FieldCount)] #[diesel(table_name = sum_coin_balances, primary_key(object_id))] pub struct StoredSumCoinBalance { @@ -99,6 +107,30 @@ pub struct StoredWalObjType { pub cp_sequence_number: i64, } +#[derive(Insertable, Debug, Clone, FieldCount)] +#[diesel(table_name = obj_info, primary_key(object_id, cp_sequence_number))] +pub struct StoredObjInfo { + pub object_id: Vec, + pub cp_sequence_number: i64, + pub owner_kind: Option, + pub owner_id: Option>, + pub package: Option>, + pub module: Option, + pub name: Option, + pub instantiation: Option>, +} + +#[derive(Insertable, Debug, Clone, FieldCount)] +#[diesel(table_name = coin_balance_buckets, primary_key(object_id, cp_sequence_number))] +pub struct StoredCoinBalanceBucket { + pub object_id: Vec, + pub cp_sequence_number: i64, + pub owner_kind: Option, + pub owner_id: Option>, + pub coin_type: Option>, + pub coin_balance_bucket: Option, +} + /// StoredObjectUpdate is a wrapper type, we want to count the fields of the inner type. impl FieldCount for StoredObjectUpdate { // Add one here for cp_sequence_number field, because StoredObjectUpdate is used for @@ -135,15 +167,27 @@ where } } -#[derive(Insertable, Debug, Clone, FieldCount)] -#[diesel(table_name = obj_info, primary_key(object_id, cp_sequence_number))] -pub struct StoredObjInfo { - pub object_id: Vec, - pub cp_sequence_number: i64, - pub owner_kind: Option, - pub owner_id: Option>, - pub package: Option>, - pub module: Option, - pub name: Option, - pub instantiation: Option>, +impl serialize::ToSql for StoredCoinOwnerKind +where + i16: serialize::ToSql, +{ + fn to_sql<'b>(&'b self, out: &mut serialize::Output<'b, '_, DB>) -> serialize::Result { + match self { + StoredCoinOwnerKind::Fastpath => 0.to_sql(out), + StoredCoinOwnerKind::Consensus => 1.to_sql(out), + } + } +} + +impl deserialize::FromSql for StoredCoinOwnerKind +where + i16: deserialize::FromSql, +{ + fn from_sql(raw: DB::RawValue<'_>) -> deserialize::Result { + Ok(match i16::from_sql(raw)? { + 0 => StoredCoinOwnerKind::Fastpath, + 1 => StoredCoinOwnerKind::Consensus, + o => return Err(format!("Unexpected StoredCoinOwnerKind: {o}").into()), + }) + } } diff --git a/crates/sui-indexer-alt-schema/src/schema.rs b/crates/sui-indexer-alt-schema/src/schema.rs index c519fb4192d6f..9c59ec662d066 100644 --- a/crates/sui-indexer-alt-schema/src/schema.rs +++ b/crates/sui-indexer-alt-schema/src/schema.rs @@ -2,6 +2,17 @@ // SPDX-License-Identifier: Apache-2.0 // @generated automatically by Diesel CLI. +diesel::table! { + coin_balance_buckets (object_id, cp_sequence_number) { + object_id -> Bytea, + cp_sequence_number -> Int8, + owner_kind -> Nullable, + owner_id -> Nullable, + coin_type -> Nullable, + coin_balance_bucket -> Nullable, + } +} + diesel::table! { ev_emit_mod (package, module, tx_sequence_number) { package -> Bytea, @@ -240,6 +251,7 @@ diesel::table! { } diesel::allow_tables_to_appear_in_same_query!( + coin_balance_buckets, ev_emit_mod, ev_struct_inst, kv_checkpoints, diff --git a/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs new file mode 100644 index 0000000000000..b1bba4ec74ba8 --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs @@ -0,0 +1,211 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::{anyhow, bail, Result}; +use diesel_async::RunQueryDsl; +use sui_field_count::FieldCount; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_schema::{ + objects::{StoredCoinBalanceBucket, StoredCoinOwnerKind}, + schema::coin_balance_buckets, +}; +use sui_pg_db as db; +use sui_types::{ + base_types::{ObjectID, SuiAddress}, + full_checkpoint_content::CheckpointData, + object::{Object, Owner}, + TypeTag, +}; + +pub(crate) struct CoinBalanceBuckets; + +pub(crate) struct ProcessedCoinBalanceBucket { + pub object_id: ObjectID, + pub cp_sequence_number: u64, + pub change: CoinBalanceBucketChangeKind, +} + +pub(crate) enum CoinBalanceBucketChangeKind { + Insert { + owner_kind: StoredCoinOwnerKind, + owner_id: SuiAddress, + coin_type: TypeTag, + balance_bucket: i16, + }, + Delete, +} + +impl Processor for CoinBalanceBuckets { + const NAME: &'static str = "coin_balance_buckets"; + type Value = ProcessedCoinBalanceBucket; + + // TODO: We need to add tests for this function. + fn process(&self, checkpoint: &Arc) -> Result> { + let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number; + let checkpoint_input_objects = checkpoint.checkpoint_input_objects(); + let latest_live_output_objects: BTreeMap<_, _> = checkpoint + .latest_live_output_objects() + .into_iter() + .map(|o| (o.id(), o)) + .collect(); + let mut values: BTreeMap = BTreeMap::new(); + for (object_id, input_object) in checkpoint_input_objects.iter() { + // This loop processes all coins that were owned by a single address prior to the checkpoint, + // but is now deleted or wrapped after the checkpoint. + if !input_object.is_coin() { + continue; + } + if get_coin_owner(input_object).is_none() { + continue; + } + if latest_live_output_objects.contains_key(object_id) { + continue; + } + values.insert( + *object_id, + ProcessedCoinBalanceBucket { + object_id: *object_id, + cp_sequence_number, + change: CoinBalanceBucketChangeKind::Delete, + }, + ); + } + for (object_id, output_object) in latest_live_output_objects.iter() { + let Some(coin_type) = output_object.coin_type_maybe() else { + continue; + }; + + let (input_bucket, input_owner) = match checkpoint_input_objects.get(object_id) { + Some(input_object) => { + let bucket = get_coin_balance_bucket(input_object)?; + let owner = get_coin_owner(input_object); + (Some(bucket), owner) + } + None => (None, None), + }; + + let output_balance_bucket = get_coin_balance_bucket(output_object)?; + let output_owner = get_coin_owner(output_object); + + match (input_owner, output_owner) { + (Some(_), None) => { + // In this case, the coin was owned by a single address prior to the checkpoint, + // but is now either shared or immutable after the checkpoint. We treat this the same + // as if the coin was deleted, from the perspective of the balance bucket. + values.insert( + *object_id, + ProcessedCoinBalanceBucket { + object_id: *object_id, + cp_sequence_number, + change: CoinBalanceBucketChangeKind::Delete, + }, + ); + } + (_, Some(new_owner)) + if input_owner != output_owner + || input_bucket != Some(output_balance_bucket) => + { + // In this case, the coin is still owned by a single address after the checkpoint, + // but either the owner or the balance bucket has changed. This also includes the case + // where the coin did not exist prior to the checkpoint, and is now created/unwrapped. + values.insert( + *object_id, + ProcessedCoinBalanceBucket { + object_id: *object_id, + cp_sequence_number, + change: CoinBalanceBucketChangeKind::Insert { + owner_kind: new_owner.0, + owner_id: new_owner.1, + coin_type, + balance_bucket: output_balance_bucket, + }, + }, + ); + } + _ => {} + } + } + + Ok(values.into_values().collect()) + } +} + +#[async_trait::async_trait] +impl Handler for CoinBalanceBuckets { + async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { + let values = values + .iter() + .map(|v| v.try_into()) + .collect::>>()?; + Ok(diesel::insert_into(coin_balance_buckets::table) + .values(values) + .on_conflict_do_nothing() + .execute(conn) + .await?) + } +} + +impl FieldCount for ProcessedCoinBalanceBucket { + const FIELD_COUNT: usize = StoredCoinBalanceBucket::FIELD_COUNT; +} + +impl TryInto for &ProcessedCoinBalanceBucket { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + match &self.change { + CoinBalanceBucketChangeKind::Insert { + owner_kind, + owner_id, + coin_type, + balance_bucket, + } => { + let serialized_coin_type = bcs::to_bytes(&coin_type) + .map_err(|_| anyhow!("Failed to serialize type for {}", self.object_id))?; + Ok(StoredCoinBalanceBucket { + object_id: self.object_id.to_vec(), + cp_sequence_number: self.cp_sequence_number as i64, + owner_kind: Some(*owner_kind), + owner_id: Some(owner_id.to_vec()), + coin_type: Some(serialized_coin_type), + coin_balance_bucket: Some(*balance_bucket), + }) + } + CoinBalanceBucketChangeKind::Delete => Ok(StoredCoinBalanceBucket { + object_id: self.object_id.to_vec(), + cp_sequence_number: self.cp_sequence_number as i64, + owner_kind: None, + owner_id: None, + coin_type: None, + coin_balance_bucket: None, + }), + } + } +} + +/// Get the owner kind and address of a coin, if it is owned by a single address, +/// either through fast-path ownership or ConsensusV2 ownership. +fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> { + match object.owner() { + Owner::AddressOwner(owner_id) => Some((StoredCoinOwnerKind::Fastpath, *owner_id)), + Owner::ConsensusV2 { authenticator, .. } => Some(( + StoredCoinOwnerKind::Consensus, + *authenticator.as_single_owner(), + )), + Owner::Immutable | Owner::ObjectOwner(_) | Owner::Shared { .. } => None, + } +} + +fn get_coin_balance_bucket(coin: &Object) -> anyhow::Result { + let Some(coin) = coin.as_coin_maybe() else { + bail!("Failed to deserialize Coin for {}", coin.id()); + }; + let balance = coin.balance.value(); + if balance == 0 { + return Ok(0); + } + let bucket = balance.ilog10() as i16; + Ok(bucket) +} diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index bca68a046865b..c4c2684e6f9eb 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +pub(crate) mod coin_balance_buckets; pub(crate) mod ev_emit_mod; pub(crate) mod ev_struct_inst; pub(crate) mod kv_checkpoints;