Skip to content

Commit

Permalink
[indexer-alt] Add coin_balance_buckets pipeline (#20500)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Dec 11, 2024
1 parent 3cd7e2b commit a5ce934
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS coin_balance_buckets;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- A table of coin balance buckets, keyed on object ID and checkpoint sequence number.
-- At the end of each checkpoint, we insert a row for each coin balance bucket, if it has changed.
-- We bucketize coin balances to reduce the number of distinct values and help with both write and
-- read performance. Bucket is calculated as floor(log10(coin_balance)).
-- We also keep a record when we delete or wrap an object, which we would need for consistency query.
-- All fields except the primary key will be `NULL` for delete/wrap records.
CREATE TABLE IF NOT EXISTS coin_balance_buckets
(
object_id BYTEA NOT NULL,
cp_sequence_number BIGINT NOT NULL,
-- The kind of owner of this coin. We need this to support ConsensusV2 objects.
-- A coin can be either owned by an address through fast-path ownership, or
-- by an anddress through ConsensusV2 ownership.
-- This is represented by `StoredCoinOwnerKind` in `models/objects.rs`, which is different
-- from `StoredOwnerKind` used in `obj_info` table.
owner_kind SMALLINT,
-- The address that owns this version of the coin (it is guaranteed to be
-- address-owned).
owner_id BYTEA,
-- The type of the coin, as a BCS-serialized `TypeTag`. This is only the
-- marker type, and not the full object type (e.g. `0x0...02::sui::SUI`).
coin_type BYTEA,
-- The balance bucket of the coin, which is log10(coin_balance).
coin_balance_bucket SMALLINT,
PRIMARY KEY (object_id, cp_sequence_number)
);

CREATE INDEX IF NOT EXISTS coin_balances_buckets_owner_type
ON coin_balance_buckets (owner_kind, owner_id, coin_type, coin_balance_bucket DESC, cp_sequence_number DESC, object_id);
70 changes: 57 additions & 13 deletions crates/sui-indexer-alt-schema/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use sui_field_count::FieldCount;
use sui_types::base_types::ObjectID;

use crate::schema::{
kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances,
wal_obj_types,
coin_balance_buckets, kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types,
wal_coin_balances, wal_obj_types,
};

#[derive(Insertable, Debug, Clone, FieldCount)]
Expand Down Expand Up @@ -51,6 +51,14 @@ pub enum StoredOwnerKind {
Shared = 3,
}

#[derive(AsExpression, FromSqlRow, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[diesel(sql_type = SmallInt)]
#[repr(i16)]
pub enum StoredCoinOwnerKind {
Fastpath = 0,
Consensus = 1,
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = sum_coin_balances, primary_key(object_id))]
pub struct StoredSumCoinBalance {
Expand Down Expand Up @@ -99,6 +107,30 @@ pub struct StoredWalObjType {
pub cp_sequence_number: i64,
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = obj_info, primary_key(object_id, cp_sequence_number))]
pub struct StoredObjInfo {
pub object_id: Vec<u8>,
pub cp_sequence_number: i64,
pub owner_kind: Option<StoredOwnerKind>,
pub owner_id: Option<Vec<u8>>,
pub package: Option<Vec<u8>>,
pub module: Option<String>,
pub name: Option<String>,
pub instantiation: Option<Vec<u8>>,
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = coin_balance_buckets, primary_key(object_id, cp_sequence_number))]
pub struct StoredCoinBalanceBucket {
pub object_id: Vec<u8>,
pub cp_sequence_number: i64,
pub owner_kind: Option<StoredCoinOwnerKind>,
pub owner_id: Option<Vec<u8>>,
pub coin_type: Option<Vec<u8>>,
pub coin_balance_bucket: Option<i16>,
}

