diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index d1ce2afcccf3..e43f6ab29abc 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -680,7 +680,7 @@ mod tests { use datafusion_common::cast::as_string_array; use datafusion_common::internal_err; use datafusion_common::stats::Precision; - use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::{col, lit}; use crate::execution::session_state::SessionStateBuilder; @@ -863,7 +863,7 @@ mod tests { async fn query_compress_data( file_compression_type: FileCompressionType, ) -> Result<()> { - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new()).unwrap()); + let runtime = Arc::new(RuntimeEnvBuilder::new().build()?); let mut cfg = SessionConfig::new(); cfg.options_mut().catalog.has_header = true; let session_state = SessionStateBuilder::new() diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 18943599b136..c67424c0fa53 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -212,15 +212,15 @@ where /// # use std::sync::Arc; /// # use datafusion::prelude::*; /// # use datafusion::execution::SessionStateBuilder; -/// # use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +/// # use datafusion_execution::runtime_env::RuntimeEnvBuilder; /// // Configure a 4k batch size /// let config = SessionConfig::new() .with_batch_size(4 * 1024); /// /// // configure a memory limit of 1GB with 20% slop -/// let runtime_env = RuntimeEnv::new( -/// RuntimeConfig::new() +/// let runtime_env = RuntimeEnvBuilder::new() /// .with_memory_limit(1024 * 1024 * 1024, 0.80) -/// ).unwrap(); +/// .build() +/// .unwrap(); /// /// // Create a SessionState using the config and runtime_env /// let state = SessionStateBuilder::new() @@ -1623,7 +1623,7 @@ mod tests { use super::{super::options::CsvReadOptions, *}; use crate::assert_batches_eq; use crate::execution::memory_pool::MemoryConsumer; - use crate::execution::runtime_env::RuntimeConfig; + use crate::execution::runtime_env::RuntimeEnvBuilder; use crate::test; use crate::test_util::{plan_and_collect, populate_csv_partitions}; @@ -1758,8 +1758,7 @@ mod tests { let path = path.join("tests/tpch-csv"); let url = format!("file://{}", path.display()); - let rt_cfg = RuntimeConfig::new(); - let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap()); + let runtime = Arc::new(RuntimeEnvBuilder::new().build()?); let cfg = SessionConfig::new() .set_str("datafusion.catalog.location", url.as_str()) .set_str("datafusion.catalog.format", "CSV") diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index eda306dd3de5..1980589491a5 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -22,7 +22,7 @@ use arrow::{ compute::SortOptions, record_batch::RecordBatch, }; -use datafusion::execution::runtime_env::RuntimeConfig; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; @@ -136,10 +136,12 @@ impl SortTest { .sort_spill_reservation_bytes, ); - let runtime_env = RuntimeConfig::new() - .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) - .build(); - let runtime = Arc::new(runtime_env.unwrap()); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) + .build() + .unwrap(), + ); SessionContext::new_with_config_rt(session_config, runtime) } else { SessionContext::new_with_config(session_config) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index dbd5592e8020..592c25dedc50 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -40,7 +40,7 @@ use tokio::fs::File; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::execution::disk_manager::DiskManagerConfig; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::physical_optimizer::join_selection::JoinSelection; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -509,17 +509,17 @@ impl TestCase { let table = scenario.table(); - let mut rt_config = RuntimeConfig::new() + let rt_config = RuntimeEnvBuilder::new() // disk manager setting controls the spilling .with_disk_manager(disk_manager_config) .with_memory_limit(memory_limit, MEMORY_FRACTION); - if let Some(pool) = memory_pool { - rt_config = rt_config.with_memory_pool(pool); + let runtime = if let Some(pool) = memory_pool { + rt_config.with_memory_pool(pool).build().unwrap() + } else { + rt_config.build().unwrap() }; - let runtime = RuntimeEnv::new(rt_config).unwrap(); - // Configure execution let builder = SessionStateBuilder::new() .with_config(config) diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index bf25b36f48e8..bd251f1a6669 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -33,7 +33,7 @@ use datafusion_execution::cache::cache_unit::{ DefaultFileStatisticsCache, DefaultListFilesCache, }; use datafusion_execution::config::SessionConfig; -use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use tempfile::tempdir; @@ -198,7 +198,10 @@ fn get_cache_runtime_state() -> ( .with_list_files_cache(Some(list_file_cache.clone())); let rt = Arc::new( - RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)).unwrap(), + RuntimeEnvBuilder::new() + .with_cache_manager(cache_config) + .build() + .expect("could not build runtime environment"), ); let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state(); diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index bddce81f537b..e7b48be95cff 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -41,7 +41,7 @@ use url::Url; /// Execution runtime environment that manages system resources such /// as memory, disk, cache and storage. /// -/// A [`RuntimeEnv`] is created from a [`RuntimeConfig`] and has the +/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the /// following resource management functionality: /// /// * [`MemoryPool`]: Manage memory @@ -147,13 +147,17 @@ impl RuntimeEnv { impl Default for RuntimeEnv { fn default() -> Self { - RuntimeEnv::new(RuntimeConfig::new()).unwrap() + RuntimeEnvBuilder::new().build().unwrap() } } +/// Please see: +/// This a type alias for backwards compatibility. +pub type RuntimeConfig = RuntimeEnvBuilder; + #[derive(Clone)] /// Execution runtime configuration -pub struct RuntimeConfig { +pub struct RuntimeEnvBuilder { /// DiskManager to manage temporary disk file usage pub disk_manager: DiskManagerConfig, /// [`MemoryPool`] from which to allocate memory @@ -166,13 +170,13 @@ pub struct RuntimeConfig { pub object_store_registry: Arc, } -impl Default for RuntimeConfig { +impl Default for RuntimeEnvBuilder { fn default() -> Self { Self::new() } } -impl RuntimeConfig { +impl RuntimeEnvBuilder { /// New with default values pub fn new() -> Self { Self { @@ -229,8 +233,17 @@ impl RuntimeConfig { self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()])) } - /// Build a `RuntimeEnv` object from the configuration + /// Build a RuntimeEnv pub fn build(self) -> Result { - RuntimeEnv::new(self) + let memory_pool = self + .memory_pool + .unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default())); + + Ok(RuntimeEnv { + memory_pool, + disk_manager: DiskManager::try_new(self.disk_manager)?, + cache_manager: CacheManager::try_new(&self.cache_manager)?, + object_store_registry: self.object_store_registry, + }) } } diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 21a644284c42..35689b8e08df 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -24,7 +24,7 @@ use crate::{ config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry, - runtime_env::{RuntimeConfig, RuntimeEnv}, + runtime_env::{RuntimeEnv, RuntimeEnvBuilder}, }; use datafusion_common::{plan_datafusion_err, DataFusionError, Result}; use datafusion_expr::planner::ExprPlanner; @@ -57,7 +57,8 @@ pub struct TaskContext { impl Default for TaskContext { fn default() -> Self { - let runtime = RuntimeEnv::new(RuntimeConfig::new()) + let runtime = RuntimeEnvBuilder::new() + .build() .expect("default runtime created successfully"); // Create a default task context, mostly useful for testing diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 6ca01928c80f..b5447a2d7317 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1212,7 +1212,7 @@ mod tests { }; use datafusion_execution::config::SessionConfig; use datafusion_execution::memory_pool::FairSpillPool; - use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; @@ -1324,11 +1324,10 @@ mod tests { fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc { let session_config = SessionConfig::new().with_batch_size(batch_size); let runtime = Arc::new( - RuntimeEnv::new( - RuntimeConfig::default() - .with_memory_pool(Arc::new(FairSpillPool::new(max_memory))), - ) - .unwrap(), + RuntimeEnvBuilder::default() + .with_memory_pool(Arc::new(FairSpillPool::new(max_memory))) + .build() + .unwrap(), ); let task_ctx = TaskContext::default() .with_session_config(session_config) @@ -1809,7 +1808,9 @@ mod tests { let input_schema = input.schema(); let runtime = Arc::new( - RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(), + RuntimeEnvBuilder::default() + .with_memory_limit(1, 1.0) + .build()?, ); let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 0868ee721665..b99d4f17c42a 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -488,7 +488,7 @@ mod tests { use crate::test::build_table_scan_i32; use datafusion_common::{assert_batches_sorted_eq, assert_contains}; - use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; async fn join_collect( left: Arc, @@ -673,8 +673,11 @@ mod tests { #[tokio::test] async fn test_overallocation() -> Result<()> { - let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build()?, + ); let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 7fac23ad5557..f20d00e1a298 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1572,7 +1572,7 @@ mod tests { ScalarValue, }; use datafusion_execution::config::SessionConfig; - use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; @@ -3798,8 +3798,11 @@ mod tests { ]; for join_type in join_types { - let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build()?, + ); let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); @@ -3871,8 +3874,11 @@ mod tests { ]; for join_type in join_types { - let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build()?, + ); let session_config = SessionConfig::default().with_batch_size(50); let task_ctx = TaskContext::default() .with_session_config(session_config) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 18de2de03192..3cd373544157 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -644,7 +644,7 @@ mod tests { use arrow::datatypes::{DataType, Field}; use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue}; - use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::{Partitioning, PhysicalExpr}; @@ -1019,8 +1019,11 @@ mod tests { ]; for join_type in join_types { - let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build()?, + ); let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 511cb4c55fcd..09fe5d9ebc54 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1978,7 +1978,7 @@ mod tests { }; use datafusion_execution::config::SessionConfig; use datafusion_execution::disk_manager::DiskManagerConfig; - use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_execution::TaskContext; use crate::expressions::Column; @@ -2900,10 +2900,12 @@ mod tests { ]; // Disable DiskManager to prevent spilling - let runtime_config = RuntimeConfig::new() - .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::Disabled); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .with_disk_manager(DiskManagerConfig::Disabled) + .build()?, + ); let session_config = SessionConfig::default().with_batch_size(50); for join_type in join_types { @@ -2985,10 +2987,12 @@ mod tests { ]; // Disable DiskManager to prevent spilling - let runtime_config = RuntimeConfig::new() - .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::Disabled); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .with_disk_manager(DiskManagerConfig::Disabled) + .build()?, + ); let session_config = SessionConfig::default().with_batch_size(50); for join_type in join_types { @@ -3048,10 +3052,12 @@ mod tests { ]; // Enable DiskManager to allow spilling - let runtime_config = RuntimeConfig::new() - .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::NewOs); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .with_disk_manager(DiskManagerConfig::NewOs) + .build()?, + ); for batch_size in [1, 50] { let session_config = SessionConfig::default().with_batch_size(batch_size); @@ -3156,10 +3162,13 @@ mod tests { ]; // Enable DiskManager to allow spilling - let runtime_config = RuntimeConfig::new() - .with_memory_limit(500, 1.0) - .with_disk_manager(DiskManagerConfig::NewOs); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(500, 1.0) + .with_disk_manager(DiskManagerConfig::NewOs) + .build()?, + ); + for batch_size in [1, 50] { let session_config = SessionConfig::default().with_batch_size(batch_size); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 5a3fcb5029e1..650006a9d02d 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1025,7 +1025,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::cast::as_string_array; use datafusion_common::{assert_batches_sorted_eq, exec_err}; - use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use tokio::task::JoinSet; @@ -1507,7 +1507,9 @@ mod tests { // setup up context let runtime = Arc::new( - RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(), + RuntimeEnvBuilder::default() + .with_memory_limit(1, 1.0) + .build()?, ); let task_ctx = TaskContext::default().with_runtime(runtime); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a81b09948cca..e92a57493141 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -966,7 +966,7 @@ mod tests { use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; use datafusion_execution::config::SessionConfig; - use datafusion_execution::runtime_env::RuntimeConfig; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_common::ScalarValue; use datafusion_physical_expr::expressions::Literal; @@ -1009,9 +1009,11 @@ mod tests { .options() .execution .sort_spill_reservation_bytes; - let rt_config = RuntimeConfig::new() - .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0); - let runtime = Arc::new(RuntimeEnv::new(rt_config)?); + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0) + .build()?, + ); let task_ctx = Arc::new( TaskContext::default() .with_session_config(session_config) @@ -1085,11 +1087,14 @@ mod tests { .execution .sort_spill_reservation_bytes; - let rt_config = RuntimeConfig::new().with_memory_limit( - sort_spill_reservation_bytes + avg_batch_size * (partitions - 1), - 1.0, + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit( + sort_spill_reservation_bytes + avg_batch_size * (partitions - 1), + 1.0, + ) + .build()?, ); - let runtime = Arc::new(RuntimeEnv::new(rt_config)?); let task_ctx = Arc::new( TaskContext::default() .with_runtime(runtime) diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index a74cce72ac64..50325d262d1d 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -78,9 +78,8 @@ mod test { use super::*; use datafusion::execution::context::SessionContext; use datafusion_execution::{ - config::SessionConfig, - disk_manager::DiskManagerConfig, - runtime_env::{RuntimeConfig, RuntimeEnv}, + config::SessionConfig, disk_manager::DiskManagerConfig, + runtime_env::RuntimeEnvBuilder, }; use datafusion_physical_plan::collect; use datafusion_sql::parser::DFParser; @@ -100,10 +99,10 @@ mod test { // Execute SQL (using datafusion) let rt = Arc::new( - RuntimeEnv::new( - RuntimeConfig::new().with_disk_manager(DiskManagerConfig::Disabled), - ) - .unwrap(), + RuntimeEnvBuilder::new() + .with_disk_manager(DiskManagerConfig::Disabled) + .build() + .unwrap(), ); let session_config = SessionConfig::new().with_target_partitions(1); let session_context =