diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f9b08b2e21ccf..66ff544a9fa82 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -166,6 +166,9 @@ jobs: - name: Indexer Alt schema run: | ./crates/sui-indexer-alt/generate_schema.sh + - name: Indexer Alt Framework schema + run: | + ./crates/sui-indexer-alt-framework/generate_schema.sh # Ensure there are no uncommitted changes in the repo after running tests - run: scripts/changed-files.sh shell: bash diff --git a/Cargo.lock b/Cargo.lock index a9dac90a8351a..86ec6811a157f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14093,30 +14093,55 @@ dependencies = [ [[package]] name = "sui-indexer-alt" version = "1.39.0" +dependencies = [ + "anyhow", + "async-trait", + "bcs", + "clap", + "diesel", + "diesel-async", + "diesel_migrations", + "futures", + "itertools 0.13.0", + "rand 0.8.5", + "serde", + "sui-default-config", + "sui-field-count", + "sui-indexer-alt-framework", + "sui-protocol-config", + "sui-synthetic-ingestion", + "sui-types", + "telemetry-subscribers", + "tempfile", + "tokio", + "tokio-util 0.7.10", + "toml 0.7.4", + "tracing", + "wiremock", +] + +[[package]] +name = "sui-indexer-alt-framework" +version = "1.39.0" dependencies = [ "anyhow", "async-trait", "axum 0.7.5", "backoff", "bb8", - "bcs", "chrono", "clap", "diesel", "diesel-async", "diesel_migrations", "futures", - "itertools 0.13.0", "prometheus", "rand 0.8.5", "reqwest 0.12.5", "serde", - "sui-default-config", "sui-field-count", "sui-pg-temp-db", - "sui-protocol-config", "sui-storage", - "sui-synthetic-ingestion", "sui-types", "telemetry-subscribers", "tempfile", @@ -14124,7 +14149,6 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util 0.7.10", - "toml 0.7.4", "tracing", "url", "wiremock", diff --git a/Cargo.toml b/Cargo.toml index ffb5f2589ae8f..f41c0ddf85be2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ members = [ "crates/sui-graphql-rpc-headers", "crates/sui-indexer", "crates/sui-indexer-alt", + "crates/sui-indexer-alt-framework", "crates/sui-indexer-builder", "crates/sui-json", "crates/sui-json-rpc", @@ -646,6 +647,7 @@ sui-graphql-rpc-client = { path = "crates/sui-graphql-rpc-client" } sui-graphql-rpc-headers = { path = "crates/sui-graphql-rpc-headers" } sui-genesis-builder = { path = "crates/sui-genesis-builder" } sui-indexer = { path = "crates/sui-indexer" } +sui-indexer-alt-framework = { path = "crates/sui-indexer-alt-framework" } sui-indexer-builder = { path = "crates/sui-indexer-builder" } sui-json = { path = "crates/sui-json" } sui-json-rpc = { path = "crates/sui-json-rpc" } diff --git a/crates/sui-indexer-alt-framework/Cargo.toml b/crates/sui-indexer-alt-framework/Cargo.toml new file mode 100644 index 0000000000000..1d4fe70045dba --- /dev/null +++ b/crates/sui-indexer-alt-framework/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "sui-indexer-alt-framework" +version.workspace = true +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false +edition = "2021" + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +axum.workspace = true +backoff.workspace = true +bb8 = "0.8.5" +chrono.workspace = true +clap.workspace = true +diesel = { workspace = true, features = ["chrono"] } +diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] } +diesel_migrations.workspace = true +futures.workspace = true +prometheus.workspace = true +reqwest.workspace = true +serde.workspace = true +thiserror.workspace = true +tokio.workspace = true +tokio-stream.workspace = true +tokio-util.workspace = true +tracing.workspace = true +url.workspace = true + +sui-field-count.workspace = true +sui-storage.workspace = true +sui-types.workspace = true + +[dev-dependencies] +rand.workspace = true +telemetry-subscribers.workspace = true +tempfile.workspace = true +wiremock.workspace = true + +sui-pg-temp-db.workspace = true diff --git a/crates/sui-indexer-alt-framework/diesel.toml b/crates/sui-indexer-alt-framework/diesel.toml new file mode 100644 index 0000000000000..054029ff39a8a --- /dev/null +++ b/crates/sui-indexer-alt-framework/diesel.toml @@ -0,0 +1,6 @@ +[print_schema] +file = "src/schema.rs" +patch_file = "schema.patch" + +[migrations_directory] +dir = "migrations" diff --git a/crates/sui-indexer-alt-framework/generate_schema.sh b/crates/sui-indexer-alt-framework/generate_schema.sh new file mode 100755 index 0000000000000..718b67f6f354f --- /dev/null +++ b/crates/sui-indexer-alt-framework/generate_schema.sh @@ -0,0 +1,77 @@ +#!/bin/bash +# Copyright (c) Mysten Labs, Inc. +# SPDX-License-Identifier: Apache-2.0 +# +# Update sui-indexer's generated src/schema.rs based on the schema after +# running all its migrations on a clean database. Expects the first argument to +# be a port to run the temporary database on (defaults to 5433). + +set -x +set -e + +if ! command -v git &> /dev/null; then + echo "Please install git: e.g. brew install git" >&2 + exit 1 +fi + +for PG in psql initdb postgres pg_isready pg_ctl; do + if ! command -v $PG &> /dev/null; then + echo "Could not find $PG. Please install postgres: e.g. brew install postgresql@15" >&2 + exit 1 + fi +done + +if ! command -v diesel &> /dev/null; then + echo "Please install diesel: e.g. cargo install diesel_cli --features postgres" >&2 + exit 1 +fi + +REPO=$(git rev-parse --show-toplevel) + +# Create a temporary directory to store the ephemeral DB. +TMP=$(mktemp -d) + +# Set-up a trap to clean everything up on EXIT (stop DB, delete temp directory) +function cleanup { + pg_ctl stop -D "$TMP" -mfast + set +x + echo "Postgres STDOUT:" + cat "$TMP/db.stdout" + echo "Postgres STDERR:" + cat "$TMP/db.stderr" + set -x + rm -rf "$TMP" +} +trap cleanup EXIT + +# Create a new database in the temporary directory +initdb -D "$TMP" --user postgres + +# Run the DB in the background, on the port provided and capture its output +PORT=${1:-5433} +postgres -D "$TMP" -p "$PORT" -c unix_socket_directories= \ + > "$TMP/db.stdout" \ + 2> "$TMP/db.stderr" & + +# Wait for postgres to report as ready +RETRIES=0 +while ! pg_isready -p "$PORT" --host "localhost" --username "postgres"; do + if [ $RETRIES -gt 5 ]; then + echo "Postgres failed to start" >&2 + exit 1 + fi + sleep 1 + RETRIES=$((RETRIES + 1)) +done + +# Run all migrations on the new database +diesel migration run \ + --database-url "postgres://postgres:postgrespw@localhost:$PORT" \ + --migration-dir "$REPO/crates/sui-indexer-alt-framework/migrations" + +# Generate the schema.rs file, excluding partition tables and including the +# copyright notice. +diesel print-schema \ + --database-url "postgres://postgres:postgrespw@localhost:$PORT" \ + --patch-file "$REPO/crates/sui-indexer-alt-framework/schema.patch" \ + > "$REPO/crates/sui-indexer-alt-framework/src/schema.rs" diff --git a/crates/sui-indexer-alt-framework/migrations/00000000000000_diesel_initial_setup/down.sql b/crates/sui-indexer-alt-framework/migrations/00000000000000_diesel_initial_setup/down.sql new file mode 100644 index 0000000000000..a9f526091194b --- /dev/null +++ b/crates/sui-indexer-alt-framework/migrations/00000000000000_diesel_initial_setup/down.sql @@ -0,0 +1,6 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + +DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass); +DROP FUNCTION IF EXISTS diesel_set_updated_at(); diff --git a/crates/sui-indexer-alt-framework/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-indexer-alt-framework/migrations/00000000000000_diesel_initial_setup/up.sql new file mode 100644 index 0000000000000..d68895b1a7b7d --- /dev/null +++ b/crates/sui-indexer-alt-framework/migrations/00000000000000_diesel_initial_setup/up.sql @@ -0,0 +1,36 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + + + + +-- Sets up a trigger for the given table to automatically set a column called +-- `updated_at` whenever the row is modified (unless `updated_at` was included +-- in the modified columns) +-- +-- # Example +-- +-- ```sql +-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW()); +-- +-- SELECT diesel_manage_updated_at('users'); +-- ``` +CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$ +BEGIN + EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s + FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$ +BEGIN + IF ( + NEW IS DISTINCT FROM OLD AND + NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at + ) THEN + NEW.updated_at := current_timestamp; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/crates/sui-indexer-alt/migrations/2024-10-16-225607_watermarks/down.sql b/crates/sui-indexer-alt-framework/migrations/2024-10-16-225607_watermarks/down.sql similarity index 100% rename from crates/sui-indexer-alt/migrations/2024-10-16-225607_watermarks/down.sql rename to crates/sui-indexer-alt-framework/migrations/2024-10-16-225607_watermarks/down.sql diff --git a/crates/sui-indexer-alt/migrations/2024-10-16-225607_watermarks/up.sql b/crates/sui-indexer-alt-framework/migrations/2024-10-16-225607_watermarks/up.sql similarity index 100% rename from crates/sui-indexer-alt/migrations/2024-10-16-225607_watermarks/up.sql rename to crates/sui-indexer-alt-framework/migrations/2024-10-16-225607_watermarks/up.sql diff --git a/crates/sui-indexer-alt-framework/schema.patch b/crates/sui-indexer-alt-framework/schema.patch new file mode 100644 index 0000000000000..fc16ecc785f2c --- /dev/null +++ b/crates/sui-indexer-alt-framework/schema.patch @@ -0,0 +1,7 @@ +diff --git a/crates/sui-indexer-alt-framework/src/schema.rs b/crates/sui-indexer-alt-framework/src/schema.rs +--- a/crates/sui-indexer-alt-framework/src/schema.rs ++++ b/crates/sui-indexer-alt-framework/src/schema.rs +@@ -1 +1,3 @@ ++// Copyright (c) Mysten Labs, Inc. ++// SPDX-License-Identifier: Apache-2.0 + // @generated automatically by Diesel CLI. diff --git a/crates/sui-indexer-alt/src/db.rs b/crates/sui-indexer-alt-framework/src/db.rs similarity index 82% rename from crates/sui-indexer-alt/src/db.rs rename to crates/sui-indexer-alt-framework/src/db.rs index b5cda7896516f..0f04613787a30 100644 --- a/crates/sui-indexer-alt/src/db.rs +++ b/crates/sui-indexer-alt-framework/src/db.rs @@ -2,7 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::anyhow; -use diesel::migration::MigrationVersion; +use diesel::migration::{self, Migration, MigrationSource, MigrationVersion}; +use diesel::pg::Pg; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::{ pooled_connection::{ @@ -16,6 +17,8 @@ use std::time::Duration; use tracing::info; use url::Url; +/// Migrations for schema that the indexer framework needs, regardless of the specific data being +/// indexed. const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); #[derive(clap::Args, Debug, Clone)] @@ -62,8 +65,8 @@ impl Db { } /// Retrieves a connection from the pool. Can fail with a timeout if a connection cannot be - /// established before the [DbConfig::connection_timeout] has elapsed. - pub(crate) async fn connect(&self) -> Result, RunError> { + /// established before the [DbArgs::connection_timeout] has elapsed. + pub async fn connect(&self) -> Result, RunError> { self.pool.get().await } @@ -123,11 +126,27 @@ impl Db { Ok(()) } + /// Run migrations on the database. Use the `migrations` parameter to pass in the migrations + /// that are specific to the indexer being run. Migrations that the indexer framework needs + /// will be added automatically. + /// + /// Use Diesel's `embed_migrations!` macro to generate the `migrations` parameter for your + /// indexer. pub(crate) async fn run_migrations( &self, + migrations: &'static EmbeddedMigrations, ) -> Result>, anyhow::Error> { use diesel_migrations::MigrationHarness; + struct WithFrameworkMigrations(&'static EmbeddedMigrations); + impl MigrationSource for WithFrameworkMigrations { + fn migrations(&self) -> migration::Result>>> { + let mut migrations = self.0.migrations()?; + migrations.extend(MIGRATIONS.migrations()?); + Ok(migrations) + } + } + info!("Running migrations ..."); let conn = self.pool.dedicated_connection().await?; let mut wrapper: AsyncConnectionWrapper = @@ -135,7 +154,7 @@ impl Db { let finished_migrations = tokio::task::spawn_blocking(move || { wrapper - .run_pending_migrations(MIGRATIONS) + .run_pending_migrations(WithFrameworkMigrations(migrations)) .map(|versions| versions.iter().map(MigrationVersion::as_owned).collect()) }) .await? @@ -158,13 +177,18 @@ impl Default for DbArgs { } } -/// Drop all tables and rerunning migrations. -pub async fn reset_database(db_config: DbArgs, skip_migrations: bool) -> Result<(), anyhow::Error> { +/// Drop all tables, and re-run migrations if supplied. +pub async fn reset_database( + db_config: DbArgs, + migrations: Option<&'static EmbeddedMigrations>, +) -> Result<(), anyhow::Error> { let db = Db::new(db_config).await?; db.clear_database().await?; - if !skip_migrations { - db.run_migrations().await?; + + if let Some(migrations) = migrations { + db.run_migrations(migrations).await?; } + Ok(()) } @@ -230,7 +254,7 @@ mod tests { .unwrap(); assert_eq!(cnt.cnt, 1); - reset_database(db_args, true).await.unwrap(); + reset_database(db_args, None).await.unwrap(); let mut conn = db.connect().await.unwrap(); let cnt = diesel::sql_query( diff --git a/crates/sui-indexer-alt/src/ingestion/broadcaster.rs b/crates/sui-indexer-alt-framework/src/ingestion/broadcaster.rs similarity index 100% rename from crates/sui-indexer-alt/src/ingestion/broadcaster.rs rename to crates/sui-indexer-alt-framework/src/ingestion/broadcaster.rs diff --git a/crates/sui-indexer-alt/src/ingestion/client.rs b/crates/sui-indexer-alt-framework/src/ingestion/client.rs similarity index 100% rename from crates/sui-indexer-alt/src/ingestion/client.rs rename to crates/sui-indexer-alt-framework/src/ingestion/client.rs diff --git a/crates/sui-indexer-alt/src/ingestion/error.rs b/crates/sui-indexer-alt-framework/src/ingestion/error.rs similarity index 100% rename from crates/sui-indexer-alt/src/ingestion/error.rs rename to crates/sui-indexer-alt-framework/src/ingestion/error.rs diff --git a/crates/sui-indexer-alt/src/ingestion/local_client.rs b/crates/sui-indexer-alt-framework/src/ingestion/local_client.rs similarity index 100% rename from crates/sui-indexer-alt/src/ingestion/local_client.rs rename to crates/sui-indexer-alt-framework/src/ingestion/local_client.rs diff --git a/crates/sui-indexer-alt/src/ingestion/mod.rs b/crates/sui-indexer-alt-framework/src/ingestion/mod.rs similarity index 97% rename from crates/sui-indexer-alt/src/ingestion/mod.rs rename to crates/sui-indexer-alt-framework/src/ingestion/mod.rs index 88818a25214d5..036c3ed0185e9 100644 --- a/crates/sui-indexer-alt/src/ingestion/mod.rs +++ b/crates/sui-indexer-alt-framework/src/ingestion/mod.rs @@ -53,7 +53,7 @@ pub struct IngestionConfig { pub retry_interval_ms: u64, } -pub struct IngestionService { +pub(crate) struct IngestionService { config: IngestionConfig, client: IngestionClient, ingest_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>, @@ -69,7 +69,9 @@ impl IngestionConfig { } impl IngestionService { - pub fn new( + /// TODO: If we want to expose this as part of the framework, so people can run just an + /// ingestion service, we will need to split `IngestionMetrics` out from `IndexerMetrics`. + pub(crate) fn new( args: ClientArgs, config: IngestionConfig, metrics: Arc, @@ -97,7 +99,7 @@ impl IngestionService { } /// The client this service uses to fetch checkpoints. - pub fn client(&self) -> &IngestionClient { + pub(crate) fn client(&self) -> &IngestionClient { &self.client } @@ -110,7 +112,7 @@ impl IngestionService { /// run ahead of the watermark by more than the config's buffer_size. /// /// Returns the channel to receive checkpoints from and the channel to accept watermarks from. - pub fn subscribe( + pub(crate) fn subscribe( &mut self, ) -> ( mpsc::Receiver>, @@ -135,7 +137,7 @@ impl IngestionService { /// If ingestion reaches the leading edge of the network, it will encounter checkpoints that do /// not exist yet. These will be retried repeatedly on a fixed `retry_interval` until they /// become available. - pub async fn run(self, checkpoints: I) -> Result<(JoinHandle<()>, JoinHandle<()>)> + pub(crate) async fn run(self, checkpoints: I) -> Result<(JoinHandle<()>, JoinHandle<()>)> where I: IntoIterator + Send + Sync + 'static, I::IntoIter: Send + Sync + 'static, diff --git a/crates/sui-indexer-alt/src/ingestion/regulator.rs b/crates/sui-indexer-alt-framework/src/ingestion/regulator.rs similarity index 100% rename from crates/sui-indexer-alt/src/ingestion/regulator.rs rename to crates/sui-indexer-alt-framework/src/ingestion/regulator.rs diff --git a/crates/sui-indexer-alt/src/ingestion/remote_client.rs b/crates/sui-indexer-alt-framework/src/ingestion/remote_client.rs similarity index 100% rename from crates/sui-indexer-alt/src/ingestion/remote_client.rs rename to crates/sui-indexer-alt-framework/src/ingestion/remote_client.rs diff --git a/crates/sui-indexer-alt/src/ingestion/test_utils.rs b/crates/sui-indexer-alt-framework/src/ingestion/test_utils.rs similarity index 100% rename from crates/sui-indexer-alt/src/ingestion/test_utils.rs rename to crates/sui-indexer-alt-framework/src/ingestion/test_utils.rs diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs new file mode 100644 index 0000000000000..9ebe7ed77e47c --- /dev/null +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -0,0 +1,386 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; + +use anyhow::{ensure, Context, Result}; +use db::{Db, DbArgs}; +use diesel_migrations::EmbeddedMigrations; +use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService}; +use metrics::{IndexerMetrics, MetricsService}; +use pipeline::{ + concurrent::{self, ConcurrentConfig}, + sequential::{self, SequentialConfig}, + Processor, +}; +use task::graceful_shutdown; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; +use watermarks::CommitterWatermark; + +pub mod db; +pub mod ingestion; +pub(crate) mod metrics; +pub mod pipeline; +pub(crate) mod schema; +pub mod task; +pub(crate) mod watermarks; + +/// Command-line arguments for the indexer +#[derive(clap::Args, Debug, Clone)] +pub struct IndexerArgs { + /// Override for the checkpoint to start ingestion from -- useful for backfills. By default, + /// ingestion will start just after the lowest checkpoint watermark across all active + /// pipelines. + #[arg(long)] + pub first_checkpoint: Option, + + /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By + /// default, ingestion will not stop, and will continue to poll for new checkpoints. + #[arg(long)] + pub last_checkpoint: Option, + + /// Only run the following pipelines. If not provided, all pipelines found in the + /// configuration file will be run. + #[arg(long, action = clap::ArgAction::Append)] + pub pipeline: Vec, + + /// Don't write to the watermark tables for concurrent pipelines. + #[arg(long)] + pub skip_watermark: bool, + + /// Address to serve Prometheus Metrics from. + #[arg(long, default_value_t = Self::default().metrics_address)] + pub metrics_address: SocketAddr, +} + +pub struct Indexer { + /// Connection pool to the database. + db: Db, + + /// Prometheus Metrics. + metrics: Arc, + + /// Service for serving Prometheis metrics. + metrics_service: MetricsService, + + /// Service for downloading and disseminating checkpoint data. + ingestion_service: IngestionService, + + /// Optional override of the checkpoint lowerbound. + first_checkpoint: Option, + + /// Optional override of the checkpoint upperbound. + last_checkpoint: Option, + + /// Don't write to the watermark tables for concurrent pipelines. + skip_watermark: bool, + + /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will + /// run. Any pipelines that are present in this filter but not added to the indexer will yield + /// a warning when the indexer is run. + enabled_pipelines: Option>, + + /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline + /// with the same name isn't added twice. + added_pipelines: BTreeSet<&'static str>, + + /// Cancellation token shared among all continuous tasks in the service. + cancel: CancellationToken, + + /// The checkpoint lowerbound derived from watermarks of pipelines added to the indexer. When + /// the indexer runs, it will start from this point, unless this has been overridden by + /// [Self::first_checkpoint]. + first_checkpoint_from_watermark: u64, + + /// The handles for every task spawned by this indexer, used to manage graceful shutdown. + handles: Vec>, +} + +impl Indexer { + /// Create a new instance of the indexer framework. `db_args`, `indexer_args,`, `client_args`, + /// and `ingestion_config` contain configurations for the following, respectively: + /// + /// - Connecting to the database, + /// - What is indexed (which checkpoints, which pipelines, whether to update the watermarks + /// table) and where to serve metrics from, + /// - Where to download checkpoints from, + /// - Concurrency and buffering parameters for downloading checkpoints. + /// + /// `migrations` contains the SQL to run in order to bring the database schema up-to-date for + /// the specific instance of the indexer, generated using diesel's `embed_migrations!` macro. + /// These migrations will be run as part of initializing the indexer. + /// + /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline] + /// or [Self::sequential_pipeline], before the indexer is started using [Self::run]. + pub async fn new( + db_args: DbArgs, + indexer_args: IndexerArgs, + client_args: ClientArgs, + ingestion_config: IngestionConfig, + migrations: &'static EmbeddedMigrations, + cancel: CancellationToken, + ) -> Result { + let IndexerArgs { + first_checkpoint, + last_checkpoint, + pipeline, + skip_watermark, + metrics_address, + } = indexer_args; + + let db = Db::new(db_args) + .await + .context("Failed to connect to database")?; + + // At indexer initialization, we ensure that the DB schema is up-to-date. + db.run_migrations(migrations) + .await + .context("Failed to run pending migrations")?; + + let (metrics, metrics_service) = + MetricsService::new(metrics_address, db.clone(), cancel.clone())?; + + let ingestion_service = IngestionService::new( + client_args, + ingestion_config, + metrics.clone(), + cancel.clone(), + )?; + + Ok(Self { + db, + metrics, + metrics_service, + ingestion_service, + first_checkpoint, + last_checkpoint, + skip_watermark, + enabled_pipelines: if pipeline.is_empty() { + None + } else { + Some(pipeline.into_iter().collect()) + }, + added_pipelines: BTreeSet::new(), + cancel, + first_checkpoint_from_watermark: u64::MAX, + handles: vec![], + }) + } + + /// The database connection pool used by the indexer. + pub fn db(&self) -> &Db { + &self.db + } + + /// The ingestion client used by the indexer to fetch checkpoints. + pub fn ingestion_client(&self) -> &IngestionClient { + self.ingestion_service.client() + } + + /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started, + /// they will be idle until the ingestion service starts, and serves it checkpoint data. + /// + /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they + /// keep the watermark table up-to-date with the highest point they can guarantee all data + /// exists for, for their pipeline. + pub async fn concurrent_pipeline( + &mut self, + handler: H, + config: ConcurrentConfig, + ) -> Result<()> { + let Some(watermark) = self.add_pipeline::().await? else { + return Ok(()); + }; + + // For a concurrent pipeline, if skip_watermark is set, we don't really care about the + // watermark consistency. first_checkpoint can be anything since we don't update watermark, + // and writes should be idempotent. + if !self.skip_watermark { + self.check_first_checkpoint_consistency::(&watermark)?; + } + + self.handles.push(concurrent::pipeline( + handler, + watermark, + config, + self.skip_watermark, + self.db.clone(), + self.ingestion_service.subscribe().0, + self.metrics.clone(), + self.cancel.clone(), + )); + + Ok(()) + } + + /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started, + /// they will be idle until the ingestion service starts, and serves it checkpoint data. + /// + /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may + /// be required to handle pipelines that modify data in-place (where each update is not an + /// insert, but could be a modification of an existing row, where ordering between updates is + /// important). + /// + /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed + /// number of checkpoints (configured by `checkpoint_lag`). + pub async fn sequential_pipeline( + &mut self, + handler: H, + config: SequentialConfig, + ) -> Result<()> { + let Some(watermark) = self.add_pipeline::().await? else { + return Ok(()); + }; + + if self.skip_watermark { + warn!( + pipeline = H::NAME, + "--skip-watermarks enabled and ignored for sequential pipeline" + ); + } + + // For a sequential pipeline, data must be written in the order of checkpoints. + // Hence, we do not allow the first_checkpoint override to be in arbitrary positions. + self.check_first_checkpoint_consistency::(&watermark)?; + + let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe(); + + self.handles.push(sequential::pipeline( + handler, + watermark, + config, + self.db.clone(), + checkpoint_rx, + watermark_tx, + self.metrics.clone(), + self.cancel.clone(), + )); + + Ok(()) + } + + /// Checks that the first checkpoint override is consistent with the watermark for the pipeline. + /// If the watermark does not exist, the override can be anything. If the watermark exists, the + /// override must not leave any gap in the data: it can be in the past, or at the tip of the + /// network, but not in the future. + fn check_first_checkpoint_consistency( + &self, + watermark: &Option, + ) -> Result<()> { + if let (Some(watermark), Some(first_checkpoint)) = (watermark, self.first_checkpoint) { + ensure!( + first_checkpoint as i64 <= watermark.checkpoint_hi_inclusive + 1, + "For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. \ + This could create gaps in the data.", + P::NAME, + first_checkpoint, + watermark.checkpoint_hi_inclusive, + ); + } + + Ok(()) + } + + /// Start ingesting checkpoints. Ingestion either starts from the configured + /// `first_checkpoint`, or it is calculated based on the watermarks of all active pipelines. + /// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided, + /// or will continue until it tracks the tip of the network. + pub async fn run(mut self) -> Result> { + if let Some(enabled_pipelines) = self.enabled_pipelines { + ensure!( + enabled_pipelines.is_empty(), + "Tried to enable pipelines that this indexer does not know about: \ + {enabled_pipelines:#?}", + ); + } + + let metrics_handle = self + .metrics_service + .run() + .await + .context("Failed to start metrics service")?; + + // If an override has been provided, start ingestion from there, otherwise start ingestion + // from just after the lowest committer watermark across all enabled pipelines. + let first_checkpoint = self + .first_checkpoint + .unwrap_or(self.first_checkpoint_from_watermark); + + let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX); + + info!(first_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range"); + + let (regulator_handle, broadcaster_handle) = self + .ingestion_service + .run(first_checkpoint..=last_checkpoint) + .await + .context("Failed to start ingestion service")?; + + self.handles.push(regulator_handle); + self.handles.push(broadcaster_handle); + + let cancel = self.cancel.clone(); + Ok(tokio::spawn(async move { + // Wait for the ingestion service and all its related tasks to wind down gracefully: + // If ingestion has been configured to only handle a specific range of checkpoints, we + // want to make sure that tasks are allowed to run to completion before shutting them + // down. + graceful_shutdown(self.handles, self.cancel).await; + + info!("Indexing pipeline gracefully shut down"); + + // Pick off any stragglers (in this case, just the metrics service). + cancel.cancel(); + metrics_handle.await.unwrap(); + })) + } + + /// Update the indexer's first checkpoint based on the watermark for the pipeline by adding for + /// handler `H` (as long as it's enabled). Returns `Ok(None)` if the pipeline is disabled, + /// `Ok(Some(None))` if the pipeline is enabled but its watermark is not found, and + /// `Ok(Some(Some(watermark)))` if the pipeline is enabled and the watermark is found. + async fn add_pipeline( + &mut self, + ) -> Result>>> { + ensure!( + self.added_pipelines.insert(P::NAME), + "Pipeline {:?} already added", + P::NAME, + ); + + if let Some(enabled_pipelines) = &mut self.enabled_pipelines { + if !enabled_pipelines.remove(P::NAME) { + info!(pipeline = P::NAME, "Skipping"); + return Ok(None); + } + } + + let mut conn = self.db.connect().await.context("Failed DB connection")?; + + let watermark = CommitterWatermark::get(&mut conn, P::NAME) + .await + .with_context(|| format!("Failed to get watermark for {}", P::NAME))?; + + // TODO(amnn): Test this (depends on supporting migrations and tempdb). + self.first_checkpoint_from_watermark = watermark + .as_ref() + .map_or(0, |w| w.checkpoint_hi_inclusive as u64 + 1) + .min(self.first_checkpoint_from_watermark); + + Ok(Some(watermark)) + } +} + +impl Default for IndexerArgs { + fn default() -> Self { + Self { + first_checkpoint: None, + last_checkpoint: None, + pipeline: vec![], + skip_watermark: false, + metrics_address: "0.0.0.0:9184".parse().unwrap(), + } + } +} diff --git a/crates/sui-indexer-alt/src/metrics.rs b/crates/sui-indexer-alt-framework/src/metrics.rs similarity index 99% rename from crates/sui-indexer-alt/src/metrics.rs rename to crates/sui-indexer-alt-framework/src/metrics.rs index 3627524d73c12..820beaf527b8c 100644 --- a/crates/sui-indexer-alt/src/metrics.rs +++ b/crates/sui-indexer-alt-framework/src/metrics.rs @@ -49,14 +49,14 @@ const BATCH_SIZE_BUCKETS: &[f64] = &[ ]; /// Service to expose prometheus metrics from the indexer. -pub struct MetricsService { +pub(crate) struct MetricsService { addr: SocketAddr, registry: Registry, cancel: CancellationToken, } #[derive(Clone)] -pub struct IndexerMetrics { +pub(crate) struct IndexerMetrics { // Statistics related to fetching data from the remote store. pub total_ingested_checkpoints: IntCounter, pub total_ingested_transactions: IntCounter, @@ -131,7 +131,7 @@ impl MetricsService { /// Create a new metrics service, exposing Mysten-wide metrics, and Indexer-specific metrics. /// Returns the Indexer-specific metrics and the service itself (which must be run with /// [Self::run]). - pub fn new( + pub(crate) fn new( addr: SocketAddr, db: Db, cancel: CancellationToken, @@ -151,7 +151,7 @@ impl MetricsService { } /// Start the service. The service will run until the cancellation token is triggered. - pub async fn run(self) -> Result> { + pub(crate) async fn run(self) -> Result> { let listener = TcpListener::bind(&self.addr).await?; let app = Router::new() .route("/metrics", get(metrics)) @@ -171,7 +171,7 @@ impl MetricsService { } impl IndexerMetrics { - pub fn new(registry: &Registry) -> Self { + pub(crate) fn new(registry: &Registry) -> Self { Self { total_ingested_checkpoints: register_int_counter_with_registry!( "indexer_total_ingested_checkpoints", diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs similarity index 100% rename from crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs rename to crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/commit_watermark.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs similarity index 99% rename from crates/sui-indexer-alt/src/pipeline/concurrent/commit_watermark.rs rename to crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs index e4072197e4a62..d92ae3d6ba36c 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/commit_watermark.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs @@ -18,10 +18,10 @@ use tracing::{debug, error, info, warn}; use crate::{ db::Db, metrics::IndexerMetrics, - models::watermarks::CommitterWatermark, pipeline::{ CommitterConfig, WatermarkPart, LOUD_WATERMARK_UPDATE_INTERVAL, WARN_PENDING_WATERMARKS, }, + watermarks::CommitterWatermark, }; use super::Handler; diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs similarity index 100% rename from crates/sui-indexer-alt/src/pipeline/concurrent/committer.rs rename to crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs similarity index 99% rename from crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs rename to crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs index c0864d4718bc1..ce783e98c26c4 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken; use crate::{ db::{self, Db}, metrics::IndexerMetrics, - models::watermarks::CommitterWatermark, + watermarks::CommitterWatermark, }; use super::{processor::processor, CommitterConfig, Processor, WatermarkPart, PIPELINE_BUFFER}; diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/pruner.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs similarity index 98% rename from crates/sui-indexer-alt/src/pipeline/concurrent/pruner.rs rename to crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs index 8c2b7f9bbc601..aa2648654a2b7 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/pruner.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs @@ -11,8 +11,8 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - db::Db, metrics::IndexerMetrics, models::watermarks::PrunerWatermark, - pipeline::LOUD_WATERMARK_UPDATE_INTERVAL, + db::Db, metrics::IndexerMetrics, pipeline::LOUD_WATERMARK_UPDATE_INTERVAL, + watermarks::PrunerWatermark, }; use super::{Handler, PrunerConfig}; diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/reader_watermark.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/reader_watermark.rs similarity index 98% rename from crates/sui-indexer-alt/src/pipeline/concurrent/reader_watermark.rs rename to crates/sui-indexer-alt-framework/src/pipeline/concurrent/reader_watermark.rs index 0ef037fa6e478..03b18ee4ac352 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/reader_watermark.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/reader_watermark.rs @@ -10,7 +10,7 @@ use tracing::{debug, info, warn}; use crate::{ db::Db, metrics::IndexerMetrics, - models::watermarks::{ReaderWatermark, StoredWatermark}, + watermarks::{ReaderWatermark, StoredWatermark}, }; use super::{Handler, PrunerConfig}; diff --git a/crates/sui-indexer-alt/src/pipeline/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/mod.rs similarity index 98% rename from crates/sui-indexer-alt/src/pipeline/mod.rs rename to crates/sui-indexer-alt-framework/src/pipeline/mod.rs index b977f4ccff8db..026a803f4fb04 100644 --- a/crates/sui-indexer-alt/src/pipeline/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/mod.rs @@ -3,14 +3,14 @@ use std::time::Duration; -use crate::models::watermarks::CommitterWatermark; +use crate::watermarks::CommitterWatermark; pub use processor::Processor; use serde::{Deserialize, Serialize}; pub mod concurrent; mod processor; -pub(crate) mod sequential; +pub mod sequential; /// Tracing message for the watermark update will be logged at info level at least this many /// checkpoints. diff --git a/crates/sui-indexer-alt/src/pipeline/processor.rs b/crates/sui-indexer-alt-framework/src/pipeline/processor.rs similarity index 100% rename from crates/sui-indexer-alt/src/pipeline/processor.rs rename to crates/sui-indexer-alt-framework/src/pipeline/processor.rs diff --git a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs similarity index 99% rename from crates/sui-indexer-alt/src/pipeline/sequential/committer.rs rename to crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs index 116b8ef9edf7f..73e3a437c0cb9 100644 --- a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs @@ -15,8 +15,8 @@ use tracing::{debug, info, warn}; use crate::{ db::Db, metrics::IndexerMetrics, - models::watermarks::CommitterWatermark, pipeline::{Indexed, LOUD_WATERMARK_UPDATE_INTERVAL, WARN_PENDING_WATERMARKS}, + watermarks::CommitterWatermark, }; use super::{Handler, SequentialConfig}; diff --git a/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs similarity index 99% rename from crates/sui-indexer-alt/src/pipeline/sequential/mod.rs rename to crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs index 0a6fb15d93d95..5cac521e30260 100644 --- a/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/sequential/mod.rs @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken; use crate::{ db::{self, Db}, metrics::IndexerMetrics, - models::watermarks::CommitterWatermark, + watermarks::CommitterWatermark, }; use super::{processor::processor, CommitterConfig, Processor, PIPELINE_BUFFER}; diff --git a/crates/sui-indexer-alt-framework/src/schema.rs b/crates/sui-indexer-alt-framework/src/schema.rs new file mode 100644 index 0000000000000..baf83f70e41a9 --- /dev/null +++ b/crates/sui-indexer-alt-framework/src/schema.rs @@ -0,0 +1,16 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +// @generated automatically by Diesel CLI. + +diesel::table! { + watermarks (pipeline) { + pipeline -> Text, + epoch_hi_inclusive -> Int8, + checkpoint_hi_inclusive -> Int8, + tx_hi -> Int8, + timestamp_ms_hi_inclusive -> Int8, + reader_lo -> Int8, + pruner_timestamp -> Timestamp, + pruner_hi -> Int8, + } +} diff --git a/crates/sui-indexer-alt/src/task.rs b/crates/sui-indexer-alt-framework/src/task.rs similarity index 100% rename from crates/sui-indexer-alt/src/task.rs rename to crates/sui-indexer-alt-framework/src/task.rs diff --git a/crates/sui-indexer-alt/src/models/watermarks.rs b/crates/sui-indexer-alt-framework/src/watermarks.rs similarity index 89% rename from crates/sui-indexer-alt/src/models/watermarks.rs rename to crates/sui-indexer-alt-framework/src/watermarks.rs index b27359afef0da..1ba188b04e0ba 100644 --- a/crates/sui-indexer-alt/src/models/watermarks.rs +++ b/crates/sui-indexer-alt-framework/src/watermarks.rs @@ -12,7 +12,7 @@ use crate::{db::Connection, schema::watermarks}; #[derive(Insertable, Selectable, Queryable, Debug, Clone, FieldCount)] #[diesel(table_name = watermarks)] -pub struct StoredWatermark { +pub(crate) struct StoredWatermark { pub pipeline: String, pub epoch_hi_inclusive: i64, pub checkpoint_hi_inclusive: i64, @@ -26,7 +26,7 @@ pub struct StoredWatermark { /// Fields that the committer is responsible for setting. #[derive(AsChangeset, Selectable, Queryable, Debug, Clone, FieldCount)] #[diesel(table_name = watermarks)] -pub struct CommitterWatermark<'p> { +pub(crate) struct CommitterWatermark<'p> { pub pipeline: Cow<'p, str>, pub epoch_hi_inclusive: i64, pub checkpoint_hi_inclusive: i64, @@ -36,14 +36,14 @@ pub struct CommitterWatermark<'p> { #[derive(AsChangeset, Selectable, Queryable, Debug, Clone, FieldCount)] #[diesel(table_name = watermarks)] -pub struct ReaderWatermark<'p> { +pub(crate) struct ReaderWatermark<'p> { pub pipeline: Cow<'p, str>, pub reader_lo: i64, } #[derive(Queryable, Debug, Clone, FieldCount)] #[diesel(table_name = watermarks)] -pub struct PrunerWatermark<'p> { +pub(crate) struct PrunerWatermark<'p> { /// The pipeline in question pub pipeline: Cow<'p, str>, @@ -60,7 +60,7 @@ pub struct PrunerWatermark<'p> { } impl StoredWatermark { - pub async fn get( + pub(crate) async fn get( conn: &mut Connection<'_>, pipeline: &'static str, ) -> QueryResult> { @@ -75,7 +75,7 @@ impl StoredWatermark { impl CommitterWatermark<'static> { /// Get the current high watermark for the pipeline. - pub async fn get( + pub(crate) async fn get( conn: &mut Connection<'_>, pipeline: &'static str, ) -> QueryResult> { @@ -90,7 +90,7 @@ impl CommitterWatermark<'static> { impl<'p> CommitterWatermark<'p> { /// A new watermark with the given pipeline name indicating zero progress. - pub fn initial(pipeline: Cow<'p, str>) -> Self { + pub(crate) fn initial(pipeline: Cow<'p, str>) -> Self { CommitterWatermark { pipeline, epoch_hi_inclusive: 0, @@ -101,7 +101,7 @@ impl<'p> CommitterWatermark<'p> { } /// The consensus timestamp associated with this checkpoint. - pub fn timestamp(&self) -> DateTime { + pub(crate) fn timestamp(&self) -> DateTime { DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive).unwrap_or_default() } @@ -109,7 +109,7 @@ impl<'p> CommitterWatermark<'p> { /// Returns a boolean indicating whether the watermark was actually updated or not. /// /// TODO(amnn): Test this (depends on supporting migrations and tempdb). - pub async fn update(&self, conn: &mut Connection<'_>) -> QueryResult { + pub(crate) async fn update(&self, conn: &mut Connection<'_>) -> QueryResult { use diesel::query_dsl::methods::FilterDsl; Ok(diesel::insert_into(watermarks::table) .values(StoredWatermark::from(self.clone())) @@ -124,7 +124,7 @@ impl<'p> CommitterWatermark<'p> { } impl<'p> ReaderWatermark<'p> { - pub fn new(pipeline: impl Into>, reader_lo: u64) -> Self { + pub(crate) fn new(pipeline: impl Into>, reader_lo: u64) -> Self { ReaderWatermark { pipeline: pipeline.into(), reader_lo: reader_lo as i64, @@ -135,7 +135,7 @@ impl<'p> ReaderWatermark<'p> { /// watermark, and updates the timestamp this update happened to the database's current time. /// /// Returns a boolean indicating whether the watermark was actually updated or not. - pub async fn update(&self, conn: &mut Connection<'_>) -> QueryResult { + pub(crate) async fn update(&self, conn: &mut Connection<'_>) -> QueryResult { Ok(diesel::update(watermarks::table) .set((self, watermarks::pruner_timestamp.eq(diesel::dsl::now))) .filter(watermarks::pipeline.eq(&self.pipeline)) @@ -154,7 +154,7 @@ impl PrunerWatermark<'static> { /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and /// `reader_lo` (exclusive) after `wait_for` milliseconds have passed since this response was /// returned. - pub async fn get( + pub(crate) async fn get( conn: &mut Connection<'_>, pipeline: &'static str, delay: Duration, @@ -186,17 +186,17 @@ impl PrunerWatermark<'static> { impl<'p> PrunerWatermark<'p> { /// How long to wait before the pruner can act on this information, or `None`, if there is no /// need to wait. - pub fn wait_for(&self) -> Option { + pub(crate) fn wait_for(&self) -> Option { (self.wait_for > 0).then(|| Duration::from_millis(self.wait_for as u64)) } /// Whether the pruner has any work left to do on the range in this watermark. - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.pruner_hi >= self.reader_lo } /// The next chunk that the pruner should work on, to advance the watermark. - pub fn next_chunk(&mut self, size: u64) -> (u64, u64) { + pub(crate) fn next_chunk(&mut self, size: u64) -> (u64, u64) { let from = self.pruner_hi as u64; let to = (from + size).min(self.reader_lo as u64); (from, to) @@ -206,7 +206,7 @@ impl<'p> PrunerWatermark<'p> { /// raises the watermark. /// /// Returns a boolean indicating whether the watermark was actually updated or not. - pub async fn update(&self, conn: &mut Connection<'_>) -> QueryResult { + pub(crate) async fn update(&self, conn: &mut Connection<'_>) -> QueryResult { Ok(diesel::update(watermarks::table) .set(watermarks::pruner_hi.eq(self.pruner_hi)) .filter(watermarks::pipeline.eq(&self.pipeline)) diff --git a/crates/sui-indexer-alt/Cargo.toml b/crates/sui-indexer-alt/Cargo.toml index 4cabc6da97ecf..5c29142db1ec5 100644 --- a/crates/sui-indexer-alt/Cargo.toml +++ b/crates/sui-indexer-alt/Cargo.toml @@ -13,34 +13,24 @@ path = "src/main.rs" [dependencies] anyhow.workspace = true async-trait.workspace = true -axum.workspace = true -backoff.workspace = true -bb8 = "0.8.5" bcs.workspace = true -chrono.workspace = true clap.workspace = true diesel = { workspace = true, features = ["chrono"] } diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] } diesel_migrations.workspace = true futures.workspace = true itertools.workspace = true -prometheus.workspace = true -reqwest.workspace = true serde.workspace = true telemetry-subscribers.workspace = true -thiserror.workspace = true tokio.workspace = true -tokio-stream.workspace = true tokio-util.workspace = true toml.workspace = true tracing.workspace = true -url.workspace = true sui-default-config.workspace = true sui-field-count.workspace = true -sui-pg-temp-db.workspace = true +sui-indexer-alt-framework.workspace = true sui-protocol-config.workspace = true -sui-storage.workspace = true sui-types.workspace = true sui-synthetic-ingestion = { workspace = true, optional = true } diff --git a/crates/sui-indexer-alt/src/args.rs b/crates/sui-indexer-alt/src/args.rs index 5949ef89367cf..ab799683b876e 100644 --- a/crates/sui-indexer-alt/src/args.rs +++ b/crates/sui-indexer-alt/src/args.rs @@ -5,10 +5,9 @@ use std::path::PathBuf; #[cfg(feature = "benchmark")] use crate::benchmark::BenchmarkArgs; -use crate::db::DbArgs; -use crate::ingestion::ClientArgs; use crate::IndexerArgs; use clap::Subcommand; +use sui_indexer_alt_framework::{db::DbArgs, ingestion::ClientArgs}; #[derive(clap::Parser, Debug, Clone)] pub struct Args { diff --git a/crates/sui-indexer-alt/src/benchmark.rs b/crates/sui-indexer-alt/src/benchmark.rs index 0cb622c0716db..d9caa911da129 100644 --- a/crates/sui-indexer-alt/src/benchmark.rs +++ b/crates/sui-indexer-alt/src/benchmark.rs @@ -3,13 +3,15 @@ use std::{path::PathBuf, time::Instant}; -use crate::{ +use sui_indexer_alt_framework::{ db::{reset_database, DbArgs}, ingestion::ClientArgs, - start_indexer, IndexerArgs, IndexerConfig, + IndexerArgs, }; use sui_synthetic_ingestion::synthetic_ingestion::read_ingestion_data; +use crate::{config::IndexerConfig, models::MIGRATIONS, start_indexer}; + #[derive(clap::Args, Debug, Clone)] pub struct BenchmarkArgs { /// Path to the local ingestion directory to read checkpoints data from. @@ -37,7 +39,7 @@ pub async fn run_benchmark( let last_checkpoint = *ingestion_data.keys().last().unwrap(); let num_transactions: usize = ingestion_data.values().map(|c| c.transactions.len()).sum(); - reset_database(db_args.clone(), false /* do not skip migrations */).await?; + reset_database(db_args.clone(), Some(&MIGRATIONS)).await?; let indexer_args = IndexerArgs { first_checkpoint: Some(first_checkpoint), diff --git a/crates/sui-indexer-alt/src/bootstrap.rs b/crates/sui-indexer-alt/src/bootstrap.rs index 6d5fe26338b39..045c09057f6db 100644 --- a/crates/sui-indexer-alt/src/bootstrap.rs +++ b/crates/sui-indexer-alt/src/bootstrap.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::{bail, Context, Result}; use diesel::{OptionalExtension, QueryDsl, SelectableHelper}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::task::graceful_shutdown; use sui_types::{ full_checkpoint_content::CheckpointData, sui_system_state::{get_sui_system_state, SuiSystemStateTrait}, @@ -17,7 +18,6 @@ use tracing::info; use crate::{ models::{checkpoints::StoredGenesis, epochs::StoredEpochStart}, schema::{kv_epoch_starts, kv_genesis}, - task::graceful_shutdown, Indexer, }; diff --git a/crates/sui-indexer-alt/src/config.rs b/crates/sui-indexer-alt/src/config.rs index ddf72c5a77f49..1d00aadd653cc 100644 --- a/crates/sui-indexer-alt/src/config.rs +++ b/crates/sui-indexer-alt/src/config.rs @@ -4,9 +4,7 @@ use std::mem; use sui_default_config::DefaultConfig; -use tracing::warn; - -use crate::{ +use sui_indexer_alt_framework::{ ingestion::IngestionConfig, pipeline::{ concurrent::{ConcurrentConfig, PrunerConfig}, @@ -14,6 +12,7 @@ use crate::{ CommitterConfig, }, }; +use tracing::warn; /// Trait for merging configuration structs together. pub trait Merge { diff --git a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs index 592008fc86137..b7dee21d56c13 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs @@ -5,13 +5,14 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::Result; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; -use crate::{ - db, models::events::StoredEvEmitMod, pipeline::concurrent::Handler, pipeline::Processor, - schema::ev_emit_mod, -}; -pub struct EvEmitMod; +use crate::{models::events::StoredEvEmitMod, schema::ev_emit_mod}; +pub(crate) struct EvEmitMod; impl Processor for EvEmitMod { const NAME: &'static str = "ev_emit_mod"; diff --git a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs index 826a85ff7f881..576d664ac3017 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs @@ -5,14 +5,15 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; -use crate::{ - db, models::events::StoredEvStructInst, pipeline::concurrent::Handler, pipeline::Processor, - schema::ev_struct_inst, -}; +use crate::{models::events::StoredEvStructInst, schema::ev_struct_inst}; -pub struct EvStructInst; +pub(crate) struct EvStructInst; impl Processor for EvStructInst { const NAME: &'static str = "ev_struct_inst"; diff --git a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs index 14edb7e4de97e..203fab2b5c1cc 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs @@ -5,14 +5,15 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; -use crate::{ - db, models::checkpoints::StoredCheckpoint, pipeline::concurrent::Handler, pipeline::Processor, - schema::kv_checkpoints, -}; +use crate::{models::checkpoints::StoredCheckpoint, schema::kv_checkpoints}; -pub struct KvCheckpoints; +pub(crate) struct KvCheckpoints; impl Processor for KvCheckpoints { const NAME: &'static str = "kv_checkpoints"; diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs index fa4fe6f024a6c..57f2582243172 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs @@ -5,20 +5,19 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::{ event::SystemEpochInfoEvent, full_checkpoint_content::CheckpointData, transaction::{TransactionDataAPI, TransactionKind}, }; -use crate::{ - db, - models::epochs::StoredEpochEnd, - pipeline::{concurrent::Handler, Processor}, - schema::kv_epoch_ends, -}; +use crate::{models::epochs::StoredEpochEnd, schema::kv_epoch_ends}; -pub struct KvEpochEnds; +pub(crate) struct KvEpochEnds; impl Processor for KvEpochEnds { const NAME: &'static str = "kv_epoch_ends"; diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs index d4ac53ebebaf4..13533b04fc8f7 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs @@ -5,18 +5,19 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::{ full_checkpoint_content::CheckpointData, sui_system_state::{get_sui_system_state, SuiSystemStateTrait}, transaction::{TransactionDataAPI, TransactionKind}, }; -use crate::{ - db, models::epochs::StoredEpochStart, pipeline::concurrent::Handler, pipeline::Processor, - schema::kv_epoch_starts, -}; +use crate::{models::epochs::StoredEpochStart, schema::kv_epoch_starts}; -pub struct KvEpochStarts; +pub(crate) struct KvEpochStarts; impl Processor for KvEpochStarts { const NAME: &'static str = "kv_epoch_starts"; diff --git a/crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs b/crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs index b2e1c3f96d3c9..90135dc2b12b5 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs @@ -5,17 +5,19 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_protocol_config::ProtocolConfig; use sui_types::full_checkpoint_content::CheckpointData; use crate::{ - db, models::{checkpoints::StoredGenesis, epochs::StoredFeatureFlag}, - pipeline::{concurrent::Handler, Processor}, schema::kv_feature_flags, }; -pub struct KvFeatureFlags(pub StoredGenesis); +pub(crate) struct KvFeatureFlags(pub(crate) StoredGenesis); impl Processor for KvFeatureFlags { const NAME: &'static str = "kv_feature_flags"; diff --git a/crates/sui-indexer-alt/src/handlers/kv_objects.rs b/crates/sui-indexer-alt/src/handlers/kv_objects.rs index 7fa0dc5111003..cdcc0283f1688 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_objects.rs @@ -5,14 +5,15 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; -use crate::{ - db, models::objects::StoredObject, pipeline::concurrent::Handler, pipeline::Processor, - schema::kv_objects, -}; +use crate::{models::objects::StoredObject, schema::kv_objects}; -pub struct KvObjects; +pub(crate) struct KvObjects; impl Processor for KvObjects { const NAME: &'static str = "kv_objects"; diff --git a/crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs b/crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs index d364aa8f171cd..5a8a483a72ae1 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs @@ -5,17 +5,19 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_protocol_config::ProtocolConfig; use sui_types::full_checkpoint_content::CheckpointData; use crate::{ - db, models::{checkpoints::StoredGenesis, epochs::StoredProtocolConfig}, - pipeline::{concurrent::Handler, Processor}, schema::kv_protocol_configs, }; -pub struct KvProtocolConfigs(pub StoredGenesis); +pub(crate) struct KvProtocolConfigs(pub(crate) StoredGenesis); impl Processor for KvProtocolConfigs { const NAME: &'static str = "kv_protocol_configs"; diff --git a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs index 12dbf45ebcf14..fb2d9237b20e0 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs @@ -5,14 +5,15 @@ use std::sync::Arc; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; -use crate::{ - db, models::transactions::StoredTransaction, pipeline::concurrent::Handler, - pipeline::Processor, schema::kv_transactions, -}; +use crate::{models::transactions::StoredTransaction, schema::kv_transactions}; -pub struct KvTransactions; +pub(crate) struct KvTransactions; impl Processor for KvTransactions { const NAME: &'static str = "kv_transactions"; diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index 9346a087250cc..2ab452d870b5b 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -1,26 +1,26 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -pub mod ev_emit_mod; -pub mod ev_struct_inst; -pub mod kv_checkpoints; -pub mod kv_epoch_ends; -pub mod kv_epoch_starts; -pub mod kv_feature_flags; -pub mod kv_objects; -pub mod kv_protocol_configs; -pub mod kv_transactions; -pub mod obj_info; -pub mod obj_versions; -pub mod sum_coin_balances; -pub mod sum_displays; -pub mod sum_obj_types; -pub mod sum_packages; -pub mod tx_affected_addresses; -pub mod tx_affected_objects; -pub mod tx_balance_changes; -pub mod tx_calls; -pub mod tx_digests; -pub mod tx_kinds; -pub mod wal_coin_balances; -pub mod wal_obj_types; +pub(crate) mod ev_emit_mod; +pub(crate) mod ev_struct_inst; +pub(crate) mod kv_checkpoints; +pub(crate) mod kv_epoch_ends; +pub(crate) mod kv_epoch_starts; +pub(crate) mod kv_feature_flags; +pub(crate) mod kv_objects; +pub(crate) mod kv_protocol_configs; +pub(crate) mod kv_transactions; +pub(crate) mod obj_info; +pub(crate) mod obj_versions; +pub(crate) mod sum_coin_balances; +pub(crate) mod sum_displays; +pub(crate) mod sum_obj_types; +pub(crate) mod sum_packages; +pub(crate) mod tx_affected_addresses; +pub(crate) mod tx_affected_objects; +pub(crate) mod tx_balance_changes; +pub(crate) mod tx_calls; +pub(crate) mod tx_digests; +pub(crate) mod tx_kinds; +pub(crate) mod wal_coin_balances; +pub(crate) mod wal_obj_types; diff --git a/crates/sui-indexer-alt/src/handlers/obj_info.rs b/crates/sui-indexer-alt/src/handlers/obj_info.rs index 4b673788cdaee..da78a3fa16860 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_info.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_info.rs @@ -5,16 +5,18 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::{anyhow, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData, object::Owner}; use crate::{ - db, models::objects::{StoredObjInfo, StoredOwnerKind}, - pipeline::{concurrent::Handler, Processor}, schema::obj_info, }; -pub struct ObjInfo; +pub(crate) struct ObjInfo; impl Processor for ObjInfo { const NAME: &'static str = "obj_info"; diff --git a/crates/sui-indexer-alt/src/handlers/obj_versions.rs b/crates/sui-indexer-alt/src/handlers/obj_versions.rs index 09a03f151714f..99665870f03a5 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_versions.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_versions.rs @@ -5,16 +5,15 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; -use sui_types::full_checkpoint_content::CheckpointData; - -use crate::{ +use sui_indexer_alt_framework::{ db, - models::objects::StoredObjVersion, pipeline::{concurrent::Handler, Processor}, - schema::obj_versions, }; +use sui_types::full_checkpoint_content::CheckpointData; + +use crate::{models::objects::StoredObjVersion, schema::obj_versions}; -pub struct ObjVersions; +pub(crate) struct ObjVersions; impl Processor for ObjVersions { const NAME: &'static str = "obj_versions"; diff --git a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs index 578d019ffb4a3..be196f04b7b86 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs @@ -11,21 +11,24 @@ use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::{try_join_all, Either}; use sui_field_count::FieldCount; +use sui_indexer_alt_framework::{ + db, + pipeline::{sequential::Handler, Processor}, +}; use sui_types::{ base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, object::Owner, }; use crate::{ - db, models::objects::{StoredObjectUpdate, StoredSumCoinBalance}, - pipeline::{sequential::Handler, Processor}, schema::sum_coin_balances, }; const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumCoinBalance::FIELD_COUNT; const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; -pub struct SumCoinBalances; + +pub(crate) struct SumCoinBalances; impl Processor for SumCoinBalances { const NAME: &'static str = "sum_coin_balances"; diff --git a/crates/sui-indexer-alt/src/handlers/sum_displays.rs b/crates/sui-indexer-alt/src/handlers/sum_displays.rs index 8d8803493281a..4e29bbd2faca2 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_displays.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_displays.rs @@ -8,17 +8,17 @@ use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::try_join_all; use sui_field_count::FieldCount; -use sui_types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData}; - -use crate::{ +use sui_indexer_alt_framework::{ db, - models::displays::StoredDisplay, pipeline::{sequential::Handler, Processor}, - schema::sum_displays, }; +use sui_types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData}; + +use crate::{models::displays::StoredDisplay, schema::sum_displays}; const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredDisplay::FIELD_COUNT; -pub struct SumDisplays; + +pub(crate) struct SumDisplays; impl Processor for SumDisplays { const NAME: &'static str = "sum_displays"; diff --git a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs b/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs index 0d043e6d622f2..21264c2a54070 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs @@ -11,22 +11,24 @@ use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::{try_join_all, Either}; use sui_field_count::FieldCount; +use sui_indexer_alt_framework::{ + db, + pipeline::{sequential::Handler, Processor}, +}; use sui_types::{ base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, object::Owner, }; use crate::{ - db, models::objects::{StoredObjectUpdate, StoredOwnerKind, StoredSumObjType}, - pipeline::{sequential::Handler, Processor}, schema::sum_obj_types, }; const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumObjType::FIELD_COUNT; const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; -pub struct SumObjTypes; +pub(crate) struct SumObjTypes; impl Processor for SumObjTypes { const NAME: &'static str = "sum_obj_types"; diff --git a/crates/sui-indexer-alt/src/handlers/sum_packages.rs b/crates/sui-indexer-alt/src/handlers/sum_packages.rs index 2cb1f81586735..8ef08338ba80e 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_packages.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_packages.rs @@ -8,18 +8,17 @@ use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::try_join_all; use sui_field_count::FieldCount; -use sui_types::full_checkpoint_content::CheckpointData; - -use crate::{ +use sui_indexer_alt_framework::{ db, - models::packages::StoredPackage, pipeline::{sequential::Handler, Processor}, - schema::sum_packages, }; +use sui_types::full_checkpoint_content::CheckpointData; + +use crate::{models::packages::StoredPackage, schema::sum_packages}; const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredPackage::FIELD_COUNT; -pub struct SumPackages; +pub(crate) struct SumPackages; impl Processor for SumPackages { const NAME: &'static str = "sum_packages"; diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs index c9a6edca892a7..37046df48e68e 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs @@ -6,14 +6,15 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; use itertools::Itertools; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::{full_checkpoint_content::CheckpointData, object::Owner}; -use crate::{ - db, models::transactions::StoredTxAffectedAddress, pipeline::concurrent::Handler, - pipeline::Processor, schema::tx_affected_addresses, -}; +use crate::{models::transactions::StoredTxAffectedAddress, schema::tx_affected_addresses}; -pub struct TxAffectedAddresses; +pub(crate) struct TxAffectedAddresses; impl Processor for TxAffectedAddresses { const NAME: &'static str = "tx_affected_addresses"; diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs index 7a98cdcb95408..8bbd1b6694483 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs @@ -5,14 +5,15 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData}; -use crate::{ - db, models::transactions::StoredTxAffectedObject, pipeline::concurrent::Handler, - pipeline::Processor, schema::tx_affected_objects, -}; +use crate::{models::transactions::StoredTxAffectedObject, schema::tx_affected_objects}; -pub struct TxAffectedObjects; +pub(crate) struct TxAffectedObjects; impl Processor for TxAffectedObjects { const NAME: &'static str = "tx_affected_objects"; diff --git a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs index d0f300a2cd753..17d26ed76f6d0 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs @@ -5,6 +5,10 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::{ coin::Coin, effects::TransactionEffectsAPI, @@ -13,14 +17,11 @@ use sui_types::{ }; use crate::{ - db, models::transactions::{BalanceChange, StoredTxBalanceChange}, - pipeline::concurrent::Handler, - pipeline::Processor, schema::tx_balance_changes, }; -pub struct TxBalanceChanges; +pub(crate) struct TxBalanceChanges; impl Processor for TxBalanceChanges { const NAME: &'static str = "tx_balance_changes"; diff --git a/crates/sui-indexer-alt/src/handlers/tx_calls.rs b/crates/sui-indexer-alt/src/handlers/tx_calls.rs index 815fb2a7b3744..e0ef7a748619b 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_calls.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_calls.rs @@ -5,15 +5,16 @@ use std::sync::Arc; use anyhow::{Ok, Result}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; use sui_types::transaction::TransactionDataAPI; -use crate::{ - db, models::transactions::StoredTxCalls, pipeline::concurrent::Handler, pipeline::Processor, - schema::tx_calls, -}; +use crate::{models::transactions::StoredTxCalls, schema::tx_calls}; -pub struct TxCalls; +pub(crate) struct TxCalls; impl Processor for TxCalls { const NAME: &'static str = "tx_calls"; diff --git a/crates/sui-indexer-alt/src/handlers/tx_digests.rs b/crates/sui-indexer-alt/src/handlers/tx_digests.rs index 280289b6819b7..489d1176d83ab 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_digests.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_digests.rs @@ -5,14 +5,15 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; -use crate::{ - db, models::transactions::StoredTxDigest, pipeline::concurrent::Handler, pipeline::Processor, - schema::tx_digests, -}; +use crate::{models::transactions::StoredTxDigest, schema::tx_digests}; -pub struct TxDigests; +pub(crate) struct TxDigests; impl Processor for TxDigests { const NAME: &'static str = "tx_digests"; diff --git a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs index 11bd5f750f3f3..90fc09929cca7 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs @@ -5,16 +5,18 @@ use std::sync::Arc; use anyhow::Result; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; use crate::{ - db, models::transactions::{StoredKind, StoredTxKind}, - pipeline::{concurrent::Handler, Processor}, schema::tx_kinds, }; -pub struct TxKinds; +pub(crate) struct TxKinds; impl Processor for TxKinds { const NAME: &'static str = "tx_kinds"; diff --git a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs index 18af515e934f6..ebc28dd8f5672 100644 --- a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs +++ b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs @@ -6,18 +6,20 @@ use std::sync::Arc; use anyhow::Result; use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; use crate::{ - db, models::objects::{StoredObjectUpdate, StoredSumCoinBalance, StoredWalCoinBalance}, - pipeline::{concurrent::Handler, Processor}, schema::wal_coin_balances, }; use super::sum_coin_balances::SumCoinBalances; -pub struct WalCoinBalances; +pub(crate) struct WalCoinBalances; impl Processor for WalCoinBalances { const NAME: &'static str = "wal_coin_balances"; diff --git a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs b/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs index 6fbb3ae54f2b2..32207e4e203c8 100644 --- a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs +++ b/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs @@ -6,18 +6,20 @@ use std::sync::Arc; use anyhow::Result; use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::{ + db, + pipeline::{concurrent::Handler, Processor}, +}; use sui_types::full_checkpoint_content::CheckpointData; use crate::{ - db, models::objects::{StoredObjectUpdate, StoredSumObjType, StoredWalObjType}, - pipeline::{concurrent::Handler, Processor}, schema::wal_obj_types, }; use super::sum_obj_types::SumObjTypes; -pub struct WalObjTypes; +pub(crate) struct WalObjTypes; impl Processor for WalObjTypes { const NAME: &'static str = "wal_obj_types"; diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 5272faf3edb98..1997ea9ee7b7b 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -1,12 +1,9 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; - -use anyhow::{ensure, Context, Result}; +use anyhow::Context; use bootstrap::bootstrap; use config::{ConsistencyConfig, IndexerConfig, PipelineLayer}; -use db::{Db, DbArgs}; use handlers::{ ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints, kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags, @@ -17,376 +14,27 @@ use handlers::{ tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, }; -use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService}; -use metrics::{IndexerMetrics, MetricsService}; -use models::watermarks::CommitterWatermark; -use pipeline::{ - concurrent::{self, ConcurrentConfig, PrunerConfig}, - sequential::{self, SequentialConfig}, - CommitterConfig, Processor, +use models::MIGRATIONS; +use sui_indexer_alt_framework::db::DbArgs; +use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig}; +use sui_indexer_alt_framework::pipeline::{ + concurrent::{ConcurrentConfig, PrunerConfig}, + sequential::SequentialConfig, + CommitterConfig, }; -use task::graceful_shutdown; -use tokio::task::JoinHandle; +use sui_indexer_alt_framework::{Indexer, IndexerArgs}; use tokio_util::sync::CancellationToken; -use tracing::{info, warn}; pub mod args; -pub mod bootstrap; +pub(crate) mod bootstrap; pub mod config; -pub mod db; -pub mod handlers; -pub mod ingestion; -pub mod metrics; +pub(crate) mod handlers; pub mod models; -pub mod pipeline; pub mod schema; -pub mod task; #[cfg(feature = "benchmark")] pub mod benchmark; -/// Command-line arguments for the indexer -#[derive(clap::Args, Debug, Clone)] -pub struct IndexerArgs { - /// Override for the checkpoint to start ingestion from -- useful for backfills. By default, - /// ingestion will start just after the lowest checkpoint watermark across all active - /// pipelines. - #[arg(long)] - pub first_checkpoint: Option, - - /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By - /// default, ingestion will not stop, and will continue to poll for new checkpoints. - #[arg(long)] - pub last_checkpoint: Option, - - /// Only run the following pipelines. If not provided, all pipelines found in the - /// configuration file will be run. - #[arg(long, action = clap::ArgAction::Append)] - pipeline: Vec, - - /// Don't write to the watermark tables for concurrent pipelines. - #[arg(long)] - pub skip_watermark: bool, - - /// Address to serve Prometheus Metrics from. - #[arg(long, default_value_t = Self::default().metrics_address)] - pub metrics_address: SocketAddr, -} - -pub struct Indexer { - /// Connection pool to the database. - db: Db, - - /// Prometheus Metrics. - metrics: Arc, - - /// Service for serving Prometheis metrics. - metrics_service: MetricsService, - - /// Service for downloading and disseminating checkpoint data. - ingestion_service: IngestionService, - - /// Optional override of the checkpoint lowerbound. - first_checkpoint: Option, - - /// Optional override of the checkpoint upperbound. - last_checkpoint: Option, - - /// Don't write to the watermark tables for concurrent pipelines. - skip_watermark: bool, - - /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will - /// run. Any pipelines that are present in this filter but not added to the indexer will yield - /// a warning when the indexer is run. - enabled_pipelines: Option>, - - /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline - /// with the same name isn't added twice. - added_pipelines: BTreeSet<&'static str>, - - /// Cancellation token shared among all continuous tasks in the service. - cancel: CancellationToken, - - /// The checkpoint lowerbound derived from watermarks of pipelines added to the indexer. When - /// the indexer runs, it will start from this point, unless this has been overridden by - /// [Self::first_checkpoint]. - first_checkpoint_from_watermark: u64, - - /// The handles for every task spawned by this indexer, used to manage graceful shutdown. - handles: Vec>, -} - -impl Indexer { - pub async fn new( - db_args: DbArgs, - indexer_args: IndexerArgs, - client_args: ClientArgs, - ingestion_config: IngestionConfig, - cancel: CancellationToken, - ) -> Result { - let IndexerArgs { - first_checkpoint, - last_checkpoint, - pipeline, - skip_watermark, - metrics_address, - } = indexer_args; - - let db = Db::new(db_args) - .await - .context("Failed to connect to database")?; - - // At indexer initialization, we ensure that the DB schema is up-to-date. - db.run_migrations() - .await - .context("Failed to run pending migrations")?; - - let (metrics, metrics_service) = - MetricsService::new(metrics_address, db.clone(), cancel.clone())?; - - let ingestion_service = IngestionService::new( - client_args, - ingestion_config, - metrics.clone(), - cancel.clone(), - )?; - - Ok(Self { - db, - metrics, - metrics_service, - ingestion_service, - first_checkpoint, - last_checkpoint, - skip_watermark, - enabled_pipelines: if pipeline.is_empty() { - None - } else { - Some(pipeline.into_iter().collect()) - }, - added_pipelines: BTreeSet::new(), - cancel, - first_checkpoint_from_watermark: u64::MAX, - handles: vec![], - }) - } - - /// The database connection pool used by the indexer. - pub fn db(&self) -> &Db { - &self.db - } - - /// The ingestion client used by the indexer to fetch checkpoints. - pub fn ingestion_client(&self) -> &IngestionClient { - self.ingestion_service.client() - } - - /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started, - /// they will be idle until the ingestion service starts, and serves it checkpoint data. - /// - /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they - /// keep the watermark table up-to-date with the highest point they can guarantee all data - /// exists for, for their pipeline. - pub async fn concurrent_pipeline( - &mut self, - handler: H, - config: ConcurrentConfig, - ) -> Result<()> { - let Some(watermark) = self.add_pipeline::().await? else { - return Ok(()); - }; - - // For a concurrent pipeline, if skip_watermark is set, we don't really care about the - // watermark consistency. first_checkpoint can be anything since we don't update watermark, - // and writes should be idempotent. - if !self.skip_watermark { - self.check_first_checkpoint_consistency::(&watermark)?; - } - - self.handles.push(concurrent::pipeline( - handler, - watermark, - config, - self.skip_watermark, - self.db.clone(), - self.ingestion_service.subscribe().0, - self.metrics.clone(), - self.cancel.clone(), - )); - - Ok(()) - } - - /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started, - /// they will be idle until the ingestion service starts, and serves it checkpoint data. - /// - /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may - /// be required to handle pipelines that modify data in-place (where each update is not an - /// insert, but could be a modification of an existing row, where ordering between updates is - /// important). - /// - /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed - /// number of checkpoints (configured by `checkpoint_lag`). - pub async fn sequential_pipeline( - &mut self, - handler: H, - config: SequentialConfig, - ) -> Result<()> { - let Some(watermark) = self.add_pipeline::().await? else { - return Ok(()); - }; - - if self.skip_watermark { - warn!( - pipeline = H::NAME, - "--skip-watermarks enabled and ignored for sequential pipeline" - ); - } - - // For a sequential pipeline, data must be written in the order of checkpoints. - // Hence, we do not allow the first_checkpoint override to be in arbitrary positions. - self.check_first_checkpoint_consistency::(&watermark)?; - - let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe(); - - self.handles.push(sequential::pipeline( - handler, - watermark, - config, - self.db.clone(), - checkpoint_rx, - watermark_tx, - self.metrics.clone(), - self.cancel.clone(), - )); - - Ok(()) - } - - /// Checks that the first checkpoint override is consistent with the watermark for the pipeline. - /// If the watermark does not exist, the override can be anything. If the watermark exists, the - /// override must not leave any gap in the data: it can be in the past, or at the tip of the - /// network, but not in the future. - fn check_first_checkpoint_consistency( - &self, - watermark: &Option, - ) -> Result<()> { - if let (Some(watermark), Some(first_checkpoint)) = (watermark, self.first_checkpoint) { - ensure!( - first_checkpoint as i64 <= watermark.checkpoint_hi_inclusive + 1, - "For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. \ - This could create gaps in the data.", - P::NAME, - first_checkpoint, - watermark.checkpoint_hi_inclusive, - ); - } - - Ok(()) - } - - /// Start ingesting checkpoints. Ingestion either starts from the configured - /// `first_checkpoint`, or it is calculated based on the watermarks of all active pipelines. - /// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided, - /// or will continue until it tracks the tip of the network. - pub async fn run(mut self) -> Result> { - if let Some(enabled_pipelines) = self.enabled_pipelines { - ensure!( - enabled_pipelines.is_empty(), - "Tried to enable pipelines that this indexer does not know about: \ - {enabled_pipelines:#?}", - ); - } - - let metrics_handle = self - .metrics_service - .run() - .await - .context("Failed to start metrics service")?; - - // If an override has been provided, start ingestion from there, otherwise start ingestion - // from just after the lowest committer watermark across all enabled pipelines. - let first_checkpoint = self - .first_checkpoint - .unwrap_or(self.first_checkpoint_from_watermark); - - let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX); - - info!(first_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range"); - - let (regulator_handle, broadcaster_handle) = self - .ingestion_service - .run(first_checkpoint..=last_checkpoint) - .await - .context("Failed to start ingestion service")?; - - self.handles.push(regulator_handle); - self.handles.push(broadcaster_handle); - - let cancel = self.cancel.clone(); - Ok(tokio::spawn(async move { - // Wait for the ingestion service and all its related tasks to wind down gracefully: - // If ingestion has been configured to only handle a specific range of checkpoints, we - // want to make sure that tasks are allowed to run to completion before shutting them - // down. - graceful_shutdown(self.handles, self.cancel).await; - - info!("Indexing pipeline gracefully shut down"); - - // Pick off any stragglers (in this case, just the metrics service). - cancel.cancel(); - metrics_handle.await.unwrap(); - })) - } - - /// Update the indexer's first checkpoint based on the watermark for the pipeline by adding for - /// handler `H` (as long as it's enabled). Returns `Ok(None)` if the pipeline is disabled, - /// `Ok(Some(None))` if the pipeline is enabled but its watermark is not found, and - /// `Ok(Some(Some(watermark)))` if the pipeline is enabled and the watermark is found. - async fn add_pipeline( - &mut self, - ) -> Result>>> { - ensure!( - self.added_pipelines.insert(P::NAME), - "Pipeline {:?} already added", - P::NAME, - ); - - if let Some(enabled_pipelines) = &mut self.enabled_pipelines { - if !enabled_pipelines.remove(P::NAME) { - info!(pipeline = P::NAME, "Skipping"); - return Ok(None); - } - } - - let mut conn = self.db.connect().await.context("Failed DB connection")?; - - let watermark = CommitterWatermark::get(&mut conn, P::NAME) - .await - .with_context(|| format!("Failed to get watermark for {}", P::NAME))?; - - // TODO(amnn): Test this (depends on supporting migrations and tempdb). - self.first_checkpoint_from_watermark = watermark - .as_ref() - .map_or(0, |w| w.checkpoint_hi_inclusive as u64 + 1) - .min(self.first_checkpoint_from_watermark); - - Ok(Some(watermark)) - } -} - -impl Default for IndexerArgs { - fn default() -> Self { - Self { - first_checkpoint: None, - last_checkpoint: None, - pipeline: vec![], - skip_watermark: false, - metrics_address: "0.0.0.0:9184".parse().unwrap(), - } - } -} - pub async fn start_indexer( db_args: DbArgs, indexer_args: IndexerArgs, @@ -466,6 +114,7 @@ pub async fn start_indexer( indexer_args, client_args, ingestion, + &MIGRATIONS, cancel.clone(), ) .await?; diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index d72a6650825ac..d05b5cdad6c6e 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -11,8 +11,9 @@ use sui_indexer_alt::args::Args; use sui_indexer_alt::args::Command; use sui_indexer_alt::config::IndexerConfig; use sui_indexer_alt::config::Merge; -use sui_indexer_alt::db::reset_database; +use sui_indexer_alt::models::MIGRATIONS; use sui_indexer_alt::start_indexer; +use sui_indexer_alt_framework::db::reset_database; use tokio::fs; #[tokio::main] @@ -72,7 +73,7 @@ async fn main() -> Result<()> { } Command::ResetDatabase { skip_migrations } => { - reset_database(args.db_args, skip_migrations).await?; + reset_database(args.db_args, (!skip_migrations).then_some(&MIGRATIONS)).await?; } #[cfg(feature = "benchmark")] diff --git a/crates/sui-indexer-alt/src/models/mod.rs b/crates/sui-indexer-alt/src/models/mod.rs index 0db405170f6d5..5baf5fb04cd83 100644 --- a/crates/sui-indexer-alt/src/models/mod.rs +++ b/crates/sui-indexer-alt/src/models/mod.rs @@ -1,6 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use diesel_migrations::{embed_migrations, EmbeddedMigrations}; + pub mod checkpoints; pub mod displays; pub mod epochs; @@ -8,4 +10,5 @@ pub mod events; pub mod objects; pub mod packages; pub mod transactions; -pub mod watermarks; + +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); diff --git a/crates/sui-indexer-alt/src/schema.rs b/crates/sui-indexer-alt/src/schema.rs index e1872ad90194c..c519fb4192d6f 100644 --- a/crates/sui-indexer-alt/src/schema.rs +++ b/crates/sui-indexer-alt/src/schema.rs @@ -239,19 +239,6 @@ diesel::table! { } } -diesel::table! { - watermarks (pipeline) { - pipeline -> Text, - epoch_hi_inclusive -> Int8, - checkpoint_hi_inclusive -> Int8, - tx_hi -> Int8, - timestamp_ms_hi_inclusive -> Int8, - reader_lo -> Int8, - pruner_timestamp -> Timestamp, - pruner_hi -> Int8, - } -} - diesel::allow_tables_to_appear_in_same_query!( ev_emit_mod, ev_struct_inst, @@ -277,5 +264,4 @@ diesel::allow_tables_to_appear_in_same_query!( tx_kinds, wal_coin_balances, wal_obj_types, - watermarks, );