/// StoredObjectUpdate is a wrapper type, we want to count the fields of the inner type.
impl<T: FieldCount> FieldCount for StoredObjectUpdate<T> {
// Add one here for cp_sequence_number field, because StoredObjectUpdate is used for
Expand Down Expand Up @@ -135,15 +167,27 @@ where
}
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = obj_info, primary_key(object_id, cp_sequence_number))]
pub struct StoredObjInfo {
pub object_id: Vec<u8>,
pub cp_sequence_number: i64,
pub owner_kind: Option<StoredOwnerKind>,
pub owner_id: Option<Vec<u8>>,
pub package: Option<Vec<u8>>,
pub module: Option<String>,
pub name: Option<String>,
pub instantiation: Option<Vec<u8>>,
impl<DB: Backend> serialize::ToSql<SmallInt, DB> for StoredCoinOwnerKind
where
i16: serialize::ToSql<SmallInt, DB>,
{
fn to_sql<'b>(&'b self, out: &mut serialize::Output<'b, '_, DB>) -> serialize::Result {
match self {
StoredCoinOwnerKind::Fastpath => 0.to_sql(out),
StoredCoinOwnerKind::Consensus => 1.to_sql(out),
}
}
}

impl<DB: Backend> deserialize::FromSql<SmallInt, DB> for StoredCoinOwnerKind
where
i16: deserialize::FromSql<SmallInt, DB>,
{
fn from_sql(raw: DB::RawValue<'_>) -> deserialize::Result<Self> {
Ok(match i16::from_sql(raw)? {
0 => StoredCoinOwnerKind::Fastpath,
1 => StoredCoinOwnerKind::Consensus,
o => return Err(format!("Unexpected StoredCoinOwnerKind: {o}").into()),
})
}
}
12 changes: 12 additions & 0 deletions crates/sui-indexer-alt-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@
// SPDX-License-Identifier: Apache-2.0
// @generated automatically by Diesel CLI.

diesel::table! {
coin_balance_buckets (object_id, cp_sequence_number) {
object_id -> Bytea,
cp_sequence_number -> Int8,
owner_kind -> Nullable<Int2>,
owner_id -> Nullable<Bytea>,
coin_type -> Nullable<Bytea>,
coin_balance_bucket -> Nullable<Int2>,
}
}

diesel::table! {
ev_emit_mod (package, module, tx_sequence_number) {
package -> Bytea,
Expand Down Expand Up @@ -240,6 +251,7 @@ diesel::table! {
}

diesel::allow_tables_to_appear_in_same_query!(
coin_balance_buckets,
ev_emit_mod,
ev_struct_inst,
kv_checkpoints,
Expand Down
211 changes: 211 additions & 0 deletions crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeMap, sync::Arc};

use anyhow::{anyhow, bail, Result};
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{
objects::{StoredCoinBalanceBucket, StoredCoinOwnerKind},
schema::coin_balance_buckets,
};
use sui_pg_db as db;
use sui_types::{
base_types::{ObjectID, SuiAddress},
full_checkpoint_content::CheckpointData,
object::{Object, Owner},
TypeTag,
};

pub(crate) struct CoinBalanceBuckets;

pub(crate) struct ProcessedCoinBalanceBucket {
pub object_id: ObjectID,
pub cp_sequence_number: u64,
pub change: CoinBalanceBucketChangeKind,
}

pub(crate) enum CoinBalanceBucketChangeKind {
Insert {
owner_kind: StoredCoinOwnerKind,
owner_id: SuiAddress,
coin_type: TypeTag,
balance_bucket: i16,
},
Delete,
}

impl Processor for CoinBalanceBuckets {
const NAME: &'static str = "coin_balance_buckets";
type Value = ProcessedCoinBalanceBucket;

// TODO: We need to add tests for this function.
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number;
let checkpoint_input_objects = checkpoint.checkpoint_input_objects();
let latest_live_output_objects: BTreeMap<_, _> = checkpoint
.latest_live_output_objects()
.into_iter()
.map(|o| (o.id(), o))
.collect();
let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
for (object_id, input_object) in checkpoint_input_objects.iter() {
// This loop processes all coins that were owned by a single address prior to the checkpoint,
// but is now deleted or wrapped after the checkpoint.
if !input_object.is_coin() {
continue;
}
if get_coin_owner(input_object).is_none() {
continue;
}
if latest_live_output_objects.contains_key(object_id) {
continue;
}
values.insert(
*object_id,
ProcessedCoinBalanceBucket {
object_id: *object_id,
cp_sequence_number,
change: CoinBalanceBucketChangeKind::Delete,
},
);
}
for (object_id, output_object) in latest_live_output_objects.iter() {
let Some(coin_type) = output_object.coin_type_maybe() else {
continue;
};

let (input_bucket, input_owner) = match checkpoint_input_objects.get(object_id) {
Some(input_object) => {
let bucket = get_coin_balance_bucket(input_object)?;
let owner = get_coin_owner(input_object);
(Some(bucket), owner)
}
None => (None, None),
};

let output_balance_bucket = get_coin_balance_bucket(output_object)?;
let output_owner = get_coin_owner(output_object);

match (input_owner, output_owner) {
(Some(_), None) => {
// In this case, the coin was owned by a single address prior to the checkpoint,
// but is now either shared or immutable after the checkpoint. We treat this the same
// as if the coin was deleted, from the perspective of the balance bucket.
values.insert(
*object_id,
ProcessedCoinBalanceBucket {
object_id: *object_id,
cp_sequence_number,
change: CoinBalanceBucketChangeKind::Delete,
},
);
}
(_, Some(new_owner))
if input_owner != output_owner
|| input_bucket != Some(output_balance_bucket) =>
{
// In this case, the coin is still owned by a single address after the checkpoint,
// but either the owner or the balance bucket has changed. This also includes the case
// where the coin did not exist prior to the checkpoint, and is now created/unwrapped.
values.insert(
*object_id,
ProcessedCoinBalanceBucket {
object_id: *object_id,
cp_sequence_number,
change: CoinBalanceBucketChangeKind::Insert {
owner_kind: new_owner.0,
owner_id: new_owner.1,
coin_type,
balance_bucket: output_balance_bucket,
},
},
);
}
_ => {}
}
}

Ok(values.into_values().collect())
}
}

#[async_trait::async_trait]
impl Handler for CoinBalanceBuckets {
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
let values = values
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<StoredCoinBalanceBucket>>>()?;
Ok(diesel::insert_into(coin_balance_buckets::table)
.values(values)
.on_conflict_do_nothing()
.execute(conn)
.await?)
}
}

