From 6cc663c639ca3fa1421db7021e0af33b0466e1be Mon Sep 17 00:00:00 2001 From: wlmyng <127570466+wlmyng@users.noreply.github.com> Date: Tue, 15 Oct 2024 18:42:57 -0700 Subject: [PATCH] [indexer][writer] Add first_tx_sequence_number to epochs table to decouple from checkpoints table (#19773) ## Description The `epoch_total_transactions` field on `epochs` table is calculated today from `checkpoints.network_total_transactions`, which is not ideal since the latter is prunable and `epochs` will not be pruned for the foreseeable future. To remove this dependency, we add `first_tx_sequence_number` to the `epochs` table at epoch boundary. That is, the network total transaction count from the final checkpoint of the epoch becomes the first tx sequence number of the new epoch. This also means that at epoch boundary, the current-to-be-previous epoch's `epoch_total_transactions` is derived from the checkpoint's network total transactions - the epoch's `first_tx_sequence_number`. Consequently, this will also help in the pruner implementation, as given an epoch we'd like to know the corresponding cp and tx. This encompasses just the writer change. Before updating the read path, we will need to backfill the instance. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../tests/stable/epoch/epoch_start_to_end.exp | 231 ++++++++++++++++++ .../stable/epoch/epoch_start_to_end.move | 182 ++++++++++++++ .../sui-graphql-rpc/src/test_infra/cluster.rs | 56 +++++ crates/sui-graphql-rpc/tests/e2e_tests.rs | 6 +- .../down.sql | 1 + .../up.sql | 1 + .../src/handlers/checkpoint_handler.rs | 34 +-- crates/sui-indexer/src/handlers/committer.rs | 2 +- crates/sui-indexer/src/handlers/mod.rs | 35 ++- crates/sui-indexer/src/models/epoch.rs | 135 ++++++---- crates/sui-indexer/src/schema.rs | 1 + crates/sui-indexer/src/store/indexer_store.rs | 4 +- .../sui-indexer/src/store/pg_indexer_store.rs | 78 ++---- .../src/store/pg_partition_manager.rs | 12 +- crates/sui-indexer/src/types.rs | 100 -------- 15 files changed, 638 insertions(+), 240 deletions(-) create mode 100644 crates/sui-graphql-e2e-tests/tests/stable/epoch/epoch_start_to_end.exp create mode 100644 crates/sui-graphql-e2e-tests/tests/stable/epoch/epoch_start_to_end.move create mode 100644 crates/sui-indexer/migrations/pg/2024-10-09-180628_add_network_total_transactions_to_epochs/down.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-10-09-180628_add_network_total_transactions_to_epochs/up.sql diff --git a/crates/sui-graphql-e2e-tests/tests/stable/epoch/epoch_start_to_end.exp b/crates/sui-graphql-e2e-tests/tests/stable/epoch/epoch_start_to_end.exp new file mode 100644 index 0000000000000..d2c454ee4e5a7 --- /dev/null +++ b/crates/sui-graphql-e2e-tests/tests/stable/epoch/epoch_start_to_end.exp @@ -0,0 +1,231 @@ +processed 13 tasks + +init: +C: object(0,0) + +task 1, line 6: +//# create-checkpoint +Checkpoint created: 1 + +task 2, line 8: +//# advance-epoch +Epoch advanced: 0 + +task 3, lines 10-12: +//# programmable --sender C --inputs 10000000000 @C +//> SplitCoins(Gas, [Input(0)]); +//> TransferObjects([Result(0)], Input(1)); +created: object(3,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 1976000, storage_rebate: 0, non_refundable_storage_fee: 0 + +task 4, lines 14-16: +//# programmable --sender C --inputs 5000000000 @C +//> SplitCoins(Gas, [Input(0)]); +//> TransferObjects([Result(0)], Input(1)); +created: object(4,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 1976000, storage_rebate: 978120, non_refundable_storage_fee: 9880 + +task 5, line 18: +//# run 0x3::sui_system::request_add_stake --args object(0x5) object(3,0) @validator_0 --sender C +events: Event { package_id: sui_system, transaction_module: Identifier("sui_system"), sender: C, type_: StructTag { address: sui_system, module: Identifier("validator"), name: Identifier("StakingRequestEvent"), type_params: [] }, contents: [135, 141, 242, 35, 38, 24, 124, 195, 86, 219, 178, 127, 110, 40, 201, 151, 112, 169, 166, 183, 93, 180, 71, 210, 141, 37, 35, 151, 110, 94, 69, 29, 218, 131, 22, 109, 1, 175, 215, 221, 207, 138, 245, 248, 68, 244, 90, 170, 83, 244, 133, 72, 229, 17, 124, 35, 245, 162, 151, 140, 253, 66, 34, 68, 252, 204, 154, 66, 27, 187, 19, 193, 166, 106, 26, 169, 143, 10, 215, 80, 41, 237, 233, 72, 87, 119, 156, 105, 21, 180, 79, 148, 6, 139, 146, 30, 1, 0, 0, 0, 0, 0, 0, 0, 0, 228, 11, 84, 2, 0, 0, 0] } +created: object(5,0) +mutated: object(_), 0x0000000000000000000000000000000000000000000000000000000000000005, object(0,0) +deleted: object(3,0) +gas summary: computation_cost: 1000000, storage_cost: 15078400, storage_rebate: 1956240, non_refundable_storage_fee: 19760 + +task 6, line 20: +//# create-checkpoint +Checkpoint created: 3 + +task 7, lines 22-72: +//# run-graphql +Response: { + "data": { + "epoch": { + "epochId": 1, + "referenceGasPrice": "1000", + "validatorSet": { + "totalStake": "20000000000000000", + "activeValidators": { + "nodes": [ + { + "name": "validator-0" + } + ] + } + }, + "startTimestamp": "1970-01-01T00:00:00Z", + "totalCheckpoints": 0, + "totalTransactions": null, + "totalGasFees": null, + "totalStakeRewards": null, + "totalStakeSubsidies": null, + "fundSize": "0", + "netInflow": null, + "fundInflow": null, + "fundOutflow": null, + "storageFund": { + "totalObjectStorageRebates": "0", + "nonRefundableBalance": "0" + }, + "safeMode": { + "enabled": false + }, + "systemStateVersion": 2, + "systemParameters": { + "stakeSubsidyStartEpoch": 0 + }, + "systemStakeSubsidy": { + "balance": "9949400000000000000", + "currentDistributionAmount": "1000000000000000" + }, + "checkpoints": { + "nodes": [ + { + "sequenceNumber": 3 + } + ] + }, + "transactionBlocks": { + "nodes": [ + { + "digest": "HqhN74u19oG1oVeEjLJx9Z5RdYxT7XWoGYHMmM1QNQ2q" + } + ] + }, + "endTimestamp": null + } + } +} + +task 8, line 74: +//# create-checkpoint +Checkpoint created: 4 + +task 9, line 76: +//# advance-epoch +Epoch advanced: 1 + +task 10, line 78: +//# create-checkpoint +Checkpoint created: 6 + +task 11, lines 80-130: +//# run-graphql +Response: { + "data": { + "epoch": { + "epochId": 1, + "referenceGasPrice": "1000", + "validatorSet": { + "totalStake": "20000000000000000", + "activeValidators": { + "nodes": [ + { + "name": "validator-0" + } + ] + } + }, + "startTimestamp": "1970-01-01T00:00:00Z", + "totalCheckpoints": 2, + "totalTransactions": 4, + "totalGasFees": "3000000", + "totalStakeRewards": "3000000", + "totalStakeSubsidies": "0", + "fundSize": "0", + "netInflow": "16096040", + "fundInflow": "19030400", + "fundOutflow": "2934360", + "storageFund": { + "totalObjectStorageRebates": "0", + "nonRefundableBalance": "0" + }, + "safeMode": { + "enabled": false + }, + "systemStateVersion": 2, + "systemParameters": { + "stakeSubsidyStartEpoch": 0 + }, + "systemStakeSubsidy": { + "balance": "9949400000000000000", + "currentDistributionAmount": "1000000000000000" + }, + "checkpoints": { + "nodes": [ + { + "sequenceNumber": 5 + } + ] + }, + "transactionBlocks": { + "nodes": [ + { + "digest": "BjA5MhqTJWEkp2gyPhBBBYcER2nYoYevdGSmagrrvVRb" + } + ] + }, + "endTimestamp": "1970-01-01T00:00:00Z" + } + } +} + +task 12, lines 132-182: +//# run-graphql +Response: { + "data": { + "epoch": { + "epochId": 2, + "referenceGasPrice": "1000", + "validatorSet": { + "totalStake": "20000010003000000", + "activeValidators": { + "nodes": [ + { + "name": "validator-0" + } + ] + } + }, + "startTimestamp": "1970-01-01T00:00:00Z", + "totalCheckpoints": 0, + "totalTransactions": null, + "totalGasFees": null, + "totalStakeRewards": null, + "totalStakeSubsidies": null, + "fundSize": "16096040", + "netInflow": null, + "fundInflow": null, + "fundOutflow": null, + "storageFund": { + "totalObjectStorageRebates": "16066400", + "nonRefundableBalance": "29640" + }, + "safeMode": { + "enabled": false + }, + "systemStateVersion": 2, + "systemParameters": { + "stakeSubsidyStartEpoch": 0 + }, + "systemStakeSubsidy": { + "balance": "9949400000000000000", + "currentDistributionAmount": "1000000000000000" + }, + "checkpoints": { + "nodes": [ + { + "sequenceNumber": 6 + } + ] + }, + "transactionBlocks": { + "nodes": [] + }, + "endTimestamp": null + } + } +} diff --git a/crates/sui-graphql-e2e-tests/tests/stable/epoch/epoch_start_to_end.move b/crates/sui-graphql-e2e-tests/tests/stable/epoch/epoch_start_to_end.move new file mode 100644 index 0000000000000..28b0b82822c92 --- /dev/null +++ b/crates/sui-graphql-e2e-tests/tests/stable/epoch/epoch_start_to_end.move @@ -0,0 +1,182 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//# init --protocol-version 51 --simulator --accounts C + +//# create-checkpoint + +//# advance-epoch + +//# programmable --sender C --inputs 10000000000 @C +//> SplitCoins(Gas, [Input(0)]); +//> TransferObjects([Result(0)], Input(1)); + +//# programmable --sender C --inputs 5000000000 @C +//> SplitCoins(Gas, [Input(0)]); +//> TransferObjects([Result(0)], Input(1)); + +//# run 0x3::sui_system::request_add_stake --args object(0x5) object(3,0) @validator_0 --sender C + +//# create-checkpoint + +//# run-graphql +{ + epoch(id: 1) { + epochId + referenceGasPrice + validatorSet { + totalStake + activeValidators { + nodes { + name + } + } + } + startTimestamp + totalCheckpoints + totalTransactions + totalGasFees + totalStakeRewards + totalStakeSubsidies + fundSize + netInflow + fundInflow + fundOutflow + storageFund { + totalObjectStorageRebates + nonRefundableBalance + } + safeMode { + enabled + } + systemStateVersion + systemParameters { + stakeSubsidyStartEpoch + } + systemStakeSubsidy { + balance + currentDistributionAmount + } + checkpoints(last: 1) { + nodes { + sequenceNumber + } + } + transactionBlocks(last: 1) { + nodes { + digest + } + } + endTimestamp + } +} + +//# create-checkpoint + +//# advance-epoch + +//# create-checkpoint + +//# run-graphql +{ + epoch(id: 1) { + epochId + referenceGasPrice + validatorSet { + totalStake + activeValidators { + nodes { + name + } + } + } + startTimestamp + totalCheckpoints + totalTransactions + totalGasFees + totalStakeRewards + totalStakeSubsidies + fundSize + netInflow + fundInflow + fundOutflow + storageFund { + totalObjectStorageRebates + nonRefundableBalance + } + safeMode { + enabled + } + systemStateVersion + systemParameters { + stakeSubsidyStartEpoch + } + systemStakeSubsidy { + balance + currentDistributionAmount + } + checkpoints(last: 1) { + nodes { + sequenceNumber + } + } + transactionBlocks(last: 1) { + nodes { + digest + } + } + endTimestamp + } +} + +//# run-graphql +{ + epoch(id: 2) { + epochId + referenceGasPrice + validatorSet { + totalStake + activeValidators { + nodes { + name + } + } + } + startTimestamp + totalCheckpoints + totalTransactions + totalGasFees + totalStakeRewards + totalStakeSubsidies + fundSize + netInflow + fundInflow + fundOutflow + storageFund { + totalObjectStorageRebates + nonRefundableBalance + } + safeMode { + enabled + } + systemStateVersion + systemParameters { + stakeSubsidyStartEpoch + } + systemStakeSubsidy { + balance + currentDistributionAmount + } + checkpoints(last: 1) { + nodes { + sequenceNumber + } + } + transactionBlocks(last: 1) { + nodes { + digest + } + } + endTimestamp + } +} diff --git a/crates/sui-graphql-rpc/src/test_infra/cluster.rs b/crates/sui-graphql-rpc/src/test_infra/cluster.rs index 28b451a958902..27ee4f5a23b31 100644 --- a/crates/sui-graphql-rpc/src/test_infra/cluster.rs +++ b/crates/sui-graphql-rpc/src/test_infra/cluster.rs @@ -372,6 +372,56 @@ pub async fn wait_for_graphql_checkpoint_catchup( .expect("Timeout waiting for graphql to catchup to checkpoint"); } +/// Ping the GraphQL server until its background task has updated the checkpoint watermark to the +/// desired checkpoint. +pub async fn wait_for_graphql_epoch_catchup( + client: &SimpleClient, + epoch: u64, + base_timeout: Duration, +) { + info!( + "Waiting for graphql to catchup to epoch {}, base time out is {}", + epoch, + base_timeout.as_secs() + ); + let query = r#" + { + epoch { + epochId + } + }"#; + + let timeout = base_timeout.mul_f64(epoch.max(1) as f64); + + tokio::time::timeout(timeout, async { + loop { + let resp = client + .execute_to_graphql(query.to_string(), false, vec![], vec![]) + .await + .unwrap() + .response_body_json(); + + let latest_epoch = resp["data"]["epoch"].get("epochId"); + info!("Latest epoch: {:?}", latest_epoch); + // Indexer has not picked up any epochs yet + let Some(latest_epoch) = latest_epoch else { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + + // Indexer has picked up an epoch, but it's not the one we're waiting for + let latest_epoch = latest_epoch.as_u64().unwrap(); + if latest_epoch < epoch { + tokio::time::sleep(Duration::from_secs(1)).await; + } else { + break; + } + } + }) + .await + .expect("Timeout waiting for graphql to catchup to epoch"); +} + /// Ping the GraphQL server for a checkpoint until an empty response is returned, indicating that /// the checkpoint has been pruned. pub async fn wait_for_graphql_checkpoint_pruned( @@ -423,6 +473,12 @@ impl Cluster { wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await } + /// Waits for the indexer to index up to the given epoch, then waits for the graphql service's + /// background task to update the corresponding watermark. + pub async fn wait_for_epoch_catchup(&self, epoch: u64, base_timeout: Duration) { + wait_for_graphql_epoch_catchup(&self.graphql_client, epoch, base_timeout).await + } + /// Waits for the indexer to prune a given checkpoint. pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) { wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await diff --git a/crates/sui-graphql-rpc/tests/e2e_tests.rs b/crates/sui-graphql-rpc/tests/e2e_tests.rs index b51f94e40ccea..7f4321f59594c 100644 --- a/crates/sui-graphql-rpc/tests/e2e_tests.rs +++ b/crates/sui-graphql-rpc/tests/e2e_tests.rs @@ -744,7 +744,7 @@ async fn test_dry_run_failed_execution() { } #[tokio::test] -async fn test_epoch_data() { +async fn test_epoch_live_object_set_digest() { telemetry_subscribers::init_for_testing(); let cluster = start_cluster(ServiceConfig::test_defaults()).await; @@ -756,7 +756,9 @@ async fn test_epoch_data() { .await; // Wait for the epoch to be indexed - sleep(Duration::from_secs(10)).await; + cluster + .wait_for_epoch_catchup(0, Duration::from_secs(30)) + .await; // Query the epoch let query = " diff --git a/crates/sui-indexer/migrations/pg/2024-10-09-180628_add_network_total_transactions_to_epochs/down.sql b/crates/sui-indexer/migrations/pg/2024-10-09-180628_add_network_total_transactions_to_epochs/down.sql new file mode 100644 index 0000000000000..e088120452e58 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-10-09-180628_add_network_total_transactions_to_epochs/down.sql @@ -0,0 +1 @@ +ALTER TABLE epochs DROP COLUMN first_tx_sequence_number; diff --git a/crates/sui-indexer/migrations/pg/2024-10-09-180628_add_network_total_transactions_to_epochs/up.sql b/crates/sui-indexer/migrations/pg/2024-10-09-180628_add_network_total_transactions_to_epochs/up.sql new file mode 100644 index 0000000000000..becdb61fe5e83 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-10-09-180628_add_network_total_transactions_to_epochs/up.sql @@ -0,0 +1 @@ +ALTER TABLE epochs ADD COLUMN first_tx_sequence_number bigint; diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 4f722ba8d0b3b..be4e0d375a923 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -29,11 +29,12 @@ use crate::errors::IndexerError; use crate::handlers::committer::start_tx_checkpoint_commit_task; use crate::metrics::IndexerMetrics; use crate::models::display::StoredDisplay; +use crate::models::epoch::{EndOfEpochUpdate, StartOfEpochUpdate}; use crate::models::obj_indices::StoredObjectVersion; use crate::store::{IndexerStore, PgIndexerStore}; use crate::types::{ - EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, - IndexedObject, IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, + EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject, + IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, }; use super::tx_processor::EpochEndIndexingObjectStore; @@ -152,12 +153,12 @@ impl CheckpointHandler { get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary(); return Ok(Some(EpochToCommit { last_epoch: None, - new_epoch: IndexedEpochInfo::from_new_system_state_summary( + new_epoch: StartOfEpochUpdate::new( system_state_summary, 0, //first_checkpoint_id + 0, // first_tx_sequence_number None, ), - network_total_transactions: 0, })); } @@ -183,13 +184,9 @@ impl CheckpointHandler { let event = bcs::from_bytes::(&epoch_event.contents)?; - // Now we just entered epoch X, we want to calculate the diff between - // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2). Note that on - // the indexer's chain-reading side, this is not guaranteed to have the latest data. Rather - // than impose a wait on the reading side, however, we overwrite this on the persisting - // side, where we can guarantee that the previous epoch's checkpoints have been written to - // db. - + // At some point while committing data in epoch X - 1, we will encounter a new epoch X. We + // want to retrieve X - 2's network total transactions to calculate the number of + // transactions that occurred in epoch X - 1. let network_tx_count_prev_epoch = match system_state_summary.epoch { // If first epoch change, this number is 0 1 => Ok(0), @@ -197,23 +194,28 @@ impl CheckpointHandler { let last_epoch = system_state_summary.epoch - 2; state .get_network_total_transactions_by_end_of_epoch(last_epoch) - .await + .await? + .ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError(format!( + "Network total transactions for epoch {} not found", + last_epoch + )) + }) } }?; Ok(Some(EpochToCommit { - last_epoch: Some(IndexedEpochInfo::from_end_of_epoch_data( - system_state_summary.clone(), + last_epoch: Some(EndOfEpochUpdate::new( checkpoint_summary, &event, network_tx_count_prev_epoch, )), - new_epoch: IndexedEpochInfo::from_new_system_state_summary( + new_epoch: StartOfEpochUpdate::new( system_state_summary, checkpoint_summary.sequence_number + 1, // first_checkpoint_id + checkpoint_summary.network_total_transactions, Some(&event), ), - network_total_transactions: checkpoint_summary.network_total_transactions, })) } diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index f67f2fad6f007..ad9df09be4894 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -56,7 +56,7 @@ where let epoch = checkpoint.epoch.clone(); batch.push(checkpoint); next_checkpoint_sequence_number += 1; - let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch.epoch); + let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch_id()); // The batch will consist of contiguous checkpoints and at most one epoch boundary at // the end. if batch.len() == checkpoint_commit_batch_size || epoch.is_some() { diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index 75b64394f1ba3..eb99d56abe36b 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -12,10 +12,14 @@ use tokio_util::sync::CancellationToken; use crate::{ errors::IndexerError, - models::{display::StoredDisplay, obj_indices::StoredObjectVersion}, + models::{ + display::StoredDisplay, + epoch::{EndOfEpochUpdate, StartOfEpochUpdate}, + obj_indices::StoredObjectVersion, + }, types::{ - EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, - IndexedObject, IndexedPackage, IndexedTransaction, IndexerResult, TxIndex, + EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject, + IndexedPackage, IndexedTransaction, IndexerResult, TxIndex, }, }; @@ -50,9 +54,28 @@ pub struct TransactionObjectChangesToCommit { #[derive(Clone, Debug)] pub struct EpochToCommit { - pub last_epoch: Option, - pub new_epoch: IndexedEpochInfo, - pub network_total_transactions: u64, + pub last_epoch: Option, + pub new_epoch: StartOfEpochUpdate, +} + +impl EpochToCommit { + pub fn new_epoch_id(&self) -> u64 { + self.new_epoch.epoch as u64 + } + + pub fn new_epoch_first_checkpoint_id(&self) -> u64 { + self.new_epoch.first_checkpoint_id as u64 + } + + pub fn last_epoch_total_transactions(&self) -> Option { + self.last_epoch + .as_ref() + .map(|e| e.epoch_total_transactions as u64) + } + + pub fn new_epoch_first_tx_sequence_number(&self) -> u64 { + self.new_epoch.first_tx_sequence_number as u64 + } } pub struct CommonHandler { diff --git a/crates/sui-indexer/src/models/epoch.rs b/crates/sui-indexer/src/models/epoch.rs index 482e1747ede2b..0918e50c72c35 100644 --- a/crates/sui-indexer/src/models/epoch.rs +++ b/crates/sui-indexer/src/models/epoch.rs @@ -2,10 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::schema::epochs; -use crate::types::IndexedEpochInfo; use crate::{errors::IndexerError, schema::feature_flags, schema::protocol_configs}; +use diesel::prelude::{AsChangeset, Identifiable}; use diesel::{Insertable, Queryable, Selectable}; use sui_json_rpc_types::{EndOfEpochInfo, EpochInfo}; +use sui_types::event::SystemEpochInfoEvent; +use sui_types::messages_checkpoint::CertifiedCheckpointSummary; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; #[derive(Queryable, Insertable, Debug, Clone, Default)] @@ -32,6 +34,41 @@ pub struct StoredEpochInfo { pub epoch_commitments: Option>, /// This is the system state summary at the beginning of the epoch, serialized as JSON. pub system_state_summary_json: Option, + /// First transaction sequence number of this epoch. + pub first_tx_sequence_number: Option, +} + +#[derive(Insertable, Identifiable, AsChangeset, Clone, Debug)] +#[diesel(primary_key(epoch))] +#[diesel(table_name = epochs)] +pub struct StartOfEpochUpdate { + pub epoch: i64, + pub first_checkpoint_id: i64, + pub first_tx_sequence_number: i64, + pub epoch_start_timestamp: i64, + pub reference_gas_price: i64, + pub protocol_version: i64, + pub total_stake: i64, + pub storage_fund_balance: i64, + pub system_state_summary_json: serde_json::Value, +} + +#[derive(Identifiable, AsChangeset, Clone, Debug)] +#[diesel(primary_key(epoch))] +#[diesel(table_name = epochs)] +pub struct EndOfEpochUpdate { + pub epoch: i64, + pub epoch_total_transactions: i64, + pub last_checkpoint_id: i64, + pub epoch_end_timestamp: i64, + pub storage_fund_reinvestment: i64, + pub storage_charge: i64, + pub storage_rebate: i64, + pub stake_subsidy_amount: i64, + pub total_gas_fees: i64, + pub total_stake_rewards_distributed: i64, + pub leftover_storage_fund_inflow: i64, + pub epoch_commitments: Vec, } #[derive(Queryable, Insertable, Debug, Clone, Default)] @@ -61,6 +98,7 @@ pub struct QueryableEpochInfo { pub total_stake: i64, pub storage_fund_balance: i64, pub epoch_total_transactions: Option, + pub first_tx_sequence_number: Option, pub last_checkpoint_id: Option, pub epoch_end_timestamp: Option, pub storage_fund_reinvestment: Option, @@ -79,60 +117,63 @@ pub struct QueryableEpochSystemState { pub system_state: Vec, } -impl StoredEpochInfo { - pub fn from_epoch_beginning_info(e: &IndexedEpochInfo) -> Self { +impl StartOfEpochUpdate { + pub fn new( + new_system_state_summary: SuiSystemStateSummary, + first_checkpoint_id: u64, + first_tx_sequence_number: u64, + event: Option<&SystemEpochInfoEvent>, + ) -> Self { Self { - epoch: e.epoch as i64, - system_state_summary_json: Some( - serde_json::to_value(e.system_state_summary.clone()).unwrap(), - ), - first_checkpoint_id: e.first_checkpoint_id as i64, - epoch_start_timestamp: e.epoch_start_timestamp as i64, - reference_gas_price: e.reference_gas_price as i64, - protocol_version: e.protocol_version as i64, - total_stake: e.total_stake as i64, - storage_fund_balance: e.storage_fund_balance as i64, - ..Default::default() + epoch: new_system_state_summary.epoch as i64, + system_state_summary_json: serde_json::to_value(new_system_state_summary.clone()) + .unwrap(), + first_checkpoint_id: first_checkpoint_id as i64, + first_tx_sequence_number: first_tx_sequence_number as i64, + epoch_start_timestamp: new_system_state_summary.epoch_start_timestamp_ms as i64, + reference_gas_price: new_system_state_summary.reference_gas_price as i64, + protocol_version: new_system_state_summary.protocol_version as i64, + // NOTE: total_stake and storage_fund_balance are about new epoch, + // although the event is generated at the end of the previous epoch, + // the event is optional b/c no such event for the first epoch. + total_stake: event.map(|e| e.total_stake as i64).unwrap_or(0), + storage_fund_balance: event.map(|e| e.storage_fund_balance as i64).unwrap_or(0), } } +} - // TODO: It's a bit fragile to construct the full data structure but only - // commit partial data. We should refactor this. - pub fn from_epoch_end_info(e: &IndexedEpochInfo) -> Self { +impl EndOfEpochUpdate { + pub fn new( + last_checkpoint_summary: &CertifiedCheckpointSummary, + event: &SystemEpochInfoEvent, + first_tx_sequence_number: u64, + ) -> Self { Self { - epoch: e.epoch as i64, - // TODO: Deprecate this. - system_state: None, - // At epoch end the system state would be the state of the next epoch, so we ignore it. - system_state_summary_json: None, - epoch_total_transactions: e.epoch_total_transactions.map(|v| v as i64), - last_checkpoint_id: e.last_checkpoint_id.map(|v| v as i64), - epoch_end_timestamp: e.epoch_end_timestamp.map(|v| v as i64), - storage_fund_reinvestment: e.storage_fund_reinvestment.map(|v| v as i64), - storage_charge: e.storage_charge.map(|v| v as i64), - storage_rebate: e.storage_rebate.map(|v| v as i64), - stake_subsidy_amount: e.stake_subsidy_amount.map(|v| v as i64), - total_gas_fees: e.total_gas_fees.map(|v| v as i64), - total_stake_rewards_distributed: e.total_stake_rewards_distributed.map(|v| v as i64), - leftover_storage_fund_inflow: e.leftover_storage_fund_inflow.map(|v| v as i64), - epoch_commitments: e - .epoch_commitments - .as_ref() - .map(|v| bcs::to_bytes(&v).unwrap()), - - // For the following fields: - // we don't update these columns when persisting EndOfEpoch data. - // However if the data is partial, diesel would interpret them - // as Null and hence cause errors. - first_checkpoint_id: 0, - epoch_start_timestamp: 0, - reference_gas_price: 0, - protocol_version: 0, - total_stake: 0, - storage_fund_balance: 0, + epoch: last_checkpoint_summary.epoch as i64, + epoch_total_transactions: (last_checkpoint_summary.network_total_transactions + - first_tx_sequence_number) as i64, + last_checkpoint_id: *last_checkpoint_summary.sequence_number() as i64, + epoch_end_timestamp: last_checkpoint_summary.timestamp_ms as i64, + storage_fund_reinvestment: event.storage_fund_reinvestment as i64, + storage_charge: event.storage_charge as i64, + storage_rebate: event.storage_rebate as i64, + leftover_storage_fund_inflow: event.leftover_storage_fund_inflow as i64, + stake_subsidy_amount: event.stake_subsidy_amount as i64, + total_gas_fees: event.total_gas_fees as i64, + total_stake_rewards_distributed: event.total_stake_rewards_distributed as i64, + epoch_commitments: bcs::to_bytes( + &last_checkpoint_summary + .end_of_epoch_data + .clone() + .unwrap() + .epoch_commitments, + ) + .unwrap(), } } +} +impl StoredEpochInfo { pub fn get_json_system_state_summary(&self) -> Result { let Some(system_state_summary_json) = self.system_state_summary_json.clone() else { return Err(IndexerError::PersistentStorageDataCorruptionError( diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index f56872fc80e90..a2c418db8e042 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -62,6 +62,7 @@ diesel::table! { leftover_storage_fund_inflow -> Nullable, epoch_commitments -> Nullable, system_state_summary_json -> Nullable, + first_tx_sequence_number -> Nullable, } } diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index f623a72f70fe2..80c645951308e 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -96,8 +96,10 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { async fn persist_packages(&self, packages: Vec) -> Result<(), IndexerError>; + /// Updates the current epoch with end-of-epoch data, and writes a new epoch to the database. async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError>; + /// Updates epoch-partitioned tables to accept data from the new epoch. async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError>; async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError>; @@ -105,7 +107,7 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { async fn get_network_total_transactions_by_end_of_epoch( &self, epoch: u64, - ) -> Result; + ) -> Result, IndexerError>; async fn upload_display(&self, epoch: u64) -> Result<(), IndexerError>; diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 0101e4989697f..0750b65806707 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -1226,64 +1226,20 @@ impl PgIndexerStore { async { if let Some(last_epoch) = &epoch.last_epoch { let last_epoch_id = last_epoch.epoch; - // Overwrites the `epoch_total_transactions` field on `epoch.last_epoch` because - // we are not guaranteed to have the latest data in db when this is set on - // indexer's chain-reading side. However, when we `persist_epoch`, the - // checkpoints from an epoch ago must have been indexed. - let previous_epoch_network_total_transactions = match epoch_id { - 0 | 1 => 0, - _ => { - let prev_epoch_id = epoch_id - 2; - let result = checkpoints::table - .filter(checkpoints::epoch.eq(prev_epoch_id as i64)) - .select(max(checkpoints::network_total_transactions)) - .first::>(conn) - .await - .map(|o| o.unwrap_or(0))?; - - result as u64 - } - }; - - let epoch_total_transactions = epoch.network_total_transactions - - previous_epoch_network_total_transactions; - let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); - last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64); info!(last_epoch_id, "Persisting epoch end data."); - diesel::insert_into(epochs::table) - .values(vec![last_epoch]) - .on_conflict(epochs::epoch) - .do_update() - .set(( - epochs::system_state.eq(excluded(epochs::system_state)), - epochs::epoch_total_transactions - .eq(excluded(epochs::epoch_total_transactions)), - epochs::last_checkpoint_id.eq(excluded(epochs::last_checkpoint_id)), - epochs::epoch_end_timestamp.eq(excluded(epochs::epoch_end_timestamp)), - epochs::storage_fund_reinvestment - .eq(excluded(epochs::storage_fund_reinvestment)), - epochs::storage_charge.eq(excluded(epochs::storage_charge)), - epochs::storage_rebate.eq(excluded(epochs::storage_rebate)), - epochs::stake_subsidy_amount.eq(excluded(epochs::stake_subsidy_amount)), - epochs::total_gas_fees.eq(excluded(epochs::total_gas_fees)), - epochs::total_stake_rewards_distributed - .eq(excluded(epochs::total_stake_rewards_distributed)), - epochs::leftover_storage_fund_inflow - .eq(excluded(epochs::leftover_storage_fund_inflow)), - epochs::epoch_commitments.eq(excluded(epochs::epoch_commitments)), - )) + diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch_id))) + .set(last_epoch) .execute(conn) .await?; } let epoch_id = epoch.new_epoch.epoch; info!(epoch_id, "Persisting epoch beginning info"); - let new_epoch = StoredEpochInfo::from_epoch_beginning_info(&epoch.new_epoch); let error_message = concat!("Failed to write to ", stringify!((epochs::table)), " DB"); diesel::insert_into(epochs::table) - .values(new_epoch) + .values(epoch.new_epoch) .on_conflict_do_nothing() .execute(conn) .await @@ -1312,7 +1268,7 @@ impl PgIndexerStore { // partition_0 has been created, so no need to advance it. if let Some(last_epoch_id) = last_epoch_id { let last_db_epoch: Option = epochs::table - .filter(epochs::epoch.eq(last_epoch_id as i64)) + .filter(epochs::epoch.eq(last_epoch_id)) .first::(&mut connection) .await .optional() @@ -1527,20 +1483,24 @@ impl PgIndexerStore { async fn get_network_total_transactions_by_end_of_epoch( &self, epoch: u64, - ) -> Result { + ) -> Result, IndexerError> { use diesel_async::RunQueryDsl; let mut connection = self.pool.get().await?; - checkpoints::table - .filter(checkpoints::epoch.eq(epoch as i64)) - .select(checkpoints::network_total_transactions) - .order_by(checkpoints::sequence_number.desc()) - .first::(&mut connection) - .await - .map_err(Into::into) - .context("Failed to get network total transactions in epoch") - .map(|v| v as u64) + // TODO: (wlmyng) update to read from epochs::network_total_transactions + + Ok(Some( + checkpoints::table + .filter(checkpoints::epoch.eq(epoch as i64)) + .select(checkpoints::network_total_transactions) + .order_by(checkpoints::sequence_number.desc()) + .first::(&mut connection) + .await + .map_err(Into::into) + .context("Failed to get network total transactions in epoch") + .map(|v| v as u64)?, + )) } async fn update_watermarks_upper_bound( @@ -2162,7 +2122,7 @@ impl IndexerStore for PgIndexerStore { async fn get_network_total_transactions_by_end_of_epoch( &self, epoch: u64, - ) -> Result { + ) -> Result, IndexerError> { self.get_network_total_transactions_by_end_of_epoch(epoch) .await } diff --git a/crates/sui-indexer/src/store/pg_partition_manager.rs b/crates/sui-indexer/src/store/pg_partition_manager.rs index 2dbc031cc5f07..876a1b9c56146 100644 --- a/crates/sui-indexer/src/store/pg_partition_manager.rs +++ b/crates/sui-indexer/src/store/pg_partition_manager.rs @@ -64,15 +64,11 @@ impl EpochPartitionData { pub fn compose_data(epoch: EpochToCommit, last_db_epoch: StoredEpochInfo) -> Self { let last_epoch = last_db_epoch.epoch as u64; let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64; - let next_epoch = epoch.new_epoch.epoch; - let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id; - - // Determining the tx_sequence_number range for the epoch partition differs from the - // checkpoint_sequence_number range, because the former is a sum of total transactions - - // this sum already addresses the off-by-one. - let next_epoch_start_tx = epoch.network_total_transactions; + let next_epoch = epoch.new_epoch_id(); + let next_epoch_start_cp = epoch.new_epoch_first_checkpoint_id(); + let next_epoch_start_tx = epoch.new_epoch_first_tx_sequence_number(); let last_epoch_start_tx = - next_epoch_start_tx - last_db_epoch.epoch_total_transactions.unwrap() as u64; + next_epoch_start_tx - epoch.last_epoch_total_transactions().unwrap(); Self { last_epoch, diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index c7628a593e14a..6c88e3d27641a 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -14,7 +14,6 @@ use sui_types::crypto::AggregateAuthoritySignature; use sui_types::digests::TransactionDigest; use sui_types::dynamic_field::DynamicFieldType; use sui_types::effects::TransactionEffects; -use sui_types::event::SystemEpochInfoEvent; use sui_types::messages_checkpoint::{ CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData, @@ -22,7 +21,6 @@ use sui_types::messages_checkpoint::{ use sui_types::move_package::MovePackage; use sui_types::object::{Object, Owner}; use sui_types::sui_serde::SuiStructTag; -use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; use sui_types::transaction::SenderSignedData; use crate::errors::IndexerError; @@ -98,104 +96,6 @@ impl IndexedCheckpoint { } } -/// Represents system state and summary info at the start and end of an epoch. Optional fields are -/// populated at epoch boundary, since they cannot be determined at the start of the epoch. -#[derive(Clone, Debug)] -pub struct IndexedEpochInfo { - pub epoch: u64, - pub first_checkpoint_id: u64, - pub epoch_start_timestamp: u64, - pub reference_gas_price: u64, - pub protocol_version: u64, - pub total_stake: u64, - pub storage_fund_balance: u64, - pub system_state_summary: SuiSystemStateSummary, - pub epoch_total_transactions: Option, - pub last_checkpoint_id: Option, - pub epoch_end_timestamp: Option, - pub storage_fund_reinvestment: Option, - pub storage_charge: Option, - pub storage_rebate: Option, - pub stake_subsidy_amount: Option, - pub total_gas_fees: Option, - pub total_stake_rewards_distributed: Option, - pub leftover_storage_fund_inflow: Option, - pub epoch_commitments: Option>, -} - -impl IndexedEpochInfo { - pub fn from_new_system_state_summary( - new_system_state_summary: SuiSystemStateSummary, - first_checkpoint_id: u64, - event: Option<&SystemEpochInfoEvent>, - ) -> IndexedEpochInfo { - Self { - epoch: new_system_state_summary.epoch, - first_checkpoint_id, - epoch_start_timestamp: new_system_state_summary.epoch_start_timestamp_ms, - reference_gas_price: new_system_state_summary.reference_gas_price, - protocol_version: new_system_state_summary.protocol_version, - // NOTE: total_stake and storage_fund_balance are about new epoch, - // although the event is generated at the end of the previous epoch, - // the event is optional b/c no such event for the first epoch. - total_stake: event.map(|e| e.total_stake).unwrap_or(0), - storage_fund_balance: event.map(|e| e.storage_fund_balance).unwrap_or(0), - system_state_summary: new_system_state_summary, - epoch_total_transactions: None, - last_checkpoint_id: None, - epoch_end_timestamp: None, - storage_fund_reinvestment: None, - storage_charge: None, - storage_rebate: None, - stake_subsidy_amount: None, - total_gas_fees: None, - total_stake_rewards_distributed: None, - leftover_storage_fund_inflow: None, - epoch_commitments: None, - } - } - - /// Creates `IndexedEpochInfo` for epoch X-1 at the boundary of epoch X-1 to X. - /// `network_total_tx_num_at_last_epoch_end` is needed to determine the number of transactions - /// that occurred in the epoch X-1. - pub fn from_end_of_epoch_data( - system_state_summary: SuiSystemStateSummary, - last_checkpoint_summary: &CertifiedCheckpointSummary, - event: &SystemEpochInfoEvent, - network_total_tx_num_at_last_epoch_end: u64, - ) -> IndexedEpochInfo { - Self { - epoch: last_checkpoint_summary.epoch, - epoch_total_transactions: Some( - last_checkpoint_summary.network_total_transactions - - network_total_tx_num_at_last_epoch_end, - ), - last_checkpoint_id: Some(*last_checkpoint_summary.sequence_number()), - epoch_end_timestamp: Some(last_checkpoint_summary.timestamp_ms), - storage_fund_reinvestment: Some(event.storage_fund_reinvestment), - storage_charge: Some(event.storage_charge), - storage_rebate: Some(event.storage_rebate), - leftover_storage_fund_inflow: Some(event.leftover_storage_fund_inflow), - stake_subsidy_amount: Some(event.stake_subsidy_amount), - total_gas_fees: Some(event.total_gas_fees), - total_stake_rewards_distributed: Some(event.total_stake_rewards_distributed), - epoch_commitments: last_checkpoint_summary - .end_of_epoch_data - .as_ref() - .map(|e| e.epoch_commitments.clone()), - system_state_summary, - // The following felds will not and shall not be upserted - // into DB. We have them below to make compiler and diesel happy - first_checkpoint_id: 0, - epoch_start_timestamp: 0, - reference_gas_price: 0, - protocol_version: 0, - total_stake: 0, - storage_fund_balance: 0, - } - } -} - #[derive(Debug, Clone)] pub struct IndexedEvent { pub tx_sequence_number: u64,