Skip to content

Commit

Permalink
store reference of SourceStreamChunkBuilder in SourceStreamChunkRowWr…
Browse files Browse the repository at this point in the history
…iter

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 11, 2024
1 parent c7cdfb5 commit 753e867
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ impl SourceStreamChunkBuilder {

pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
SourceStreamChunkRowWriter {
descs: &self.descs,
builders: &mut self.builders,
op_builder: &mut self.op_builder,
vis_builder: &mut self.vis_builder,
builder: self,
visible: true, // write visible rows by default
row_meta: None,
}
Expand Down Expand Up @@ -104,10 +101,7 @@ impl SourceStreamChunkBuilder {
/// - errors for non-primary key columns will be ignored and filled with default value instead;
/// - other errors will be propagated.
pub struct SourceStreamChunkRowWriter<'a> {
descs: &'a [SourceColumnDesc],
builders: &'a mut [ArrayBuilderImpl],
op_builder: &'a mut Vec<Op>,
vis_builder: &'a mut BitmapBuilder,
builder: &'a mut SourceStreamChunkBuilder,

/// Whether the rows written by this writer should be visible in output `StreamChunk`.
visible: bool,
Expand Down Expand Up @@ -140,8 +134,8 @@ impl<'a> SourceStreamChunkRowWriter<'a> {

impl SourceStreamChunkRowWriter<'_> {
fn append_op(&mut self, op: Op) {
self.op_builder.push(op);
self.vis_builder.append(self.visible);
self.builder.op_builder.push(op);
self.builder.vis_builder.append(self.visible);
}

fn do_action<'a, A: OpAction>(
Expand Down Expand Up @@ -290,8 +284,8 @@ impl SourceStreamChunkRowWriter<'_> {
// Columns that changes have been applied to. Used to rollback when an error occurs.
let mut applied_columns = 0;

let result = (self.descs.iter())
.zip_eq_fast(self.builders.iter_mut())
let result = (self.builder.descs.iter())
.zip_eq_fast(self.builder.builders.iter_mut())
.try_for_each(|(desc, builder)| {
wrapped_f(desc).map(|output| {
A::apply(builder, output);
Expand All @@ -306,7 +300,7 @@ impl SourceStreamChunkRowWriter<'_> {
}
Err(e) => {
for i in 0..applied_columns {
A::rollback(&mut self.builders[i]);
A::rollback(&mut self.builder.builders[i]);
}
Err(e)
}
Expand Down

0 comments on commit 753e867

Please sign in to comment.