From 80c9d870c39fdbfbb90d748d3839b25e9662c9d6 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 12 Jul 2024 13:21:41 +0800 Subject: [PATCH] minor --- src/connector/src/parser/parquet_parser.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index 91e18ec93f8e7..5696e54b98ac4 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -53,17 +53,15 @@ impl ParquetParser { #[for_await] for record_batch in record_batch_stream { let record_batch: RecordBatch = record_batch?; - let record_batch_num_rows = record_batch.num_rows(); // Convert each record batch into a stream chunk according to user defined schema. let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(record_batch)?; yield chunk; - self.update_offset(record_batch_num_rows); } } - fn update_offset(&mut self, record_batch_num_rows: usize) { - self.offset += record_batch_num_rows; + fn inc_offset(&mut self) { + self.offset += 1; } /// The function `convert_record_batch_to_stream_chunk` is designed to transform the given `RecordBatch` into a `StreamChunk`. @@ -145,8 +143,9 @@ impl ParquetParser { risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) =>{ let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone()); - for i in 0..record_batch.num_rows(){ - let datum: Datum = Some(ScalarImpl::Utf8((self.offset+i).to_string().into())); + for _ in 0..record_batch.num_rows(){ + let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into())); + self.inc_offset(); array_builder.append( datum); } let res = array_builder.finish();