Skip to content

Commit

Permalink
feat(iota-indexer): Refactor ExtendedApi tests in indexer to use shar…
Browse files Browse the repository at this point in the history
…ed test cluster
  • Loading branch information
tomxey committed Oct 10, 2024
1 parent a84eda7 commit e0539ec
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 395 deletions.
2 changes: 2 additions & 0 deletions crates/iota-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl Cluster for LocalNewCluster {
Some(pg_address.clone()),
fullnode_url.clone(),
ReaderWriterConfig::writer_mode(None),
None,
)
.await;

Expand All @@ -267,6 +268,7 @@ impl Cluster for LocalNewCluster {
Some(pg_address),
fullnode_url.clone(),
ReaderWriterConfig::reader_mode(indexer_address.to_string()),
None,
)
.await;
}
Expand Down
1 change: 1 addition & 0 deletions crates/iota-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub async fn start_cluster(
Some(db_url),
val_fn.rpc_url().to_string(),
ReaderWriterConfig::writer_mode(None),
None,
)
.await;

Expand Down
3 changes: 2 additions & 1 deletion crates/iota-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ pub async fn start_test_indexer(
db_url: Option<String>,
rpc_url: String,
reader_writer_config: ReaderWriterConfig,
new_database: Option<String>,
) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
start_test_indexer_impl(db_url, rpc_url, reader_writer_config, None).await
start_test_indexer_impl(db_url, rpc_url, reader_writer_config, new_database).await
}

pub async fn start_test_indexer_impl(
Expand Down
108 changes: 84 additions & 24 deletions crates/iota-indexer/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
net::SocketAddr,
net::{SocketAddr, TcpListener},
sync::{Arc, OnceLock},
time::Duration,
};
Expand Down Expand Up @@ -46,7 +46,10 @@ impl ApiTestSetup {
let runtime = tokio::runtime::Runtime::new().unwrap();

let (cluster, store, client) =
runtime.block_on(start_test_cluster_with_read_write_indexer(None));
runtime.block_on(start_test_cluster_with_read_write_indexer(
None,
Some("shared_test_indexer_db".to_string()),
));

Self {
runtime,
Expand All @@ -58,10 +61,47 @@ impl ApiTestSetup {
}
}

pub struct SimulacrumApiTestEnvDefinition {
pub unique_env_name: String,
pub env_initializer: Box<dyn Fn() -> Simulacrum>,
}

pub struct InitializedSimulacrumEnv {
pub runtime: Runtime,
pub sim: Arc<Simulacrum>,
pub store: PgIndexerStore,
/// Indexer RPC Client
pub client: HttpClient,
}

impl SimulacrumApiTestEnvDefinition {
pub fn get_or_init_env<'a>(
&self,
initialized_env_container: &'a OnceLock<InitializedSimulacrumEnv>,
) -> &'a InitializedSimulacrumEnv {
initialized_env_container.get_or_init(|| {
let runtime = tokio::runtime::Runtime::new().unwrap();
let sim = Arc::new((self.env_initializer)());
let db_name = format!("simulacrum_env_db_{}", self.unique_env_name);
let (_, store, _, client) = runtime.block_on(
start_simulacrum_rest_api_with_read_write_indexer(sim.clone(), Some(db_name)),
);

InitializedSimulacrumEnv {
runtime,
sim,
store,
client,
}
})
}
}

