From cc7e506b3cef2a29d9c30cc16ccbedd6d9380e08 Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Mon, 18 Sep 2023 17:46:26 +0800 Subject: [PATCH] refactor(sink): impl SinkFormatter for DebeziumJsonFormatter (#12372) --- .../src/sink/formatter/debezium_json.rs | 352 ++++++++++++++++++ src/connector/src/sink/formatter/mod.rs | 2 + src/connector/src/sink/kafka.rs | 92 +---- src/connector/src/sink/kinesis.rs | 32 +- src/connector/src/sink/utils.rs | 226 +---------- 5 files changed, 372 insertions(+), 332 deletions(-) create mode 100644 src/connector/src/sink/formatter/debezium_json.rs diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs new file mode 100644 index 0000000000000..f5b48e836c64d --- /dev/null +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -0,0 +1,352 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::Op; +use risingwave_common::catalog::{Field, Schema}; +use serde_json::{json, Map, Value}; +use tracing::warn; + +use super::{Result, SinkFormatter, StreamChunk}; +use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::tri; + +const DEBEZIUM_NAME_FIELD_PREFIX: &str = "RisingWave"; + +pub struct DebeziumAdapterOpts { + gen_tombstone: bool, +} + +impl Default for DebeziumAdapterOpts { + fn default() -> Self { + Self { + gen_tombstone: true, + } + } +} + +fn concat_debezium_name_field(db_name: &str, sink_from_name: &str, value: &str) -> String { + DEBEZIUM_NAME_FIELD_PREFIX.to_owned() + "." + db_name + "." + sink_from_name + "." + value +} + +pub struct DebeziumJsonFormatter<'a> { + schema: &'a Schema, + pk_indices: &'a [usize], + db_name: &'a str, + sink_from_name: &'a str, + opts: DebeziumAdapterOpts, + ts_ms: u64, +} + +impl<'a> DebeziumJsonFormatter<'a> { + pub fn new( + schema: &'a Schema, + pk_indices: &'a [usize], + db_name: &'a str, + sink_from_name: &'a str, + opts: DebeziumAdapterOpts, + ts_ms: u64, + ) -> Self { + Self { + schema, + pk_indices, + db_name, + sink_from_name, + opts, + ts_ms, + } + } +} + +impl<'a> SinkFormatter for DebeziumJsonFormatter<'a> { + type K = Value; + type V = Value; + + fn format_chunk( + &self, + chunk: &StreamChunk, + ) -> impl Iterator, Option)>> { + std::iter::from_generator(|| { + let DebeziumJsonFormatter { + schema, + pk_indices, + db_name, + sink_from_name, + opts, + ts_ms, + } = self; + let source_field = json!({ + "db": db_name, + "table": sink_from_name, + }); + + let mut update_cache: Option> = None; + + let key_encoder = + JsonEncoder::new(schema, Some(pk_indices), TimestampHandlingMode::Milli); + let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); + + for (op, row) in chunk.rows() { + let event_key_object: Option = Some(json!({ + "schema": json!({ + "type": "struct", + "fields": fields_pk_to_json(&schema.fields, pk_indices), + "optional": false, + "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), + }), + "payload": tri!(key_encoder.encode(row)), + })); + let event_object: Option = match op { + Op::Insert => Some(json!({ + "schema": schema_to_json(schema, db_name, sink_from_name), + "payload": { + "before": null, + "after": tri!(val_encoder.encode(row)), + "op": "c", + "ts_ms": ts_ms, + "source": source_field, + } + })), + Op::Delete => { + let value_obj = Some(json!({ + "schema": schema_to_json(schema, db_name, sink_from_name), + "payload": { + "before": tri!(val_encoder.encode(row)), + "after": null, + "op": "d", + "ts_ms": ts_ms, + "source": source_field, + } + })); + yield Ok((event_key_object.clone(), value_obj)); + + if opts.gen_tombstone { + // Tomestone event + // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events + yield Ok((event_key_object, None)); + } + + continue; + } + Op::UpdateDelete => { + update_cache = Some(tri!(val_encoder.encode(row))); + continue; + } + Op::UpdateInsert => { + if let Some(before) = update_cache.take() { + Some(json!({ + "schema": schema_to_json(schema, db_name, sink_from_name), + "payload": { + "before": before, + "after": tri!(val_encoder.encode(row)), + "op": "u", + "ts_ms": ts_ms, + "source": source_field, + } + })) + } else { + warn!( + "not found UpdateDelete in prev row, skipping, row index {:?}", + row.index() + ); + continue; + } + } + }; + yield Ok((event_key_object, event_object)); + } + }) + } +} + +pub(crate) fn schema_to_json(schema: &Schema, db_name: &str, sink_from_name: &str) -> Value { + let mut schema_fields = Vec::new(); + schema_fields.push(json!({ + "type": "struct", + "fields": fields_to_json(&schema.fields), + "optional": true, + "field": "before", + "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), + })); + schema_fields.push(json!({ + "type": "struct", + "fields": fields_to_json(&schema.fields), + "optional": true, + "field": "after", + "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), + })); + + schema_fields.push(json!({ + "type": "struct", + "optional": false, + "name": concat_debezium_name_field(db_name, sink_from_name, "Source"), + "fields": vec![ + json!({ + "type": "string", + "optional": false, + "field": "db" + }), + json!({ + "type": "string", + "optional": true, + "field": "table" + })], + "field": "source" + })); + schema_fields.push(json!({ + "type": "string", + "optional": false, + "field": "op" + })); + schema_fields.push(json!({ + "type": "int64", + "optional": false, + "field": "ts_ms" + })); + + json!({ + "type": "struct", + "fields": schema_fields, + "optional": false, + "name": concat_debezium_name_field(db_name, sink_from_name, "Envelope"), + }) +} + +pub(crate) fn fields_pk_to_json(fields: &[Field], pk_indices: &[usize]) -> Value { + let mut res = Vec::new(); + for idx in pk_indices { + res.push(field_to_json(&fields[*idx])); + } + json!(res) +} + +pub(crate) fn fields_to_json(fields: &[Field]) -> Value { + let mut res = Vec::new(); + + fields + .iter() + .for_each(|field| res.push(field_to_json(field))); + + json!(res) +} + +pub(crate) fn field_to_json(field: &Field) -> Value { + // mapping from 'https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-data-types' + let r#type = match field.data_type() { + risingwave_common::types::DataType::Boolean => "boolean", + risingwave_common::types::DataType::Int16 => "int16", + risingwave_common::types::DataType::Int32 => "int32", + risingwave_common::types::DataType::Int64 => "int64", + risingwave_common::types::DataType::Int256 => "string", + risingwave_common::types::DataType::Float32 => "float", + risingwave_common::types::DataType::Float64 => "double", + // currently, we only support handling decimal as string. + // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-decimal-types + risingwave_common::types::DataType::Decimal => "string", + + risingwave_common::types::DataType::Varchar => "string", + + risingwave_common::types::DataType::Date => "int32", + risingwave_common::types::DataType::Time => "int64", + risingwave_common::types::DataType::Timestamp => "int64", + risingwave_common::types::DataType::Timestamptz => "string", + risingwave_common::types::DataType::Interval => "string", + + risingwave_common::types::DataType::Bytea => "bytes", + risingwave_common::types::DataType::Jsonb => "string", + risingwave_common::types::DataType::Serial => "int32", + // since the original debezium pg support HSTORE via encoded as json string by default, + // we do the same here + risingwave_common::types::DataType::Struct(_) => "string", + risingwave_common::types::DataType::List { .. } => "string", + }; + json!({ + "field": field.name, + "optional": true, + "type": r#type, + }) +} + +#[cfg(test)] +mod tests { + use risingwave_common::test_prelude::StreamChunkTestExt; + use risingwave_common::types::DataType; + + use super::*; + use crate::sink::utils::chunk_to_json; + + #[test] + fn test_chunk_to_json() -> Result<()> { + let chunk = StreamChunk::from_pretty( + " i f {i,f} + + 0 0.0 {0,0.0} + + 1 1.0 {1,1.0} + + 2 2.0 {2,2.0} + + 3 3.0 {3,3.0} + + 4 4.0 {4,4.0} + + 5 5.0 {5,5.0} + + 6 6.0 {6,6.0} + + 7 7.0 {7,7.0} + + 8 8.0 {8,8.0} + + 9 9.0 {9,9.0}", + ); + + let schema = Schema::new(vec![ + Field { + data_type: DataType::Int32, + name: "v1".into(), + sub_fields: vec![], + type_name: "".into(), + }, + Field { + data_type: DataType::Float32, + name: "v2".into(), + sub_fields: vec![], + type_name: "".into(), + }, + Field { + data_type: DataType::new_struct( + vec![DataType::Int32, DataType::Float32], + vec!["v4".to_string(), "v5".to_string()], + ), + name: "v3".into(), + sub_fields: vec![ + Field { + data_type: DataType::Int32, + name: "v4".into(), + sub_fields: vec![], + type_name: "".into(), + }, + Field { + data_type: DataType::Float32, + name: "v5".into(), + sub_fields: vec![], + type_name: "".into(), + }, + ], + type_name: "".into(), + }, + ]); + + let json_chunk = chunk_to_json(chunk, &schema).unwrap(); + let schema_json = schema_to_json(&schema, "test_db", "test_table"); + assert_eq!(schema_json, serde_json::from_str::("{\"fields\":[{\"field\":\"before\",\"fields\":[{\"field\":\"v1\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"v2\",\"optional\":true,\"type\":\"float\"},{\"field\":\"v3\",\"optional\":true,\"type\":\"string\"}],\"name\":\"RisingWave.test_db.test_table.Key\",\"optional\":true,\"type\":\"struct\"},{\"field\":\"after\",\"fields\":[{\"field\":\"v1\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"v2\",\"optional\":true,\"type\":\"float\"},{\"field\":\"v3\",\"optional\":true,\"type\":\"string\"}],\"name\":\"RisingWave.test_db.test_table.Key\",\"optional\":true,\"type\":\"struct\"},{\"field\":\"source\",\"fields\":[{\"field\":\"db\",\"optional\":false,\"type\":\"string\"},{\"field\":\"table\",\"optional\":true,\"type\":\"string\"}],\"name\":\"RisingWave.test_db.test_table.Source\",\"optional\":false,\"type\":\"struct\"},{\"field\":\"op\",\"optional\":false,\"type\":\"string\"},{\"field\":\"ts_ms\",\"optional\":false,\"type\":\"int64\"}],\"name\":\"RisingWave.test_db.test_table.Envelope\",\"optional\":false,\"type\":\"struct\"}").unwrap()); + assert_eq!( + serde_json::from_str::(&json_chunk[0]).unwrap(), + serde_json::from_str::("{\"v1\":0,\"v2\":0.0,\"v3\":{\"v4\":0,\"v5\":0.0}}") + .unwrap() + ); + + Ok(()) + } +} diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 432e4d33e0f2b..585752b03327c 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -17,9 +17,11 @@ use risingwave_common::array::StreamChunk; use crate::sink::Result; mod append_only; +mod debezium_json; mod upsert; pub use append_only::AppendOnlyFormatter; +pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter}; pub use upsert::UpsertFormatter; /// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format, diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 378c4ee8d930a..ec5192666916c 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -20,7 +20,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::anyhow; use futures::future::try_join_all; use futures::{Future, FutureExt}; -use futures_async_stream::for_await; use rdkafka::error::{KafkaError, KafkaResult}; use rdkafka::message::ToBytes; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord}; @@ -33,13 +32,14 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use super::encoder::{JsonEncoder, TimestampHandlingMode}; -use super::formatter::{AppendOnlyFormatter, UpsertFormatter}; +use super::formatter::{ + AppendOnlyFormatter, DebeziumAdapterOpts, DebeziumJsonFormatter, UpsertFormatter, +}; use super::{ FormattedSink, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::common::KafkaCommon; -use crate::sink::utils::{gen_debezium_message_stream, DebeziumAdapterOpts}; use crate::sink::{ DummySinkCommitCoordinator, Result, SinkWriterParam, SinkWriterV1, SinkWriterV1Adapter, }; @@ -528,26 +528,16 @@ impl KafkaSinkWriter { let sink_from_name = self.sink_from_name.clone(); // Initialize the dbz_stream - let dbz_stream = gen_debezium_message_stream( + let f = DebeziumJsonFormatter::new( &schema, &pk_indices, - chunk, - ts_ms, - DebeziumAdapterOpts::default(), &db_name, &sink_from_name, + DebeziumAdapterOpts::default(), + ts_ms, ); - #[for_await] - for msg in dbz_stream { - let (event_key_object, event_object) = msg?; - self.write_inner( - event_key_object.map(|j| j.to_string().into_bytes()), - event_object.map(|j| j.to_string().into_bytes()), - ) - .await?; - } - Ok(()) + self.write_chunk(chunk, f).await } async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { @@ -637,12 +627,9 @@ impl SinkWriterV1 for KafkaSinkWriter { mod test { use maplit::hashmap; use risingwave_common::catalog::Field; - use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::DataType; - use serde_json::Value; use super::*; - use crate::sink::utils::*; #[test] fn parse_rdkafka_props() { @@ -840,69 +827,4 @@ mod test { Ok(()) } - - #[test] - fn test_chunk_to_json() -> Result<()> { - let chunk = StreamChunk::from_pretty( - " i f {i,f} - + 0 0.0 {0,0.0} - + 1 1.0 {1,1.0} - + 2 2.0 {2,2.0} - + 3 3.0 {3,3.0} - + 4 4.0 {4,4.0} - + 5 5.0 {5,5.0} - + 6 6.0 {6,6.0} - + 7 7.0 {7,7.0} - + 8 8.0 {8,8.0} - + 9 9.0 {9,9.0}", - ); - - let schema = Schema::new(vec![ - Field { - data_type: DataType::Int32, - name: "v1".into(), - sub_fields: vec![], - type_name: "".into(), - }, - Field { - data_type: DataType::Float32, - name: "v2".into(), - sub_fields: vec![], - type_name: "".into(), - }, - Field { - data_type: DataType::new_struct( - vec![DataType::Int32, DataType::Float32], - vec!["v4".to_string(), "v5".to_string()], - ), - name: "v3".into(), - sub_fields: vec![ - Field { - data_type: DataType::Int32, - name: "v4".into(), - sub_fields: vec![], - type_name: "".into(), - }, - Field { - data_type: DataType::Float32, - name: "v5".into(), - sub_fields: vec![], - type_name: "".into(), - }, - ], - type_name: "".into(), - }, - ]); - - let json_chunk = chunk_to_json(chunk, &schema).unwrap(); - let schema_json = schema_to_json(&schema, "test_db", "test_table"); - assert_eq!(schema_json, serde_json::from_str::("{\"fields\":[{\"field\":\"before\",\"fields\":[{\"field\":\"v1\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"v2\",\"optional\":true,\"type\":\"float\"},{\"field\":\"v3\",\"optional\":true,\"type\":\"string\"}],\"name\":\"RisingWave.test_db.test_table.Key\",\"optional\":true,\"type\":\"struct\"},{\"field\":\"after\",\"fields\":[{\"field\":\"v1\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"v2\",\"optional\":true,\"type\":\"float\"},{\"field\":\"v3\",\"optional\":true,\"type\":\"string\"}],\"name\":\"RisingWave.test_db.test_table.Key\",\"optional\":true,\"type\":\"struct\"},{\"field\":\"source\",\"fields\":[{\"field\":\"db\",\"optional\":false,\"type\":\"string\"},{\"field\":\"table\",\"optional\":true,\"type\":\"string\"}],\"name\":\"RisingWave.test_db.test_table.Source\",\"optional\":false,\"type\":\"struct\"},{\"field\":\"op\",\"optional\":false,\"type\":\"string\"},{\"field\":\"ts_ms\",\"optional\":false,\"type\":\"int64\"}],\"name\":\"RisingWave.test_db.test_table.Envelope\",\"optional\":false,\"type\":\"struct\"}").unwrap()); - assert_eq!( - serde_json::from_str::(&json_chunk[0]).unwrap(), - serde_json::from_str::("{\"v1\":0,\"v2\":0.0,\"v3\":{\"v4\":0,\"v5\":0.0}}") - .unwrap() - ); - - Ok(()) - } } diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index e319984de8a34..2789b63aae23c 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -20,7 +20,6 @@ use aws_sdk_kinesis::error::DisplayErrorContext; use aws_sdk_kinesis::operation::put_record::PutRecordOutput; use aws_sdk_kinesis::primitives::Blob; use aws_sdk_kinesis::Client as KinesisClient; -use futures_async_stream::for_await; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; @@ -29,11 +28,12 @@ use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; -use super::formatter::{AppendOnlyFormatter, UpsertFormatter}; +use super::formatter::{ + AppendOnlyFormatter, DebeziumAdapterOpts, DebeziumJsonFormatter, UpsertFormatter, +}; use super::{FormattedSink, SinkParam}; use crate::common::KinesisCommon; use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; -use crate::sink::utils::{gen_debezium_message_stream, DebeziumAdapterOpts}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -225,33 +225,17 @@ impl KinesisSinkWriter { self.write_chunk(chunk, f).await } - async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { - let dbz_stream = gen_debezium_message_stream( + async fn debezium_update(mut self: &Self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { + let f = DebeziumJsonFormatter::new( &self.schema, &self.pk_indices, - chunk, - ts_ms, - DebeziumAdapterOpts::default(), &self.db_name, &self.sink_from_name, + DebeziumAdapterOpts::default(), + ts_ms, ); - #[for_await] - for msg in dbz_stream { - let (event_key_object, event_object) = msg?; - let key_str = event_key_object.unwrap().to_string(); - self.put_record( - &key_str, - if let Some(value) = event_object { - value.to_string().into_bytes() - } else { - vec![] - }, - ) - .await?; - } - - Ok(()) + self.write_chunk(chunk, f).await } } diff --git a/src/connector/src/sink/utils.rs b/src/connector/src/sink/utils.rs index e1bae62c2b5c2..b68164beea206 100644 --- a/src/connector/src/sink/utils.rs +++ b/src/connector/src/sink/utils.rs @@ -12,232 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures_async_stream::try_stream; -use risingwave_common::array::stream_chunk::Op; use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::{Field, Schema}; -use serde_json::{json, Map, Value}; -use tracing::warn; +use risingwave_common::catalog::Schema; +use serde_json::Value; use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; -use crate::sink::{Result, SinkError}; - -const DEBEZIUM_NAME_FIELD_PREFIX: &str = "RisingWave"; - -pub struct DebeziumAdapterOpts { - gen_tombstone: bool, -} - -impl Default for DebeziumAdapterOpts { - fn default() -> Self { - Self { - gen_tombstone: true, - } - } -} - -fn concat_debezium_name_field(db_name: &str, sink_from_name: &str, value: &str) -> String { - DEBEZIUM_NAME_FIELD_PREFIX.to_owned() + "." + db_name + "." + sink_from_name + "." + value -} - -#[try_stream(ok = (Option, Option), error = SinkError)] -pub async fn gen_debezium_message_stream<'a>( - schema: &'a Schema, - pk_indices: &'a [usize], - chunk: StreamChunk, - ts_ms: u64, - opts: DebeziumAdapterOpts, - db_name: &'a str, - sink_from_name: &'a str, -) { - let source_field = json!({ - "db": db_name, - "table": sink_from_name, - }); - - let mut update_cache: Option> = None; - - let key_encoder = JsonEncoder::new(schema, Some(pk_indices), TimestampHandlingMode::Milli); - let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); - - for (op, row) in chunk.rows() { - let event_key_object: Option = Some(json!({ - "schema": json!({ - "type": "struct", - "fields": fields_pk_to_json(&schema.fields, pk_indices), - "optional": false, - "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), - }), - "payload": key_encoder.encode(row)?, - })); - let event_object: Option = match op { - Op::Insert => Some(json!({ - "schema": schema_to_json(schema, db_name, sink_from_name), - "payload": { - "before": null, - "after": val_encoder.encode(row)?, - "op": "c", - "ts_ms": ts_ms, - "source": source_field, - } - })), - Op::Delete => { - let value_obj = Some(json!({ - "schema": schema_to_json(schema, db_name, sink_from_name), - "payload": { - "before": val_encoder.encode(row)?, - "after": null, - "op": "d", - "ts_ms": ts_ms, - "source": source_field, - } - })); - yield (event_key_object.clone(), value_obj); - - if opts.gen_tombstone { - // Tomestone event - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events - yield (event_key_object, None); - } - - continue; - } - Op::UpdateDelete => { - update_cache = Some(val_encoder.encode(row)?); - continue; - } - Op::UpdateInsert => { - if let Some(before) = update_cache.take() { - Some(json!({ - "schema": schema_to_json(schema, db_name, sink_from_name), - "payload": { - "before": before, - "after": val_encoder.encode(row)?, - "op": "u", - "ts_ms": ts_ms, - "source": source_field, - } - })) - } else { - warn!( - "not found UpdateDelete in prev row, skipping, row index {:?}", - row.index() - ); - continue; - } - } - }; - yield (event_key_object, event_object); - } -} - -pub(crate) fn schema_to_json(schema: &Schema, db_name: &str, sink_from_name: &str) -> Value { - let mut schema_fields = Vec::new(); - schema_fields.push(json!({ - "type": "struct", - "fields": fields_to_json(&schema.fields), - "optional": true, - "field": "before", - "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), - })); - schema_fields.push(json!({ - "type": "struct", - "fields": fields_to_json(&schema.fields), - "optional": true, - "field": "after", - "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), - })); - - schema_fields.push(json!({ - "type": "struct", - "optional": false, - "name": concat_debezium_name_field(db_name, sink_from_name, "Source"), - "fields": vec![ - json!({ - "type": "string", - "optional": false, - "field": "db" - }), - json!({ - "type": "string", - "optional": true, - "field": "table" - })], - "field": "source" - })); - schema_fields.push(json!({ - "type": "string", - "optional": false, - "field": "op" - })); - schema_fields.push(json!({ - "type": "int64", - "optional": false, - "field": "ts_ms" - })); - - json!({ - "type": "struct", - "fields": schema_fields, - "optional": false, - "name": concat_debezium_name_field(db_name, sink_from_name, "Envelope"), - }) -} - -pub(crate) fn fields_pk_to_json(fields: &[Field], pk_indices: &[usize]) -> Value { - let mut res = Vec::new(); - for idx in pk_indices { - res.push(field_to_json(&fields[*idx])); - } - json!(res) -} - -pub(crate) fn fields_to_json(fields: &[Field]) -> Value { - let mut res = Vec::new(); - - fields - .iter() - .for_each(|field| res.push(field_to_json(field))); - - json!(res) -} - -pub(crate) fn field_to_json(field: &Field) -> Value { - // mapping from 'https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-data-types' - let r#type = match field.data_type() { - risingwave_common::types::DataType::Boolean => "boolean", - risingwave_common::types::DataType::Int16 => "int16", - risingwave_common::types::DataType::Int32 => "int32", - risingwave_common::types::DataType::Int64 => "int64", - risingwave_common::types::DataType::Int256 => "string", - risingwave_common::types::DataType::Float32 => "float", - risingwave_common::types::DataType::Float64 => "double", - // currently, we only support handling decimal as string. - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-decimal-types - risingwave_common::types::DataType::Decimal => "string", - - risingwave_common::types::DataType::Varchar => "string", - - risingwave_common::types::DataType::Date => "int32", - risingwave_common::types::DataType::Time => "int64", - risingwave_common::types::DataType::Timestamp => "int64", - risingwave_common::types::DataType::Timestamptz => "string", - risingwave_common::types::DataType::Interval => "string", - - risingwave_common::types::DataType::Bytea => "bytes", - risingwave_common::types::DataType::Jsonb => "string", - risingwave_common::types::DataType::Serial => "int32", - // since the original debezium pg support HSTORE via encoded as json string by default, - // we do the same here - risingwave_common::types::DataType::Struct(_) => "string", - risingwave_common::types::DataType::List { .. } => "string", - }; - json!({ - "field": field.name, - "optional": true, - "type": r#type, - }) -} +use crate::sink::Result; pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> { let encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);