Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-alt: protocol_configs and feature_flags pipelines #20149

Merged
merged 10 commits into from
Nov 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ CREATE TABLE IF NOT EXISTS watermarks
-- this field once it can guarantee that all checkpoints at or before this
-- timestamp have been written to the database.
timestamp_ms_hi_inclusive BIGINT NOT NULL,
-- Inclusive lower epoch bound for this entity's data. Pruner updates this
-- field when the epoch range exceeds the retention policy.
epoch_lo BIGINT NOT NULL,
-- Inclusive low watermark that the pruner advances. Corresponds to the
-- epoch id, checkpoint sequence number, or tx sequence number depending on
-- the entity. Data before this watermark is considered pruned by a reader.
Expand All @@ -31,7 +28,7 @@ CREATE TABLE IF NOT EXISTS watermarks
-- some data needs to be dropped. The pruner uses this column to determine
-- whether to prune or wait long enough that all in-flight reads complete
-- or timeout before it acts on an updated watermark.
pruner_timestamp_ms BIGINT NOT NULL,
pruner_timestamp TIMESTAMP NOT NULL,
-- Column used by the pruner to track its true progress. Data below this
-- watermark can be immediately pruned.
pruner_hi BIGINT NOT NULL
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS kv_protocol_configs;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS kv_protocol_configs
(
protocol_version BIGINT NOT NULL,
config_name TEXT NOT NULL,
config_value TEXT,
PRIMARY KEY (protocol_version, config_name)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS kv_feature_flags;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS kv_feature_flags
(
protocol_version BIGINT NOT NULL,
flag_name TEXT NOT NULL,
flag_value BOOLEAN NOT NULL,
PRIMARY KEY (protocol_version, flag_name)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS kv_epoch_starts;
DROP TABLE IF EXISTS kv_epoch_ends;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-- Information related to an epoch that is available when it starts
CREATE TABLE IF NOT EXISTS kv_epoch_starts
(
epoch BIGINT PRIMARY KEY,
protocol_version BIGINT NOT NULL,

-- Inclusive checkpoint lowerbound of the epoch.
cp_lo BIGINT NOT NULL,
-- The timestamp that the epoch starts at. This is always extracted from
-- the system state object.
start_timestamp_ms BIGINT NOT NULL,
-- The reference gas price that will be used for the rest of the epoch.
reference_gas_price BIGINT NOT NULL,
-- BCS serialized SystemState.
system_state BYTEA NOT NULL
);

-- Information related to an epoch that is available when it ends (after the
-- epoch advancement to the next epoch)
CREATE TABLE IF NOT EXISTS kv_epoch_ends
(
epoch BIGINT PRIMARY KEY,

-- Exclusive checkpoint upperbound of the epoch.
cp_hi BIGINT NOT NULL,
-- Exclusive transaction upperbound of the epoch.
tx_hi BIGINT NOT NULL,

-- The epoch ends at the timestamp of its last checkpoint.
end_timestamp_ms BIGINT NOT NULL,

-- Whether the epoch advancement at the end of this epoch entered safe
-- mode.
safe_mode BOOLEAN NOT NULL,

-- Staking information after advancement to the next epoch. These fields
-- are extracted from the `SystemEpochInfoEvent` emitted during epoch
-- advancement. If the epoch advancement entered safe mode, these fields
-- will all be NULL (because a safe mode advance epoch does not emit this
-- event).
total_stake BIGINT,
storage_fund_balance BIGINT,
storage_fund_reinvestment BIGINT,
storage_charge BIGINT,
storage_rebate BIGINT,
stake_subsidy_amount BIGINT,
total_gas_fees BIGINT,
total_stake_rewards_distributed
BIGINT,
leftover_storage_fund_inflow
BIGINT,

-- BCS serialized `Vec<EpochCommitment>` bytes, found in last
-- `CheckpointSummary` of the epoch.
epoch_commitments BYTEA NOT NULL
);
21 changes: 21 additions & 0 deletions crates/sui-indexer-alt/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use crate::db::DbConfig;
use crate::IndexerConfig;
use clap::Subcommand;
Expand All @@ -22,6 +24,25 @@ pub enum Command {
#[command(flatten)]
indexer: IndexerConfig,

/// How often to check whether write-ahead logs related to the consistent range can be
/// pruned.
#[arg(
long,
default_value = "300",
value_name = "SECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_secs),
)]
consistent_pruning_interval: Duration,

/// How long to wait before honouring reader low watermarks.
#[arg(
long,
default_value = "120",
value_name = "SECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_secs),
)]
pruner_delay: Duration,

/// Number of checkpoints to delay indexing summary tables for.
#[clap(long)]
consistent_range: Option<u64>,
Expand Down
21 changes: 20 additions & 1 deletion crates/sui-indexer-alt/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::{
models::checkpoints::StoredGenesis, schema::kv_genesis, task::graceful_shutdown, Indexer,
models::{checkpoints::StoredGenesis, epochs::StoredEpochStart},
schema::{kv_epoch_starts, kv_genesis},
task::graceful_shutdown,
Indexer,
};

/// Ensures the genesis table has been populated before the rest of the indexer is run, and returns
Expand Down Expand Up @@ -91,6 +94,15 @@ pub async fn bootstrap(
initial_protocol_version: system_state.protocol_version() as i64,
};

