Skip to content

Commit

Permalink
fix(batch): support dedicated runtime for file scan (#17764)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jul 22, 2024
1 parent cd30ac0 commit ccf4945
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::types::{DataType, ScalarImpl, StructType};
use risingwave_connector::source::iceberg::create_parquet_stream_builder;
pub use risingwave_pb::expr::table_function::PbType as TableFunctionType;
use risingwave_pb::expr::PbTableFunction;
use tokio::runtime::Runtime;

use super::{infer_type, Expr, ExprImpl, ExprRewriter, RwResult};
use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind};
Expand Down Expand Up @@ -139,29 +140,39 @@ impl TableFunction {
.into());

#[cfg(not(madsim))]
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let parquet_stream_builder = create_parquet_stream_builder(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
eval_args[5].clone(),
)
.await?;

let mut rw_types = vec![];
for field in parquet_stream_builder.schema().fields() {
rw_types.push((
field.name().to_string(),
IcebergArrowConvert.type_from_field(field)?,
));
}
{
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("rw-file-scan")
.enable_all()
.build()
.expect("failed to build file-scan runtime")
});

tokio::task::block_in_place(|| {
RUNTIME.block_on(async {
let parquet_stream_builder = create_parquet_stream_builder(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
eval_args[5].clone(),
)
.await?;

let mut rw_types = vec![];
for field in parquet_stream_builder.schema().fields() {
rw_types.push((
field.name().to_string(),
IcebergArrowConvert.type_from_field(field)?,
));
}

Ok::<risingwave_common::types::DataType, anyhow::Error>(DataType::Struct(
StructType::new(rw_types),
))
})
})?
Ok::<risingwave_common::types::DataType, anyhow::Error>(DataType::Struct(
StructType::new(rw_types),
))
})
})?
}
};

Ok(TableFunction {
Expand Down

0 comments on commit ccf4945

Please sign in to comment.