From 4939ddfece25680a42f1068930a3f2aff985a379 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Mon, 30 Sep 2024 19:03:48 -0700 Subject: [PATCH] Committer pushes one update through store --- Cargo.lock | 2 +- crates/sui-indexer/README.md | 8 ++- .../pg/2024-09-12-213234_watermarks/down.sql | 1 + .../pg/2024-09-12-213234_watermarks/up.sql | 32 +++++++++ crates/sui-indexer/src/handlers/committer.rs | 36 +++++++++- crates/sui-indexer/src/handlers/pruner.rs | 13 ++++ crates/sui-indexer/src/indexer.rs | 2 + crates/sui-indexer/src/metrics.rs | 8 +++ crates/sui-indexer/src/models/mod.rs | 1 + crates/sui-indexer/src/models/watermarks.rs | 56 +++++++++++++++ crates/sui-indexer/src/schema.rs | 14 ++++ crates/sui-indexer/src/store/indexer_store.rs | 9 +++ .../sui-indexer/src/store/pg_indexer_store.rs | 72 ++++++++++++++++++- 13 files changed, 249 insertions(+), 5 deletions(-) create mode 100644 crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/down.sql create mode 100644 crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql create mode 100644 crates/sui-indexer/src/models/watermarks.rs diff --git a/Cargo.lock b/Cargo.lock index 1613e267177fa..e0dd17bc1015a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13667,7 +13667,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-util 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-util 0.7.10", "toml 0.7.4", "tracing", "url", diff --git a/crates/sui-indexer/README.md b/crates/sui-indexer/README.md index dd57acf635aa1..cdf7a14a90947 100644 --- a/crates/sui-indexer/README.md +++ b/crates/sui-indexer/README.md @@ -32,7 +32,7 @@ See the [docs](https://docs.sui.io/guides/developer/getting-started/local-networ Start a local network using the `sui` binary: ```sh -cargo run --bin sui -- start --with-faucet --force-regenesis +cargo run --bin sui -- start --with-faucet --force-regenesis ``` If you want to run a local network with the indexer enabled (note that `libpq` is required), you can run the following command after following the steps in the next section to set up an indexer DB: @@ -124,3 +124,9 @@ Note that you need an existing database for this to work. Using the DATABASE_URL # Change the RPC_CLIENT_URL to http://0.0.0.0:9000 to run indexer against local validator & fullnode cargo run --bin sui-indexer --features mysql-feature --no-default-features -- --db-url "" --rpc-client-url "https://fullnode.devnet.sui.io:443" --fullnode-sync-worker --reset-db ``` + +### Extending the indexer + +To add a new table, run `diesel migration generate your_table_name`, and modify the newly created `up.sql` and `down.sql` files. + +You would apply the migration with `diesel migration run`, and run the script in `./scripts/generate_indexer_schema.sh` to update the `schema.rs` file. diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/down.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/down.sql new file mode 100644 index 0000000000000..e9de336153f62 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS watermarks; diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql new file mode 100644 index 0000000000000..6efe18466b349 --- /dev/null +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql @@ -0,0 +1,32 @@ +CREATE TABLE watermarks +( + -- The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`. + entity TEXT NOT NULL, + -- Inclusive upper bound epoch this entity has data for. Committer updates this field. Pruner + -- uses this field for per-entity epoch-level retention, and is mostly useful for pruning + -- unpartitioned tables. + epoch_hi BIGINT NOT NULL, + -- Inclusive lower bound epoch this entity has data for. Pruner updates this field, and uses + -- this field in tandem with `epoch_hi` for per-entity epoch-level retention. This is mostly + -- useful for pruning unpartitioned tables. + epoch_lo BIGINT NOT NULL, + -- Inclusive upper bound checkpoint this entity has data for. Committer updates this field. All + -- data of this entity in the checkpoint must be persisted before advancing this watermark. The + -- committer or ingestion task refers to this on disaster recovery. + checkpoint_hi BIGINT NOT NULL, + -- Inclusive high watermark that the committer advances. For `checkpoints`, this represents the + -- checkpoint sequence number, for `transactions`, the transaction sequence number, etc. + reader_hi BIGINT NOT NULL, + -- Inclusive low watermark that the pruner advances. Data before this watermark is considered + -- pruned by a reader. The underlying data may still exist in the db instance. + reader_lo BIGINT NOT NULL, + -- Updated using the database's current timestamp when the pruner sees that some data needs to + -- be dropped. The pruner uses this column to determine whether to prune or wait long enough + -- that all in-flight reads complete or timeout before it acts on an updated watermark. + timestamp_ms BIGINT NOT NULL, + -- Column used by the pruner to track its true progress. Data at and below this watermark has + -- been truly pruned from the db, and should no longer exist. When recovering from a crash, the + -- pruner will consult this column to determine where to continue. + pruned_lo BIGINT, + PRIMARY KEY (entity) +); diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index ee964bbce2fc7..7558e108ac415 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -10,9 +10,11 @@ use tracing::{error, info}; use sui_types::messages_checkpoint::CheckpointSequenceNumber; +use crate::handlers::pruner::PrunableTable; use crate::metrics::IndexerMetrics; use crate::store::IndexerStore; use crate::types::IndexerResult; +use strum::IntoEnumIterator; use super::{CheckpointDataToCommit, EpochToCommit}; @@ -57,6 +59,8 @@ where batch.push(checkpoint); next_checkpoint_sequence_number += 1; let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch.epoch); + // The batch will consist of contiguous checkpoints and at most one epoch boundary at + // the end. if batch.len() == checkpoint_commit_batch_size || epoch.is_some() { commit_checkpoints(&state, batch, epoch, &metrics).await; batch = vec![]; @@ -79,6 +83,10 @@ where Ok(()) } +/// Writes indexed checkpoint data to the database, and then update watermark upper bounds and +/// metrics. Expects `indexed_checkpoint_batch` to be non-empty, and contain contiguous checkpoints. +/// There can be at most one epoch boundary at the end. If an epoch boundary is detected, +/// epoch-partitioned tables must be advanced. // Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty #[instrument(skip_all, fields( first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number, @@ -130,7 +138,14 @@ async fn commit_checkpoints( } let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number; - let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number; + let (epoch_id, last_checkpoint_seq, last_tx_seq) = { + let checkpoint = checkpoint_batch.last().unwrap(); + ( + checkpoint.epoch, + checkpoint.sequence_number, + checkpoint.max_tx_sequence_number, + ) + }; let guard = metrics.checkpoint_db_commit_latency.start_timer(); let tx_batch = tx_batch.into_iter().flatten().collect::>(); @@ -189,7 +204,8 @@ async fn commit_checkpoints( let is_epoch_end = epoch.is_some(); - // handle partitioning on epoch boundary + // On epoch boundary, we need to modify the existing partitions' upper bound, and introduce a + // new partition for incoming data for the upcoming epoch. if let Some(epoch_data) = epoch { state .advance_epoch(epoch_data) @@ -212,6 +228,22 @@ async fn commit_checkpoints( }) .expect("Persisting data into DB should not fail."); + state + .update_watermarks_upper_bound( + PrunableTable::iter().collect(), + epoch_id, + last_checkpoint_seq, + last_tx_seq, + ) + .await + .tap_err(|e| { + error!( + "Failed to update watermark upper bound with error: {}", + e.to_string() + ); + }) + .expect("Updating watermark upper bound in DB should not fail."); + if is_epoch_end { // The epoch has advanced so we update the configs for the new protocol version, if it has changed. let chain_id = state diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index 9f196a6d4f028..cc50000d26368 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -66,6 +66,19 @@ pub enum PrunableTable { PrunerCpWatermark, } +impl PrunableTable { + /// Given a committer's report of the latest written checkpoint and tx, return the value that + /// corresponds to the variant's unit to be used by readers. + pub fn map_to_reader_unit(&self, cp: u64, tx: u64) -> u64 { + match self { + PrunableTable::ObjectsHistory + | PrunableTable::Checkpoints + | PrunableTable::PrunerCpWatermark => cp, + _ => tx, + } + } +} + impl Pruner { /// Instantiates a pruner with default retention and overrides. Pruner will finalize the /// retention policies so there is a value for every prunable table. diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index 33698e995de88..7d264335afbcf 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -87,6 +87,8 @@ impl Indexer { let mut exit_senders = vec![]; let mut executors = vec![]; + // Ingestion task watermarks are snapshotted once on indexer startup based on the + // corresponding watermark table before being handed off to the ingestion task. let progress_store = ShimIndexerProgressStore::new(vec![ ("primary".to_string(), primary_watermark), ("object_snapshot".to_string(), object_snapshot_watermark), diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 739a686d3cbfc..e6e3411ecdbbb 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -125,6 +125,7 @@ pub struct IndexerMetrics { pub checkpoint_db_commit_latency_tx_indices_chunks: Histogram, pub checkpoint_db_commit_latency_checkpoints: Histogram, pub checkpoint_db_commit_latency_epoch: Histogram, + pub checkpoint_db_commit_latency_watermarks: Histogram, pub thousand_transaction_avg_db_commit_latency: Histogram, pub object_db_commit_latency: Histogram, pub object_mutation_db_commit_latency: Histogram, @@ -536,6 +537,13 @@ impl IndexerMetrics { registry, ) .unwrap(), + checkpoint_db_commit_latency_watermarks: register_histogram_with_registry!( + "checkpoint_db_commit_latency_watermarks", + "Time spent committing watermarks", + DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), tokio_blocking_task_wait_latency: register_histogram_with_registry!( "tokio_blocking_task_wait_latency", "Time spent to wait for tokio blocking task pool", diff --git a/crates/sui-indexer/src/models/mod.rs b/crates/sui-indexer/src/models/mod.rs index e55efebb8c567..84e8b308bc0d5 100644 --- a/crates/sui-indexer/src/models/mod.rs +++ b/crates/sui-indexer/src/models/mod.rs @@ -12,3 +12,4 @@ pub mod packages; pub mod raw_checkpoints; pub mod transactions; pub mod tx_indices; +pub mod watermarks; diff --git a/crates/sui-indexer/src/models/watermarks.rs b/crates/sui-indexer/src/models/watermarks.rs new file mode 100644 index 0000000000000..5994cec97adb0 --- /dev/null +++ b/crates/sui-indexer/src/models/watermarks.rs @@ -0,0 +1,56 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::watermarks::{self}; +use diesel::prelude::*; + +/// Represents a row in the `watermarks` table. +#[derive(Queryable, Insertable, Default, QueryableByName)] +#[diesel(table_name = watermarks, primary_key(entity))] +pub struct StoredWatermark { + /// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`. + pub entity: String, + /// Inclusive upper bound epoch this entity has data for. Committer updates this field. Pruner + /// uses this field for per-entity epoch-level retention, and is mostly useful for pruning + /// unpartitioned tables. + pub epoch_hi: i64, + /// Inclusive lower bound epoch this entity has data for. Pruner updates this field, and uses + /// this field in tandem with `epoch_hi` for per-entity epoch-level retention. This is mostly + /// useful for pruning unpartitioned tables. + pub epoch_lo: i64, + /// Inclusive upper bound checkpoint this entity has data for. Committer updates this field. All + /// data of this entity in the checkpoint must be persisted before advancing this watermark. The + /// committer or ingestion task refers to this on disaster recovery. + pub checkpoint_hi: i64, + /// Inclusive high watermark that the committer advances. For `checkpoints`, this represents the + /// checkpoint sequence number, for `transactions`, the transaction sequence number, etc. + pub reader_hi: i64, + /// Inclusive low watermark that the pruner advances. Data before this watermark is considered + /// pruned by a reader. The underlying data may still exist in the db instance. + pub reader_lo: i64, + /// Updated using the database's current timestamp when the pruner sees that some data needs to + /// be dropped. The pruner uses this column to determine whether to prune or wait long enough + /// that all in-flight reads complete or timeout before it acts on an updated watermark. + pub timestamp_ms: i64, + /// Column used by the pruner to track its true progress. Data at and below this watermark has + /// been truly pruned from the db, and should no longer exist. When recovering from a crash, the + /// pruner will consult this column to determine where to continue. + pub pruned_lo: Option, +} + +impl StoredWatermark { + pub fn from_upper_bound_update( + entity: &str, + epoch_hi: u64, + checkpoint_hi: u64, + reader_hi: u64, + ) -> Self { + StoredWatermark { + entity: entity.to_string(), + epoch_hi: epoch_hi as i64, + checkpoint_hi: checkpoint_hi as i64, + reader_hi: reader_hi as i64, + ..StoredWatermark::default() + } + } +} diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index dfc12173279e6..5443ff4221261 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -368,6 +368,19 @@ diesel::table! { } } +diesel::table! { + watermarks (entity) { + entity -> Text, + epoch_hi -> Int8, + epoch_lo -> Int8, + checkpoint_hi -> Int8, + reader_hi -> Int8, + reader_lo -> Int8, + timestamp_ms -> Int8, + pruned_lo -> Nullable, + } +} + diesel::allow_tables_to_appear_in_same_query!( chain_identifier, checkpoints, @@ -403,4 +416,5 @@ diesel::allow_tables_to_appear_in_same_query!( tx_kinds, tx_recipients, tx_senders, + watermarks, ); diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index da4b86ba81016..3b072fd508233 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -6,6 +6,7 @@ use std::collections::BTreeMap; use async_trait::async_trait; use crate::errors::IndexerError; +use crate::handlers::pruner::PrunableTable; use crate::handlers::{EpochToCommit, TransactionObjectChangesToCommit}; use crate::models::display::StoredDisplay; use crate::models::obj_indices::StoredObjectVersion; @@ -114,4 +115,12 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { &self, checkpoints: Vec, ) -> Result<(), IndexerError>; + + async fn update_watermarks_upper_bound( + &self, + tables: Vec, + epoch: u64, + cp: u64, + tx: u64, + ) -> Result<(), IndexerError>; } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 0c9438b01ced1..56dae6e7cb3bc 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -27,6 +27,7 @@ use sui_storage::object_store::util::put; use crate::config::UploadOptions; use crate::database::ConnectionPool; use crate::errors::{Context, IndexerError}; +use crate::handlers::pruner::PrunableTable; use crate::handlers::EpochToCommit; use crate::handlers::TransactionObjectChangesToCommit; use crate::metrics::IndexerMetrics; @@ -44,6 +45,7 @@ use crate::models::objects::{ }; use crate::models::packages::StoredPackage; use crate::models::transactions::StoredTransaction; +use crate::models::watermarks::StoredWatermark; use crate::schema::{ chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package, event_senders, event_struct_instantiation, event_struct_module, event_struct_name, @@ -51,7 +53,7 @@ use crate::schema::{ objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark, raw_checkpoints, transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, - tx_recipients, tx_senders, + tx_recipients, tx_senders, watermarks, }; use crate::store::transaction_with_retry; use crate::types::{EventIndex, IndexedDeletedObject, IndexedObject}; @@ -1454,6 +1456,63 @@ impl PgIndexerStore { .context("Failed to get network total transactions in epoch") .map(|v| v as u64) } + + async fn update_watermarks_upper_bound( + &self, + tables: Vec, + epoch: u64, + cp: u64, + tx: u64, + ) -> Result<(), IndexerError> { + use diesel_async::RunQueryDsl; + + let guard = self + .metrics + .checkpoint_db_commit_latency_watermarks + .start_timer(); + + transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { + let upper_bound_updates = tables + .into_iter() + .map(|table| { + let reader_upper_bound = table.map_to_reader_unit(cp, tx); + + StoredWatermark::from_upper_bound_update( + table.as_ref(), + epoch, + cp, + reader_upper_bound, + ) + }) + .collect::>(); + async { + diesel::insert_into(watermarks::table) + .values(upper_bound_updates) + .on_conflict(watermarks::entity) + .do_update() + .set(( + watermarks::epoch_hi.eq(excluded(watermarks::epoch_hi)), + watermarks::checkpoint_hi.eq(excluded(watermarks::checkpoint_hi)), + watermarks::reader_hi.eq(excluded(watermarks::reader_hi)), + )) + .execute(conn) + .await + .map_err(IndexerError::from) + .context("Failed to update watermarks upper bound")?; + + Ok::<(), IndexerError>(()) + } + .scope_boxed() + }) + .await + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted watermarks"); + }) + .tap_err(|e| { + tracing::error!("Failed to persist watermarks with error: {}", e); + }) + } } #[async_trait] @@ -2110,6 +2169,17 @@ impl IndexerStore for PgIndexerStore { ) -> Result<(), IndexerError> { self.persist_raw_checkpoints_impl(&checkpoints).await } + + async fn update_watermarks_upper_bound( + &self, + tables: Vec, + epoch: u64, + cp: u64, + tx: u64, + ) -> Result<(), IndexerError> { + self.update_watermarks_upper_bound(tables, epoch, cp, tx) + .await + } } fn make_objects_history_to_commit(