From ccf49454e2e7042d28e76d4dd1124005dcbe567d Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 22 Jul 2024 16:16:39 +0800 Subject: [PATCH] fix(batch): support dedicated runtime for file scan (#17764) --- src/frontend/src/expr/table_function.rs | 57 +++++++++++++++---------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index f4dda8d8176b..d0fe845378ae 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::create_parquet_stream_builder; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; +use tokio::runtime::Runtime; use super::{infer_type, Expr, ExprImpl, ExprRewriter, RwResult}; use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind}; @@ -139,29 +140,39 @@ impl TableFunction { .into()); #[cfg(not(madsim))] - tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - let parquet_stream_builder = create_parquet_stream_builder( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - eval_args[5].clone(), - ) - .await?; - - let mut rw_types = vec![]; - for field in parquet_stream_builder.schema().fields() { - rw_types.push(( - field.name().to_string(), - IcebergArrowConvert.type_from_field(field)?, - )); - } + { + static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("rw-file-scan") + .enable_all() + .build() + .expect("failed to build file-scan runtime") + }); + + tokio::task::block_in_place(|| { + RUNTIME.block_on(async { + let parquet_stream_builder = create_parquet_stream_builder( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + eval_args[5].clone(), + ) + .await?; + + let mut rw_types = vec![]; + for field in parquet_stream_builder.schema().fields() { + rw_types.push(( + field.name().to_string(), + IcebergArrowConvert.type_from_field(field)?, + )); + } - Ok::(DataType::Struct( - StructType::new(rw_types), - )) - }) - })? + Ok::(DataType::Struct( + StructType::new(rw_types), + )) + }) + })? + } }; Ok(TableFunction {