Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iota-indexer): Refactor ExtendedApi tests in indexer to use shared test cluster #3018

2 changes: 2 additions & 0 deletions crates/iota-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ impl Cluster for LocalNewCluster {
fullnode_url.clone(),
ReaderWriterConfig::writer_mode(None),
data_ingestion_path.clone(),
None,
)
.await;

Expand All @@ -274,6 +275,7 @@ impl Cluster for LocalNewCluster {
fullnode_url.clone(),
ReaderWriterConfig::reader_mode(indexer_address.to_string()),
data_ingestion_path,
None,
)
.await;
}
Expand Down
2 changes: 2 additions & 0 deletions crates/iota-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub async fn start_cluster(
true,
Some(data_ingestion_path),
cancellation_token.clone(),
None,
)
.await;

Expand Down Expand Up @@ -137,6 +138,7 @@ pub async fn serve_executor(
true,
Some(data_ingestion_path),
cancellation_token.clone(),
Some(&graphql_connection_config.db_name()),
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion crates/iota-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ The crate provides following tests currently:
# run tests requiring only postgres integration
cargo test --features pg_integration -- --test-threads 1
# run rpc tests with shared runtime
cargo test --features shared_test_runtime
cargo test --features shared_test_runtime -- --test-threads 1
```

For a better testing experience is possible to use [nextest](https://nexte.st/)
Expand Down
12 changes: 10 additions & 2 deletions crates/iota-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub async fn start_test_indexer<T: R2D2Connection + Send + 'static>(
rpc_url: String,
reader_writer_config: ReaderWriterConfig,
data_ingestion_path: PathBuf,
new_database: Option<&str>,
) -> (PgIndexerStore<T>, JoinHandle<Result<(), IndexerError>>) {
start_test_indexer_impl(
db_url,
Expand All @@ -54,6 +55,7 @@ pub async fn start_test_indexer<T: R2D2Connection + Send + 'static>(
false,
Some(data_ingestion_path),
CancellationToken::new(),
new_database,
)
.await
}
Expand All @@ -65,17 +67,23 @@ pub async fn start_test_indexer_impl<T: R2D2Connection + 'static>(
db_url: Option<String>,
rpc_url: String,
reader_writer_config: ReaderWriterConfig,
reset_database: bool,
mut reset_database: bool,
data_ingestion_path: Option<PathBuf>,
cancel: CancellationToken,
new_database: Option<&str>,
) -> (PgIndexerStore<T>, JoinHandle<Result<(), IndexerError>>) {
let db_url = db_url.unwrap_or_else(|| {
let mut db_url = db_url.unwrap_or_else(|| {
let pg_host = env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".into());
let pg_port = env::var("POSTGRES_PORT").unwrap_or_else(|_| "32770".into());
let pw = env::var("POSTGRES_PASSWORD").unwrap_or_else(|_| "postgrespw".into());
format!("postgres://postgres:{pw}@{pg_host}:{pg_port}")
});

if let Some(new_database) = new_database {
db_url = replace_db_name(&db_url, new_database).0;
reset_database = true;
};

let mut config = IndexerConfig {
db_url: Some(db_url.clone().into()),
rpc_client_url: rpc_url,
Expand Down
124 changes: 93 additions & 31 deletions crates/iota-indexer/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use std::{
};

use diesel::PgConnection;
use iota_config::node::RunWithRange;
use iota_config::{
local_ip_utils::{get_available_port, new_local_tcp_socket_for_testing},
node::RunWithRange,
};
use iota_indexer::{
IndexerConfig,
errors::IndexerError,
Expand All @@ -33,7 +36,8 @@ use tempfile::tempdir;
use test_cluster::{TestCluster, TestClusterBuilder};
use tokio::{runtime::Runtime, task::JoinHandle};

const DEFAULT_DB_URL: &str = "postgres://postgres:postgrespw@localhost:5432/iota_indexer";
const POSTGRES_URL: &str = "postgres://postgres:postgrespw@localhost:5432";
const DEFAULT_DB: &str = "iota_indexer";
const DEFAULT_INDEXER_IP: &str = "127.0.0.1";
const DEFAULT_INDEXER_PORT: u16 = 9005;
const DEFAULT_SERVER_PORT: u16 = 3000;
Expand All @@ -53,8 +57,9 @@ impl ApiTestSetup {
GLOBAL_API_TEST_SETUP.get_or_init(|| {
let runtime = tokio::runtime::Runtime::new().unwrap();

let (cluster, store, client) =
runtime.block_on(start_test_cluster_with_read_write_indexer(None));
let (cluster, store, client) = runtime.block_on(
start_test_cluster_with_read_write_indexer(None, Some("shared_test_indexer_db")),
);

Self {
runtime,
Expand All @@ -66,10 +71,50 @@ impl ApiTestSetup {
}
}

pub struct SimulacrumTestSetup {
pub runtime: Runtime,
pub sim: Arc<Simulacrum>,
pub store: PgIndexerStore<PgConnection>,
/// Indexer RPC Client
pub client: HttpClient,
}

impl SimulacrumTestSetup {
pub fn get_or_init<'a>(
unique_env_name: &str,
env_initializer: impl Fn(PathBuf) -> Simulacrum,
initialized_env_container: &'a OnceLock<SimulacrumTestSetup>,
) -> &'a SimulacrumTestSetup {
initialized_env_container.get_or_init(|| {
let runtime = tokio::runtime::Runtime::new().unwrap();
let data_ingestion_path = tempdir().unwrap().into_path();

let sim = env_initializer(data_ingestion_path.clone());
let sim = Arc::new(sim);

let db_name = format!("simulacrum_env_db_{}", unique_env_name);
let (_, store, _, client) =
runtime.block_on(start_simulacrum_rest_api_with_read_write_indexer(
sim.clone(),
data_ingestion_path,
Some(&db_name),
));

SimulacrumTestSetup {
runtime,
sim,
store,
client,
}
})
}
}

/// Start a [`TestCluster`][`test_cluster::TestCluster`] with a `Read` &
/// `Write` indexer
pub async fn start_test_cluster_with_read_write_indexer(
stop_cluster_after_checkpoint_seq: Option<u64>,
database_name: Option<&str>,
) -> (TestCluster, PgIndexerStore<PgConnection>, HttpClient) {
let temp = tempdir().unwrap().into_path();
let mut builder = TestClusterBuilder::new().with_data_ingestion_dir(temp.clone());
Expand All @@ -85,26 +130,32 @@ pub async fn start_test_cluster_with_read_write_indexer(

// start indexer in write mode
let (pg_store, _pg_store_handle) = start_test_indexer(
Some(DEFAULT_DB_URL.to_owned()),
Some(get_indexer_db_url(None)),
cluster.rpc_url().to_string(),
ReaderWriterConfig::writer_mode(None),
temp.clone(),
database_name,
)
.await;

// start indexer in read mode
start_indexer_reader(cluster.rpc_url().to_owned(), temp);
let indexer_port = start_indexer_reader(cluster.rpc_url().to_owned(), temp, database_name);

// create an RPC client by using the indexer url
let rpc_client = HttpClientBuilder::default()
.build(format!(
"http://{DEFAULT_INDEXER_IP}:{DEFAULT_INDEXER_PORT}"
))
.build(format!("http://{DEFAULT_INDEXER_IP}:{indexer_port}"))
.unwrap();

(cluster, pg_store, rpc_client)
}

fn get_indexer_db_url(database_name: Option<&str>) -> String {
database_name.map_or_else(
|| format!("{POSTGRES_URL}/{DEFAULT_DB}"),
|db_name| format!("{POSTGRES_URL}/{db_name}"),
)
}

/// Wait for the indexer to catch up to the given checkpoint sequence number
///
/// Indexer starts storing data after checkpoint 0
Expand Down Expand Up @@ -192,24 +243,31 @@ pub async fn indexer_wait_for_transaction(
}

/// Start an Indexer instance in `Read` mode
fn start_indexer_reader(fullnode_rpc_url: impl Into<String>, data_ingestion_path: PathBuf) {
fn start_indexer_reader(
fullnode_rpc_url: impl Into<String>,
data_ingestion_path: PathBuf,
database_name: Option<&str>,
) -> u16 {
let db_url = get_indexer_db_url(database_name);
let port = get_available_port(DEFAULT_INDEXER_IP);
let config = IndexerConfig {
db_url: Some(DEFAULT_DB_URL.to_owned().into()),
db_url: Some(db_url.clone().into()),
rpc_client_url: fullnode_rpc_url.into(),
reset_db: true,
rpc_server_worker: true,
rpc_server_url: DEFAULT_INDEXER_IP.to_owned(),
rpc_server_port: DEFAULT_INDEXER_PORT,
rpc_server_port: port,
data_ingestion_path: Some(data_ingestion_path),
..Default::default()
};

let registry = prometheus::Registry::default();
init_metrics(&registry);

tokio::spawn(async move {
Indexer::start_reader::<PgConnection>(&config, &registry, DEFAULT_DB_URL.to_owned()).await
});
tokio::spawn(
async move { Indexer::start_reader::<PgConnection>(&config, &registry, db_url).await },
);
port
}

/// Check if provided error message does match with
Expand All @@ -228,35 +286,31 @@ pub fn rpc_call_error_msg_matches<T>(
})
}

pub fn get_default_fullnode_rpc_api_addr() -> SocketAddr {
format!("127.0.0.1:{}", DEFAULT_SERVER_PORT)
.parse()
.unwrap()
}

/// Set up a test indexer fetching from a REST endpoint served by the given
/// Simulacrum.
pub async fn start_simulacrum_rest_api_with_write_indexer(
sim: Arc<Simulacrum>,
data_ingestion_path: PathBuf,
server_url: Option<SocketAddr>,
database_name: Option<&str>,
) -> (
JoinHandle<()>,
PgIndexerStore<PgConnection>,
JoinHandle<Result<(), IndexerError>>,
) {
let server_url = get_default_fullnode_rpc_api_addr();

let server_url = server_url.unwrap_or_else(new_local_tcp_socket_for_testing);
let server_handle = tokio::spawn(async move {
iota_rest_api::RestService::new_without_version(sim)
.start_service(server_url)
.await;
});
// Starts indexer
let (pg_store, pg_handle) = start_test_indexer(
Some(DEFAULT_DB_URL.to_owned()),
Some(get_indexer_db_url(None)),
format!("http://{}", server_url),
ReaderWriterConfig::writer_mode(None),
data_ingestion_path,
database_name,
)
.await;
(server_handle, pg_store, pg_handle)
Expand All @@ -265,24 +319,32 @@ pub async fn start_simulacrum_rest_api_with_write_indexer(
pub async fn start_simulacrum_rest_api_with_read_write_indexer(
sim: Arc<Simulacrum>,
data_ingestion_path: PathBuf,
database_name: Option<&str>,
) -> (
JoinHandle<()>,
PgIndexerStore<PgConnection>,
JoinHandle<Result<(), IndexerError>>,
HttpClient,
) {
let server_url = get_default_fullnode_rpc_api_addr();
let (server_handle, pg_store, pg_handle) =
start_simulacrum_rest_api_with_write_indexer(sim, data_ingestion_path.clone()).await;
let simulacrum_server_url = new_local_tcp_socket_for_testing();
let (server_handle, pg_store, pg_handle) = start_simulacrum_rest_api_with_write_indexer(
sim,
data_ingestion_path.clone(),
Some(simulacrum_server_url),
database_name,
)
.await;

// start indexer in read mode
start_indexer_reader(format!("http://{}", server_url), data_ingestion_path);
let indexer_port = start_indexer_reader(
format!("http://{}", simulacrum_server_url),
data_ingestion_path,
database_name,
);

// create an RPC client by using the indexer url
let rpc_client = HttpClientBuilder::default()
.build(format!(
"http://{DEFAULT_INDEXER_IP}:{DEFAULT_INDEXER_PORT}"
))
.build(format!("http://{DEFAULT_INDEXER_IP}:{indexer_port}"))
.unwrap();

(server_handle, pg_store, pg_handle, rpc_client)
Expand Down
18 changes: 14 additions & 4 deletions crates/iota-indexer/tests/ingestion_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ mod ingestion_tests {
// Create a checkpoint which should include the transaction we executed.
let checkpoint = sim.create_checkpoint();

let (_, pg_store, _) =
start_simulacrum_rest_api_with_write_indexer(Arc::new(sim), data_ingestion_path).await;
let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
Arc::new(sim),
data_ingestion_path,
None,
Some("indexer_ingestion_tests_db"),
)
.await;

indexer_wait_for_checkpoint(&pg_store, 1).await;

Expand Down Expand Up @@ -97,8 +102,13 @@ mod ingestion_tests {
// Create a checkpoint which should include the transaction we executed.
let _ = sim.create_checkpoint();

let (_, pg_store, _) =
start_simulacrum_rest_api_with_write_indexer(Arc::new(sim), data_ingestion_path).await;
let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
Arc::new(sim),
data_ingestion_path,
None,
Some("indexer_ingestion_tests_db"),
)
.await;

indexer_wait_for_checkpoint(&pg_store, 1).await;

Expand Down
Loading
Loading