Skip to content

Commit

Permalink
re-consolidate tests ...
Browse files Browse the repository at this point in the history
... use rstest to run same tests for standalone
and remote.
  • Loading branch information
milenkovicm committed Nov 23, 2024
1 parent b3685d2 commit 441affb
Show file tree
Hide file tree
Showing 15 changed files with 852 additions and 1,029 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tonic-build = { version = "0.12", default-features = false, features = [
tracing = "0.1.36"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
ctor = { version = "0.2" }

tokio = { version = "1" }
uuid = { version = "1.10", features = ["v4", "v7"] }
Expand Down
7 changes: 3 additions & 4 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ url = { workspace = true }
[dev-dependencies]
ballista-executor = { path = "../executor", version = "0.12.0" }
ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
ctor = { version = "0.2" }
ctor = { workspace = true }
env_logger = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
testcontainers-modules = { version = "0.11", features = ["minio"] }
rstest = { version = "0.23" }
tonic = { workspace = true }

[features]
default = ["standalone"]
standalone = ["ballista-executor", "ballista-scheduler"]
testcontainers = []
11 changes: 6 additions & 5 deletions ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl SessionContextExt for SessionContext {
let config = SessionConfig::new_with_ballista();
let scheduler_url = Extension::parse_url(url)?;
log::info!(
"Connecting to Ballista scheduler at {}",
"Connecting to Ballista scheduler at: {}",
scheduler_url.clone()
);
let remote_session_id =
Expand Down Expand Up @@ -245,10 +245,11 @@ impl Extension {
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
}
Some(session_state) => {
ballista_executor::new_standalone_executor_from_state::<
datafusion_proto::protobuf::LogicalPlanNode,
datafusion_proto::protobuf::PhysicalPlanNode,
>(scheduler, concurrent_tasks, session_state)
ballista_executor::new_standalone_executor_from_state(
scheduler,
concurrent_tasks,
session_state,
)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
}
Expand Down
135 changes: 44 additions & 91 deletions ballista/client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,64 +19,14 @@ use std::env;
use std::error::Error;
use std::path::PathBuf;

use ballista::prelude::SessionConfigExt;
use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_core::serde::{
protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
};
use ballista_core::{ConfigProducer, RuntimeProducer};
use ballista_scheduler::SessionBuilder;
use datafusion::execution::SessionState;
use datafusion::prelude::SessionConfig;
use object_store::aws::AmazonS3Builder;
use testcontainers_modules::minio::MinIO;
use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand};
use testcontainers_modules::testcontainers::ContainerRequest;
use testcontainers_modules::{minio, testcontainers::ImageExt};

pub const REGION: &str = "eu-west-1";
pub const BUCKET: &str = "ballista";
pub const ACCESS_KEY_ID: &str = "MINIO";
pub const SECRET_KEY: &str = "MINIOMINIO";

#[allow(dead_code)]
pub fn create_s3_store(
port: u16,
) -> std::result::Result<object_store::aws::AmazonS3, object_store::Error> {
AmazonS3Builder::new()
.with_endpoint(format!("http://localhost:{port}"))
.with_region(REGION)
.with_bucket_name(BUCKET)
.with_access_key_id(ACCESS_KEY_ID)
.with_secret_access_key(SECRET_KEY)
.with_allow_http(true)
.build()
}

#[allow(dead_code)]
pub fn create_minio_container() -> ContainerRequest<minio::MinIO> {
MinIO::default()
.with_env_var("MINIO_ACCESS_KEY", ACCESS_KEY_ID)
.with_env_var("MINIO_SECRET_KEY", SECRET_KEY)
}

#[allow(dead_code)]
pub fn create_bucket_command() -> ExecCommand {
// this is hack to create a bucket without creating s3 client.
// this works with current testcontainer (and image) version 'RELEASE.2022-02-07T08-17-33Z'.
// (testcontainer does not await properly on latest image version)
//
// if testcontainer image version change to something newer we should use "mc mb /data/ballista"
// to crate a bucket.
ExecCommand::new(vec![
"mkdir".to_string(),
format!("/data/{}", crate::common::BUCKET),
])
.with_cmd_ready_condition(CmdWaitFor::seconds(1))
}

// /// Remote ballista cluster to be used for local testing.
// static BALLISTA_CLUSTER: tokio::sync::OnceCell<(String, u16)> =
// tokio::sync::OnceCell::const_new();
use datafusion::prelude::{SessionConfig, SessionContext};

/// Returns the parquet test data directory, which is by default
/// stored in a git submodule rooted at
Expand Down Expand Up @@ -161,17 +111,8 @@ pub async fn setup_test_cluster() -> (String, u16) {

let host = "localhost".to_string();

let scheduler_url = format!("http://{}:{}", host, addr.port());

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to test scheduler...");
}
Ok(scheduler) => break scheduler,
}
};
let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor(
scheduler,
Expand All @@ -190,7 +131,6 @@ pub async fn setup_test_cluster() -> (String, u16) {
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (String, u16) {
let config = SessionConfig::new_with_ballista();
//let default_codec = BallistaCodec::default();

let addr = ballista_scheduler::standalone::new_standalone_scheduler_from_state(
&session_state,
Expand All @@ -200,22 +140,10 @@ pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (Stri

let host = "localhost".to_string();

let scheduler_url = format!("http://{}:{}", host, addr.port());
let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to test scheduler...");
}
Ok(scheduler) => break scheduler,
}
};

ballista_executor::new_standalone_executor_from_state::<
datafusion_proto::protobuf::LogicalPlanNode,
datafusion_proto::protobuf::PhysicalPlanNode,
>(
ballista_executor::new_standalone_executor_from_state(
scheduler,
config.ballista_standalone_parallelism(),
&session_state,
Expand Down Expand Up @@ -253,22 +181,13 @@ pub async fn setup_test_cluster_with_builders(

let host = "localhost".to_string();

let scheduler_url = format!("http://{}:{}", host, addr.port());

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to test scheduler...");
}
Ok(scheduler) => break scheduler,
}
};
let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor_from_builder(
scheduler,
config.ballista_standalone_parallelism(),
config_producer.clone(),
config_producer,
runtime_producer,
codec,
Default::default(),
Expand All @@ -281,6 +200,40 @@ pub async fn setup_test_cluster_with_builders(
(host, addr.port())
}

async fn connect_to_scheduler(
scheduler_url: String,
) -> SchedulerGrpcClient<tonic::transport::Channel> {
let mut retry = 50;
loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) if retry > 0 => {
retry -= 1;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::debug!("Re-attempting to connect to test scheduler...");
}

Err(_) => {
log::error!("scheduler connection timed out");
panic!("scheduler connection timed out")
}
Ok(scheduler) => break scheduler,
}
}
}

#[allow(dead_code)]
pub async fn standalone_context() -> SessionContext {
SessionContext::standalone().await.unwrap()
}

#[allow(dead_code)]
pub async fn remote_context() -> SessionContext {
let (host, port) = setup_test_cluster().await;
SessionContext::remote(&format!("df://{host}:{port}"))
.await
.unwrap()
}

#[ctor::ctor]
fn init() {
// Enable RUST_LOG logging configuration for test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod common;
//
#[cfg(test)]
#[cfg(feature = "standalone")]
mod standalone_tests {
mod basic {
use ballista::prelude::SessionContextExt;
use datafusion::arrow;
use datafusion::arrow::util::pretty::pretty_format_batches;
Expand Down
Loading

0 comments on commit 441affb

Please sign in to comment.