diff --git a/e2e_test/s3/fs_source_batch.py b/e2e_test/s3/fs_source_batch.py index d606be36f37f0..9f8da63533a68 100644 --- a/e2e_test/s3/fs_source_batch.py +++ b/e2e_test/s3/fs_source_batch.py @@ -109,6 +109,59 @@ def _assert_eq(field, got, expect): cur.close() conn.close() +def test_empty_source(config, prefix, fmt): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _source(): + return f's3_test_empty_{fmt}' + + def _encode(): + if fmt == 'json': + return 'JSON' + else: + return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" + + # Execute a SELECT statement + cur.execute(f'''CREATE SOURCE {_source()}( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 's3_v2', + match_pattern = '{prefix}*.{fmt}', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) FORMAT PLAIN ENCODE {_encode()};''') + + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_source()}' + print(f'Execute {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], 0) + + print('Empty source test pass') + + cur.execute(f'drop source {_source()}') + cur.close() + conn.close() if __name__ == "__main__": FILE_NUM = 4001 @@ -153,3 +206,5 @@ def _assert_eq(field, got, expect): # clean up s3 files for idx, _ in enumerate(formatted_files): client.remove_object(config["S3_BUCKET"], _s3(idx)) + + test_empty_source(config, run_id, fmt) \ No newline at end of file diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 543d0c0a3ae6f..a6c76c78bf23d 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -380,30 +380,57 @@ impl StageRunner { )); } } else if let Some(source_info) = self.stage.source_info.as_ref() { - let chunk_size = (source_info.split_info().unwrap().len() as f32 + // If there is no file in source, the `chunk_size` is set to 1. + let chunk_size = ((source_info.split_info().unwrap().len() as f32 / self.stage.parallelism.unwrap() as f32) - .ceil() as usize; - for (id, split) in source_info - .split_info() - .unwrap() - .chunks(chunk_size) - .enumerate() - { + .ceil() as usize) + .max(1); + if source_info.split_info().unwrap().is_empty() { + // No file in source, schedule an empty task. + const EMPTY_TASK_ID: u64 = 0; let task_id = PbTaskId { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, - task_id: id as u64, + task_id: EMPTY_TASK_ID, }; - let plan_fragment = self - .create_plan_fragment(id as u64, Some(PartitionInfo::Source(split.to_vec()))); - let worker = - self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?; + let plan_fragment = + self.create_plan_fragment(EMPTY_TASK_ID, Some(PartitionInfo::Source(vec![]))); + let worker = self.choose_worker( + &plan_fragment, + EMPTY_TASK_ID as u32, + self.stage.dml_table_id, + )?; futures.push(self.schedule_task( task_id, plan_fragment, worker, expr_context.clone(), )); + } else { + for (id, split) in source_info + .split_info() + .unwrap() + .chunks(chunk_size) + .enumerate() + { + let task_id = PbTaskId { + query_id: self.stage.query_id.id.clone(), + stage_id: self.stage.id, + task_id: id as u64, + }; + let plan_fragment = self.create_plan_fragment( + id as u64, + Some(PartitionInfo::Source(split.to_vec())), + ); + let worker = + self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?; + futures.push(self.schedule_task( + task_id, + plan_fragment, + worker, + expr_context.clone(), + )); + } } } else if let Some(file_scan_info) = self.stage.file_scan_info.as_ref() { let chunk_size = (file_scan_info.file_location.len() as f32 diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 7643e5c5e7ba2..65bfbe09c54b0 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -732,13 +732,19 @@ impl StageGraph { // For batch reading file source, the number of files involved is typically large. // In order to avoid generating a task for each file, the parallelism of tasks is limited here. - // todo(wcy-fdu): Currently it will be divided into half of schedule_unit_count groups, and this will be changed to configurable later. + // The minimum `task_parallelism` is 1. Additionally, `task_parallelism` + // must be greater than the number of files to read. Therefore, we first take the + // minimum of the number of files and (self.batch_parallelism / 2). If the number of + // files is 0, we set task_parallelism to 1. + let task_parallelism = match &stage.source_info { Some(SourceScanInfo::Incomplete(source_fetch_info)) => { match source_fetch_info.connector { - ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => { - (self.batch_parallelism / 2) as u32 - } + ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => (min( + complete_source_info.split_info().unwrap().len() as u32, + (self.batch_parallelism / 2) as u32, + )) + .max(1), _ => complete_source_info.split_info().unwrap().len() as u32, } }