diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 8bf9fc5b7e610..ae3fc7056a6a6 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -159,7 +159,10 @@ impl SourceExecutor { for chunk in stream { match chunk { Ok(chunk) => { - yield covert_stream_chunk_to_batch_chunk(chunk.chunk)?; + let data_chunk = covert_stream_chunk_to_batch_chunk(chunk.chunk)?; + if data_chunk.capacity() > 0 { + yield data_chunk; + } } Err(e) => { return Err(e); diff --git a/src/common/src/util/chunk_coalesce.rs b/src/common/src/util/chunk_coalesce.rs index 9a41fc83e8f0e..3bd56b19e434d 100644 --- a/src/common/src/util/chunk_coalesce.rs +++ b/src/common/src/util/chunk_coalesce.rs @@ -285,7 +285,12 @@ impl SlicedDataChunk { } pub fn with_offset_checked(data_chunk: DataChunk, offset: usize) -> Self { - assert!(offset < data_chunk.capacity()); + assert!( + offset < data_chunk.capacity(), + "offset {}, data_chunk capacity {}", + offset, + data_chunk.capacity() + ); Self { data_chunk, offset } }