Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support batch read s3 parquet file #17673

Merged
merged 15 commits into from
Jul 18, 2024
21 changes: 21 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,26 @@ message SourceNode {
map<string, secret.SecretRef> secret_refs = 6;
}

message FileScanNode {
enum FileFormat {
FILE_FORMAT_UNSPECIFIED = 0;
PARQUET = 1;
}

enum StorageType {
STORAGE_TYPE_UNSPECIFIED = 0;
S3 = 1;
}

repeated plan_common.ColumnDesc columns = 1;
FileFormat file_format = 2;
StorageType storage_type = 3;
string s3_region = 4;
string s3_access_key = 5;
string s3_secret_key = 6;
string file_location = 7;
}

message ProjectNode {
repeated expr.ExprNode select_list = 1;
}
Expand Down Expand Up @@ -344,6 +364,7 @@ message PlanNode {
SortOverWindowNode sort_over_window = 35;
MaxOneRowNode max_one_row = 36;
LogRowSeqScanNode log_row_seq_scan = 37;
FileScanNode file_scan = 38;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub use values::*;
use self::log_row_seq_scan::LogStoreRowSeqScanExecutorBuilder;
use self::test_utils::{BlockExecutorBuilder, BusyLoopExecutorBuilder};
use crate::error::Result;
use crate::executor::s3_file_scan::FileScanExecutorBuilder;
use crate::executor::sys_row_seq_scan::SysRowSeqScanExecutorBuilder;
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};

Expand Down Expand Up @@ -241,6 +242,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::Source => SourceExecutor,
NodeBody::SortOverWindow => SortOverWindowExecutor,
NodeBody::MaxOneRow => MaxOneRowExecutor,
NodeBody::FileScan => FileScanExecutorBuilder,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
Expand Down
40 changes: 37 additions & 3 deletions src/batch/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use parquet::arrow::ProjectionMask;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{Field, Schema};
use risingwave_connector::source::iceberg::parquet_file_reader::create_parquet_stream_builder;
use risingwave_pb::batch_plan::file_scan_node;
use risingwave_pb::batch_plan::file_scan_node::StorageType;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder};
use crate::task::BatchTaskContext;

#[derive(PartialEq, Debug)]
pub enum FileFormat {
Expand Down Expand Up @@ -55,7 +59,6 @@ impl Executor for S3FileScanExecutor {
}

impl S3FileScanExecutor {
#![expect(dead_code)]
pub fn new(
file_format: FileFormat,
location: String,
Expand Down Expand Up @@ -113,3 +116,34 @@ impl S3FileScanExecutor {
}
}
}

pub struct FileScanExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for FileScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
_inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
let file_scan_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::FileScan
)?;

assert_eq!(file_scan_node.storage_type, StorageType::S3 as i32);

Ok(Box::new(S3FileScanExecutor::new(
match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
file_scan_node::FileFormat::Unspecified => unreachable!(),
},
file_scan_node.file_location.clone(),
file_scan_node.s3_region.clone(),
file_scan_node.s3_access_key.clone(),
file_scan_node.s3_secret_key.clone(),
source.context.get_config().developer.chunk_size,
Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
source.plan_node().get_identity().clone(),
)))
}
}
14 changes: 9 additions & 5 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::
)
});

static TABLE_FUNCTION_TO_PROJECT_SET: LazyLock<OptimizationStage> = LazyLock::new(|| {
static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Table Function To Project Set",
vec![TableFunctionToProjectSetRule::create()],
"Table Function Convert",
vec![
// Apply file scan rule first
TableFunctionToFileScanRule::create(),
TableFunctionToProjectSetRule::create(),
],
ApplyOrder::TopDown,
)
});
Expand Down Expand Up @@ -592,7 +596,7 @@ impl LogicalOptimizer {
// Should be applied before converting table function to project set.
plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW);
// In order to unnest a table function, we need to convert it into a `project_set` first.
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET);
plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT);

plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
if has_logical_max_one_row(plan.clone()) {
Expand Down Expand Up @@ -700,7 +704,7 @@ impl LogicalOptimizer {
// Table function should be converted into `file_scan` before `project_set`.
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN);
// In order to unnest a table function, we need to convert it into a `project_set` first.
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET);
plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT);

