Skip to content

Commit

Permalink
cleanups and what not
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Dec 27, 2024
1 parent 63a0762 commit 72e140e
Showing 1 changed file with 58 additions and 87 deletions.
145 changes: 58 additions & 87 deletions crates/sui-indexer-alt/tests/pruning/pruner_tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use diesel::query_dsl::select_dsl::SelectDsl;
use diesel_async::RunQueryDsl;
use rand::rngs::StdRng;
use rand::SeedableRng;
use simulacrum::Simulacrum;
use std::{path::PathBuf, time::Duration};
use std::{ops::Range, path::PathBuf, time::Duration};
use sui_indexer_alt::{
config::{IndexerConfig, Merge},
start_indexer,
Expand All @@ -15,18 +17,15 @@ use sui_indexer_alt_framework::{
use sui_indexer_alt_schema::schema::{kv_checkpoints, kv_epoch_starts};
use sui_pg_db::{
temp::{get_available_port, TempDb},
Db, DbArgs,
Connection, Db, DbArgs,
};
use tempfile::TempDir;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
// test for a pruner that doesn't need interval lookups

/// Prepares a test indexer configuration, deferring to the default config for top-level fields, but
/// explicitly setting all fields of the `cp_sequence_numbers` pipeline layer. This can then be
/// passed to the `indexer_config` arg of `start_indexer`.
#[cfg(test)]
fn load_indexer_config(path_str: &str) -> IndexerConfig {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push(format!("tests/pruning/configs/{}", path_str));
Expand All @@ -35,14 +34,14 @@ fn load_indexer_config(path_str: &str) -> IndexerConfig {
toml::from_str(&config_str).expect("Failed to parse test config TOML")
}

#[cfg(test)]
/// The TempDir and TempDb need to be kept alive for the duration of the test, otherwise parts of
/// the test env will hang indefinitely.
async fn setup_temp_resources() -> (TempDb, TempDir) {
let temp_db = TempDb::new().unwrap();
let temp_dir = tempfile::tempdir().unwrap();
(temp_db, temp_dir)
}

#[cfg(test)]
async fn setup_test_env(
db_url: String,
data_ingestion_path: PathBuf,
Expand Down Expand Up @@ -106,6 +105,40 @@ async fn setup_test_env(
(sim, db, indexer_handle, cancel)
}

/// Even though the indexer consists of several independent pipelines, the `cp_sequence_numbers`
/// table governs checkpoint -> tx and epoch lookups and provides such information for prunable
/// tables. This waits for the lookup table to be updated with the expected changes.
async fn wait_for_tx_interval(
conn: &mut Connection<'_>,
duration: Duration,
cp_range: Range<u64>,
) -> anyhow::Result<()> {
tokio::select! {
_ = tokio::time::sleep(duration) => {
anyhow::bail!("Timeout occurred while waiting for tx interval of checkpoints [{}, {})", cp_range.start, cp_range.end);
}
result = async {
loop {
match tx_interval(conn, cp_range.clone()).await {
Ok(_) => break Ok(()),
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
} => result
}
}

async fn cleanup_test_env(
cancel: CancellationToken,
indexer_handle: JoinHandle<anyhow::Result<()>>,
) {
cancel.cancel();
let _ = indexer_handle.await.expect("Indexer task panicked");
}

/// Test that the `cp_sequence_numbers` is correctly committed to.
#[tokio::test]
pub async fn test_cp_sequence_numbers() -> () {
let indexer_config = load_indexer_config("base_config.toml");
Expand All @@ -118,8 +151,6 @@ pub async fn test_cp_sequence_numbers() -> () {
let (mut sim, db, indexer_handle, cancel) =
setup_test_env(db_url, data_ingestion_path, merged_config).await;

// Do your test assertions here
// ...
sim.create_checkpoint();
sim.create_checkpoint();

Expand All @@ -128,31 +159,12 @@ pub async fn test_cp_sequence_numbers() -> () {
.await
.expect("Failed to retrieve DB connection");

let timeout_duration = Duration::from_secs(5); // Adjust timeout as needed
loop {
match timeout(timeout_duration, tx_interval(&mut conn, 0..2)).await {
// Timeout occurred
Err(_elapsed) => {
// If we hit timeout, return early from the test
cancel.cancel();
return;
}
// Got a result within timeout
Ok(result) => match result {
// Got valid range, break loop
Ok(_) => break,
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
},
}
if let Err(e) = wait_for_tx_interval(&mut conn, Duration::from_secs(5), 0..2).await {
cleanup_test_env(cancel, indexer_handle).await;
panic!("{:?}", e);
}

cancel.cancel();

// Wait for the indexer to shut down
let _ = indexer_handle.await.expect("Indexer task panicked");
cleanup_test_env(cancel, indexer_handle).await;
}

#[tokio::test]
Expand All @@ -176,30 +188,11 @@ pub async fn test_kv_epoch_starts_cross_epoch() -> () {
.await
.expect("Failed to retrieve DB connection");

let timeout_duration = Duration::from_secs(5); // Adjust timeout as needed
loop {
match timeout(timeout_duration, tx_interval(&mut conn, 0..3)).await {
// Timeout occurred
Err(_elapsed) => {
// If we hit timeout, return early from the test
cancel.cancel();
return;
}
// Got a result within timeout
Ok(result) => match result {
// Got valid range, break loop
Ok(_) => break,
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
},
}
if let Err(e) = wait_for_tx_interval(&mut conn, Duration::from_secs(5), 0..3).await {
cleanup_test_env(cancel, indexer_handle).await;
panic!("{:?}", e);
}

use diesel::query_dsl::select_dsl::SelectDsl;
use diesel_async::RunQueryDsl;

let timeout_duration = Duration::from_secs(5);
tokio::select! {
_ = tokio::time::sleep(timeout_duration) => {
Expand All @@ -222,12 +215,11 @@ pub async fn test_kv_epoch_starts_cross_epoch() -> () {
} => {}
}

cancel.cancel();

// Wait for the indexer to shut down
let _ = indexer_handle.await.expect("Indexer task panicked");
cleanup_test_env(cancel, indexer_handle).await;
}

/// The checkpoint-based pruner watermark continuously updates the `pruner_hi`, but we don't want to
/// prune epoch-related data until the `[from, to)` checkpoints are across epochs.
#[tokio::test]
pub async fn test_kv_epoch_starts_same_epoch() -> () {
let merged_config = load_indexer_config("base_config.toml")
Expand All @@ -250,30 +242,11 @@ pub async fn test_kv_epoch_starts_same_epoch() -> () {
.await
.expect("Failed to retrieve DB connection");

let timeout_duration = Duration::from_secs(5); // Adjust timeout as needed
loop {
match timeout(timeout_duration, tx_interval(&mut conn, 0..4)).await {
// Timeout occurred
Err(_elapsed) => {
// If we hit timeout, return early from the test
cancel.cancel();
return;
}
// Got a result within timeout
Ok(result) => match result {
// Got valid range, break loop
Ok(_) => break,
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
},
}
if let Err(e) = wait_for_tx_interval(&mut conn, Duration::from_secs(5), 0..4).await {
cleanup_test_env(cancel, indexer_handle).await;
panic!("{:?}", e);
}

use diesel::query_dsl::select_dsl::SelectDsl;
use diesel_async::RunQueryDsl;

let timeout_duration = Duration::from_secs(5);
tokio::select! {
_ = tokio::time::sleep(timeout_duration) => {
Expand All @@ -296,12 +269,13 @@ pub async fn test_kv_epoch_starts_same_epoch() -> () {
} => {}
}

cancel.cancel();

// Wait for the indexer to shut down
let _ = indexer_handle.await.expect("Indexer task panicked");
cleanup_test_env(cancel, indexer_handle).await;
}

/// Not all tables require a mapping to the `cp_sequence_numbers` table. For example,
/// `kv_checkpoints` table can be pruned directly with checkpoint-based watermarks from the pruner
/// watermark task. In this test, the `cp_sequence_numbers` table is not enabled. The indexer should
/// still be able to prune `kv_checkpoints`.
#[tokio::test]
pub async fn test_kv_checkpoints_no_mapping() -> () {
let merged_config =
Expand All @@ -322,9 +296,6 @@ pub async fn test_kv_checkpoints_no_mapping() -> () {
.await
.expect("Failed to retrieve DB connection");

use diesel::query_dsl::select_dsl::SelectDsl;
use diesel_async::RunQueryDsl;

let timeout_duration = Duration::from_secs(5);
tokio::select! {
_ = tokio::time::sleep(timeout_duration) => {
Expand Down

0 comments on commit 72e140e

Please sign in to comment.