Skip to content

Commit

Permalink
Committer pushes one update through store
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Oct 4, 2024
1 parent da281ff commit 3b48da6
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 3 deletions.
6 changes: 6 additions & 0 deletions crates/sui-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,9 @@ Note that you need an existing database for this to work. Using the DATABASE_URL
# Change the RPC_CLIENT_URL to http://0.0.0.0:9000 to run indexer against local validator & fullnode
cargo run --bin sui-indexer --features mysql-feature --no-default-features -- --db-url "<DATABASE_URL>" --rpc-client-url "https://fullnode.devnet.sui.io:443" --fullnode-sync-worker --reset-db
```

### Extending the indexer

To add a new table, run `diesel migration generate your_table_name`, and modify the newly created `up.sql` and `down.sql` files.

You would apply the migration with `diesel migration run`, and run the script in `./scripts/generate_indexer_schema.sh` to update the `schema.rs` file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS watermarks;
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE TABLE watermarks
(
-- The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
entity TEXT NOT NULL,
-- Inclusive upper bound epoch this entity has data for. Committer updates this field. Pruner
-- uses this field for per-entity epoch-level retention, and is mostly useful for pruning
-- unpartitioned tables.
epoch_hi BIGINT NOT NULL,
-- Inclusive lower bound epoch this entity has data for. Pruner updates this field, and uses
-- this field in tandem with `epoch_hi` for per-entity epoch-level retention. This is mostly
-- useful for pruning unpartitioned tables.
epoch_lo BIGINT NOT NULL,
-- Inclusive upper bound checkpoint this entity has data for. Committer updates this field. All
-- data of this entity in the checkpoint must be persisted before advancing this watermark. The
-- committer or ingestion task refers to this on disaster recovery.
checkpoint_hi BIGINT NOT NULL,
-- Inclusive high watermark that the committer advances. For `checkpoints`, this represents the
-- checkpoint sequence number, for `transactions`, the transaction sequence number, etc.
reader_hi BIGINT NOT NULL,
-- Inclusive low watermark that the pruner advances. Data before this watermark is considered
-- pruned by a reader. The underlying data may still exist in the db instance.
reader_lo BIGINT NOT NULL,
-- Updated using the database's current timestamp when the pruner sees that some data needs to
-- be dropped. The pruner uses this column to determine whether to prune or wait long enough
-- that all in-flight reads complete or timeout before it acts on an updated watermark.
timestamp_ms BIGINT NOT NULL,
-- Column used by the pruner to track its true progress. Data at and below this watermark has
-- been truly pruned from the db, and should no longer exist. When recovering from a crash, the
-- pruner will consult this column to determine where to continue.
pruned_lo BIGINT,
PRIMARY KEY (entity)
);
36 changes: 34 additions & 2 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use tracing::{error, info};

use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::handlers::pruner::PrunableTable;
use crate::metrics::IndexerMetrics;
use crate::store::IndexerStore;
use crate::types::IndexerResult;
use strum::IntoEnumIterator;

use super::{CheckpointDataToCommit, EpochToCommit};

Expand Down Expand Up @@ -57,6 +59,8 @@ where
batch.push(checkpoint);
next_checkpoint_sequence_number += 1;
let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch.epoch);
// The batch will consist of contiguous checkpoints and at most one epoch boundary at
// the end.
if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
commit_checkpoints(&state, batch, epoch, &metrics).await;
batch = vec![];
Expand All @@ -79,6 +83,10 @@ where
Ok(())
}

/// Writes indexed checkpoint data to the database, and then update watermark upper bounds and
/// metrics. Expects `indexed_checkpoint_batch` to be non-empty, and contain contiguous checkpoints.
/// There can be at most one epoch boundary at the end. If an epoch boundary is detected,
/// epoch-partitioned tables must be advanced.
// Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty
#[instrument(skip_all, fields(
first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
Expand Down Expand Up @@ -130,7 +138,14 @@ async fn commit_checkpoints<S>(
}

let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
let (epoch_id, last_checkpoint_seq, last_tx_seq) = {
let checkpoint = checkpoint_batch.last().unwrap();
(
checkpoint.epoch,
checkpoint.sequence_number,
checkpoint.max_tx_sequence_number,
)
};

let guard = metrics.checkpoint_db_commit_latency.start_timer();
let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
Expand Down Expand Up @@ -189,7 +204,8 @@ async fn commit_checkpoints<S>(

let is_epoch_end = epoch.is_some();

// handle partitioning on epoch boundary
// On epoch boundary, we need to modify the existing partitions' upper bound, and introduce a
// new partition for incoming data for the upcoming epoch.
if let Some(epoch_data) = epoch {
state
.advance_epoch(epoch_data)
Expand All @@ -212,6 +228,22 @@ async fn commit_checkpoints<S>(
})
.expect("Persisting data into DB should not fail.");

state
.update_watermarks_upper_bound(
PrunableTable::iter().collect(),
epoch_id,
last_checkpoint_seq,
last_tx_seq,
)
.await
.tap_err(|e| {
error!(
"Failed to update watermark upper bound with error: {}",
e.to_string()
);
})
.expect("Updating watermark upper bound in DB should not fail.");

if is_epoch_end {
// The epoch has advanced so we update the configs for the new protocol version, if it has changed.
let chain_id = state
Expand Down
13 changes: 13 additions & 0 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ pub enum PrunableTable {
PrunerCpWatermark,
}

impl PrunableTable {
/// Given a committer's report of the latest written checkpoint and tx, return the value that
/// corresponds to the variant's unit to be used by readers.
pub fn map_to_reader_unit(&self, cp: u64, tx: u64) -> u64 {
match self {
PrunableTable::ObjectsHistory
| PrunableTable::Checkpoints
| PrunableTable::PrunerCpWatermark => cp,
_ => tx,
}
}
}

impl Pruner {
/// Instantiates a pruner with default retention and overrides. Pruner will finalize the
/// retention policies so there is a value for every prunable table.
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl Indexer {

let mut exit_senders = vec![];
let mut executors = vec![];
// Ingestion task watermarks are snapshotted once on indexer startup based on the
// corresponding watermark table before being handed off to the ingestion task.
let progress_store = ShimIndexerProgressStore::new(vec![
("primary".to_string(), primary_watermark),
("object_snapshot".to_string(), object_snapshot_watermark),
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub struct IndexerMetrics {
pub checkpoint_db_commit_latency_tx_indices_chunks: Histogram,
pub checkpoint_db_commit_latency_checkpoints: Histogram,
pub checkpoint_db_commit_latency_epoch: Histogram,
pub checkpoint_db_commit_latency_watermarks: Histogram,
pub thousand_transaction_avg_db_commit_latency: Histogram,
pub object_db_commit_latency: Histogram,
pub object_mutation_db_commit_latency: Histogram,
Expand Down Expand Up @@ -536,6 +537,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
checkpoint_db_commit_latency_watermarks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_watermarks",
"Time spent committing watermarks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
tokio_blocking_task_wait_latency: register_histogram_with_registry!(
"tokio_blocking_task_wait_latency",
"Time spent to wait for tokio blocking task pool",
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ pub mod packages;
pub mod raw_checkpoints;
pub mod transactions;
pub mod tx_indices;
pub mod watermarks;
56 changes: 56 additions & 0 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::watermarks::{self};
use diesel::prelude::*;

/// Represents a row in the `watermarks` table.
#[derive(Queryable, Insertable, Default, QueryableByName)]
#[diesel(table_name = watermarks, primary_key(entity))]
pub struct StoredWatermark {
/// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
pub entity: String,
/// Inclusive upper bound epoch this entity has data for. Committer updates this field. Pruner
/// uses this field for per-entity epoch-level retention, and is mostly useful for pruning
/// unpartitioned tables.
pub epoch_hi: i64,
/// Inclusive lower bound epoch this entity has data for. Pruner updates this field, and uses
/// this field in tandem with `epoch_hi` for per-entity epoch-level retention. This is mostly
/// useful for pruning unpartitioned tables.
pub epoch_lo: i64,
/// Inclusive upper bound checkpoint this entity has data for. Committer updates this field. All
/// data of this entity in the checkpoint must be persisted before advancing this watermark. The
/// committer or ingestion task refers to this on disaster recovery.
pub checkpoint_hi: i64,
/// Inclusive high watermark that the committer advances. For `checkpoints`, this represents the
/// checkpoint sequence number, for `transactions`, the transaction sequence number, etc.
pub reader_hi: i64,
/// Inclusive low watermark that the pruner advances. Data before this watermark is considered
/// pruned by a reader. The underlying data may still exist in the db instance.
pub reader_lo: i64,
/// Updated using the database's current timestamp when the pruner sees that some data needs to
/// be dropped. The pruner uses this column to determine whether to prune or wait long enough
/// that all in-flight reads complete or timeout before it acts on an updated watermark.
pub timestamp_ms: i64,
/// Column used by the pruner to track its true progress. Data at and below this watermark has
/// been truly pruned from the db, and should no longer exist. When recovering from a crash, the
/// pruner will consult this column to determine where to continue.
pub pruned_lo: Option<i64>,
}

impl StoredWatermark {
pub fn from_upper_bound_update(
entity: &str,
epoch_hi: u64,
checkpoint_hi: u64,
reader_hi: u64,
) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_hi: epoch_hi as i64,
checkpoint_hi: checkpoint_hi as i64,
reader_hi: reader_hi as i64,
..StoredWatermark::default()
}
}
}
14 changes: 14 additions & 0 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,19 @@ diesel::table! {
}
}

diesel::table! {
watermarks (entity) {
entity -> Text,
epoch_hi -> Int8,
epoch_lo -> Int8,
checkpoint_hi -> Int8,
reader_hi -> Int8,
reader_lo -> Int8,
timestamp_ms -> Int8,
pruned_lo -> Nullable<Int8>,
}
}

diesel::allow_tables_to_appear_in_same_query!(
chain_identifier,
checkpoints,
Expand Down Expand Up @@ -403,4 +416,5 @@ diesel::allow_tables_to_appear_in_same_query!(
tx_kinds,
tx_recipients,
tx_senders,
watermarks,
);
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::BTreeMap;
use async_trait::async_trait;

use crate::errors::IndexerError;
use crate::handlers::pruner::PrunableTable;
use crate::handlers::{EpochToCommit, TransactionObjectChangesToCommit};
use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
Expand Down Expand Up @@ -114,4 +115,12 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
&self,
checkpoints: Vec<StoredRawCheckpoint>,
) -> Result<(), IndexerError>;

async fn update_watermarks_upper_bound(
&self,
tables: Vec<PrunableTable>,
epoch: u64,
cp: u64,
tx: u64,
) -> Result<(), IndexerError>;
}
72 changes: 71 additions & 1 deletion crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use sui_storage::object_store::util::put;
use crate::config::UploadOptions;
use crate::database::ConnectionPool;
use crate::errors::{Context, IndexerError};
use crate::handlers::pruner::PrunableTable;
use crate::handlers::EpochToCommit;
use crate::handlers::TransactionObjectChangesToCommit;
use crate::metrics::IndexerMetrics;
Expand All @@ -44,14 +45,15 @@ use crate::models::objects::{
};
use crate::models::packages::StoredPackage;
use crate::models::transactions::StoredTransaction;
use crate::models::watermarks::StoredWatermark;
use crate::schema::{
chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
event_struct_package, events, feature_flags, full_objects_history, objects, objects_history,
objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark,
raw_checkpoints, transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun,
tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds,
tx_recipients, tx_senders,
tx_recipients, tx_senders, watermarks,
};
use crate::store::transaction_with_retry;
use crate::types::{EventIndex, IndexedDeletedObject, IndexedObject};
Expand Down Expand Up @@ -1454,6 +1456,63 @@ impl PgIndexerStore {
.context("Failed to get network total transactions in epoch")
.map(|v| v as u64)
}

async fn update_watermarks_upper_bound(
&self,
tables: Vec<PrunableTable>,
epoch: u64,
cp: u64,
tx: u64,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

let guard = self
.metrics
.checkpoint_db_commit_latency_watermarks
.start_timer();

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
let upper_bound_updates = tables
.into_iter()
.map(|table| {
let reader_upper_bound = table.map_to_reader_unit(cp, tx);

StoredWatermark::from_upper_bound_update(
table.as_ref(),
epoch,
cp,
reader_upper_bound,
)
})
.collect::<Vec<_>>();
async {
diesel::insert_into(watermarks::table)
.values(upper_bound_updates)
.on_conflict(watermarks::entity)
.do_update()
.set((
watermarks::epoch_hi.eq(excluded(watermarks::epoch_hi)),
watermarks::checkpoint_hi.eq(excluded(watermarks::checkpoint_hi)),
watermarks::reader_hi.eq(excluded(watermarks::reader_hi)),
))
.execute(conn)
.await
.map_err(IndexerError::from)
.context("Failed to update watermarks upper bound")?;

Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted watermarks");
})
.tap_err(|e| {
tracing::error!("Failed to persist watermarks with error: {}", e);
})
}
}

#[async_trait]
Expand Down Expand Up @@ -2110,6 +2169,17 @@ impl IndexerStore for PgIndexerStore {
) -> Result<(), IndexerError> {
self.persist_raw_checkpoints_impl(&checkpoints).await
}

async fn update_watermarks_upper_bound(
&self,
tables: Vec<PrunableTable>,
epoch: u64,
cp: u64,
tx: u64,
) -> Result<(), IndexerError> {
self.update_watermarks_upper_bound(tables, epoch, cp, tx)
.await
}
}

fn make_objects_history_to_commit(
Expand Down

0 comments on commit 3b48da6

Please sign in to comment.