Skip to content

Commit

Permalink
support dedicated runtime for file scan
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jul 22, 2024
1 parent bd3b9a1 commit d967d2e
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::types::{DataType, ScalarImpl, StructType};
use risingwave_connector::source::iceberg::create_parquet_stream_builder;
pub use risingwave_pb::expr::table_function::PbType as TableFunctionType;
use risingwave_pb::expr::PbTableFunction;
use tokio::runtime::Runtime;

use super::{infer_type, Expr, ExprImpl, ExprRewriter, RwResult};
use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind};
Expand Down Expand Up @@ -132,36 +133,44 @@ impl TableFunction {
.into());
}

#[cfg(madsim)]
return Err(crate::error::ErrorCode::BindError(
"file_scan can't be used in the madsim mode".to_string(),
)
.into());

#[cfg(not(madsim))]
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let parquet_stream_builder = create_parquet_stream_builder(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
eval_args[5].clone(),
)
.await?;

let mut rw_types = vec![];
for field in parquet_stream_builder.schema().fields() {
rw_types.push((
field.name().to_string(),
IcebergArrowConvert.type_from_field(field)?,
));
}
if cfg!(madsim) {
return Err(crate::error::ErrorCode::BindError(
"file_scan can't be used in the madsim mode".to_string(),
)
.into());
} else {
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("rw-file-scan")
.enable_all()
.build()
.expect("failed to build file-scan runtime")
});

tokio::task::block_in_place(|| {
RUNTIME.block_on(async {
let parquet_stream_builder = create_parquet_stream_builder(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
eval_args[5].clone(),
)
.await?;

let mut rw_types = vec![];
for field in parquet_stream_builder.schema().fields() {
rw_types.push((
field.name().to_string(),
IcebergArrowConvert.type_from_field(field)?,
));
}

Ok::<risingwave_common::types::DataType, anyhow::Error>(DataType::Struct(
StructType::new(rw_types),
))
})
})?
Ok::<risingwave_common::types::DataType, anyhow::Error>(DataType::Struct(
StructType::new(rw_types),
))
})
})?
}
};

Ok(TableFunction {
Expand Down

0 comments on commit d967d2e

Please sign in to comment.