Skip to content

Commit

Permalink
pick: indexer: handle sui safe mode (#20015) (#20025)
Browse files Browse the repository at this point in the history
## Description 

title 

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:

## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp authored Oct 24, 2024
1 parent 29ff3e3 commit 81da0c7
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 47 deletions.
65 changes: 36 additions & 29 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use sui_types::dynamic_field::DynamicFieldInfo;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::{info, warn};

use move_core_types::language_storage::{StructTag, TypeTag};
use mysten_metrics::{get_metrics, spawn_monitored_task};
Expand All @@ -29,7 +29,7 @@ use crate::errors::IndexerError;
use crate::handlers::committer::start_tx_checkpoint_commit_task;
use crate::metrics::IndexerMetrics;
use crate::models::display::StoredDisplay;
use crate::models::epoch::{EndOfEpochUpdate, StartOfEpochUpdate};
use crate::models::epoch::{EndOfEpochUpdate, EpochEndInfo, EpochStartInfo, StartOfEpochUpdate};
use crate::models::obj_indices::StoredObjectVersion;
use crate::store::{IndexerStore, PgIndexerStore};
use crate::types::{
Expand Down Expand Up @@ -153,12 +153,7 @@ impl CheckpointHandler {
get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary();
return Ok(Some(EpochToCommit {
last_epoch: None,
new_epoch: StartOfEpochUpdate::new(
system_state_summary,
0, //first_checkpoint_id
0, // first_tx_sequence_number
None,
),
new_epoch: StartOfEpochUpdate::new(system_state_summary, EpochStartInfo::default()),
}));
}

Expand All @@ -170,24 +165,34 @@ impl CheckpointHandler {
let system_state_summary =
get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary();

let epoch_event = transactions
let epoch_event_opt = transactions
.iter()
.flat_map(|t| t.events.as_ref().map(|e| &e.data))
.flatten()
.find(|ev| ev.is_system_epoch_info_event())
.unwrap_or_else(|| {
panic!(
"Can't find SystemEpochInfoEvent in epoch end checkpoint {}",
checkpoint_summary.sequence_number()
)
});

let event = bcs::from_bytes::<SystemEpochInfoEvent>(&epoch_event.contents)?;
.find_map(|t| {
t.events.as_ref()?.data.iter().find_map(|ev| {
if ev.is_system_epoch_info_event() {
Some(bcs::from_bytes::<SystemEpochInfoEvent>(&ev.contents))
} else {
None
}
})
})
.transpose()?;
if epoch_event_opt.is_none() {
warn!(
"No SystemEpochInfoEvent found at end of epoch {}, some epoch data will be set to default.",
checkpoint_summary.epoch,
);
assert!(
system_state_summary.safe_mode,
"Sui is not in safe mode but no SystemEpochInfoEvent found at end of epoch {}",
checkpoint_summary.epoch
);
}

// At some point while committing data in epoch X - 1, we will encounter a new epoch X. We
// want to retrieve X - 2's network total transactions to calculate the number of
// transactions that occurred in epoch X - 1.
let network_tx_count_prev_epoch = match system_state_summary.epoch {
let first_tx_sequence_number = match system_state_summary.epoch {
// If first epoch change, this number is 0
1 => Ok(0),
_ => {
Expand All @@ -204,18 +209,20 @@ impl CheckpointHandler {
}
}?;

let epoch_end_info = EpochEndInfo::new(epoch_event_opt.as_ref());
let epoch_start_info = EpochStartInfo::new(
checkpoint_summary.sequence_number.saturating_add(1),
checkpoint_summary.network_total_transactions,
epoch_event_opt.as_ref(),
);

Ok(Some(EpochToCommit {
last_epoch: Some(EndOfEpochUpdate::new(
checkpoint_summary,
&event,
network_tx_count_prev_epoch,
first_tx_sequence_number,
epoch_end_info,
)),
new_epoch: StartOfEpochUpdate::new(
system_state_summary,
checkpoint_summary.sequence_number + 1, // first_checkpoint_id
checkpoint_summary.network_total_transactions,
Some(&event),
),
new_epoch: StartOfEpochUpdate::new(system_state_summary, epoch_start_info),
}))
}

Expand Down
81 changes: 63 additions & 18 deletions crates/sui-indexer/src/models/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,50 +117,95 @@ pub struct QueryableEpochSystemState {
pub system_state: Vec<u8>,
}

impl StartOfEpochUpdate {
#[derive(Default)]
pub struct EpochStartInfo {
pub first_checkpoint_id: u64,
pub first_tx_sequence_number: u64,
pub total_stake: u64,
pub storage_fund_balance: u64,
}

impl EpochStartInfo {
pub fn new(
new_system_state_summary: SuiSystemStateSummary,
first_checkpoint_id: u64,
first_tx_sequence_number: u64,
event: Option<&SystemEpochInfoEvent>,
epoch_event_opt: Option<&SystemEpochInfoEvent>,
) -> Self {
Self {
first_checkpoint_id,
first_tx_sequence_number,
total_stake: epoch_event_opt.map(|e| e.total_stake).unwrap_or_default(),
storage_fund_balance: epoch_event_opt
.map(|e| e.storage_fund_balance)
.unwrap_or_default(),
}
}
}

impl StartOfEpochUpdate {
pub fn new(
new_system_state_summary: SuiSystemStateSummary,
epoch_start_info: EpochStartInfo,
) -> Self {
Self {
epoch: new_system_state_summary.epoch as i64,
system_state_summary_json: serde_json::to_value(new_system_state_summary.clone())
.unwrap(),
first_checkpoint_id: first_checkpoint_id as i64,
first_tx_sequence_number: first_tx_sequence_number as i64,
first_checkpoint_id: epoch_start_info.first_checkpoint_id as i64,
first_tx_sequence_number: epoch_start_info.first_tx_sequence_number as i64,
epoch_start_timestamp: new_system_state_summary.epoch_start_timestamp_ms as i64,
reference_gas_price: new_system_state_summary.reference_gas_price as i64,
protocol_version: new_system_state_summary.protocol_version as i64,
// NOTE: total_stake and storage_fund_balance are about new epoch,
// although the event is generated at the end of the previous epoch,
// the event is optional b/c no such event for the first epoch.
total_stake: event.map(|e| e.total_stake as i64).unwrap_or(0),
storage_fund_balance: event.map(|e| e.storage_fund_balance as i64).unwrap_or(0),
total_stake: epoch_start_info.total_stake as i64,
storage_fund_balance: epoch_start_info.storage_fund_balance as i64,
}
}
}

#[derive(Default)]
pub struct EpochEndInfo {
pub storage_fund_reinvestment: u64,
pub storage_charge: u64,
pub storage_rebate: u64,
pub leftover_storage_fund_inflow: u64,
pub stake_subsidy_amount: u64,
pub total_gas_fees: u64,
pub total_stake_rewards_distributed: u64,
}

impl EpochEndInfo {
pub fn new(epoch_event_opt: Option<&SystemEpochInfoEvent>) -> Self {
epoch_event_opt.map_or_else(Self::default, |epoch_event| Self {
storage_fund_reinvestment: epoch_event.storage_fund_reinvestment,
storage_charge: epoch_event.storage_charge,
storage_rebate: epoch_event.storage_rebate,
leftover_storage_fund_inflow: epoch_event.leftover_storage_fund_inflow,
stake_subsidy_amount: epoch_event.stake_subsidy_amount,
total_gas_fees: epoch_event.total_gas_fees,
total_stake_rewards_distributed: epoch_event.total_stake_rewards_distributed,
})
}
}

impl EndOfEpochUpdate {
pub fn new(
last_checkpoint_summary: &CertifiedCheckpointSummary,
event: &SystemEpochInfoEvent,
first_tx_sequence_number: u64,
epoch_end_info: EpochEndInfo,
) -> Self {
Self {
epoch: last_checkpoint_summary.epoch as i64,
epoch_total_transactions: (last_checkpoint_summary.network_total_transactions
- first_tx_sequence_number) as i64,
last_checkpoint_id: *last_checkpoint_summary.sequence_number() as i64,
epoch_end_timestamp: last_checkpoint_summary.timestamp_ms as i64,
storage_fund_reinvestment: event.storage_fund_reinvestment as i64,
storage_charge: event.storage_charge as i64,
storage_rebate: event.storage_rebate as i64,
leftover_storage_fund_inflow: event.leftover_storage_fund_inflow as i64,
stake_subsidy_amount: event.stake_subsidy_amount as i64,
total_gas_fees: event.total_gas_fees as i64,
total_stake_rewards_distributed: event.total_stake_rewards_distributed as i64,
storage_fund_reinvestment: epoch_end_info.storage_fund_reinvestment as i64,
storage_charge: epoch_end_info.storage_charge as i64,
storage_rebate: epoch_end_info.storage_rebate as i64,
leftover_storage_fund_inflow: epoch_end_info.leftover_storage_fund_inflow as i64,
stake_subsidy_amount: epoch_end_info.stake_subsidy_amount as i64,
total_gas_fees: epoch_end_info.total_gas_fees as i64,
total_stake_rewards_distributed: epoch_end_info.total_stake_rewards_distributed as i64,
epoch_commitments: bcs::to_bytes(
&last_checkpoint_summary
.end_of_epoch_data
Expand Down

0 comments on commit 81da0c7

Please sign in to comment.