/// Start a [`TestCluster`][`test_cluster::TestCluster`] with a `Read` &
/// `Write` indexer
pub async fn start_test_cluster_with_read_write_indexer(
stop_cluster_after_checkpoint_seq: Option<u64>,
database_name: Option<String>,
) -> (TestCluster, PgIndexerStore, HttpClient) {
let mut builder = TestClusterBuilder::new();

Expand All @@ -79,17 +119,16 @@ pub async fn start_test_cluster_with_read_write_indexer(
Some(DEFAULT_DB_URL.to_owned()),
cluster.rpc_url().to_string(),
ReaderWriterConfig::writer_mode(None),
database_name.clone(),
)
.await;

// start indexer in read mode
start_indexer_reader(cluster.rpc_url().to_owned());
let indexer_port = start_indexer_reader(cluster.rpc_url().to_owned(), database_name);

// create an RPC client by using the indexer url
let rpc_client = HttpClientBuilder::default()
.build(format!(
"http://{DEFAULT_INDEXER_IP}:{DEFAULT_INDEXER_PORT}"
))
.build(format!("http://{DEFAULT_INDEXER_IP}:{indexer_port}"))
.unwrap();

(cluster, pg_store, rpc_client)
Expand Down Expand Up @@ -117,24 +156,44 @@ pub async fn indexer_wait_for_checkpoint(
.expect("Timeout waiting for indexer to catchup to checkpoint");
}

fn get_available_port() -> u16 {
// Let the OS assign some available port, read it, and then make it available
// again.
// This results in a race condition, some other app can use this port in a short
// time window between this function call and a place where we use it
let tl = TcpListener::bind(("127.0.0.1", 0)).unwrap();
tl.local_addr().unwrap().port()
}

fn replace_db_name(db_url: &str, new_db_name: &str) -> String {
let pos = db_url.rfind('/').expect("Unable to find / in db_url");
format!("{}/{}", &db_url[..pos], new_db_name)
}

/// Start an Indexer instance in `Read` mode
fn start_indexer_reader(fullnode_rpc_url: impl Into<String>) {
fn start_indexer_reader(fullnode_rpc_url: impl Into<String>, database_name: Option<String>) -> u16 {
let db_url = match database_name {
Some(database_name) => replace_db_name(&DEFAULT_DB_URL.to_owned(), &database_name),
None => DEFAULT_DB_URL.to_owned(),
};

let port = get_available_port();
let config = IndexerConfig {
db_url: Some(DEFAULT_DB_URL.to_owned()),
db_url: Some(db_url.clone()),
rpc_client_url: fullnode_rpc_url.into(),
reset_db: true,
rpc_server_worker: true,
rpc_server_url: DEFAULT_INDEXER_IP.to_owned(),
rpc_server_port: DEFAULT_INDEXER_PORT,
rpc_server_port: port,
..Default::default()
};

let registry = prometheus::Registry::default();
init_metrics(&registry);

tokio::spawn(async move {
Indexer::start_reader(&config, &registry, DEFAULT_DB_URL.to_owned()).await
});
tokio::spawn(async move { Indexer::start_reader(&config, &registry, db_url).await });

port
}

/// Check if provided error message does match with
Expand All @@ -153,23 +212,23 @@ pub fn rpc_call_error_msg_matches<T>(
})
}

pub fn get_default_fullnode_rpc_api_addr() -> SocketAddr {
format!("127.0.0.1:{}", DEFAULT_SERVER_PORT)
.parse()
.unwrap()
pub fn get_available_fullnode_rpc_api_addr() -> SocketAddr {
let port = get_available_port();
format!("127.0.0.1:{}", port).parse().unwrap()
}

/// Set up a test indexer fetching from a REST endpoint served by the given
/// Simulacrum.
pub async fn start_simulacrum_rest_api_with_write_indexer(
sim: Arc<Simulacrum>,
server_url: Option<SocketAddr>,
database_name: Option<String>,
) -> (
JoinHandle<()>,
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
) {
let server_url = get_default_fullnode_rpc_api_addr();

let server_url = server_url.unwrap_or_else(get_available_fullnode_rpc_api_addr);
let server_handle = tokio::spawn(async move {
let chain_id = (*sim
.get_checkpoint_by_sequence_number(0)
Expand All @@ -187,31 +246,32 @@ pub async fn start_simulacrum_rest_api_with_write_indexer(
Some(DEFAULT_DB_URL.to_owned()),
format!("http://{}", server_url),
ReaderWriterConfig::writer_mode(None),
database_name,
)
.await;
(server_handle, pg_store, pg_handle)
}

pub async fn start_simulacrum_rest_api_with_read_write_indexer(
sim: Arc<Simulacrum>,
database_name: Option<String>,
) -> (
JoinHandle<()>,
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
HttpClient,
) {
let server_url = get_default_fullnode_rpc_api_addr();
let server_url = get_available_fullnode_rpc_api_addr();
let (server_handle, pg_store, pg_handle) =
start_simulacrum_rest_api_with_write_indexer(sim).await;
start_simulacrum_rest_api_with_write_indexer(sim, Some(server_url), database_name.clone())
.await;

// start indexer in read mode
start_indexer_reader(format!("http://{}", server_url));
let indexer_port = start_indexer_reader(format!("http://{}", server_url), database_name);

// create an RPC client by using the indexer url
let rpc_client = HttpClientBuilder::default()
.build(format!(
"http://{DEFAULT_INDEXER_IP}:{DEFAULT_INDEXER_PORT}"
))
.build(format!("http://{DEFAULT_INDEXER_IP}:{indexer_port}"))
.unwrap();

(server_handle, pg_store, pg_handle, rpc_client)
Expand Down
7 changes: 6 additions & 1 deletion crates/iota-indexer/tests/ingestion_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ mod ingestion_tests {
// Create a checkpoint which should include the transaction we executed.
let checkpoint = sim.create_checkpoint();

let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(Arc::new(sim)).await;
let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
Arc::new(sim),
None,
Some("indexer_ingestion_tests_db".to_string()),
)
.await;

// Wait for the indexer to catch up to the checkpoint.
indexer_wait_for_checkpoint(&pg_store, 1).await;
Expand Down
Loading

0 comments on commit e0539ec

Please sign in to comment.