diff --git a/crates/sui-indexer-alt/tests/pruning/pruner_tests.rs b/crates/sui-indexer-alt/tests/pruning/pruner_tests.rs index d660806a6ef0d..4769516d0d2af 100644 --- a/crates/sui-indexer-alt/tests/pruning/pruner_tests.rs +++ b/crates/sui-indexer-alt/tests/pruning/pruner_tests.rs @@ -1,10 +1,12 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use diesel::query_dsl::select_dsl::SelectDsl; +use diesel_async::RunQueryDsl; use rand::rngs::StdRng; use rand::SeedableRng; use simulacrum::Simulacrum; -use std::{path::PathBuf, time::Duration}; +use std::{ops::Range, path::PathBuf, time::Duration}; use sui_indexer_alt::{ config::{IndexerConfig, Merge}, start_indexer, @@ -15,18 +17,15 @@ use sui_indexer_alt_framework::{ use sui_indexer_alt_schema::schema::{kv_checkpoints, kv_epoch_starts}; use sui_pg_db::{ temp::{get_available_port, TempDb}, - Db, DbArgs, + Connection, Db, DbArgs, }; use tempfile::TempDir; use tokio::task::JoinHandle; -use tokio::time::timeout; use tokio_util::sync::CancellationToken; -// test for a pruner that doesn't need interval lookups /// Prepares a test indexer configuration, deferring to the default config for top-level fields, but /// explicitly setting all fields of the `cp_sequence_numbers` pipeline layer. This can then be /// passed to the `indexer_config` arg of `start_indexer`. -#[cfg(test)] fn load_indexer_config(path_str: &str) -> IndexerConfig { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); path.push(format!("tests/pruning/configs/{}", path_str)); @@ -35,14 +34,14 @@ fn load_indexer_config(path_str: &str) -> IndexerConfig { toml::from_str(&config_str).expect("Failed to parse test config TOML") } -#[cfg(test)] +/// The TempDir and TempDb need to be kept alive for the duration of the test, otherwise parts of +/// the test env will hang indefinitely. async fn setup_temp_resources() -> (TempDb, TempDir) { let temp_db = TempDb::new().unwrap(); let temp_dir = tempfile::tempdir().unwrap(); (temp_db, temp_dir) } -#[cfg(test)] async fn setup_test_env( db_url: String, data_ingestion_path: PathBuf, @@ -106,6 +105,40 @@ async fn setup_test_env( (sim, db, indexer_handle, cancel) } +/// Even though the indexer consists of several independent pipelines, the `cp_sequence_numbers` +/// table governs checkpoint -> tx and epoch lookups and provides such information for prunable +/// tables. This waits for the lookup table to be updated with the expected changes. +async fn wait_for_tx_interval( + conn: &mut Connection<'_>, + duration: Duration, + cp_range: Range, +) -> anyhow::Result<()> { + tokio::select! { + _ = tokio::time::sleep(duration) => { + anyhow::bail!("Timeout occurred while waiting for tx interval of checkpoints [{}, {})", cp_range.start, cp_range.end); + } + result = async { + loop { + match tx_interval(conn, cp_range.clone()).await { + Ok(_) => break Ok(()), + Err(_) => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + } => result + } +} + +async fn cleanup_test_env( + cancel: CancellationToken, + indexer_handle: JoinHandle>, +) { + cancel.cancel(); + let _ = indexer_handle.await.expect("Indexer task panicked"); +} + +/// Test that the `cp_sequence_numbers` is correctly committed to. #[tokio::test] pub async fn test_cp_sequence_numbers() -> () { let indexer_config = load_indexer_config("base_config.toml"); @@ -118,8 +151,6 @@ pub async fn test_cp_sequence_numbers() -> () { let (mut sim, db, indexer_handle, cancel) = setup_test_env(db_url, data_ingestion_path, merged_config).await; - // Do your test assertions here - // ... sim.create_checkpoint(); sim.create_checkpoint(); @@ -128,31 +159,12 @@ pub async fn test_cp_sequence_numbers() -> () { .await .expect("Failed to retrieve DB connection"); - let timeout_duration = Duration::from_secs(5); // Adjust timeout as needed - loop { - match timeout(timeout_duration, tx_interval(&mut conn, 0..2)).await { - // Timeout occurred - Err(_elapsed) => { - // If we hit timeout, return early from the test - cancel.cancel(); - return; - } - // Got a result within timeout - Ok(result) => match result { - // Got valid range, break loop - Ok(_) => break, - Err(_) => { - tokio::time::sleep(Duration::from_millis(100)).await; - continue; - } - }, - } + if let Err(e) = wait_for_tx_interval(&mut conn, Duration::from_secs(5), 0..2).await { + cleanup_test_env(cancel, indexer_handle).await; + panic!("{:?}", e); } - cancel.cancel(); - - // Wait for the indexer to shut down - let _ = indexer_handle.await.expect("Indexer task panicked"); + cleanup_test_env(cancel, indexer_handle).await; } #[tokio::test] @@ -176,30 +188,11 @@ pub async fn test_kv_epoch_starts_cross_epoch() -> () { .await .expect("Failed to retrieve DB connection"); - let timeout_duration = Duration::from_secs(5); // Adjust timeout as needed - loop { - match timeout(timeout_duration, tx_interval(&mut conn, 0..3)).await { - // Timeout occurred - Err(_elapsed) => { - // If we hit timeout, return early from the test - cancel.cancel(); - return; - } - // Got a result within timeout - Ok(result) => match result { - // Got valid range, break loop - Ok(_) => break, - Err(_) => { - tokio::time::sleep(Duration::from_millis(100)).await; - continue; - } - }, - } + if let Err(e) = wait_for_tx_interval(&mut conn, Duration::from_secs(5), 0..3).await { + cleanup_test_env(cancel, indexer_handle).await; + panic!("{:?}", e); } - use diesel::query_dsl::select_dsl::SelectDsl; - use diesel_async::RunQueryDsl; - let timeout_duration = Duration::from_secs(5); tokio::select! { _ = tokio::time::sleep(timeout_duration) => { @@ -222,12 +215,11 @@ pub async fn test_kv_epoch_starts_cross_epoch() -> () { } => {} } - cancel.cancel(); - - // Wait for the indexer to shut down - let _ = indexer_handle.await.expect("Indexer task panicked"); + cleanup_test_env(cancel, indexer_handle).await; } +/// The checkpoint-based pruner watermark continuously updates the `pruner_hi`, but we don't want to +/// prune epoch-related data until the `[from, to)` checkpoints are across epochs. #[tokio::test] pub async fn test_kv_epoch_starts_same_epoch() -> () { let merged_config = load_indexer_config("base_config.toml") @@ -250,30 +242,11 @@ pub async fn test_kv_epoch_starts_same_epoch() -> () { .await .expect("Failed to retrieve DB connection"); - let timeout_duration = Duration::from_secs(5); // Adjust timeout as needed - loop { - match timeout(timeout_duration, tx_interval(&mut conn, 0..4)).await { - // Timeout occurred - Err(_elapsed) => { - // If we hit timeout, return early from the test - cancel.cancel(); - return; - } - // Got a result within timeout - Ok(result) => match result { - // Got valid range, break loop - Ok(_) => break, - Err(_) => { - tokio::time::sleep(Duration::from_millis(100)).await; - continue; - } - }, - } + if let Err(e) = wait_for_tx_interval(&mut conn, Duration::from_secs(5), 0..4).await { + cleanup_test_env(cancel, indexer_handle).await; + panic!("{:?}", e); } - use diesel::query_dsl::select_dsl::SelectDsl; - use diesel_async::RunQueryDsl; - let timeout_duration = Duration::from_secs(5); tokio::select! { _ = tokio::time::sleep(timeout_duration) => { @@ -296,12 +269,13 @@ pub async fn test_kv_epoch_starts_same_epoch() -> () { } => {} } - cancel.cancel(); - - // Wait for the indexer to shut down - let _ = indexer_handle.await.expect("Indexer task panicked"); + cleanup_test_env(cancel, indexer_handle).await; } +/// Not all tables require a mapping to the `cp_sequence_numbers` table. For example, +/// `kv_checkpoints` table can be pruned directly with checkpoint-based watermarks from the pruner +/// watermark task. In this test, the `cp_sequence_numbers` table is not enabled. The indexer should +/// still be able to prune `kv_checkpoints`. #[tokio::test] pub async fn test_kv_checkpoints_no_mapping() -> () { let merged_config = @@ -322,9 +296,6 @@ pub async fn test_kv_checkpoints_no_mapping() -> () { .await .expect("Failed to retrieve DB connection"); - use diesel::query_dsl::select_dsl::SelectDsl; - use diesel_async::RunQueryDsl; - let timeout_duration = Duration::from_secs(5); tokio::select! { _ = tokio::time::sleep(timeout_duration) => {