diff --git a/src/connector/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs index 4dc65a750d817..a5848127f3c12 100644 --- a/src/connector/src/sink/formatter/append_only.rs +++ b/src/connector/src/sink/formatter/append_only.rs @@ -14,7 +14,7 @@ use risingwave_common::catalog::Schema; -use super::{Op, Result, RowRef, SinkFormatter}; +use super::{FormattedRow, Op, Result, RowRef, SinkFormatter}; use crate::sink::encoder::RowEncoder; pub struct AppendOnlyFormatter<'a, KE, VE> { @@ -44,9 +44,9 @@ impl<'a, KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<' type K = Option; type V = Option; - fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result> { + fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result> { if op != Op::Insert { - return Ok(None); + return Ok(FormattedRow::Skip); } let event_key_object = Some(self.key_encoder.encode( row, @@ -55,6 +55,6 @@ impl<'a, KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<' )?); let event_object = Some(self.val_encoder.encode_all(row, &self.schema.fields)?); - Ok(Some((event_key_object, event_object))) + Ok(FormattedRow::Pair(event_key_object, event_object)) } } diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index 37f2e9d26e19d..aaa5c50eedd08 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -16,7 +16,7 @@ use risingwave_common::catalog::{Field, Schema}; use serde_json::{json, Map, Value}; use tracing::warn; -use super::{Op, Result, RowRef, SinkFormatter}; +use super::{FormattedRow, Op, Result, RowRef, SinkFormatter}; use crate::sink::encoder::{JsonEncoder, RowEncoder}; use crate::sink::utils::TimestampHandlingMode; @@ -62,7 +62,7 @@ impl<'a> SinkFormatter for DebeziumJsonFormatter<'a> { type K = Option; type V = Option; - fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result> { + fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result> { let event_key_object: Option = Some(json!({ "schema": json!({ "type": "struct", @@ -99,19 +99,15 @@ impl<'a> SinkFormatter for DebeziumJsonFormatter<'a> { "source": self.source_field, } })); - // yield (event_key_object.clone(), value_obj); if self.opts.gen_tombstone { - // Tomestone event - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events - // yield (event_key_object, None); + return Ok(FormattedRow::WithTombstone(event_key_object, value_obj)); } - - return Ok(None); + value_obj } Op::UpdateDelete => { self.update_cache = Some(self.encoder.encode_all(row, &self.schema.fields)?); - return Ok(None); + return Ok(FormattedRow::Skip); } Op::UpdateInsert => { if let Some(before) = self.update_cache.take() { @@ -130,12 +126,12 @@ impl<'a> SinkFormatter for DebeziumJsonFormatter<'a> { "not found UpdateDelete in prev row, skipping, row index {:?}", row.index() ); - return Ok(None); + return Ok(FormattedRow::Skip); } } }; - Ok(Some((event_key_object, event_object))) + Ok(FormattedRow::Pair(event_key_object, event_object)) } } diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 34ff8ea75282f..6023076a5e354 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -29,5 +29,13 @@ pub use upsert::UpsertFormatter; pub trait SinkFormatter { type K; type V; - fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result>; + fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result>; +} + +pub enum FormattedRow { + Skip, + Pair(K, V), + // Tomestone event + // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events + WithTombstone(K, V), } diff --git a/src/connector/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs index c1dc8d27cf9a8..dd951305620cb 100644 --- a/src/connector/src/sink/formatter/upsert.rs +++ b/src/connector/src/sink/formatter/upsert.rs @@ -14,7 +14,7 @@ use risingwave_common::catalog::Schema; -use super::{Op, Result, RowRef, SinkFormatter}; +use super::{FormattedRow, Op, Result, RowRef, SinkFormatter}; use crate::sink::encoder::RowEncoder; pub struct UpsertFormatter<'a, KE, VE> { @@ -44,7 +44,7 @@ impl<'a, KE: RowEncoder, VE: RowEncoder> SinkFormatter for UpsertFormatter<'a, K type K = Option; type V = Option; - fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result> { + fn format_row(&mut self, op: Op, row: RowRef<'_>) -> Result> { let event_key_object = Some(self.key_encoder.encode( row, &self.schema.fields, @@ -59,10 +59,10 @@ impl<'a, KE: RowEncoder, VE: RowEncoder> SinkFormatter for UpsertFormatter<'a, K Op::Delete => None, Op::UpdateDelete => { // upsert semantic does not require update delete event - return Ok(None); + return Ok(FormattedRow::Skip); } }; - Ok(Some((event_key_object, event_object))) + Ok(FormattedRow::Pair(event_key_object, event_object)) } } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index cb9c104ae0e03..6b9139374740d 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -546,7 +546,7 @@ mod test { use super::*; use crate::sink::encoder::{EmptyEncoder, SerToString}; - use crate::sink::formatter::{schema_to_json, SinkFormatter}; + use crate::sink::formatter::{schema_to_json, FormattedRow, SinkFormatter}; use crate::sink::utils::*; #[test] @@ -807,10 +807,10 @@ mod test { ); let json_chunk: Vec<_> = chunk .rows() - .flat_map(|(op, row)| { - f.format_row(op, row) - .unwrap() - .map(|(_, v)| v.unwrap().ser_to_string().unwrap()) + .flat_map(|(op, row)| match f.format_row(op, row).unwrap() { + FormattedRow::Skip => None, + FormattedRow::Pair(_, v) => Some(v.unwrap().ser_to_string().unwrap()), + FormattedRow::WithTombstone(_, _) => unreachable!(), }) .collect(); let schema_json = schema_to_json(&schema, "test_db", "test_table"); diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 77431b6965edd..ce216ff4bd53c 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -49,7 +49,7 @@ pub use tracing; use self::catalog::SinkType; use self::clickhouse::{ClickHouseConfig, ClickHouseSink}; use self::encoder::{SerToBytes, SerToString}; -use self::formatter::SinkFormatter; +use self::formatter::{FormattedRow, SinkFormatter}; use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK}; use crate::sink::boxed::BoxSink; use crate::sink::catalog::{SinkCatalog, SinkId}; @@ -212,7 +212,7 @@ pub trait MessageSink { async fn write_one(&self, k: Option, v: Option) -> Result<()>; async fn write_chunk< - K0: SerTo + Send, + K0: SerTo + Send + Clone, V0: SerTo + Send, F: SinkFormatter, V = Option> + Send, >( @@ -221,12 +221,21 @@ pub trait MessageSink { mut formatter: F, ) -> Result<()> { for (op, row) in chunk.rows() { - let Some((event_key_object, event_object)) = formatter.format_row(op, row)? else {continue}; + let (event_key_object, event_object, tombstone_key) = + match formatter.format_row(op, row)? { + FormattedRow::Skip => continue, + FormattedRow::Pair(k, v) => (k, v, None), + FormattedRow::WithTombstone(k, v) => (k.clone(), v, Some(k)), + }; self.write_one( event_key_object.map(|x| x.ser_to()).transpose()?, event_object.map(|x| x.ser_to()).transpose()?, ) .await?; + if let Some(event_key_object) = tombstone_key { + self.write_one(event_key_object.map(|x| x.ser_to()).transpose()?, None) + .await?; + } } Ok(()) } diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 7fe45ac2b6a62..a2f0eb2da991a 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -26,7 +26,7 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use super::encoder::{EmptyEncoder, JsonEncoder, SerToBytes}; -use super::formatter::{AppendOnlyFormatter, SinkFormatter}; +use super::formatter::{AppendOnlyFormatter, FormattedRow, SinkFormatter}; use super::utils::TimestampHandlingMode; use super::{DummySinkCommitCoordinator, SinkWriter, SinkWriterParam}; use crate::common::NatsCommon; @@ -135,7 +135,9 @@ impl NatsSinkWriter { &[], ); for (op, row) in chunk.rows() { - let Some((_, item)) = f.format_row(op, row)? else {continue}; + let FormattedRow::Pair(_, item) = f.format_row(op, row)? else { + continue; + }; self.context .publish( self.config.common.subject.clone(),