Skip to content

Commit

Permalink
simplify SourceStreamChunkRowWriter
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 11, 2024
1 parent 28490e4 commit c77b7d0
Showing 1 changed file with 25 additions and 35 deletions.
60 changes: 25 additions & 35 deletions src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::LazyLock;

use risingwave_common::array::stream_record::RecordType;
use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
use risingwave_common::bitmap::BitmapBuilder;
use risingwave_common::log::LogSuppresser;
Expand Down Expand Up @@ -133,12 +134,7 @@ impl<'a> SourceStreamChunkRowWriter<'a> {
}

impl SourceStreamChunkRowWriter<'_> {
fn append_op(&mut self, op: Op) {
self.builder.op_builder.push(op);
self.builder.vis_builder.append(self.visible);
}

fn do_action<'a, A: OpAction>(
fn do_action<'a, A: RowWriterAction>(
&'a mut self,
mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
) -> AccessResult<()> {
Expand Down Expand Up @@ -295,7 +291,12 @@ impl SourceStreamChunkRowWriter<'_> {

match result {
Ok(_) => {
A::finish(self);
// commit the action by appending `Op`s and visibility
for op in A::RECORD_TYPE.ops() {
self.builder.op_builder.push(*op);
self.builder.vis_builder.append(self.visible);
}

Ok(())
}
Err(e) => {
Expand All @@ -319,7 +320,7 @@ impl SourceStreamChunkRowWriter<'_> {
where
D: Into<DatumCow<'a>>,
{
self.do_action::<OpActionInsert>(|desc| f(desc).map(Into::into))
self.do_action::<InsertAction>(|desc| f(desc).map(Into::into))
}

/// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that
Expand All @@ -334,7 +335,7 @@ impl SourceStreamChunkRowWriter<'_> {
where
D: Into<DatumCow<'a>>,
{
self.do_action::<OpActionDelete>(|desc| f(desc).map(Into::into))
self.do_action::<DeleteAction>(|desc| f(desc).map(Into::into))
}

/// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that
Expand All @@ -350,27 +351,28 @@ impl SourceStreamChunkRowWriter<'_> {
D1: Into<DatumCow<'a>>,
D2: Into<DatumCow<'a>>,
{
self.do_action::<OpActionUpdate>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
self.do_action::<UpdateAction>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
}
}

trait OpAction {
trait RowWriterAction {
type Output<'a>;
const RECORD_TYPE: RecordType;

fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;

fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);

fn rollback(builder: &mut ArrayBuilderImpl);

fn finish(writer: &mut SourceStreamChunkRowWriter<'_>);
}

struct OpActionInsert;
struct InsertAction;

impl OpAction for OpActionInsert {
impl RowWriterAction for InsertAction {
type Output<'a> = DatumCow<'a>;

const RECORD_TYPE: RecordType = RecordType::Insert;

#[inline(always)]
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
datum.into()
Expand All @@ -385,18 +387,15 @@ impl OpAction for OpActionInsert {
fn rollback(builder: &mut ArrayBuilderImpl) {
builder.pop().unwrap()
}

#[inline(always)]
fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) {
writer.append_op(Op::Insert);
}
}

struct OpActionDelete;
struct DeleteAction;

impl OpAction for OpActionDelete {
impl RowWriterAction for DeleteAction {
type Output<'a> = DatumCow<'a>;

const RECORD_TYPE: RecordType = RecordType::Delete;

#[inline(always)]
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
datum.into()
Expand All @@ -411,18 +410,15 @@ impl OpAction for OpActionDelete {
fn rollback(builder: &mut ArrayBuilderImpl) {
builder.pop().unwrap()
}

#[inline(always)]
fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) {
writer.append_op(Op::Delete);
}
}

struct OpActionUpdate;
struct UpdateAction;

impl OpAction for OpActionUpdate {
impl RowWriterAction for UpdateAction {
type Output<'a> = (DatumCow<'a>, DatumCow<'a>);

const RECORD_TYPE: RecordType = RecordType::Update;

#[inline(always)]
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
let datum = datum.into();
Expand All @@ -440,10 +436,4 @@ impl OpAction for OpActionUpdate {
builder.pop().unwrap();
builder.pop().unwrap();
}

#[inline(always)]
fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) {
writer.append_op(Op::UpdateDelete);
writer.append_op(Op::UpdateInsert);
}
}

0 comments on commit c77b7d0

Please sign in to comment.