plan = Self::subquery_unnesting(plan, false, explain_trace, &ctx)?;

Expand Down
21 changes: 20 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

use pretty_xmlish::XmlNode;
use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::FileScanNode;

use super::batch::prelude::*;
use super::utils::{childless_record, column_names_pretty, Distill};
Expand Down Expand Up @@ -75,7 +77,24 @@ impl ToDistributedBatch for BatchFileScan {

impl ToBatchPb for BatchFileScan {
fn to_batch_prost_body(&self) -> NodeBody {
todo!()
NodeBody::FileScan(FileScanNode {
columns: self
.core
.columns()
.into_iter()
.map(|col| col.to_protobuf())
.collect(),
file_format: match self.core.file_format {
generic::FileFormat::Parquet => FileFormat::Parquet as i32,
},
storage_type: match self.core.storage_type {
generic::StorageType::S3 => StorageType::S3 as i32,
},
s3_region: self.core.s3_region.clone(),
s3_access_key: self.core.s3_access_key.clone(),
s3_secret_key: self.core.s3_secret_key.clone(),
file_location: self.core.file_location.clone(),
})
}
}

Expand Down
15 changes: 14 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use educe::Educe;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};

use super::GenericPlanNode;
use crate::optimizer::optimizer_context::OptimizerContextRef;
Expand Down Expand Up @@ -62,3 +62,16 @@ impl GenericPlanNode for FileScan {
FunctionalDependencySet::new(self.schema.len())
}
}

impl FileScan {
pub fn columns(&self) -> Vec<ColumnDesc> {
self.schema
.fields
.iter()
.enumerate()
.map(|(i, f)| {
ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone())
})
.collect()
}
}
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ impl ToBatch for LogicalFileScan {

impl ToStream for LogicalFileScan {
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
bail!("FileScan is not supported in streaming mode")
bail!("file_scan function is not supported in streaming mode")
}

fn logical_rewrite_for_stream(
&self,
_ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
bail!("FileScan is not supported in streaming mode")
bail!("file_scan function is not supported in streaming mode")
}
}
9 changes: 9 additions & 0 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ impl StageRunner {
expr_context.clone(),
));
}
} else if let Some(_file_scan_info) = self.stage.file_scan_info.as_ref() {
let task_id = PbTaskId {
query_id: self.stage.query_id.id.clone(),
stage_id: self.stage.id,
task_id: 0_u64,
};
let plan_fragment = self.create_plan_fragment(0_u64, Some(PartitionInfo::File));
let worker = self.choose_worker(&plan_fragment, 0_u32, self.stage.dml_table_id)?;
futures.push(self.schedule_task(task_id, plan_fragment, worker, expr_context.clone()));
} else {
for id in 0..self.stage.parallelism.unwrap() {
let task_id = PbTaskId {
Expand Down
34 changes: 34 additions & 0 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,40 @@ impl LocalQueryExecution {
};
sources.push(exchange_source);
}
} else if let Some(_file_scan_info) = &second_stage.file_scan_info {
let second_stage_plan_node = self.convert_plan_node(
&second_stage.root,
&mut None,
Some(PartitionInfo::File),
next_executor_id.clone(),
)?;
let second_stage_plan_fragment = PlanFragment {
root: Some(second_stage_plan_node),
exchange_info: Some(ExchangeInfo {
mode: DistributionMode::Single as i32,
..Default::default()
}),
};
let local_execute_plan = LocalExecutePlan {
plan: Some(second_stage_plan_fragment),
epoch: Some(self.snapshot.batch_query_epoch()),
tracing_context: tracing_context.clone(),
};
// NOTE: select a random work node here.
let worker_node = self.worker_node_manager.next_random_worker()?;
let exchange_source = ExchangeSource {
task_output_id: Some(TaskOutputId {
task_id: Some(PbTaskId {
task_id: 0_u64,
stage_id: exchange_source_stage_id,
query_id: self.query.query_id.id.clone(),
}),
output_id: 0,
}),
host: Some(worker_node.host.as_ref().unwrap().clone()),
local_execute_plan: Some(Plan(local_execute_plan)),
};
sources.push(exchange_source);
} else {
let second_stage_plan_node = self.convert_plan_node(
&second_stage.root,
Expand Down
Loading
Loading