Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Jul 12, 2024
1 parent a960216 commit 80c9d87
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions src/connector/src/parser/parquet_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,15 @@ impl ParquetParser {
#[for_await]
for record_batch in record_batch_stream {
let record_batch: RecordBatch = record_batch?;
let record_batch_num_rows = record_batch.num_rows();
// Convert each record batch into a stream chunk according to user defined schema.
let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(record_batch)?;

yield chunk;
self.update_offset(record_batch_num_rows);
}
}

fn update_offset(&mut self, record_batch_num_rows: usize) {
self.offset += record_batch_num_rows;
fn inc_offset(&mut self) {
self.offset += 1;
}

/// The function `convert_record_batch_to_stream_chunk` is designed to transform the given `RecordBatch` into a `StreamChunk`.
Expand Down Expand Up @@ -145,8 +143,9 @@ impl ParquetParser {
risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) =>{
let mut array_builder =
ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
for i in 0..record_batch.num_rows(){
let datum: Datum = Some(ScalarImpl::Utf8((self.offset+i).to_string().into()));
for _ in 0..record_batch.num_rows(){
let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
self.inc_offset();
array_builder.append( datum);
}
let res = array_builder.finish();
Expand Down

0 comments on commit 80c9d87

Please sign in to comment.