Skip to content

Commit

Permalink
[indexer] Simplify setting up a test indexer writer or reader (#19663)
Browse files Browse the repository at this point in the history
## Description 

Currently, tests that require spinning up an indexer do so through
`start_test_indexer_impl` or `start_test_indexer`. This is further
complicated by a single `start` function that accepts a
`ReaderWriterConfig`. We can simplify this by exposing two functions
with optional parameters that can be configured by the caller.

Part of a stack of PRs for watermarks
1. simplify setting up test indexer:
#19663
2. update pruner config: #19637
3. committer writes upper bounds
#19649
4. pruner writes lower bounds:
#19650
5. pruner prunes (wip)

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Oct 4, 2024
1 parent d61f3f4 commit be82841
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 151 deletions.
20 changes: 11 additions & 9 deletions crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use sui_config::{PersistedConfig, SUI_KEYSTORE_FILENAME, SUI_NETWORK_CONFIG};
use sui_graphql_rpc::config::{ConnectionConfig, ServiceConfig};
use sui_graphql_rpc::test_infra::cluster::start_graphql_server_with_fn_rpc;
use sui_indexer::tempdb::TempDb;
use sui_indexer::test_utils::{start_test_indexer, ReaderWriterConfig};
use sui_indexer::test_utils::{
start_indexer_jsonrpc_for_testing, start_indexer_writer_for_testing,
};
use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
use sui_sdk::wallet_context::WalletContext;
Expand Down Expand Up @@ -229,22 +231,22 @@ impl Cluster for LocalNewCluster {
let graphql_address = format!("127.0.0.1:{}", get_available_port("127.0.0.1"));
let graphql_url = format!("http://{graphql_address}");

// Start indexer writer
let (_, _, writer_token) = start_test_indexer(
let (_, _, writer_token) = start_indexer_writer_for_testing(
pg_address.clone(),
fullnode_url.clone(),
ReaderWriterConfig::writer_mode(None, None),
data_ingestion_path.path().to_path_buf(),
None,
None,
Some(data_ingestion_path.path().to_path_buf()),
None, /* cancel */
)
.await;
cancellation_tokens.push(writer_token.drop_guard());

// Start indexer jsonrpc service
let (_, _, reader_token) = start_test_indexer(
let (_, reader_token) = start_indexer_jsonrpc_for_testing(
pg_address.clone(),
fullnode_url.clone(),
ReaderWriterConfig::reader_mode(indexer_jsonrpc_address.clone()),
data_ingestion_path.path().to_path_buf(),
indexer_jsonrpc_address.clone(),
None, /* cancel */
)
.await;
cancellation_tokens.push(reader_token.drop_guard());
Expand Down
24 changes: 13 additions & 11 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_graphql_rpc_client::simple_client::SimpleClient;
use sui_indexer::config::PruningOptions;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::PgIndexerStore;
use sui_indexer::tempdb::get_available_port;
use sui_indexer::tempdb::TempDb;
use sui_indexer::test_utils::start_test_indexer_impl;
use sui_indexer::test_utils::ReaderWriterConfig;
use sui_indexer::test_utils::start_indexer_writer_for_testing;
use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
use sui_types::storage::RestStateReader;
use tempfile::tempdir;
Expand Down Expand Up @@ -126,12 +126,12 @@ pub async fn start_network_cluster() -> NetworkCluster {
let val_fn = start_validator_with_fullnode(data_ingestion_path.path().to_path_buf()).await;

// Starts indexer
let (pg_store, pg_handle) = start_test_indexer_impl(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
db_url,
val_fn.rpc_url().to_string(),
ReaderWriterConfig::writer_mode(None, None),
None,
None,
Some(data_ingestion_path.path().to_path_buf()),
cancellation_token.clone(),
Some(cancellation_token.clone()),
)
.await;

Expand Down Expand Up @@ -179,12 +179,14 @@ pub async fn serve_executor(
.await;
});

let (pg_store, pg_handle) = start_test_indexer_impl(
let snapshot_config = snapshot_config.unwrap_or_default();

let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
db_url,
format!("http://{}", executor_server_url),
ReaderWriterConfig::writer_mode(snapshot_config.clone(), epochs_to_keep),
Some(snapshot_config.clone()),
Some(PruningOptions { epochs_to_keep }),
Some(data_ingestion_path),
cancellation_token.clone(),
Some(cancellation_token.clone()),
)
.await;

Expand All @@ -211,7 +213,7 @@ pub async fn serve_executor(
indexer_join_handle: pg_handle,
graphql_server_join_handle: graphql_server_handle,
graphql_client: client,
snapshot_config: snapshot_config.unwrap_or_default(),
snapshot_config,
graphql_connection_config,
cancellation_token,
database,
Expand Down
24 changes: 4 additions & 20 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,6 @@ use crate::store::{IndexerStore, PgIndexerStore};
pub struct Indexer;

impl Indexer {
#[cfg(test)]
pub async fn start_writer_for_testing(
config: &IngestionConfig,
store: PgIndexerStore,
metrics: IndexerMetrics,
) -> Result<(), IndexerError> {
let snapshot_config = SnapshotLagConfig::default();
Indexer::start_writer(
config,
store,
metrics,
snapshot_config,
PruningOptions::default(),
CancellationToken::new(),
)
.await
}

pub async fn start_writer(
config: &IngestionConfig,
store: PgIndexerStore,
Expand Down Expand Up @@ -94,7 +76,8 @@ impl Indexer {
);
assert!(epochs_to_keep > 0, "Epochs to keep must be positive");
let pruner = Pruner::new(store.clone(), epochs_to_keep, metrics.clone())?;
spawn_monitored_task!(pruner.start(CancellationToken::new()));
let cancel_clone = cancel.clone();
spawn_monitored_task!(pruner.start(cancel_clone));
}

// If we already have chain identifier indexed (i.e. the first checkpoint has been indexed),
Expand Down Expand Up @@ -183,13 +166,14 @@ impl Indexer {
config: &JsonRpcConfig,
registry: &Registry,
pool: ConnectionPool,
cancel: CancellationToken,
) -> Result<(), IndexerError> {
info!(
"Sui Indexer Reader (version {:?}) started...",
env!("CARGO_PKG_VERSION")
);
let indexer_reader = IndexerReader::new(pool);
let handle = build_json_rpc_server(registry, indexer_reader, config)
let handle = build_json_rpc_server(registry, indexer_reader, config, cancel)
.await
.expect("Json rpc server should not run into errors upon start.");
tokio::spawn(async move { handle.stopped().await })
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub async fn build_json_rpc_server(
prometheus_registry: &Registry,
reader: IndexerReader,
config: &JsonRpcConfig,
cancel: CancellationToken,
) -> Result<ServerHandle, IndexerError> {
let mut builder =
JsonRpcServerBuilder::new(env!("CARGO_PKG_VERSION"), prometheus_registry, None, None);
Expand All @@ -65,7 +66,6 @@ pub async fn build_json_rpc_server(
builder.register_module(CoinReadApi::new(reader.clone()))?;
builder.register_module(ExtendedApi::new(reader.clone()))?;

let cancel = CancellationToken::new();
let system_package_task =
SystemPackageTask::new(reader.clone(), cancel.clone(), Duration::from_secs(10));

Expand Down
3 changes: 2 additions & 1 deletion crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ async fn main() -> anyhow::Result<()> {
Command::JsonRpcService(json_rpc_config) => {
check_db_migration_consistency(&mut pool.get().await?).await?;

Indexer::start_reader(&json_rpc_config, &registry, pool).await?;
Indexer::start_reader(&json_rpc_config, &registry, pool, CancellationToken::new())
.await?;
}
Command::ResetDatabase { force } => {
if !force {
Expand Down
Loading

0 comments on commit be82841

Please sign in to comment.