Skip to content

Commit

Permalink
support debezium dual-event on tombstone
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Sep 13, 2023
1 parent 3be7424 commit 32f47ed
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 30 deletions.
8 changes: 4 additions & 4 deletions src/connector/src/sink/formatter/append_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use risingwave_common::catalog::Schema;

use super::{Op, Result, RowRef, SinkFormatter};
use super::{FormattedRow, Op, Result, RowRef, SinkFormatter};
use crate::sink::encoder::RowEncoder;

pub struct AppendOnlyFormatter<'a, KE, VE> {
Expand Down Expand Up @@ -44,9 +44,9 @@ impl<'a, KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<'
type K = Option<KE::Output>;
type V = Option<VE::Output>;

fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result<Option<(Self::K, Self::V)>> {
fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result<FormattedRow<Self::K, Self::V>> {
if op != Op::Insert {
return Ok(None);
return Ok(FormattedRow::Skip);
}
let event_key_object = Some(self.key_encoder.encode(
row,
Expand All @@ -55,6 +55,6 @@ impl<'a, KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<'
)?);
let event_object = Some(self.val_encoder.encode_all(row, &self.schema.fields)?);

Ok(Some((event_key_object, event_object)))
Ok(FormattedRow::Pair(event_key_object, event_object))
}
}
18 changes: 7 additions & 11 deletions src/connector/src/sink/formatter/debezium_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use risingwave_common::catalog::{Field, Schema};
use serde_json::{json, Map, Value};
use tracing::warn;

use super::{Op, Result, RowRef, SinkFormatter};
use super::{FormattedRow, Op, Result, RowRef, SinkFormatter};
use crate::sink::encoder::{JsonEncoder, RowEncoder};
use crate::sink::utils::TimestampHandlingMode;

