diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 75a895a5f80cd..f103346bf14fa 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -30,6 +30,7 @@ #![feature(async_fn_in_trait)] #![feature(associated_type_defaults)] #![feature(impl_trait_in_assoc_type)] +#![feature(iter_from_generator)] use std::time::Duration; diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index eb57752e85849..73f4ffd7a0227 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -147,9 +147,9 @@ impl DorisSink { Err(SinkError::Doris("Don't support Jsonb".to_string())) } risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")), - risingwave_common::types::DataType::Int256 => Err(SinkError::Doris( - "doris can not support Int256".to_string(), - )), + risingwave_common::types::DataType::Int256 => { + Err(SinkError::Doris("doris can not support Int256".to_string())) + } } } } diff --git a/src/connector/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs new file mode 100644 index 0000000000000..ba7d018cd7fbc --- /dev/null +++ b/src/connector/src/sink/formatter/append_only.rs @@ -0,0 +1,55 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::Op; + +use super::{Result, SinkFormatter, StreamChunk}; +use crate::sink::encoder::RowEncoder; +use crate::tri; + +pub struct AppendOnlyFormatter { + key_encoder: KE, + val_encoder: VE, +} + +impl AppendOnlyFormatter { + pub fn new(key_encoder: KE, val_encoder: VE) -> Self { + Self { + key_encoder, + val_encoder, + } + } +} + +impl SinkFormatter for AppendOnlyFormatter { + type K = KE::Output; + type V = VE::Output; + + fn format_chunk( + &self, + chunk: &StreamChunk, + ) -> impl Iterator, Option)>> { + std::iter::from_generator(|| { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let event_key_object = Some(tri!(self.key_encoder.encode(row))); + let event_object = Some(tri!(self.val_encoder.encode(row))); + + yield Ok((event_key_object, event_object)) + } + }) + } +} diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs new file mode 100644 index 0000000000000..432e4d33e0f2b --- /dev/null +++ b/src/connector/src/sink/formatter/mod.rs @@ -0,0 +1,50 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::StreamChunk; + +use crate::sink::Result; + +mod append_only; +mod upsert; + +pub use append_only::AppendOnlyFormatter; +pub use upsert::UpsertFormatter; + +/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format, +/// for example append-only, upsert or debezium. +pub trait SinkFormatter { + type K; + type V; + + fn format_chunk( + &self, + chunk: &StreamChunk, + ) -> impl Iterator, Option)>>; +} + +/// `tri!` in generators yield `Err` and return `()` +/// `?` in generators return `Err` +#[macro_export] +macro_rules! tri { + ($expr:expr) => { + match $expr { + Ok(val) => val, + Err(err) => { + yield Err(err); + return; + } + } + }; +} diff --git a/src/connector/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs new file mode 100644 index 0000000000000..6ef2b5f2ca333 --- /dev/null +++ b/src/connector/src/sink/formatter/upsert.rs @@ -0,0 +1,61 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::Op; + +use super::{Result, SinkFormatter, StreamChunk}; +use crate::sink::encoder::RowEncoder; +use crate::tri; + +pub struct UpsertFormatter { + key_encoder: KE, + val_encoder: VE, +} + +impl UpsertFormatter { + pub fn new(key_encoder: KE, val_encoder: VE) -> Self { + Self { + key_encoder, + val_encoder, + } + } +} + +impl SinkFormatter for UpsertFormatter { + type K = KE::Output; + type V = VE::Output; + + fn format_chunk( + &self, + chunk: &StreamChunk, + ) -> impl Iterator, Option)>> { + std::iter::from_generator(|| { + for (op, row) in chunk.rows() { + let event_key_object = Some(tri!(self.key_encoder.encode(row))); + + let event_object = match op { + Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), + // Empty value with a key + Op::Delete => None, + Op::UpdateDelete => { + // upsert semantic does not require update delete event + continue; + } + }; + + yield Ok((event_key_object, event_object)) + } + }) + } +} diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index b317877925a57..9f46fb72a2529 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -30,19 +30,16 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; use serde_derive::{Deserialize, Serialize}; -use serde_json::Value; use serde_with::{serde_as, DisplayFromStr}; use super::encoder::{CustomJsonType, JsonEncoder, TimestampHandlingMode}; +use super::formatter::{AppendOnlyFormatter, UpsertFormatter}; use super::{ - Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, - SINK_TYPE_UPSERT, + FormattedSink, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::common::KafkaCommon; -use crate::sink::utils::{ - gen_append_only_message_stream, gen_debezium_message_stream, gen_upsert_message_stream, - AppendOnlyAdapterOpts, DebeziumAdapterOpts, UpsertAdapterOpts, -}; +use crate::sink::utils::{gen_debezium_message_stream, DebeziumAdapterOpts}; use crate::sink::{ DummySinkCommitCoordinator, Result, SinkWriterParam, SinkWriterV1, SinkWriterV1Adapter, }; @@ -460,20 +457,20 @@ impl KafkaSinkWriter { ret } - async fn write_json_objects( + async fn write_inner( &mut self, - event_key_object: Option, - event_object: Option, + event_key_object: Option>, + event_object: Option>, ) -> Result<()> { let topic = self.config.common.topic.clone(); // here we assume the key part always exists and value part is optional. // if value is None, we will skip the payload part. - let key_str = event_key_object.unwrap().to_string(); - let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes()); + let key_str = event_key_object.unwrap(); + let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(&key_str); let payload; if let Some(value) = event_object { - payload = value.to_string(); - record = record.payload(payload.as_bytes()); + payload = value; + record = record.payload(&payload); } // Send the data but not wait it to finish sinking // Will join all `DeliveryFuture` during commit @@ -544,8 +541,11 @@ impl KafkaSinkWriter { #[for_await] for msg in dbz_stream { let (event_key_object, event_object) = msg?; - self.write_json_objects(event_key_object, event_object) - .await?; + self.write_inner( + event_key_object.map(|j| j.to_string().into_bytes()), + event_object.map(|j| j.to_string().into_bytes()), + ) + .await?; } Ok(()) } @@ -568,20 +568,9 @@ impl KafkaSinkWriter { ); // Initialize the upsert_stream - let upsert_stream = gen_upsert_message_stream( - chunk, - UpsertAdapterOpts::default(), - key_encoder, - val_encoder, - ); + let f = UpsertFormatter::new(key_encoder, val_encoder); - #[for_await] - for msg in upsert_stream { - let (event_key_object, event_object) = msg?; - self.write_json_objects(event_key_object, event_object) - .await?; - } - Ok(()) + self.write_chunk(chunk, f).await } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { @@ -602,20 +591,18 @@ impl KafkaSinkWriter { ); // Initialize the append_only_stream - let append_only_stream = gen_append_only_message_stream( - chunk, - AppendOnlyAdapterOpts::default(), - key_encoder, - val_encoder, - ); + let f = AppendOnlyFormatter::new(key_encoder, val_encoder); - #[for_await] - for msg in append_only_stream { - let (event_key_object, event_object) = msg?; - self.write_json_objects(event_key_object, event_object) - .await?; - } - Ok(()) + self.write_chunk(chunk, f).await + } +} + +impl FormattedSink for KafkaSinkWriter { + type K = Vec; + type V = Vec; + + async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { + self.write_inner(k, v).await } } @@ -670,6 +657,7 @@ mod test { use risingwave_common::catalog::Field; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::DataType; + use serde_json::Value; use super::*; use crate::sink::utils::*; diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 80759e033474a..70d46efc00864 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -29,13 +29,12 @@ use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; -use super::SinkParam; +use super::encoder::CustomJsonType; +use super::formatter::{AppendOnlyFormatter, UpsertFormatter}; +use super::{FormattedSink, SinkParam}; use crate::common::KinesisCommon; -use crate::sink::encoder::{CustomJsonType, JsonEncoder, TimestampHandlingMode}; -use crate::sink::utils::{ - gen_append_only_message_stream, gen_debezium_message_stream, gen_upsert_message_stream, - AppendOnlyAdapterOpts, DebeziumAdapterOpts, UpsertAdapterOpts, -}; +use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; +use crate::sink::utils::{gen_debezium_message_stream, DebeziumAdapterOpts}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -173,7 +172,8 @@ impl KinesisSinkWriter { }) } - async fn put_record(&self, key: &str, payload: Blob) -> Result { + async fn put_record(&self, key: &str, payload: Vec) -> Result { + let payload = Blob::new(payload); // todo: switch to put_records() for batching Retry::spawn( ExponentialBackoff::from_millis(100).map(jitter).take(3), @@ -202,7 +202,7 @@ impl KinesisSinkWriter { }) } - async fn upsert(&self, chunk: StreamChunk) -> Result<()> { + async fn upsert(mut self: &Self, chunk: StreamChunk) -> Result<()> { let key_encoder = JsonEncoder::new( &self.schema, Some(&self.pk_indices), @@ -215,18 +215,11 @@ impl KinesisSinkWriter { TimestampHandlingMode::Milli, CustomJsonType::NoSPecial, ); - let upsert_stream = gen_upsert_message_stream( - chunk, - UpsertAdapterOpts::default(), - key_encoder, - val_encoder, - ); - - crate::impl_load_stream_write_record!(upsert_stream, self.put_record); - Ok(()) + let f = UpsertFormatter::new(key_encoder, val_encoder); + self.write_chunk(chunk, f).await } - async fn append_only(&self, chunk: StreamChunk) -> Result<()> { + async fn append_only(mut self: &Self, chunk: StreamChunk) -> Result<()> { let key_encoder = JsonEncoder::new( &self.schema, Some(&self.pk_indices), @@ -239,15 +232,8 @@ impl KinesisSinkWriter { TimestampHandlingMode::Milli, CustomJsonType::NoSPecial, ); - let append_only_stream = gen_append_only_message_stream( - chunk, - AppendOnlyAdapterOpts::default(), - key_encoder, - val_encoder, - ); - - crate::impl_load_stream_write_record!(append_only_stream, self.put_record); - Ok(()) + let f = AppendOnlyFormatter::new(key_encoder, val_encoder); + self.write_chunk(chunk, f).await } async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { @@ -261,12 +247,36 @@ impl KinesisSinkWriter { &self.sink_from_name, ); - crate::impl_load_stream_write_record!(dbz_stream, self.put_record); + #[for_await] + for msg in dbz_stream { + let (event_key_object, event_object) = msg?; + let key_str = event_key_object.unwrap().to_string(); + self.put_record( + &key_str, + if let Some(value) = event_object { + value.to_string().into_bytes() + } else { + vec![] + }, + ) + .await?; + } Ok(()) } } +impl FormattedSink for &KinesisSinkWriter { + type K = String; + type V = Vec; + + async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { + self.put_record(&k.unwrap(), v.unwrap_or_default()) + .await + .map(|_| ()) + } +} + #[async_trait::async_trait] impl SinkWriter for KinesisSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { @@ -297,23 +307,3 @@ impl SinkWriter for KinesisSinkWriter { Ok(()) } } - -#[macro_export] -macro_rules! impl_load_stream_write_record { - ($stream:ident, $op_fn:stmt) => { - #[for_await] - for msg in $stream { - let (event_key_object, event_object) = msg?; - let key_str = event_key_object.unwrap().to_string(); - $op_fn( - &key_str, - Blob::new(if let Some(value) = event_object { - value.to_string().into_bytes() - } else { - vec![] - }), - ) - .await?; - } - }; -} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 8f57f3fd0d888..c50c751b3a637 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -19,6 +19,7 @@ pub mod coordinate; pub mod doris; pub mod doris_connector; pub mod encoder; +pub mod formatter; pub mod iceberg; pub mod kafka; pub mod kinesis; @@ -50,6 +51,8 @@ pub use tracing; use self::catalog::SinkType; use self::clickhouse::{ClickHouseConfig, ClickHouseSink}; use self::doris::{DorisConfig, DorisSink}; +use self::encoder::SerTo; +use self::formatter::SinkFormatter; use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK}; use crate::sink::boxed::BoxSink; use crate::sink::catalog::{SinkCatalog, SinkId}; @@ -206,6 +209,44 @@ pub trait SinkWriterV1: Send + 'static { async fn abort(&mut self) -> Result<()>; } +/// A free-form sink that may output in multiple formats and encodings. Examples include kafka, +/// kinesis, nats and redis. +/// +/// The implementor specifies required key & value type (likely string or bytes), as well as how to +/// write a single pair. The provided `write_chunk` method would handle the interaction with a +/// `SinkFormatter`. +/// +/// Currently kafka takes `&mut self` while kinesis takes `&self`. So we use `&mut self` in trait +/// but implement it for `&Kinesis`. This allows us to hold `&mut &Kinesis` and `&Kinesis` +/// simultaneously, preventing the schema clone issue propagating from kafka to kinesis. +pub trait FormattedSink { + type K; + type V; + async fn write_one(&mut self, k: Option, v: Option) -> Result<()>; + + async fn write_chunk( + &mut self, + chunk: StreamChunk, + formatter: F, + ) -> Result<()> + where + F::K: SerTo, + F::V: SerTo, + { + for r in formatter.format_chunk(&chunk) { + let (event_key_object, event_object) = r?; + + self.write_one( + event_key_object.map(SerTo::ser_to).transpose()?, + event_object.map(SerTo::ser_to).transpose()?, + ) + .await?; + } + + Ok(()) + } +} + pub struct SinkWriterV1Adapter { is_empty: bool, epoch: u64, diff --git a/src/connector/src/sink/utils.rs b/src/connector/src/sink/utils.rs index 1b06850e31d4b..0cc2b3f7d84f8 100644 --- a/src/connector/src/sink/utils.rs +++ b/src/connector/src/sink/utils.rs @@ -281,50 +281,3 @@ pub fn doris_rows_to_json( let map = encoder.encode(row)?; Ok(map) } -#[derive(Debug, Clone, Default)] -pub struct UpsertAdapterOpts {} - -#[try_stream(ok = (Option, Option), error = SinkError)] -pub async fn gen_upsert_message_stream<'a>( - chunk: StreamChunk, - _opts: UpsertAdapterOpts, - key_encoder: JsonEncoder<'a>, - val_encoder: JsonEncoder<'a>, -) { - for (op, row) in chunk.rows() { - let event_key_object = Some(Value::Object(key_encoder.encode(row)?)); - - let event_object = match op { - Op::Insert => Some(Value::Object(val_encoder.encode(row)?)), - Op::Delete => Some(Value::Null), - Op::UpdateDelete => { - // upsert semantic does not require update delete event - continue; - } - Op::UpdateInsert => Some(Value::Object(val_encoder.encode(row)?)), - }; - - yield (event_key_object, event_object); - } -} - -#[derive(Debug, Clone, Default)] -pub struct AppendOnlyAdapterOpts {} - -#[try_stream(ok = (Option, Option), error = SinkError)] -pub async fn gen_append_only_message_stream<'a>( - chunk: StreamChunk, - _opts: AppendOnlyAdapterOpts, - key_encoder: JsonEncoder<'a>, - val_encoder: JsonEncoder<'a>, -) { - for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; - } - let event_key_object = Some(Value::Object(key_encoder.encode(row)?)); - let event_object = Some(Value::Object(val_encoder.encode(row)?)); - - yield (event_key_object, event_object); - } -}