Skip to content

Commit

Permalink
Fixes after rebase on recent feature branch
Browse files Browse the repository at this point in the history
  • Loading branch information
tomxey committed Oct 29, 2024
1 parent 4fbf950 commit 3e109c6
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 30 deletions.
4 changes: 2 additions & 2 deletions crates/iota-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ pub async fn start_cluster(
Some(db_url),
val_fn.rpc_url().to_string(),
ReaderWriterConfig::writer_mode(None),
None,
// reset_database
true,
Some(data_ingestion_path),
cancellation_token.clone(),
None,
)
.await;

Expand Down Expand Up @@ -134,11 +134,11 @@ pub async fn serve_executor(
Some(db_url),
format!("http://{}", executor_server_url),
ReaderWriterConfig::writer_mode(snapshot_config.clone()),
Some(&graphql_connection_config.db_name()),
// reset_database
true,
Some(data_ingestion_path),
cancellation_token.clone(),
Some(&graphql_connection_config.db_name()),
)
.await;

Expand Down
14 changes: 9 additions & 5 deletions crates/iota-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ pub async fn start_test_indexer<T: R2D2Connection + Send + 'static>(
db_url,
rpc_url,
reader_writer_config,
new_database,
// reset_database
false,
Some(data_ingestion_path),
CancellationToken::new(),
new_database,
)
.await
}
Expand All @@ -67,18 +67,23 @@ pub async fn start_test_indexer_impl<T: R2D2Connection + 'static>(
db_url: Option<String>,
rpc_url: String,
reader_writer_config: ReaderWriterConfig,
new_database: Option<&str>,
reset_database: bool,
mut reset_database: bool,
data_ingestion_path: Option<PathBuf>,
cancel: CancellationToken,
new_database: Option<&str>,
) -> (PgIndexerStore<T>, JoinHandle<Result<(), IndexerError>>) {
let db_url = db_url.unwrap_or_else(|| {
let mut db_url = db_url.unwrap_or_else(|| {
let pg_host = env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".into());
let pg_port = env::var("POSTGRES_PORT").unwrap_or_else(|_| "32770".into());
let pw = env::var("POSTGRES_PASSWORD").unwrap_or_else(|_| "postgrespw".into());
format!("postgres://postgres:{pw}@{pg_host}:{pg_port}")
});

if let Some(new_database) = new_database {
db_url = replace_db_name(&db_url, new_database).0;
reset_database = true;
};

let mut config = IndexerConfig {
db_url: Some(db_url.clone().into()),
rpc_client_url: rpc_url,
Expand Down Expand Up @@ -140,7 +145,6 @@ pub async fn start_test_indexer_impl<T: R2D2Connection + 'static>(
pub fn create_pg_store<T: R2D2Connection + Send + 'static>(
db_url: Secret<String>,
reset_database: bool,
new_database: Option<&str>
) -> PgIndexerStore<T> {
// Reduce the connection pool size to 10 for testing
// to prevent maxing out
Expand Down
42 changes: 28 additions & 14 deletions crates/iota-indexer/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,31 @@ impl ApiTestSetup {
pub struct SimulacrumTestSetup {
pub runtime: Runtime,
pub sim: Arc<Simulacrum>,
pub store: PgIndexerStore,
pub store: PgIndexerStore<PgConnection>,
/// Indexer RPC Client
pub client: HttpClient,
}

impl SimulacrumTestSetup {
pub fn get_or_init<'a>(
unique_env_name: &str,
env_initializer: impl Fn() -> Simulacrum,
env_initializer: impl Fn(PathBuf) -> Simulacrum,
initialized_env_container: &'a OnceLock<SimulacrumTestSetup>,
) -> &'a SimulacrumTestSetup {
initialized_env_container.get_or_init(|| {
let runtime = tokio::runtime::Runtime::new().unwrap();
let sim = Arc::new(env_initializer());
let data_ingestion_path = tempdir().unwrap().into_path();

let sim = env_initializer(data_ingestion_path.clone());
let sim = Arc::new(sim);

let db_name = format!("simulacrum_env_db_{}", unique_env_name);
let (_, store, _, client) = runtime.block_on(
start_simulacrum_rest_api_with_read_write_indexer(sim.clone(), Some(&db_name)),
);
let (_, store, _, client) =
runtime.block_on(start_simulacrum_rest_api_with_read_write_indexer(
sim.clone(),
data_ingestion_path,
Some(&db_name),
));

SimulacrumTestSetup {
runtime,
Expand Down Expand Up @@ -193,11 +200,15 @@ pub async fn indexer_wait_for_object(
}

/// Start an Indexer instance in `Read` mode
fn start_indexer_reader(fullnode_rpc_url: impl Into<String>, data_ingestion_path: PathBuf, database_name: Option<&str>) -> u16 {
fn start_indexer_reader(
fullnode_rpc_url: impl Into<String>,
data_ingestion_path: PathBuf,
database_name: Option<&str>,
) -> u16 {
let db_url = get_indexer_db_url(database_name);
let port = get_available_port(DEFAULT_INDEXER_IP);
let config = IndexerConfig {
db_url: Some(db_url.clone()),
db_url: Some(db_url.clone().into()),
rpc_client_url: fullnode_rpc_url.into(),
reset_db: true,
rpc_server_worker: true,
Expand All @@ -210,9 +221,9 @@ fn start_indexer_reader(fullnode_rpc_url: impl Into<String>, data_ingestion_path
let registry = prometheus::Registry::default();
init_metrics(&registry);

tokio::spawn(async move {
Indexer::start_reader::<PgConnection>(&config, &registry, db_url).await
});
tokio::spawn(
async move { Indexer::start_reader::<PgConnection>(&config, &registry, db_url).await },
);
port
}

Expand Down Expand Up @@ -275,15 +286,18 @@ pub async fn start_simulacrum_rest_api_with_read_write_indexer(
let simulacrum_server_url = new_local_tcp_socket_for_testing();
let (server_handle, pg_store, pg_handle) = start_simulacrum_rest_api_with_write_indexer(
sim,
data_ingestion_path.clone()
data_ingestion_path.clone(),
Some(simulacrum_server_url),
database_name,
)
.await;

// start indexer in read mode
let indexer_port =
start_indexer_reader(format!("http://{}", simulacrum_server_url), data_ingestion_path, database_name);
let indexer_port = start_indexer_reader(
format!("http://{}", simulacrum_server_url),
data_ingestion_path,
database_name,
);

// create an RPC client by using the indexer url
let rpc_client = HttpClientBuilder::default()
Expand Down
13 changes: 9 additions & 4 deletions crates/iota-indexer/tests/ingestion_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ mod ingestion_tests {

let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
Arc::new(sim),
data_ingestion_path
data_ingestion_path,
None,
Some("indexer_ingestion_tests_db".to_string()),
Some("indexer_ingestion_tests_db"),
)
.await;

Expand Down Expand Up @@ -102,8 +102,13 @@ mod ingestion_tests {
// Create a checkpoint which should include the transaction we executed.
let _ = sim.create_checkpoint();

let (_, pg_store, _) =
start_simulacrum_rest_api_with_write_indexer(Arc::new(sim), data_ingestion_path).await;
let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
Arc::new(sim),
data_ingestion_path,
None,
Some("indexer_ingestion_tests_db"),
)
.await;

indexer_wait_for_checkpoint(&pg_store, 1).await;

Expand Down
9 changes: 4 additions & 5 deletions crates/iota-indexer/tests/rpc-tests/extended_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,19 @@ static EXTENDED_API_SHARED_SIMULACRUM_INITIALIZED_ENV: OnceLock<SimulacrumTestSe
OnceLock::new();

fn get_or_init_shared_extended_api_simulacrum_env() -> &'static SimulacrumTestSetup {
let data_ingestion_path = tempdir().unwrap().into_path();
SimulacrumTestSetup::get_or_init(
"extended_api",
|| {
|data_ingestion_path| {
let mut sim = Simulacrum::new();
sim.set_data_ingestion_path(data_ingestion_path.clone());
sim.set_data_ingestion_path(data_ingestion_path);

execute_simulacrum_transactions(&mut sim, 15);
add_checkpoints(&mut sim, 300);
sim.advance_epoch();
sim.advance_epoch();

execute_simulacrum_transactions(&mut sim, 10);
add_checkpoints(&mut sim, 300);
sim.advance_epoch();
sim.advance_epoch();

execute_simulacrum_transactions(&mut sim, 5);
add_checkpoints(&mut sim, 300);
Expand Down

0 comments on commit 3e109c6

Please sign in to comment.