Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Dec 26, 2024
1 parent b9e9130 commit 52ea442
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 2 deletions.
1 change: 1 addition & 0 deletions crates/sui-indexer-alt-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ futures.workspace = true
prometheus.workspace = true
reqwest.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
Expand Down
154 changes: 152 additions & 2 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use pipeline::{
sequential::{self, SequentialConfig},
Processor,
};
use sui_pg_db::{Db, DbArgs};
use sui_pg_db::{temp::TempDb, Db, DbArgs};
use task::graceful_shutdown;
use tempfile::tempdir;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
Expand Down Expand Up @@ -175,6 +176,25 @@ impl Indexer {
})
}

pub async fn new_for_testing() -> (Self, TempDb) {
let temp_db = TempDb::new().unwrap();
let db_args = DbArgs::new_for_testing(temp_db.database().url().clone());
let indexer = Indexer::new(
db_args,
IndexerArgs::default(),
ClientArgs {
remote_store_url: None,
local_ingestion_path: Some(tempdir().unwrap().into_path()),
},
IngestionConfig::default(),
&MIGRATIONS,
CancellationToken::new(),
)
.await
.unwrap();
(indexer, temp_db)
}

/// The database connection pool used by the indexer.
pub fn db(&self) -> &Db {
&self.db
Expand Down Expand Up @@ -404,7 +424,6 @@ impl Indexer {
.unwrap_or_default()
};

// TODO(amnn): Test this (depends on supporting migrations and tempdb).
self.first_checkpoint_from_watermark =
expected_first_checkpoint.min(self.first_checkpoint_from_watermark);

Expand All @@ -423,3 +442,134 @@ impl Default for IndexerArgs {
}
}
}

#[cfg(test)]
mod tests {
use async_trait::async_trait;
use sui_field_count::FieldCount;
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;

use super::*;

#[derive(FieldCount)]
struct V {
_v: u64,
}

macro_rules! define_test_concurrent_pipeline {
($name:ident) => {
define_test_concurrent_pipeline!($name, false);
};
($name:ident, $pruning_requires_processed_values:expr) => {
struct $name;
impl Processor for $name {
const NAME: &'static str = stringify!($name);
const PRUNING_REQUIRES_PROCESSED_VALUES: bool = $pruning_requires_processed_values;
type Value = V;
fn process(
&self,
_checkpoint: &Arc<CheckpointData>,
) -> anyhow::Result<Vec<Self::Value>> {
todo!()
}
}

#[async_trait]
impl concurrent::Handler for $name {
async fn commit(
_values: &[Self::Value],
_conn: &mut db::Connection<'_>,
) -> anyhow::Result<usize> {
todo!()
}
}
};
}

define_test_concurrent_pipeline!(ConcurrentPipeline1);
define_test_concurrent_pipeline!(ConcurrentPipeline2);
define_test_concurrent_pipeline!(ConcurrentPipeline3, true);

#[tokio::test]
async fn test_add_new_pipeline() {
let (mut indexer, _temp_db) = Indexer::new_for_testing().await;
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 1);
}

#[tokio::test]
async fn test_add_existing_pipeline() {
let (mut indexer, _temp_db) = Indexer::new_for_testing().await;
let watermark = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10);
watermark
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 11);
}

#[tokio::test]
async fn test_add_multiple_pipelines() {
let (mut indexer, _temp_db) = Indexer::new_for_testing().await;
let watermark1 = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10);
watermark1
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();
let watermark2 = CommitterWatermark::new_for_testing(ConcurrentPipeline2::NAME, 20);
watermark2
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();

indexer
.concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 21);
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 11);
}

#[tokio::test]
async fn test_add_multiple_pipelines_pruning_requires_processed_values() {
let (mut indexer, _temp_db) = Indexer::new_for_testing().await;
let watermark1 = CommitterWatermark::new_for_testing(ConcurrentPipeline1::NAME, 10);
watermark1
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 11);

let watermark3 = CommitterWatermark::new_for_testing(ConcurrentPipeline3::NAME, 20);
watermark3
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap();
let pruner_watermark = PrunerWatermark::new_for_testing(ConcurrentPipeline3::NAME, 5);
assert!(pruner_watermark
.update(&mut indexer.db().connect().await.unwrap())
.await
.unwrap());
indexer
.concurrent_pipeline(ConcurrentPipeline3, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 6);
}
}
21 changes: 21 additions & 0 deletions crates/sui-indexer-alt-framework/src/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ impl<'p> CommitterWatermark<'p> {
}
}

