Skip to content

Commit

Permalink
feat(iota-indexer): Refactor ExtendedApi tests in indexer to use shar…
Browse files Browse the repository at this point in the history
…ed test cluster (#3018)

* feat(iota-indexer): Refactor ExtendedApi tests in indexer to use shared test cluster

* Reuse function for finding available port/socket

* Get rid of the `replace_db_name` function

* Use &str instad of String for database_name where possible

* Remove SimulacrumApiTestEnvDefinition type, rename InitializedSimulacrumEnv type

* Fixes after rebase on recent feature branch

* Update README to run tests with `--test-threads 1`
  • Loading branch information
tomxey authored and sergiupopescu199 committed Oct 31, 2024
1 parent 967ea66 commit f642074
Show file tree
Hide file tree
Showing 8 changed files with 478 additions and 425 deletions.
2 changes: 2 additions & 0 deletions crates/iota-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ impl Cluster for LocalNewCluster {
fullnode_url.clone(),
ReaderWriterConfig::writer_mode(None),
data_ingestion_path.clone(),
None,
)
.await;

Expand All @@ -274,6 +275,7 @@ impl Cluster for LocalNewCluster {
fullnode_url.clone(),
ReaderWriterConfig::reader_mode(indexer_address.to_string()),
data_ingestion_path,
None,
)
.await;
}
Expand Down
2 changes: 2 additions & 0 deletions crates/iota-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub async fn start_cluster(
true,
Some(data_ingestion_path),
cancellation_token.clone(),
None,
)
.await;

