From 52ea442ad9d4dda8a3d8ea6b18eb8035e5006d43 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Tue, 24 Dec 2024 14:06:42 -0800 Subject: [PATCH] Add tests --- crates/sui-indexer-alt-framework/Cargo.toml | 1 + crates/sui-indexer-alt-framework/src/lib.rs | 154 +++++++++++++++++- .../src/watermarks.rs | 21 +++ .../sui-indexer-alt/src/consistent_pruning.rs | 85 ++++++++++ .../sui-indexer-alt/src/handlers/obj_info.rs | 1 + crates/sui-pg-db/src/lib.rs | 9 + 6 files changed, 269 insertions(+), 2 deletions(-) diff --git a/crates/sui-indexer-alt-framework/Cargo.toml b/crates/sui-indexer-alt-framework/Cargo.toml index 9342f72c4c9ff..2c9a172be8efb 100644 --- a/crates/sui-indexer-alt-framework/Cargo.toml +++ b/crates/sui-indexer-alt-framework/Cargo.toml @@ -21,6 +21,7 @@ futures.workspace = true prometheus.workspace = true reqwest.workspace = true serde.workspace = true +tempfile.workspace = true thiserror.workspace = true tokio.workspace = true tokio-stream.workspace = true diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index 1759873ec999d..2b482096d9671 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -16,8 +16,9 @@ use pipeline::{ sequential::{self, SequentialConfig}, Processor, }; -use sui_pg_db::{Db, DbArgs}; +use sui_pg_db::{temp::TempDb, Db, DbArgs}; use task::graceful_shutdown; +use tempfile::tempdir; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; @@ -175,6 +176,25 @@ impl Indexer { }) } + pub async fn new_for_testing() -> (Self, TempDb) { + let temp_db = TempDb::new().unwrap(); + let db_args = DbArgs::new_for_testing(temp_db.database().url().clone()); + let indexer = Indexer::new( + db_args, + IndexerArgs::default(), + ClientArgs { + remote_store_url: None, + local_ingestion_path: Some(tempdir().unwrap().into_path()), + }, + IngestionConfig::default(), + &MIGRATIONS, + CancellationToken::new(), + ) + .await + .unwrap(); + (indexer, temp_db) + } + /// The database connection pool used by the indexer. pub fn db(&self) -> &Db { &self.db @@ -404,7 +424,6 @@ impl Indexer { .unwrap_or_default() }; - // TODO(amnn): Test this (depends on supporting migrations and tempdb). self.first_checkpoint_from_watermark = expected_first_checkpoint.min(self.first_checkpoint_from_watermark); @@ -423,3 +442,134 @@ impl Default for IndexerArgs { } } } + +#[cfg(test)] +mod tests { + use async_trait::async_trait; + use sui_field_count::FieldCount; + use sui_pg_db as db; + use sui_types::full_checkpoint_content::CheckpointData; + + use super::*; + + #[derive(FieldCount)] + struct V { + _v: u64, + } + + macro_rules! define_test_concurrent_pipeline { + ($name:ident) => { + define_test_concurrent_pipeline!($name, false); + }; + ($name:ident, $pruning_requires_processed_values:expr) => { + struct $name; + impl Processor for $name { + const NAME: &'static str = stringify!($name); + const PRUNING_REQUIRES_PROCESSED_VALUES: bool = $pruning_requires_processed_values; + type Value = V; + fn process( + &self, + _checkpoint: &Arc, + ) -> anyhow::Result> { + todo!() + } + } + + #[async_trait] + impl concurrent::Handler for $name { + async fn commit( + _values: &[Self::Value], + _conn: &mut db::Connection<'_>, + ) -> anyhow::Result { + todo!() + } + } + }; + } + + define_test_concurrent_pipeline!(ConcurrentPipeline1); + define_test_concurrent_pipeline!(ConcurrentPipeline2); + define_test_concurrent_pipeline!(ConcurrentPipeline3, true); + + #[tokio::test] + async fn test_add_new_pipeline() { + let (mut indexer, _temp_db) = Indexer::new_for_testing().await; + indexer + .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default()) + .await + .unwrap(); + assert_eq!(indexer.first_checkpoint_from_watermark, 1); + } + + #[tokio::test] + async fn test_add_existing_pipeline() { + let (mut indexer, _temp_db) = Indexer::new_for_testing().await; + let watermark = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10); + watermark + .update(&mut indexer.db().connect().await.unwrap()) + .await + .unwrap(); + indexer + .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default()) + .await + .unwrap(); + assert_eq!(indexer.first_checkpoint_from_watermark, 11); + } + + #[tokio::test] + async fn test_add_multiple_pipelines() { + let (mut indexer, _temp_db) = Indexer::new_for_testing().await; + let watermark1 = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10); + watermark1 + .update(&mut indexer.db().connect().await.unwrap()) + .await + .unwrap(); + let watermark2 = CommitterWatermark::new_for_testing(ConcurrentPipeline2::NAME, 20); + watermark2 + .update(&mut indexer.db().connect().await.unwrap()) + .await + .unwrap(); + + indexer + .concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default()) + .await + .unwrap(); + assert_eq!(indexer.first_checkpoint_from_watermark, 21); + indexer + .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default()) + .await + .unwrap(); + assert_eq!(indexer.first_checkpoint_from_watermark, 11); + } + + #[tokio::test] + async fn test_add_multiple_pipelines_pruning_requires_processed_values() { + let (mut indexer, _temp_db) = Indexer::new_for_testing().await; + let watermark1 = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10); + watermark1 + .update(&mut indexer.db().connect().await.unwrap()) + .await + .unwrap(); + indexer + .concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default()) + .await + .unwrap(); + assert_eq!(indexer.first_checkpoint_from_watermark, 11); + + let watermark3 = CommitterWatermark::new_for_testing(ConcurrentPipeline3::NAME, 20); + watermark3 + .update(&mut indexer.db().connect().await.unwrap()) + .await + .unwrap(); + let pruner_watermark = PrunerWatermark::new_for_testing(ConcurrentPipeline3::NAME, 5); + assert!(pruner_watermark + .update(&mut indexer.db().connect().await.unwrap()) + .await + .unwrap()); + indexer + .concurrent_pipeline(ConcurrentPipeline3, ConcurrentConfig::default()) + .await + .unwrap(); + assert_eq!(indexer.first_checkpoint_from_watermark, 6); + } +} diff --git a/crates/sui-indexer-alt-framework/src/watermarks.rs b/crates/sui-indexer-alt-framework/src/watermarks.rs index 88f7465a243e5..7710802b96099 100644 --- a/crates/sui-indexer-alt-framework/src/watermarks.rs +++ b/crates/sui-indexer-alt-framework/src/watermarks.rs @@ -101,6 +101,17 @@ impl<'p> CommitterWatermark<'p> { } } + #[cfg(test)] + pub(crate) fn new_for_testing(pipeline: &'p str, checkpoint_hi_inclusive: u64) -> Self { + CommitterWatermark { + pipeline: pipeline.into(), + epoch_hi_inclusive: 0, + checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64, + tx_hi: 0, + timestamp_ms_hi_inclusive: 0, + } + } + /// The consensus timestamp associated with this checkpoint. pub(crate) fn timestamp(&self) -> DateTime { DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive).unwrap_or_default() @@ -185,6 +196,16 @@ impl PrunerWatermark<'static> { } impl<'p> PrunerWatermark<'p> { + #[cfg(test)] + pub(crate) fn new_for_testing(pipeline: &'p str, pruner_hi: u64) -> Self { + PrunerWatermark { + pipeline: pipeline.into(), + wait_for: 0, + reader_lo: 0, + pruner_hi: pruner_hi as i64, + } + } + /// How long to wait before the pruner can act on this information, or `None`, if there is no /// need to wait. pub(crate) fn wait_for(&self) -> Option { diff --git a/crates/sui-indexer-alt/src/consistent_pruning.rs b/crates/sui-indexer-alt/src/consistent_pruning.rs index 5e32d8cd6b662..352f3a4843010 100644 --- a/crates/sui-indexer-alt/src/consistent_pruning.rs +++ b/crates/sui-indexer-alt/src/consistent_pruning.rs @@ -76,3 +76,88 @@ impl PruningLookupTable { Ok(result) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pruning_lookup_table_mutations() { + let table = PruningLookupTable::default(); + let obj1 = ObjectID::random(); + let obj2 = ObjectID::random(); + + // Checkpoint 1: obj1 mutated + let mut info1 = PruningInfo::new(); + info1.add_mutated_object(obj1); + table.insert(1, info1); + + // Checkpoint 2: obj2 mutated + let mut info2 = PruningInfo::new(); + info2.add_mutated_object(obj2); + table.insert(2, info2); + + // Prune checkpoints 1-2 + let result = table.take(1, 2).unwrap(); + assert_eq!(result.len(), 2); + assert_eq!(result[&obj1], 1); + assert_eq!(result[&obj2], 2); + } + + #[test] + fn test_pruning_lookup_table_deletions() { + let table = PruningLookupTable::default(); + let obj = ObjectID::random(); + + // Checkpoint 1: obj mutated + let mut info1 = PruningInfo::new(); + info1.add_mutated_object(obj); + table.insert(1, info1); + + // Checkpoint 2: obj deleted + let mut info2 = PruningInfo::new(); + info2.add_deleted_object(obj); + table.insert(2, info2); + + // Prune checkpoints 1-2 + let result = table.take(1, 2).unwrap(); + assert_eq!(result.len(), 1); + // For deleted objects, we prune up to and including the deletion checkpoint + assert_eq!(result[&obj], 3); + } + + #[test] + fn test_missing_checkpoint() { + let table = PruningLookupTable::default(); + let obj = ObjectID::random(); + + let mut info = PruningInfo::new(); + info.add_mutated_object(obj); + table.insert(1, info); + + // Try to prune checkpoint that doesn't exist in the lookup table. + assert!(table.take(2, 2).is_err()); + } + + #[test] + fn test_multiple_updates() { + let table = PruningLookupTable::default(); + let obj = ObjectID::random(); + + // Checkpoint 1: obj mutated + let mut info1 = PruningInfo::new(); + info1.add_mutated_object(obj); + table.insert(1, info1); + + // Checkpoint 2: obj mutated again + let mut info2 = PruningInfo::new(); + info2.add_mutated_object(obj); + table.insert(2, info2); + + // Prune checkpoints 1-2 + let result = table.take(1, 2).unwrap(); + assert_eq!(result.len(), 1); + // Should use the latest mutation checkpoint + assert_eq!(result[&obj], 2); + } +} diff --git a/crates/sui-indexer-alt/src/handlers/obj_info.rs b/crates/sui-indexer-alt/src/handlers/obj_info.rs index f923250a4e59c..25c85dad75ddc 100644 --- a/crates/sui-indexer-alt/src/handlers/obj_info.rs +++ b/crates/sui-indexer-alt/src/handlers/obj_info.rs @@ -33,6 +33,7 @@ impl Processor for ObjInfo { const NAME: &'static str = "obj_info"; type Value = ProcessedObjInfo; + // TODO: Add tests for this function and the pruner. fn process(&self, checkpoint: &Arc) -> Result> { let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number; let checkpoint_input_objects = checkpoint.checkpoint_input_objects(); diff --git a/crates/sui-pg-db/src/lib.rs b/crates/sui-pg-db/src/lib.rs index d3062848bcef4..b69adeec2debe 100644 --- a/crates/sui-pg-db/src/lib.rs +++ b/crates/sui-pg-db/src/lib.rs @@ -162,6 +162,15 @@ impl Db { } } +impl DbArgs { + pub fn new_for_testing(database_url: Url) -> Self { + Self { + database_url, + ..Default::default() + } + } +} + impl Default for DbArgs { fn default() -> Self { Self {