diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 82197fd97d78f..f01fd233e86f3 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -540,6 +540,28 @@ steps: timeout_in_minutes: 25 retry: *auto-retry + - label: "S3_v2 source batch read on AWS (json parser)" + key: "s3-v2-source-batch-read-check-aws-json-parser" + command: "ci/scripts/s3-source-test.sh -p ci-release -s 'fs_source_batch.py json'" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-s3-source-tests" + || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ + depends_on: build + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + - docker-compose#v5.1.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - S3_SOURCE_TEST_CONF + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + - label: "S3_v2 source check on AWS (csv parser)" key: "s3-v2-source-check-aws-csv-parser" command: "ci/scripts/s3-source-test.sh -p ci-release -s 'fs_source_v2.py csv_without_header'" diff --git a/e2e_test/s3/fs_source_batch.py b/e2e_test/s3/fs_source_batch.py new file mode 100644 index 0000000000000..d606be36f37f0 --- /dev/null +++ b/e2e_test/s3/fs_source_batch.py @@ -0,0 +1,155 @@ +import os +import sys +import csv +import json +import random +import psycopg2 + +from time import sleep +from io import StringIO +from minio import Minio +from functools import partial + +def gen_data(file_num, item_num_per_file): + assert item_num_per_file % 2 == 0, \ + f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' + return [ + [{ + 'id': file_id * item_num_per_file + item_id, + 'name': f'{file_id}_{item_id}', + 'sex': item_id % 2, + 'mark': (-1) ** (item_id % 2), + } for item_id in range(item_num_per_file)] + for file_id in range(file_num) + ] + +def format_json(data): + return [ + '\n'.join([json.dumps(item) for item in file]) + for file in data + ] + +def format_csv(data, with_header): + csv_files = [] + + for file_data in data: + ostream = StringIO() + writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys()) + if with_header: + writer.writeheader() + for item_data in file_data: + writer.writerow(item_data) + csv_files.append(ostream.getvalue()) + return csv_files + +def do_test(config, file_num, item_num_per_file, prefix, fmt): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _source(): + return f's3_test_{fmt}' + + def _encode(): + if fmt == 'json': + return 'JSON' + else: + return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" + + # Execute a SELECT statement + cur.execute(f'''CREATE SOURCE {_source()}( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 's3_v2', + match_pattern = '{prefix}*.{fmt}', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) FORMAT PLAIN ENCODE {_encode()};''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from {_source()}') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in source, {total_rows} expected, wait 30s") + sleep(30) + + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_source()}' + print(f'Execute {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + _assert_eq('sum(sex)', result[2], total_rows / 2) + _assert_eq('sum(mark)', result[3], 0) + + print('Test pass') + + cur.execute(f'drop source {_source()}') + cur.close() + conn.close() + + +if __name__ == "__main__": + FILE_NUM = 4001 + ITEM_NUM_PER_FILE = 2 + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + + fmt = sys.argv[1] + FORMATTER = { + 'json': format_json, + 'csv_with_header': partial(format_csv, with_header=True), + 'csv_without_header': partial(format_csv, with_header=False), + } + assert fmt in FORMATTER, f"Unsupported format: {fmt}" + formatted_files = FORMATTER[fmt](data) + + config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) + client = Minio( + config["S3_ENDPOINT"], + access_key=config["S3_ACCESS_KEY"], + secret_key=config["S3_SECRET_KEY"], + secure=True, + ) + run_id = str(random.randint(1000, 9999)) + _local = lambda idx: f'data_{idx}.{fmt}' + _s3 = lambda idx: f"{run_id}_data_{idx}.{fmt}" + + # put s3 files + for idx, file_str in enumerate(formatted_files): + with open(_local(idx), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + + client.fput_object( + config["S3_BUCKET"], + _s3(idx), + _local(idx) + ) + + # do test + do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) + + # clean up s3 files + for idx, _ in enumerate(formatted_files): + client.remove_object(config["S3_BUCKET"], _s3(idx)) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 689217a4f011d..b6c00b1a14aa5 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -59,7 +59,7 @@ message SourceNode { uint32 source_id = 1; repeated plan_common.ColumnCatalog columns = 2; map with_properties = 3; - bytes split = 4; + repeated bytes split = 4; catalog.StreamSourceInfo info = 5; } diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 8c9894b726ec6..4a40a45a6c28a 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use futures::StreamExt; use futures_async_stream::try_stream; +use itertools::Itertools; use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; use risingwave_common::types::DataType; @@ -43,7 +44,7 @@ pub struct SourceExecutor { column_ids: Vec, metrics: Arc, source_id: TableId, - split: SplitImpl, + split_list: Vec, schema: Schema, identity: String, @@ -89,7 +90,11 @@ impl BoxedExecutorBuilder for SourceExecutor { .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id)) .collect(); - let split = SplitImpl::restore_from_bytes(&source_node.split)?; + let split_list = source_node + .split + .iter() + .map(|split| SplitImpl::restore_from_bytes(split).unwrap()) + .collect_vec(); let fields = source_node .columns @@ -105,8 +110,9 @@ impl BoxedExecutorBuilder for SourceExecutor { if let ConnectorProperties::Iceberg(iceberg_properties) = config { let iceberg_properties: IcebergProperties = *iceberg_properties; - if let SplitImpl::Iceberg(split) = split { - let split: IcebergSplit = split; + assert_eq!(split_list.len(), 1); + if let SplitImpl::Iceberg(split) = &split_list[0] { + let split: IcebergSplit = split.clone(); Ok(Box::new(IcebergScanExecutor::new( iceberg_properties.to_iceberg_config(), Some(split.snapshot_id), @@ -135,7 +141,7 @@ impl BoxedExecutorBuilder for SourceExecutor { column_ids, metrics: source.context().source_metrics(), source_id: TableId::new(source_node.source_id), - split, + split_list, schema, identity: source.plan_node().get_identity().clone(), source_ctrl_opts, @@ -173,7 +179,7 @@ impl SourceExecutor { )); let stream = self .source - .to_stream(Some(vec![self.split]), self.column_ids, source_ctx) + .to_stream(Some(self.split_list), self.column_ids, source_ctx) .await?; #[for_await] diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index b40fbc33eab55..86374f68e7450 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -453,6 +453,8 @@ impl ConnectorProperties { pub fn support_multiple_splits(&self) -> bool { matches!(self, ConnectorProperties::Kafka(_)) + || matches!(self, ConnectorProperties::OpendalS3(_)) + || matches!(self, ConnectorProperties::Gcs(_)) } } diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 5cb84652fbab6..99173327be76f 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -151,6 +151,7 @@ impl OpendalReader { offset += len; batch_size += len; batch.push(msg); + if batch.len() >= max_chunk_size { source_ctx .metrics diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 833c9661c3ca1..5cfd10835998a 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -33,7 +33,7 @@ use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumer use crate::source::filesystem::opendal_source::{ OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; -use crate::source::filesystem::FsPageItem; +use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ create_split_reader, BoxChunkSourceStream, BoxTryStream, Column, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitReader, @@ -149,7 +149,6 @@ impl SourceReader { vec![reader] } else { let to_reader_splits = splits.into_iter().map(|split| vec![split]); - try_join_all(to_reader_splits.into_iter().map(|splits| { tracing::debug!(?splits, ?prop, "spawning connector split reader"); let props = prop.clone(); @@ -194,3 +193,30 @@ async fn build_opendal_fs_list_stream(lister: OpendalEnumera } } } + +#[try_stream(boxed, ok = OpendalFsSplit, error = crate::error::ConnectorError)] +pub async fn build_opendal_fs_list_for_batch(lister: OpendalEnumerator) { + let matcher = lister.get_matcher(); + let mut object_metadata_iter = lister.list().await?; + + while let Some(list_res) = object_metadata_iter.next().await { + match list_res { + Ok(res) => { + if matcher + .as_ref() + .map(|m| m.matches(&res.name)) + .unwrap_or(true) + { + let split = OpendalFsSplit::new(res.name, 0, res.size as usize); + yield split + } else { + continue; + } + } + Err(err) => { + tracing::error!(error = %err.as_report(), "list object fail"); + return Err(err); + } + } + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 43ec6d2a89de8..c155440ed32d0 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -19,7 +19,6 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; @@ -490,9 +489,6 @@ impl PredicatePushdown for LogicalSource { impl ToBatch for LogicalSource { fn to_batch(&self) -> Result { - if self.core.is_new_fs_connector() { - bail_not_implemented!("New fs connector for batch"); - } let mut plan: PlanRef = BatchSource::new(self.core.clone()).into(); if let Some(exprs) = &self.output_exprs { diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 9585e18713d58..0343867b1f81f 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -377,14 +377,22 @@ impl StageRunner { )); } } else if let Some(source_info) = self.stage.source_info.as_ref() { - for (id, split) in source_info.split_info().unwrap().iter().enumerate() { + let chunk_size = (source_info.split_info().unwrap().len() as f32 + / self.stage.parallelism.unwrap() as f32) + .ceil() as usize; + for (id, split) in source_info + .split_info() + .unwrap() + .chunks(chunk_size) + .enumerate() + { let task_id = TaskIdPb { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, task_id: id as u32, }; let plan_fragment = self - .create_plan_fragment(id as u32, Some(PartitionInfo::Source(split.clone()))); + .create_plan_fragment(id as u32, Some(PartitionInfo::Source(split.to_vec()))); let worker = self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?; futures.push(self.schedule_task( @@ -981,11 +989,15 @@ impl StageRunner { let NodeBody::Source(mut source_node) = node_body else { unreachable!(); }; + let partition = partition .expect("no partition info for seq scan") .into_source() .expect("PartitionInfo should be SourcePartitionInfo"); - source_node.split = partition.encode_to_bytes().into(); + source_node.split = partition + .into_iter() + .map(|split| split.encode_to_bytes().into()) + .collect_vec(); PlanNodePb { children: vec![], identity, diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 5e1838e765a49..cf9e1646ec515 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -55,7 +55,6 @@ use crate::scheduler::{ReadSnapshot, SchedulerError, SchedulerResult}; use crate::session::{FrontendEnv, SessionImpl}; pub type LocalQueryStream = ReceiverStream>; - pub struct LocalQueryExecution { sql: String, query: Query, @@ -354,11 +353,21 @@ impl LocalQueryExecution { sources.push(exchange_source); } } else if let Some(source_info) = &second_stage.source_info { - for (id, split) in source_info.split_info().unwrap().iter().enumerate() { + // For file source batch read, all the files to be read are divide into several parts to prevent the task from taking up too many resources. + + let chunk_size = (source_info.split_info().unwrap().len() as f32 + / (self.worker_node_manager.schedule_unit_count()) as f32) + .ceil() as usize; + for (id, split) in source_info + .split_info() + .unwrap() + .chunks(chunk_size) + .enumerate() + { let second_stage_plan_node = self.convert_plan_node( &second_stage.root, &mut None, - Some(PartitionInfo::Source(split.clone())), + Some(PartitionInfo::Source(split.to_vec())), next_executor_id.clone(), )?; let second_stage_plan_fragment = PlanFragment { @@ -470,7 +479,10 @@ impl LocalQueryExecution { let partition = partition .into_source() .expect("PartitionInfo should be SourcePartitionInfo here"); - source_node.split = partition.encode_to_bytes().into(); + source_node.split = partition + .into_iter() + .map(|split| split.encode_to_bytes().into()) + .collect_vec(); } } _ => unreachable!(), diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 47784fc8c4d69..c68cd02c2eeeb 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_recursion::async_recursion; use enum_as_inner::EnumAsInner; +use futures::TryStreamExt; use itertools::Itertools; use pgwire::pg_server::SessionId; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; @@ -28,8 +29,11 @@ use risingwave_common::catalog::TableDesc; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; use risingwave_common::util::scan_range::ScanRange; +use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; +use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; use risingwave_connector::source::iceberg::IcebergSplitEnumerator; use risingwave_connector::source::kafka::KafkaSplitEnumerator; +use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl, }; @@ -294,8 +298,31 @@ impl SourceScanInfo { .into_iter() .map(SplitImpl::Kafka) .collect_vec(); + Ok(SourceScanInfo::Complete(split_info)) } + ConnectorProperties::OpendalS3(prop) => { + let lister: OpendalEnumerator = + OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; + let stream = build_opendal_fs_list_for_batch(lister); + + let batch_res: Vec<_> = stream.try_collect().await?; + let res = batch_res + .into_iter() + .map(SplitImpl::OpendalS3) + .collect_vec(); + + Ok(SourceScanInfo::Complete(res)) + } + ConnectorProperties::Gcs(prop) => { + let lister: OpendalEnumerator = + OpendalEnumerator::new_gcs_source(*prop)?; + let stream = build_opendal_fs_list_for_batch(lister); + let batch_res: Vec<_> = stream.try_collect().await?; + let res = batch_res.into_iter().map(SplitImpl::Gcs).collect_vec(); + + Ok(SourceScanInfo::Complete(res)) + } ConnectorProperties::Iceberg(prop) => { let iceberg_enumerator = IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into()) @@ -376,7 +403,7 @@ pub struct TablePartitionInfo { #[derive(Clone, Debug, EnumAsInner)] pub enum PartitionInfo { Table(TablePartitionInfo), - Source(SplitImpl), + Source(Vec), } /// Fragment part of `Query`. @@ -437,6 +464,7 @@ impl QueryStage { &self, exchange_info: Option, source_info: SourceScanInfo, + task_parallelism: u32, ) -> Self { assert!(matches!(source_info, SourceScanInfo::Complete(_))); let exchange_info = if let Some(exchange_info) = exchange_info { @@ -444,13 +472,12 @@ impl QueryStage { } else { self.exchange_info.clone() }; - Self { query_id: self.query_id.clone(), id: self.id, root: self.root.clone(), exchange_info, - parallelism: Some(source_info.split_info().unwrap().len() as u32), + parallelism: Some(task_parallelism), table_scan_info: self.table_scan_info.clone(), source_info: Some(source_info), has_lookup_join: self.has_lookup_join, @@ -652,6 +679,7 @@ impl StageGraph { stage.source_info, Some(SourceScanInfo::Incomplete(_)) )); + let complete_source_info = stage .source_info .as_ref() @@ -660,9 +688,26 @@ impl StageGraph { .complete(self.batch_parallelism) .await?; + // For batch reading file source, the number of files involved is typically large. + // In order to avoid generating a task for each file, the parallelism of tasks is limited here. + // todo(wcy-fdu): Currently it will be divided into half of schedule_unit_count groups, and this will be changed to configurable later. + let task_parallelism = match &stage.source_info { + Some(SourceScanInfo::Incomplete(source_fetch_info)) => { + match source_fetch_info.connector { + ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => { + (self.batch_parallelism / 2) as u32 + } + _ => complete_source_info.split_info().unwrap().len() as u32, + } + } + _ => unreachable!(), + }; + // For file source batch read, all the files to be read are divide into several parts to prevent the task from taking up too many resources. + // todo(wcy-fdu): Currently it will be divided into half of batch_parallelism groups, and this will be changed to configurable later. let complete_stage = Arc::new(stage.clone_with_exchange_info_and_complete_source_info( exchange_info, complete_source_info, + task_parallelism, )); let parallelism = complete_stage.parallelism; complete_stages.insert(stage.id, complete_stage);