impl FieldCount for ProcessedCoinBalanceBucket {
const FIELD_COUNT: usize = StoredCoinBalanceBucket::FIELD_COUNT;
}

impl TryInto<StoredCoinBalanceBucket> for &ProcessedCoinBalanceBucket {
type Error = anyhow::Error;

fn try_into(self) -> Result<StoredCoinBalanceBucket> {
match &self.change {
CoinBalanceBucketChangeKind::Insert {
owner_kind,
owner_id,
coin_type,
balance_bucket,
} => {
let serialized_coin_type = bcs::to_bytes(&coin_type)
.map_err(|_| anyhow!("Failed to serialize type for {}", self.object_id))?;
Ok(StoredCoinBalanceBucket {
object_id: self.object_id.to_vec(),
cp_sequence_number: self.cp_sequence_number as i64,
owner_kind: Some(*owner_kind),
owner_id: Some(owner_id.to_vec()),
coin_type: Some(serialized_coin_type),
coin_balance_bucket: Some(*balance_bucket),
})
}
CoinBalanceBucketChangeKind::Delete => Ok(StoredCoinBalanceBucket {
object_id: self.object_id.to_vec(),
cp_sequence_number: self.cp_sequence_number as i64,
owner_kind: None,
owner_id: None,
coin_type: None,
coin_balance_bucket: None,
}),
}
}
}

/// Get the owner kind and address of a coin, if it is owned by a single address,
/// either through fast-path ownership or ConsensusV2 ownership.
fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> {
match object.owner() {
Owner::AddressOwner(owner_id) => Some((StoredCoinOwnerKind::Fastpath, *owner_id)),
Owner::ConsensusV2 { authenticator, .. } => Some((
StoredCoinOwnerKind::Consensus,
*authenticator.as_single_owner(),
)),
Owner::Immutable | Owner::ObjectOwner(_) | Owner::Shared { .. } => None,
}
}

fn get_coin_balance_bucket(coin: &Object) -> anyhow::Result<i16> {
let Some(coin) = coin.as_coin_maybe() else {
bail!("Failed to deserialize Coin for {}", coin.id());
};
let balance = coin.balance.value();
if balance == 0 {
return Ok(0);
}
let bucket = balance.ilog10() as i16;
Ok(bucket)
}
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod coin_balance_buckets;
pub(crate) mod ev_emit_mod;
pub(crate) mod ev_struct_inst;
pub(crate) mod kv_checkpoints;
Expand Down

0 comments on commit a5ce934

Please sign in to comment.