diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index be4e0d375a923..dd0b00359a631 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use itertools::Itertools; use sui_types::dynamic_field::DynamicFieldInfo; use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{info, warn}; use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; @@ -29,7 +29,7 @@ use crate::errors::IndexerError; use crate::handlers::committer::start_tx_checkpoint_commit_task; use crate::metrics::IndexerMetrics; use crate::models::display::StoredDisplay; -use crate::models::epoch::{EndOfEpochUpdate, StartOfEpochUpdate}; +use crate::models::epoch::{EndOfEpochUpdate, EpochEndInfo, EpochStartInfo, StartOfEpochUpdate}; use crate::models::obj_indices::StoredObjectVersion; use crate::store::{IndexerStore, PgIndexerStore}; use crate::types::{ @@ -153,12 +153,7 @@ impl CheckpointHandler { get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary(); return Ok(Some(EpochToCommit { last_epoch: None, - new_epoch: StartOfEpochUpdate::new( - system_state_summary, - 0, //first_checkpoint_id - 0, // first_tx_sequence_number - None, - ), + new_epoch: StartOfEpochUpdate::new(system_state_summary, EpochStartInfo::default()), })); } @@ -170,24 +165,34 @@ impl CheckpointHandler { let system_state_summary = get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary(); - let epoch_event = transactions + let epoch_event_opt = transactions .iter() - .flat_map(|t| t.events.as_ref().map(|e| &e.data)) - .flatten() - .find(|ev| ev.is_system_epoch_info_event()) - .unwrap_or_else(|| { - panic!( - "Can't find SystemEpochInfoEvent in epoch end checkpoint {}", - checkpoint_summary.sequence_number() - ) - }); - - let event = bcs::from_bytes::(&epoch_event.contents)?; + .find_map(|t| { + t.events.as_ref()?.data.iter().find_map(|ev| { + if ev.is_system_epoch_info_event() { + Some(bcs::from_bytes::(&ev.contents)) + } else { + None + } + }) + }) + .transpose()?; + if epoch_event_opt.is_none() { + warn!( + "No SystemEpochInfoEvent found at end of epoch {}, some epoch data will be set to default.", + checkpoint_summary.epoch, + ); + assert!( + system_state_summary.safe_mode, + "Sui is not in safe mode but no SystemEpochInfoEvent found at end of epoch {}", + checkpoint_summary.epoch + ); + } // At some point while committing data in epoch X - 1, we will encounter a new epoch X. We // want to retrieve X - 2's network total transactions to calculate the number of // transactions that occurred in epoch X - 1. - let network_tx_count_prev_epoch = match system_state_summary.epoch { + let first_tx_sequence_number = match system_state_summary.epoch { // If first epoch change, this number is 0 1 => Ok(0), _ => { @@ -204,18 +209,20 @@ impl CheckpointHandler { } }?; + let epoch_end_info = EpochEndInfo::new(epoch_event_opt.as_ref()); + let epoch_start_info = EpochStartInfo::new( + checkpoint_summary.sequence_number.saturating_add(1), + checkpoint_summary.network_total_transactions, + epoch_event_opt.as_ref(), + ); + Ok(Some(EpochToCommit { last_epoch: Some(EndOfEpochUpdate::new( checkpoint_summary, - &event, - network_tx_count_prev_epoch, + first_tx_sequence_number, + epoch_end_info, )), - new_epoch: StartOfEpochUpdate::new( - system_state_summary, - checkpoint_summary.sequence_number + 1, // first_checkpoint_id - checkpoint_summary.network_total_transactions, - Some(&event), - ), + new_epoch: StartOfEpochUpdate::new(system_state_summary, epoch_start_info), })) } diff --git a/crates/sui-indexer/src/models/epoch.rs b/crates/sui-indexer/src/models/epoch.rs index 0918e50c72c35..d8e943f4c245c 100644 --- a/crates/sui-indexer/src/models/epoch.rs +++ b/crates/sui-indexer/src/models/epoch.rs @@ -117,36 +117,81 @@ pub struct QueryableEpochSystemState { pub system_state: Vec, } -impl StartOfEpochUpdate { +#[derive(Default)] +pub struct EpochStartInfo { + pub first_checkpoint_id: u64, + pub first_tx_sequence_number: u64, + pub total_stake: u64, + pub storage_fund_balance: u64, +} + +impl EpochStartInfo { pub fn new( - new_system_state_summary: SuiSystemStateSummary, first_checkpoint_id: u64, first_tx_sequence_number: u64, - event: Option<&SystemEpochInfoEvent>, + epoch_event_opt: Option<&SystemEpochInfoEvent>, + ) -> Self { + Self { + first_checkpoint_id, + first_tx_sequence_number, + total_stake: epoch_event_opt.map(|e| e.total_stake).unwrap_or_default(), + storage_fund_balance: epoch_event_opt + .map(|e| e.storage_fund_balance) + .unwrap_or_default(), + } + } +} + +impl StartOfEpochUpdate { + pub fn new( + new_system_state_summary: SuiSystemStateSummary, + epoch_start_info: EpochStartInfo, ) -> Self { Self { epoch: new_system_state_summary.epoch as i64, system_state_summary_json: serde_json::to_value(new_system_state_summary.clone()) .unwrap(), - first_checkpoint_id: first_checkpoint_id as i64, - first_tx_sequence_number: first_tx_sequence_number as i64, + first_checkpoint_id: epoch_start_info.first_checkpoint_id as i64, + first_tx_sequence_number: epoch_start_info.first_tx_sequence_number as i64, epoch_start_timestamp: new_system_state_summary.epoch_start_timestamp_ms as i64, reference_gas_price: new_system_state_summary.reference_gas_price as i64, protocol_version: new_system_state_summary.protocol_version as i64, - // NOTE: total_stake and storage_fund_balance are about new epoch, - // although the event is generated at the end of the previous epoch, - // the event is optional b/c no such event for the first epoch. - total_stake: event.map(|e| e.total_stake as i64).unwrap_or(0), - storage_fund_balance: event.map(|e| e.storage_fund_balance as i64).unwrap_or(0), + total_stake: epoch_start_info.total_stake as i64, + storage_fund_balance: epoch_start_info.storage_fund_balance as i64, } } } +#[derive(Default)] +pub struct EpochEndInfo { + pub storage_fund_reinvestment: u64, + pub storage_charge: u64, + pub storage_rebate: u64, + pub leftover_storage_fund_inflow: u64, + pub stake_subsidy_amount: u64, + pub total_gas_fees: u64, + pub total_stake_rewards_distributed: u64, +} + +impl EpochEndInfo { + pub fn new(epoch_event_opt: Option<&SystemEpochInfoEvent>) -> Self { + epoch_event_opt.map_or_else(Self::default, |epoch_event| Self { + storage_fund_reinvestment: epoch_event.storage_fund_reinvestment, + storage_charge: epoch_event.storage_charge, + storage_rebate: epoch_event.storage_rebate, + leftover_storage_fund_inflow: epoch_event.leftover_storage_fund_inflow, + stake_subsidy_amount: epoch_event.stake_subsidy_amount, + total_gas_fees: epoch_event.total_gas_fees, + total_stake_rewards_distributed: epoch_event.total_stake_rewards_distributed, + }) + } +} + impl EndOfEpochUpdate { pub fn new( last_checkpoint_summary: &CertifiedCheckpointSummary, - event: &SystemEpochInfoEvent, first_tx_sequence_number: u64, + epoch_end_info: EpochEndInfo, ) -> Self { Self { epoch: last_checkpoint_summary.epoch as i64, @@ -154,13 +199,13 @@ impl EndOfEpochUpdate { - first_tx_sequence_number) as i64, last_checkpoint_id: *last_checkpoint_summary.sequence_number() as i64, epoch_end_timestamp: last_checkpoint_summary.timestamp_ms as i64, - storage_fund_reinvestment: event.storage_fund_reinvestment as i64, - storage_charge: event.storage_charge as i64, - storage_rebate: event.storage_rebate as i64, - leftover_storage_fund_inflow: event.leftover_storage_fund_inflow as i64, - stake_subsidy_amount: event.stake_subsidy_amount as i64, - total_gas_fees: event.total_gas_fees as i64, - total_stake_rewards_distributed: event.total_stake_rewards_distributed as i64, + storage_fund_reinvestment: epoch_end_info.storage_fund_reinvestment as i64, + storage_charge: epoch_end_info.storage_charge as i64, + storage_rebate: epoch_end_info.storage_rebate as i64, + leftover_storage_fund_inflow: epoch_end_info.leftover_storage_fund_inflow as i64, + stake_subsidy_amount: epoch_end_info.stake_subsidy_amount as i64, + total_gas_fees: epoch_end_info.total_gas_fees as i64, + total_stake_rewards_distributed: epoch_end_info.total_stake_rewards_distributed as i64, epoch_commitments: bcs::to_bytes( &last_checkpoint_summary .end_of_epoch_data