From d967d2e1670ad3943aed1061f6b42a375e34fc11 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 22 Jul 2024 13:52:55 +0800 Subject: [PATCH] support dedicated runtime for file scan --- src/frontend/src/expr/table_function.rs | 69 ++++++++++++++----------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index f4dda8d8176be..0cfab3e102553 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::create_parquet_stream_builder; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; +use tokio::runtime::Runtime; use super::{infer_type, Expr, ExprImpl, ExprRewriter, RwResult}; use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind}; @@ -132,36 +133,44 @@ impl TableFunction { .into()); } - #[cfg(madsim)] - return Err(crate::error::ErrorCode::BindError( - "file_scan can't be used in the madsim mode".to_string(), - ) - .into()); - - #[cfg(not(madsim))] - tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - let parquet_stream_builder = create_parquet_stream_builder( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - eval_args[5].clone(), - ) - .await?; - - let mut rw_types = vec![]; - for field in parquet_stream_builder.schema().fields() { - rw_types.push(( - field.name().to_string(), - IcebergArrowConvert.type_from_field(field)?, - )); - } + if cfg!(madsim) { + return Err(crate::error::ErrorCode::BindError( + "file_scan can't be used in the madsim mode".to_string(), + ) + .into()); + } else { + static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("rw-file-scan") + .enable_all() + .build() + .expect("failed to build file-scan runtime") + }); + + tokio::task::block_in_place(|| { + RUNTIME.block_on(async { + let parquet_stream_builder = create_parquet_stream_builder( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + eval_args[5].clone(), + ) + .await?; + + let mut rw_types = vec![]; + for field in parquet_stream_builder.schema().fields() { + rw_types.push(( + field.name().to_string(), + IcebergArrowConvert.type_from_field(field)?, + )); + } - Ok::(DataType::Struct( - StructType::new(rw_types), - )) - }) - })? + Ok::(DataType::Struct( + StructType::new(rw_types), + )) + }) + })? + } }; Ok(TableFunction {