#[cfg(test)]
pub(crate) fn new_for_testing(pipeline: &'p str, checkpoint_hi_inclusive: u64) -> Self {
CommitterWatermark {
pipeline: pipeline.into(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
}
}

/// The consensus timestamp associated with this checkpoint.
pub(crate) fn timestamp(&self) -> DateTime<Utc> {
DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive).unwrap_or_default()
Expand Down Expand Up @@ -185,6 +196,16 @@ impl PrunerWatermark<'static> {
}

impl<'p> PrunerWatermark<'p> {
#[cfg(test)]
pub(crate) fn new_for_testing(pipeline: &'p str, pruner_hi: u64) -> Self {
PrunerWatermark {
pipeline: pipeline.into(),
wait_for: 0,
reader_lo: 0,
pruner_hi: pruner_hi as i64,
}
}

/// How long to wait before the pruner can act on this information, or `None`, if there is no
/// need to wait.
pub(crate) fn wait_for(&self) -> Option<Duration> {
Expand Down
85 changes: 85 additions & 0 deletions crates/sui-indexer-alt/src/consistent_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,88 @@ impl PruningLookupTable {
Ok(result)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_pruning_lookup_table_mutations() {
let table = PruningLookupTable::default();
let obj1 = ObjectID::random();
let obj2 = ObjectID::random();

// Checkpoint 1: obj1 mutated
let mut info1 = PruningInfo::new();
info1.add_mutated_object(obj1);
table.insert(1, info1);

// Checkpoint 2: obj2 mutated
let mut info2 = PruningInfo::new();
info2.add_mutated_object(obj2);
table.insert(2, info2);

// Prune checkpoints 1-2
let result = table.take(1, 2).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[&obj1], 1);
assert_eq!(result[&obj2], 2);
}

#[test]
fn test_pruning_lookup_table_deletions() {
let table = PruningLookupTable::default();
let obj = ObjectID::random();

// Checkpoint 1: obj mutated
let mut info1 = PruningInfo::new();
info1.add_mutated_object(obj);
table.insert(1, info1);

// Checkpoint 2: obj deleted
let mut info2 = PruningInfo::new();
info2.add_deleted_object(obj);
table.insert(2, info2);

// Prune checkpoints 1-2
let result = table.take(1, 2).unwrap();
assert_eq!(result.len(), 1);
// For deleted objects, we prune up to and including the deletion checkpoint
assert_eq!(result[&obj], 3);
}

#[test]
fn test_missing_checkpoint() {
let table = PruningLookupTable::default();
let obj = ObjectID::random();

let mut info = PruningInfo::new();
info.add_mutated_object(obj);
table.insert(1, info);

// Try to prune checkpoint that doesn't exist in the lookup table.
assert!(table.take(2, 2).is_err());
}

#[test]
fn test_multiple_updates() {
let table = PruningLookupTable::default();
let obj = ObjectID::random();

// Checkpoint 1: obj mutated
let mut info1 = PruningInfo::new();
info1.add_mutated_object(obj);
table.insert(1, info1);

// Checkpoint 2: obj mutated again
let mut info2 = PruningInfo::new();
info2.add_mutated_object(obj);
table.insert(2, info2);

// Prune checkpoints 1-2
let result = table.take(1, 2).unwrap();
assert_eq!(result.len(), 1);
// Should use the latest mutation checkpoint
assert_eq!(result[&obj], 2);
}
}
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/obj_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl Processor for ObjInfo {
const NAME: &'static str = "obj_info";
type Value = ProcessedObjInfo;

// TODO: Add tests for this function and the pruner.
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number;
let checkpoint_input_objects = checkpoint.checkpoint_input_objects();
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-pg-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ impl Db {
}
}

impl DbArgs {
pub fn new_for_testing(database_url: Url) -> Self {
Self {
database_url,
..Default::default()
}
}
}

impl Default for DbArgs {
fn default() -> Self {
Self {
Expand Down

0 comments on commit 52ea442

Please sign in to comment.