From 03be8f59c51b456c8fcf2f494bc154a72a06afbc Mon Sep 17 00:00:00 2001 From: Will Yang Date: Mon, 30 Sep 2024 19:03:48 -0700 Subject: [PATCH] Committer pushes one update through store --- crates/sui-indexer/README.md | 8 +- .../pg/2024-09-12-213234_watermarks/down.sql | 1 + .../pg/2024-09-12-213234_watermarks/up.sql | 32 ++++++++ crates/sui-indexer/src/handlers/committer.rs | 36 ++++++++- crates/sui-indexer/src/handlers/pruner.rs | 32 ++++++++ crates/sui-indexer/src/indexer.rs | 2 + crates/sui-indexer/src/metrics.rs | 8 ++ crates/sui-indexer/src/models/mod.rs | 1 + crates/sui-indexer/src/models/watermarks.rs | 49 ++++++++++++ crates/sui-indexer/src/schema.rs | 14 ++++ crates/sui-indexer/src/store/indexer_store.rs | 9 +++ .../sui-indexer/src/store/pg_indexer_store.rs | 74 ++++++++++++++++++- 12 files changed, 262 insertions(+), 4 deletions(-) create mode 100644 crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/down.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql create mode 100644 crates/sui-indexer/src/models/watermarks.rs diff --git a/crates/sui-indexer/README.md b/crates/sui-indexer/README.md index dd57acf635aa14..cdf7a14a909473 100644 --- a/crates/sui-indexer/README.md +++ b/crates/sui-indexer/README.md @@ -32,7 +32,7 @@ See the [docs](https://docs.sui.io/guides/developer/getting-started/local-networ Start a local network using the `sui` binary: ```sh -cargo run --bin sui -- start --with-faucet --force-regenesis +cargo run --bin sui -- start --with-faucet --force-regenesis ``` If you want to run a local network with the indexer enabled (note that `libpq` is required), you can run the following command after following the steps in the next section to set up an indexer DB: @@ -124,3 +124,9 @@ Note that you need an existing database for this to work. Using the DATABASE_URL # Change the RPC_CLIENT_URL to http://0.0.0.0:9000 to run indexer against local validator & fullnode cargo run --bin sui-indexer --features mysql-feature --no-default-features -- --db-url "" --rpc-client-url "https://fullnode.devnet.sui.io:443" --fullnode-sync-worker --reset-db ``` + +### Extending the indexer + +To add a new table, run `diesel migration generate your_table_name`, and modify the newly created `up.sql` and `down.sql` files. + +You would apply the migration with `diesel migration run`, and run the script in `./scripts/generate_indexer_schema.sh` to update the `schema.rs` file. diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/down.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/down.sql new file mode 100644 index 00000000000000..e9de336153f629 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS watermarks; diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql new file mode 100644 index 00000000000000..6efe18466b3499 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql @@ -0,0 +1,32 @@ +CREATE TABLE watermarks +( + -- The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`. + entity TEXT NOT NULL, + -- Inclusive upper bound epoch this entity has data for. Committer updates this field. Pruner + -- uses this field for per-entity epoch-level retention, and is mostly useful for pruning + -- unpartitioned tables. + epoch_hi BIGINT NOT NULL, + -- Inclusive lower bound epoch this entity has data for. Pruner updates this field, and uses + -- this field in tandem with `epoch_hi` for per-entity epoch-level retention. This is mostly + -- useful for pruning unpartitioned tables. + epoch_lo BIGINT NOT NULL, + -- Inclusive upper bound checkpoint this entity has data for. Committer updates this field. All + -- data of this entity in the checkpoint must be persisted before advancing this watermark. The + -- committer or ingestion task refers to this on disaster recovery. + checkpoint_hi BIGINT NOT NULL, + -- Inclusive high watermark that the committer advances. For `checkpoints`, this represents the + -- checkpoint sequence number, for `transactions`, the transaction sequence number, etc. + reader_hi BIGINT NOT NULL, + -- Inclusive low watermark that the pruner advances. Data before this watermark is considered + -- pruned by a reader. The underlying data may still exist in the db instance. + reader_lo BIGINT NOT NULL, + -- Updated using the database's current timestamp when the pruner sees that some data needs to + -- be dropped. The pruner uses this column to determine whether to prune or wait long enough + -- that all in-flight reads complete or timeout before it acts on an updated watermark. + timestamp_ms BIGINT NOT NULL, + -- Column used by the pruner to track its true progress. Data at and below this watermark has + -- been truly pruned from the db, and should no longer exist. When recovering from a crash, the + -- pruner will consult this column to determine where to continue. + pruned_lo BIGINT, + PRIMARY KEY (entity) +); diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index ee964bbce2fc76..7558e108ac4154 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -10,9 +10,11 @@ use tracing::{error, info}; use sui_types::messages_checkpoint::CheckpointSequenceNumber; +use crate::handlers::pruner::PrunableTable; use crate::metrics::IndexerMetrics; use crate::store::IndexerStore; use crate::types::IndexerResult; +use strum::IntoEnumIterator; use super::{CheckpointDataToCommit, EpochToCommit}; @@ -57,6 +59,8 @@ where batch.push(checkpoint); next_checkpoint_sequence_number += 1; let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch.epoch); + // The batch will consist of contiguous checkpoints and at most one epoch boundary at + // the end. if batch.len() == checkpoint_commit_batch_size || epoch.is_some() { commit_checkpoints(&state, batch, epoch, &metrics).await; batch = vec![]; @@ -79,6 +83,10 @@ where Ok(()) } +/// Writes indexed checkpoint data to the database, and then update watermark upper bounds and +/// metrics. Expects `indexed_checkpoint_batch` to be non-empty, and contain contiguous checkpoints. +/// There can be at most one epoch boundary at the end. If an epoch boundary is detected, +/// epoch-partitioned tables must be advanced. // Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty #[instrument(skip_all, fields( first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number, @@ -130,7 +138,14 @@ async fn commit_checkpoints( } let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number; - let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number; + let (epoch_id, last_checkpoint_seq, last_tx_seq) = { + let checkpoint = checkpoint_batch.last().unwrap(); + ( + checkpoint.epoch, + checkpoint.sequence_number, + checkpoint.max_tx_sequence_number, + ) + }; let guard = metrics.checkpoint_db_commit_latency.start_timer(); let tx_batch = tx_batch.into_iter().flatten().collect::>(); @@ -189,7 +204,8 @@ async fn commit_checkpoints( let is_epoch_end = epoch.is_some(); - // handle partitioning on epoch boundary + // On epoch boundary, we need to modify the existing partitions' upper bound, and introduce a + // new partition for incoming data for the upcoming epoch. if let Some(epoch_data) = epoch { state .advance_epoch(epoch_data) @@ -212,6 +228,22 @@ async fn commit_checkpoints( }) .expect("Persisting data into DB should not fail."); + state + .update_watermarks_upper_bound( + PrunableTable::iter().collect(), + epoch_id, + last_checkpoint_seq, + last_tx_seq, + ) + .await + .tap_err(|e| { + error!( + "Failed to update watermark upper bound with error: {}", + e.to_string() + ); + }) + .expect("Updating watermark upper bound in DB should not fail."); + if is_epoch_end { // The epoch has advanced so we update the configs for the new protocol version, if it has changed. let chain_id = state diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index 9f196a6d4f028f..a57c6eb47ab756 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -66,6 +66,38 @@ pub enum PrunableTable { PrunerCpWatermark, } +impl PrunableTable { + /// Given a committer's report of the latest written checkpoint and tx, return the value that + /// corresponds to the variant's unit to be used by readers. + pub fn map_to_reader_unit(&self, cp: u64, tx: u64) -> u64 { + match self { + PrunableTable::ObjectsHistory => cp, + PrunableTable::Checkpoints => cp, + PrunableTable::PrunerCpWatermark => cp, + PrunableTable::Events => tx, + PrunableTable::EventEmitPackage => tx, + PrunableTable::EventEmitModule => tx, + PrunableTable::EventSenders => tx, + PrunableTable::EventStructInstantiation => tx, + PrunableTable::EventStructModule => tx, + PrunableTable::EventStructName => tx, + PrunableTable::EventStructPackage => tx, + PrunableTable::Transactions => tx, + PrunableTable::TxAffectedAddresses => tx, + PrunableTable::TxAffectedObjects => tx, + PrunableTable::TxCallsPkg => tx, + PrunableTable::TxCallsMod => tx, + PrunableTable::TxCallsFun => tx, + PrunableTable::TxChangedObjects => tx, + PrunableTable::TxDigests => tx, + PrunableTable::TxInputObjects => tx, + PrunableTable::TxKinds => tx, + PrunableTable::TxRecipients => tx, + PrunableTable::TxSenders => tx, + } + } +} + impl Pruner { /// Instantiates a pruner with default retention and overrides. Pruner will finalize the /// retention policies so there is a value for every prunable table. diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index f9f1d357364c97..c37758097dcc69 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -106,6 +106,8 @@ impl Indexer { let mut exit_senders = vec![]; let mut executors = vec![]; + // Ingestion task watermarks are snapshotted once on indexer startup based on the + // corresponding watermark table before being handed off to the ingestion task. let progress_store = ShimIndexerProgressStore::new(vec![ ("primary".to_string(), primary_watermark), ("object_snapshot".to_string(), object_snapshot_watermark), diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 739a686d3cbfc0..e6e3411ecdbbb7 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -125,6 +125,7 @@ pub struct IndexerMetrics { pub checkpoint_db_commit_latency_tx_indices_chunks: Histogram, pub checkpoint_db_commit_latency_checkpoints: Histogram, pub checkpoint_db_commit_latency_epoch: Histogram, + pub checkpoint_db_commit_latency_watermarks: Histogram, pub thousand_transaction_avg_db_commit_latency: Histogram, pub object_db_commit_latency: Histogram, pub object_mutation_db_commit_latency: Histogram, @@ -536,6 +537,13 @@ impl IndexerMetrics { registry, ) .unwrap(), + checkpoint_db_commit_latency_watermarks: register_histogram_with_registry!( + "checkpoint_db_commit_latency_watermarks", + "Time spent committing watermarks", + DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), tokio_blocking_task_wait_latency: register_histogram_with_registry!( "tokio_blocking_task_wait_latency", "Time spent to wait for tokio blocking task pool", diff --git a/crates/sui-indexer/src/models/mod.rs b/crates/sui-indexer/src/models/mod.rs index e55efebb8c567e..84e8b308bc0d5c 100644 --- a/crates/sui-indexer/src/models/mod.rs +++ b/crates/sui-indexer/src/models/mod.rs @@ -12,3 +12,4 @@ pub mod packages; pub mod raw_checkpoints; pub mod transactions; pub mod tx_indices; +pub mod watermarks; diff --git a/crates/sui-indexer/src/models/watermarks.rs b/crates/sui-indexer/src/models/watermarks.rs new file mode 100644 index 00000000000000..87858105efd78a --- /dev/null +++ b/crates/sui-indexer/src/models/watermarks.rs @@ -0,0 +1,49 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::watermarks::{self}; +use diesel::prelude::*; + +/// Represents a row in the `watermarks` table. +#[derive(Queryable, Insertable, Default, QueryableByName)] +#[diesel(table_name = watermarks, primary_key(entity))] +pub struct StoredWatermark { + /// The table name of group of tables governed by this watermark, i.e `epochs`, `checkpoints`, + /// `transactions`. + pub entity: String, + /// Upper bound epoch range to enable per-entity epoch-level retention policy. Committer + /// advances this along with `high`. + pub epoch_hi: i64, + /// Lower bound epoch range to enable per-entity epoch-level retention policy. Pruner advances + /// this. + pub epoch_lo: i64, + pub checkpoint_hi: i64, + /// The inclusive high watermark that the committer advances. + pub reader_hi: i64, + /// The inclusive low watermark that the pruner advances. Data before this watermark is + /// considered pruned. + pub reader_lo: i64, + /// Pruner sets this, and uses this column to determine whether to prune or wait long enough + /// that all in-flight reads complete or timeout before it acts on an updated watermark. + pub timestamp_ms: i64, + /// Pruner updates this, and uses this when recovering from a crash to determine where to + /// continue pruning. Represents the latest watermark pruned, inclusive. + pub pruned_lo: Option, +} + +impl StoredWatermark { + pub fn from_upper_bound_update( + entity: &str, + epoch_hi: u64, + checkpoint_hi: u64, + reader_hi: u64, + ) -> Self { + StoredWatermark { + entity: entity.to_string(), + epoch_hi: epoch_hi as i64, + checkpoint_hi: checkpoint_hi as i64, + reader_hi: reader_hi as i64, + ..StoredWatermark::default() + } + } +} diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index dfc12173279e6a..5443ff42212612 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -368,6 +368,19 @@ diesel::table! { } } +diesel::table! { + watermarks (entity) { + entity -> Text, + epoch_hi -> Int8, + epoch_lo -> Int8, + checkpoint_hi -> Int8, + reader_hi -> Int8, + reader_lo -> Int8, + timestamp_ms -> Int8, + pruned_lo -> Nullable, + } +} + diesel::allow_tables_to_appear_in_same_query!( chain_identifier, checkpoints, @@ -403,4 +416,5 @@ diesel::allow_tables_to_appear_in_same_query!( tx_kinds, tx_recipients, tx_senders, + watermarks, ); diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index da4b86ba810165..3b072fd508233b 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -6,6 +6,7 @@ use std::collections::BTreeMap; use async_trait::async_trait; use crate::errors::IndexerError; +use crate::handlers::pruner::PrunableTable; use crate::handlers::{EpochToCommit, TransactionObjectChangesToCommit}; use crate::models::display::StoredDisplay; use crate::models::obj_indices::StoredObjectVersion; @@ -114,4 +115,12 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { &self, checkpoints: Vec, ) -> Result<(), IndexerError>; + + async fn update_watermarks_upper_bound( + &self, + tables: Vec, + epoch: u64, + cp: u64, + tx: u64, + ) -> Result<(), IndexerError>; } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index bfd3360f222243..8579c5edf6e93f 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -27,6 +27,7 @@ use sui_storage::object_store::util::put; use crate::config::UploadOptions; use crate::database::ConnectionPool; use crate::errors::{Context, IndexerError}; +use crate::handlers::pruner::PrunableTable; use crate::handlers::EpochToCommit; use crate::handlers::TransactionObjectChangesToCommit; use crate::metrics::IndexerMetrics; @@ -44,6 +45,7 @@ use crate::models::objects::{ }; use crate::models::packages::StoredPackage; use crate::models::transactions::StoredTransaction; +use crate::models::watermarks::StoredWatermark; use crate::schema::{ chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package, event_senders, event_struct_instantiation, event_struct_module, event_struct_name, @@ -51,7 +53,7 @@ use crate::schema::{ objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark, raw_checkpoints, transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, - tx_recipients, tx_senders, + tx_recipients, tx_senders, watermarks, }; use crate::store::transaction_with_retry; use crate::types::{EventIndex, IndexedDeletedObject, IndexedObject}; @@ -745,6 +747,8 @@ impl PgIndexerStore { .await .map_err(IndexerError::from) .context(error_message)?; + + // TODO: (wlmyng) - so should we update the `watermarks` table here actually? } Ok::<(), IndexerError>(()) } @@ -1453,6 +1457,63 @@ impl PgIndexerStore { .context("Failed to get network total transactions in epoch") .map(|v| v as u64) } + + async fn update_watermarks_upper_bound( + &self, + tables: Vec, + epoch: u64, + cp: u64, + tx: u64, + ) -> Result<(), IndexerError> { + use diesel_async::RunQueryDsl; + + let guard = self + .metrics + .checkpoint_db_commit_latency_watermarks + .start_timer(); + + transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { + let upper_bound_updates = tables + .into_iter() + .map(|table| { + let reader_upper_bound = table.map_to_reader_unit(cp, tx); + + StoredWatermark::from_upper_bound_update( + table.as_ref(), + epoch, + cp, + reader_upper_bound, + ) + }) + .collect::>(); + async { + diesel::insert_into(watermarks::table) + .values(upper_bound_updates) + .on_conflict(watermarks::entity) + .do_update() + .set(( + watermarks::epoch_hi.eq(excluded(watermarks::epoch_hi)), + watermarks::checkpoint_hi.eq(excluded(watermarks::checkpoint_hi)), + watermarks::reader_hi.eq(excluded(watermarks::reader_hi)), + )) + .execute(conn) + .await + .map_err(IndexerError::from) + .context("Failed to update watermarks upper bound")?; + + Ok::<(), IndexerError>(()) + } + .scope_boxed() + }) + .await + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted watermarks"); + }) + .tap_err(|e| { + tracing::error!("Failed to persist watermarks with error: {}", e); + }) + } } #[async_trait] @@ -2109,6 +2170,17 @@ impl IndexerStore for PgIndexerStore { ) -> Result<(), IndexerError> { self.persist_raw_checkpoints_impl(&checkpoints).await } + + async fn update_watermarks_upper_bound( + &self, + tables: Vec, + epoch: u64, + cp: u64, + tx: u64, + ) -> Result<(), IndexerError> { + self.update_watermarks_upper_bound(tables, epoch, cp, tx) + .await + } } fn make_objects_history_to_commit(