Expand Down Expand Up @@ -137,6 +138,7 @@ pub async fn serve_executor(
true,
Some(data_ingestion_path),
cancellation_token.clone(),
Some(&graphql_connection_config.db_name()),
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion crates/iota-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ The crate provides following tests currently:
# run tests requiring only postgres integration
cargo nextest --features pg_integration --test-threads 1
# run rpc tests with shared runtime
cargo test --features shared_test_runtime
cargo test --features shared_test_runtime -- --test-threads 1
```

For a better testing experience is possible to use [nextest](https://nexte.st/)
Expand Down
12 changes: 10 additions & 2 deletions crates/iota-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub async fn start_test_indexer<T: R2D2Connection + Send + 'static>(
rpc_url: String,
reader_writer_config: ReaderWriterConfig,
data_ingestion_path: PathBuf,
new_database: Option<&str>,
) -> (PgIndexerStore<T>, JoinHandle<Result<(), IndexerError>>) {
start_test_indexer_impl(
db_url,
Expand All @@ -54,6 +55,7 @@ pub async fn start_test_indexer<T: R2D2Connection + Send + 'static>(
false,
Some(data_ingestion_path),
CancellationToken::new(),
new_database,
)
.await
}
Expand All @@ -65,17 +67,23 @@ pub async fn start_test_indexer_impl<T: R2D2Connection + 'static>(
db_url: Option<String>,
rpc_url: String,
reader_writer_config: ReaderWriterConfig,
reset_database: bool,
mut reset_database: bool,
data_ingestion_path: Option<PathBuf>,
cancel: CancellationToken,
new_database: Option<&str>,
) -> (PgIndexerStore<T>, JoinHandle<Result<(), IndexerError>>) {
let db_url = db_url.unwrap_or_else(|| {
let mut db_url = db_url.unwrap_or_else(|| {
let pg_host = env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".into());
let pg_port = env::var("POSTGRES_PORT").unwrap_or_else(|_| "32770".into());
let pw = env::var("POSTGRES_PASSWORD").unwrap_or_else(|_| "postgrespw".into());
format!("postgres://postgres:{pw}@{pg_host}:{pg_port}")
});

if let Some(new_database) = new_database {
db_url = replace_db_name(&db_url, new_database).0;
reset_database = true;
};

let mut config = IndexerConfig {
db_url: Some(db_url.clone().into()),
rpc_client_url: rpc_url,
Expand Down
124 changes: 93 additions & 31 deletions crates/iota-indexer/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use std::{
};

use diesel::PgConnection;
use iota_config::node::RunWithRange;
use iota_config::{
local_ip_utils::{get_available_port, new_local_tcp_socket_for_testing},
node::RunWithRange,
};
use iota_indexer::{
IndexerConfig,
errors::IndexerError,
Expand All @@ -33,7 +36,8 @@ use tempfile::tempdir;
use test_cluster::{TestCluster, TestClusterBuilder};
use tokio::{runtime::Runtime, task::JoinHandle};

const DEFAULT_DB_URL: &str = "postgres://postgres:postgrespw@localhost:5432/iota_indexer";
const POSTGRES_URL: &str = "postgres://postgres:postgrespw@localhost:5432";
const DEFAULT_DB: &str = "iota_indexer";
const DEFAULT_INDEXER_IP: &str = "127.0.0.1";
const DEFAULT_INDEXER_PORT: u16 = 9005;
const DEFAULT_SERVER_PORT: u16 = 3000;
Expand All @@ -53,8 +57,9 @@ impl ApiTestSetup {
GLOBAL_API_TEST_SETUP.get_or_init(|| {
let runtime = tokio::runtime::Runtime::new().unwrap();

let (cluster, store, client) =
runtime.block_on(start_test_cluster_with_read_write_indexer(None));
let (cluster, store, client) = runtime.block_on(
start_test_cluster_with_read_write_indexer(None, Some("shared_test_indexer_db")),
);

Self {
runtime,
Expand All @@ -66,10 +71,50 @@ impl ApiTestSetup {
}
}

pub struct SimulacrumTestSetup {
pub runtime: Runtime,
pub sim: Arc<Simulacrum>,
pub store: PgIndexerStore<PgConnection>,
/// Indexer RPC Client
pub client: HttpClient,
}

impl SimulacrumTestSetup {
pub fn get_or_init<'a>(
unique_env_name: &str,
env_initializer: impl Fn(PathBuf) -> Simulacrum,
initialized_env_container: &'a OnceLock<SimulacrumTestSetup>,
) -> &'a SimulacrumTestSetup {
initialized_env_container.get_or_init(|| {
let runtime = tokio::runtime::Runtime::new().unwrap();
let data_ingestion_path = tempdir().unwrap().into_path();

let sim = env_initializer(data_ingestion_path.clone());
let sim = Arc::new(sim);

let db_name = format!("simulacrum_env_db_{}", unique_env_name);
let (_, store, _, client) =
runtime.block_on(start_simulacrum_rest_api_with_read_write_indexer(
sim.clone(),
data_ingestion_path,
Some(&db_name),
));

SimulacrumTestSetup {
runtime,
sim,
store,
client,
}
})
}
}

/// Start a [`TestCluster`][`test_cluster::TestCluster`] with a `Read` &
/// `Write` indexer
pub async fn start_test_cluster_with_read_write_indexer(
stop_cluster_after_checkpoint_seq: Option<u64>,
database_name: Option<&str>,
) -> (TestCluster, PgIndexerStore<PgConnection>, HttpClient) {
let temp = tempdir().unwrap().into_path();
let mut builder = TestClusterBuilder::new().with_data_ingestion_dir(temp.clone());
Expand All @@ -85,26 +130,32 @@ pub async fn start_test_cluster_with_read_write_indexer(

// start indexer in write mode
let (pg_store, _pg_store_handle) = start_test_indexer(
Some(DEFAULT_DB_URL.to_owned()),
Some(get_indexer_db_url(None)),
cluster.rpc_url().to_string(),
ReaderWriterConfig::writer_mode(None),
temp.clone(),
database_name,
)
.await;

// start indexer in read mode
start_indexer_reader(cluster.rpc_url().to_owned(), temp);
let indexer_port = start_indexer_reader(cluster.rpc_url().to_owned(), temp, database_name);

// create an RPC client by using the indexer url
let rpc_client = HttpClientBuilder::default()
.build(format!(
"http://{DEFAULT_INDEXER_IP}:{DEFAULT_INDEXER_PORT}"
))
.build(format!("http://{DEFAULT_INDEXER_IP}:{indexer_port}"))
.unwrap();

(cluster, pg_store, rpc_client)
}

fn get_indexer_db_url(database_name: Option<&str>) -> String {
database_name.map_or_else(
|| format!("{POSTGRES_URL}/{DEFAULT_DB}"),
|db_name| format!("{POSTGRES_URL}/{db_name}"),
)
}

/// Wait for the indexer to catch up to the given checkpoint sequence number
///
/// Indexer starts storing data after checkpoint 0
Expand Down Expand Up @@ -192,24 +243,31 @@ pub async fn indexer_wait_for_transaction(
}

/// Start an Indexer instance in `Read` mode
fn start_indexer_reader(fullnode_rpc_url: impl Into<String>, data_ingestion_path: PathBuf) {
fn start_indexer_reader(
fullnode_rpc_url: impl Into<String>,
data_ingestion_path: PathBuf,
database_name: Option<&str>,
) -> u16 {
let db_url = get_indexer_db_url(database_name);
let port = get_available_port(DEFAULT_INDEXER_IP);
let config = IndexerConfig {
db_url: Some(DEFAULT_DB_URL.to_owned().into()),
db_url: Some(db_url.clone().into()),
rpc_client_url: fullnode_rpc_url.into(),
reset_db: true,
rpc_server_worker: true,
rpc_server_url: DEFAULT_INDEXER_IP.to_owned(),
rpc_server_port: DEFAULT_INDEXER_PORT,
rpc_server_port: port,
data_ingestion_path: Some(data_ingestion_path),
..Default::default()
};

let registry = prometheus::Registry::default();
init_metrics(&registry);

tokio::spawn(async move {
Indexer::start_reader::<PgConnection>(&config, &registry, DEFAULT_DB_URL.to_owned()).await
});
tokio::spawn(
async move { Indexer::start_reader::<PgConnection>(&config, &registry, db_url).await },
);
port
}

/// Check if provided error message does match with
Expand All @@ -228,35 +286,31 @@ pub fn rpc_call_error_msg_matches<T>(
})
}

pub fn get_default_fullnode_rpc_api_addr() -> SocketAddr {
format!("127.0.0.1:{}", DEFAULT_SERVER_PORT)
.parse()
.unwrap()
}

/// Set up a test indexer fetching from a REST endpoint served by the given
/// Simulacrum.
pub async fn start_simulacrum_rest_api_with_write_indexer(
sim: Arc<Simulacrum>,
data_ingestion_path: PathBuf,
server_url: Option<SocketAddr>,
database_name: Option<&str>,
) -> (
JoinHandle<()>,
PgIndexerStore<PgConnection>,
JoinHandle<Result<(), IndexerError>>,
) {
let server_url = get_default_fullnode_rpc_api_addr();

let server_url = server_url.unwrap_or_else(new_local_tcp_socket_for_testing);
let server_handle = tokio::spawn(async move {
iota_rest_api::RestService::new_without_version(sim)
.start_service(server_url)
.await;
});
// Starts indexer
let (pg_store, pg_handle) = start_test_indexer(
Some(DEFAULT_DB_URL.to_owned()),
Some(get_indexer_db_url(None)),
format!("http://{}", server_url),
ReaderWriterConfig::writer_mode(None),
data_ingestion_path,
database_name,
)
.await;
(server_handle, pg_store, pg_handle)
Expand All @@ -265,24 +319,32 @@ pub async fn start_simulacrum_rest_api_with_write_indexer(
pub async fn start_simulacrum_rest_api_with_read_write_indexer(
sim: Arc<Simulacrum>,
data_ingestion_path: PathBuf,
database_name: Option<&str>,
) -> (
JoinHandle<()>,
PgIndexerStore<PgConnection>,
JoinHandle<Result<(), IndexerError>>,
HttpClient,
) {
let server_url = get_default_fullnode_rpc_api_addr();
let (server_handle, pg_store, pg_handle) =
start_simulacrum_rest_api_with_write_indexer(sim, data_ingestion_path.clone()).await;
let simulacrum_server_url = new_local_tcp_socket_for_testing();
let (server_handle, pg_store, pg_handle) = start_simulacrum_rest_api_with_write_indexer(
sim,
data_ingestion_path.clone(),
Some(simulacrum_server_url),
database_name,
)
.await;

// start indexer in read mode
start_indexer_reader(format!("http://{}", server_url), data_ingestion_path);
let indexer_port = start_indexer_reader(
format!("http://{}", simulacrum_server_url),
data_ingestion_path,
database_name,
);

// create an RPC client by using the indexer url
let rpc_client = HttpClientBuilder::default()
.build(format!(
"http://{DEFAULT_INDEXER_IP}:{DEFAULT_INDEXER_PORT}"
))
.build(format!("http://{DEFAULT_INDEXER_IP}:{indexer_port}"))
.unwrap();

(server_handle, pg_store, pg_handle, rpc_client)
Expand Down
18 changes: 14 additions & 4 deletions crates/iota-indexer/tests/ingestion_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ mod ingestion_tests {
// Create a checkpoint which should include the transaction we executed.
let checkpoint = sim.create_checkpoint();

let (_, pg_store, _) =
start_simulacrum_rest_api_with_write_indexer(Arc::new(sim), data_ingestion_path).await;
let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
Arc::new(sim),
data_ingestion_path,
None,
Some("indexer_ingestion_tests_db"),
)
.await;

indexer_wait_for_checkpoint(&pg_store, 1).await;

Expand Down Expand Up @@ -97,8 +102,13 @@ mod ingestion_tests {
// Create a checkpoint which should include the transaction we executed.
let _ = sim.create_checkpoint();

let (_, pg_store, _) =
start_simulacrum_rest_api_with_write_indexer(Arc::new(sim), data_ingestion_path).await;
let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
Arc::new(sim),
data_ingestion_path,
None,
Some("indexer_ingestion_tests_db"),
)
.await;

indexer_wait_for_checkpoint(&pg_store, 1).await;

Expand Down
Loading

0 comments on commit f642074

Please sign in to comment.