let epoch_start = StoredEpochStart {
epoch: 0,
protocol_version: system_state.protocol_version() as i64,
cp_lo: 0,
start_timestamp_ms: system_state.epoch_start_timestamp_ms() as i64,
reference_gas_price: system_state.reference_gas_price() as i64,
system_state: bcs::to_bytes(&system_state).context("Failed to serialize SystemState")?,
};

info!(
chain = genesis.chain()?.as_str(),
protocol = ?genesis.initial_protocol_version(),
Expand All @@ -104,5 +116,12 @@ pub async fn bootstrap(
.await
.context("Failed to write genesis record")?;

diesel::insert_into(kv_epoch_starts::table)
.values(&epoch_start)
.on_conflict_do_nothing()
.execute(&mut conn)
.await
.context("Failed to write genesis epoch start record")?;

Ok(genesis)
}
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Processor for EvEmitMod {

type Value = StoredEvEmitMod;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Processor for EvStructInst {

type Value = StoredEvStructInst;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Processor for KvCheckpoints {

type Value = StoredCheckpoint;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let sequence_number = checkpoint.checkpoint_summary.sequence_number as i64;
Ok(vec![StoredCheckpoint {
sequence_number,
Expand Down
132 changes: 132 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel_async::RunQueryDsl;
use sui_types::{
event::SystemEpochInfoEvent,
full_checkpoint_content::CheckpointData,
transaction::{TransactionDataAPI, TransactionKind},
};

use crate::{
db,
models::epochs::StoredEpochEnd,
pipeline::{concurrent::Handler, Processor},
schema::kv_epoch_ends,
};

pub struct KvEpochEnds;

impl Processor for KvEpochEnds {
const NAME: &'static str = "kv_epoch_ends";

type Value = StoredEpochEnd;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
checkpoint_summary,
transactions,
..
} = checkpoint.as_ref();

let Some(end_of_epoch) = checkpoint_summary.end_of_epoch_data.as_ref() else {
return Ok(vec![]);
};

let Some(transaction) = transactions.iter().find(|tx| {
matches!(
tx.transaction.intent_message().value.kind(),
TransactionKind::ChangeEpoch(_) | TransactionKind::EndOfEpochTransaction(_)
)
}) else {
bail!(
"Failed to get end of epoch transaction in checkpoint {} with EndOfEpochData",
checkpoint_summary.sequence_number,
);
};

if let Some(SystemEpochInfoEvent {
total_stake,
storage_fund_reinvestment,
storage_charge,
storage_rebate,
storage_fund_balance,
stake_subsidy_amount,
total_gas_fees,
total_stake_rewards_distributed,
leftover_storage_fund_inflow,
..
}) = transaction
.events
.iter()
.flat_map(|events| &events.data)
.find_map(|event| {
event
.is_system_epoch_info_event()
.then(|| bcs::from_bytes(&event.contents))
})
.transpose()
.context("Failed to deserialize SystemEpochInfoEvent")?
{
Ok(vec![StoredEpochEnd {
epoch: checkpoint_summary.epoch as i64,
cp_hi: checkpoint_summary.sequence_number as i64 + 1,
tx_hi: checkpoint_summary.network_total_transactions as i64,
end_timestamp_ms: checkpoint_summary.timestamp_ms as i64,

safe_mode: false,

total_stake: Some(total_stake as i64),
storage_fund_balance: Some(storage_fund_balance as i64),
storage_fund_reinvestment: Some(storage_fund_reinvestment as i64),
storage_charge: Some(storage_charge as i64),
storage_rebate: Some(storage_rebate as i64),
stake_subsidy_amount: Some(stake_subsidy_amount as i64),
total_gas_fees: Some(total_gas_fees as i64),
total_stake_rewards_distributed: Some(total_stake_rewards_distributed as i64),
leftover_storage_fund_inflow: Some(leftover_storage_fund_inflow as i64),

epoch_commitments: bcs::to_bytes(&end_of_epoch.epoch_commitments)
.context("Failed to serialize EpochCommitment-s")?,
}])
} else {
Ok(vec![StoredEpochEnd {
epoch: checkpoint_summary.epoch as i64,
cp_hi: checkpoint_summary.sequence_number as i64 + 1,
tx_hi: checkpoint_summary.network_total_transactions as i64,
end_timestamp_ms: checkpoint_summary.timestamp_ms as i64,

safe_mode: true,

total_stake: None,
storage_fund_balance: None,
storage_fund_reinvestment: None,
storage_charge: None,
storage_rebate: None,
stake_subsidy_amount: None,
total_gas_fees: None,
total_stake_rewards_distributed: None,
leftover_storage_fund_inflow: None,

epoch_commitments: bcs::to_bytes(&end_of_epoch.epoch_commitments)
.context("Failed to serialize EpochCommitment-s")?,
}])
}
}
}

#[async_trait::async_trait]
impl Handler for KvEpochEnds {
const MIN_EAGER_ROWS: usize = 1;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(kv_epoch_ends::table)
.values(values)
.on_conflict_do_nothing()
.execute(conn)
.await?)
}
}
Loading
Loading