Expand Down Expand Up @@ -62,7 +62,7 @@ impl<'a> SinkFormatter for DebeziumJsonFormatter<'a> {
type K = Option<Value>;
type V = Option<Value>;

fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result<Option<(Self::K, Self::V)>> {
fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result<FormattedRow<Self::K, Self::V>> {
let event_key_object: Option<Value> = Some(json!({
"schema": json!({
"type": "struct",
Expand Down Expand Up @@ -99,19 +99,15 @@ impl<'a> SinkFormatter for DebeziumJsonFormatter<'a> {
"source": self.source_field,
}
}));
// yield (event_key_object.clone(), value_obj);

if self.opts.gen_tombstone {
// Tomestone event
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
// yield (event_key_object, None);
return Ok(FormattedRow::WithTombstone(event_key_object, value_obj));
}

return Ok(None);
value_obj
}
Op::UpdateDelete => {
self.update_cache = Some(self.encoder.encode_all(row, &self.schema.fields)?);
return Ok(None);
return Ok(FormattedRow::Skip);
}
Op::UpdateInsert => {
if let Some(before) = self.update_cache.take() {
Expand All @@ -130,12 +126,12 @@ impl<'a> SinkFormatter for DebeziumJsonFormatter<'a> {
"not found UpdateDelete in prev row, skipping, row index {:?}",
row.index()
);
return Ok(None);
return Ok(FormattedRow::Skip);
}
}
};

Ok(Some((event_key_object, event_object)))
Ok(FormattedRow::Pair(event_key_object, event_object))
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,13 @@ pub use upsert::UpsertFormatter;
pub trait SinkFormatter {
type K;
type V;
fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result<Option<(Self::K, Self::V)>>;
fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result<FormattedRow<Self::K, Self::V>>;
}

pub enum FormattedRow<K, V> {
Skip,
Pair(K, V),
// Tomestone event
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
WithTombstone(K, V),
}
8 changes: 4 additions & 4 deletions src/connector/src/sink/formatter/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use risingwave_common::catalog::Schema;

use super::{Op, Result, RowRef, SinkFormatter};
use super::{FormattedRow, Op, Result, RowRef, SinkFormatter};
use crate::sink::encoder::RowEncoder;

pub struct UpsertFormatter<'a, KE, VE> {
Expand Down Expand Up @@ -44,7 +44,7 @@ impl<'a, KE: RowEncoder, VE: RowEncoder> SinkFormatter for UpsertFormatter<'a, K
type K = Option<KE::Output>;
type V = Option<VE::Output>;

fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result<Option<(Self::K, Self::V)>> {
fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result<FormattedRow<Self::K, Self::V>> {
let event_key_object = Some(self.key_encoder.encode(
row,
&self.schema.fields,
Expand All @@ -59,10 +59,10 @@ impl<'a, KE: RowEncoder, VE: RowEncoder> SinkFormatter for UpsertFormatter<'a, K
Op::Delete => None,
Op::UpdateDelete => {
// upsert semantic does not require update delete event
return Ok(None);
return Ok(FormattedRow::Skip);
}
};

Ok(Some((event_key_object, event_object)))
Ok(FormattedRow::Pair(event_key_object, event_object))
}
}
10 changes: 5 additions & 5 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ mod test {

use super::*;
use crate::sink::encoder::{EmptyEncoder, SerToString};
use crate::sink::formatter::{schema_to_json, SinkFormatter};
use crate::sink::formatter::{schema_to_json, FormattedRow, SinkFormatter};
use crate::sink::utils::*;

#[test]
Expand Down Expand Up @@ -807,10 +807,10 @@ mod test {
);
let json_chunk: Vec<_> = chunk
.rows()
.flat_map(|(op, row)| {
f.format_row(op, row)
.unwrap()
.map(|(_, v)| v.unwrap().ser_to_string().unwrap())
.flat_map(|(op, row)| match f.format_row(op, row).unwrap() {
FormattedRow::Skip => None,
FormattedRow::Pair(_, v) => Some(v.unwrap().ser_to_string().unwrap()),
FormattedRow::WithTombstone(_, _) => unreachable!(),
})
.collect();
let schema_json = schema_to_json(&schema, "test_db", "test_table");
Expand Down
15 changes: 12 additions & 3 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub use tracing;
use self::catalog::SinkType;
use self::clickhouse::{ClickHouseConfig, ClickHouseSink};
use self::encoder::{SerToBytes, SerToString};
use self::formatter::SinkFormatter;
use self::formatter::{FormattedRow, SinkFormatter};
use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK};
use crate::sink::boxed::BoxSink;
use crate::sink::catalog::{SinkCatalog, SinkId};
Expand Down Expand Up @@ -212,7 +212,7 @@ pub trait MessageSink {
async fn write_one(&self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()>;

async fn write_chunk<
K0: SerTo<Self::K> + Send,
K0: SerTo<Self::K> + Send + Clone,
V0: SerTo<Self::V> + Send,
F: SinkFormatter<K = Option<K0>, V = Option<V0>> + Send,
>(
Expand All @@ -221,12 +221,21 @@ pub trait MessageSink {
mut formatter: F,
) -> Result<()> {
for (op, row) in chunk.rows() {
let Some((event_key_object, event_object)) = formatter.format_row(op, row)? else {continue};
let (event_key_object, event_object, tombstone_key) =
match formatter.format_row(op, row)? {
FormattedRow::Skip => continue,
FormattedRow::Pair(k, v) => (k, v, None),
FormattedRow::WithTombstone(k, v) => (k.clone(), v, Some(k)),
};
self.write_one(
event_key_object.map(|x| x.ser_to()).transpose()?,
event_object.map(|x| x.ser_to()).transpose()?,
)
.await?;
if let Some(event_key_object) = tombstone_key {
self.write_one(event_key_object.map(|x| x.ser_to()).transpose()?, None)
.await?;
}
}
Ok(())
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

use super::encoder::{EmptyEncoder, JsonEncoder, SerToBytes};
use super::formatter::{AppendOnlyFormatter, SinkFormatter};
use super::formatter::{AppendOnlyFormatter, FormattedRow, SinkFormatter};
use super::utils::TimestampHandlingMode;
use super::{DummySinkCommitCoordinator, SinkWriter, SinkWriterParam};
use crate::common::NatsCommon;
Expand Down Expand Up @@ -135,7 +135,9 @@ impl NatsSinkWriter {
&[],
);
for (op, row) in chunk.rows() {
let Some((_, item)) = f.format_row(op, row)? else {continue};
let FormattedRow::Pair(_, item) = f.format_row(op, row)? else {
continue;
};
self.context
.publish(
self.config.common.subject.clone(),
Expand Down

0 comments on commit 32f47ed

Please sign in to comment.