Skip to content

Commit

Permalink
fix(connector): fix batch file source when there are no files (#18018)
Browse files Browse the repository at this point in the history
Co-authored-by: Kexiang Wang <[email protected]>
  • Loading branch information
wcy-fdu and KeXiangWang authored Aug 13, 2024
1 parent c6ed9bb commit 44d25a5
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 17 deletions.
55 changes: 55 additions & 0 deletions e2e_test/s3/fs_source_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,59 @@ def _assert_eq(field, got, expect):
cur.close()
conn.close()

def test_empty_source(config, prefix, fmt):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _source():
return f's3_test_empty_{fmt}'

def _encode():
if fmt == 'json':
return 'JSON'
else:
return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})"

# Execute a SELECT statement
cur.execute(f'''CREATE SOURCE {_source()}(
id int,
name TEXT,
sex int,
mark int,
) WITH (
connector = 's3_v2',
match_pattern = '{prefix}*.{fmt}',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) FORMAT PLAIN ENCODE {_encode()};''')

stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_source()}'
print(f'Execute {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], 0)

print('Empty source test pass')

cur.execute(f'drop source {_source()}')
cur.close()
conn.close()

if __name__ == "__main__":
FILE_NUM = 4001
Expand Down Expand Up @@ -153,3 +206,5 @@ def _assert_eq(field, got, expect):
# clean up s3 files
for idx, _ in enumerate(formatted_files):
client.remove_object(config["S3_BUCKET"], _s3(idx))

test_empty_source(config, run_id, fmt)
53 changes: 40 additions & 13 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,30 +380,57 @@ impl StageRunner {
));
}
} else if let Some(source_info) = self.stage.source_info.as_ref() {
let chunk_size = (source_info.split_info().unwrap().len() as f32
// If there is no file in source, the `chunk_size` is set to 1.
let chunk_size = ((source_info.split_info().unwrap().len() as f32
/ self.stage.parallelism.unwrap() as f32)
.ceil() as usize;
for (id, split) in source_info
.split_info()
.unwrap()
.chunks(chunk_size)
.enumerate()
{
.ceil() as usize)
.max(1);
if source_info.split_info().unwrap().is_empty() {
// No file in source, schedule an empty task.
const EMPTY_TASK_ID: u64 = 0;
let task_id = PbTaskId {
query_id: self.stage.query_id.id.clone(),
stage_id: self.stage.id,
task_id: id as u64,
task_id: EMPTY_TASK_ID,
};
let plan_fragment = self
.create_plan_fragment(id as u64, Some(PartitionInfo::Source(split.to_vec())));
let worker =
self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?;
let plan_fragment =
self.create_plan_fragment(EMPTY_TASK_ID, Some(PartitionInfo::Source(vec![])));
let worker = self.choose_worker(
&plan_fragment,
EMPTY_TASK_ID as u32,
self.stage.dml_table_id,
)?;
futures.push(self.schedule_task(
task_id,
plan_fragment,
worker,
expr_context.clone(),
));
} else {
for (id, split) in source_info
.split_info()
.unwrap()
.chunks(chunk_size)
.enumerate()
{
let task_id = PbTaskId {
query_id: self.stage.query_id.id.clone(),
stage_id: self.stage.id,
task_id: id as u64,
};
let plan_fragment = self.create_plan_fragment(
id as u64,
Some(PartitionInfo::Source(split.to_vec())),
);
let worker =
self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?;
futures.push(self.schedule_task(
task_id,
plan_fragment,
worker,
expr_context.clone(),
));
}
}
} else if let Some(file_scan_info) = self.stage.file_scan_info.as_ref() {
let chunk_size = (file_scan_info.file_location.len() as f32
Expand Down
14 changes: 10 additions & 4 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,13 +732,19 @@ impl StageGraph {

// For batch reading file source, the number of files involved is typically large.
// In order to avoid generating a task for each file, the parallelism of tasks is limited here.
// todo(wcy-fdu): Currently it will be divided into half of schedule_unit_count groups, and this will be changed to configurable later.
// The minimum `task_parallelism` is 1. Additionally, `task_parallelism`
// must be greater than the number of files to read. Therefore, we first take the
// minimum of the number of files and (self.batch_parallelism / 2). If the number of
// files is 0, we set task_parallelism to 1.

let task_parallelism = match &stage.source_info {
Some(SourceScanInfo::Incomplete(source_fetch_info)) => {
match source_fetch_info.connector {
ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => {
(self.batch_parallelism / 2) as u32
}
ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => (min(
complete_source_info.split_info().unwrap().len() as u32,
(self.batch_parallelism / 2) as u32,
))
.max(1),
_ => complete_source_info.split_info().unwrap().len() as u32,
}
}
Expand Down

0 comments on commit 44d25a5

Please sign in to comment.