From a14b177c7a5431f85fc205b526f7f48afef2b13b Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 25 Nov 2024 00:44:00 +0800 Subject: [PATCH] reuse scalar adapter --- src/connector/src/sink/postgres.rs | 41 ++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 98696757fa0b8..2dce9bdd8038f 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; use async_trait::async_trait; use itertools::Itertools; +use risingwave_common::array::data_chunk_iter::RowRefIter; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; @@ -41,6 +42,20 @@ use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWrite pub const POSTGRES_SINK: &str = "postgres"; +macro_rules! rw_row_to_pg_values { + ($row:expr, $statement:expr) => { + $row.iter().enumerate().map(|(i, d)| { + d.and_then(|d| { + let ty = &$statement.params()[i]; + match ScalarAdapter::from_scalar(d, ty) { + Ok(scalar) => Some(scalar), + Err(e) => None, + } + }) + }) + }; +} + #[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct PostgresConfig { @@ -356,7 +371,10 @@ impl PostgresSinkWriter { match op { Op::Insert => { self.client - .execute_raw(&self.insert_statement, row.iter()) + .execute_raw( + &self.insert_statement, + rw_row_to_pg_values!(row, self.insert_statement), + ) .await?; } Op::UpdateInsert | Op::Delete | Op::UpdateDelete => { @@ -375,13 +393,7 @@ impl PostgresSinkWriter { self.client .execute_raw( &self.insert_statement, - row.iter(), - // row - // .iter() - // .enumerate() - // .map(|(i, d)| d.map(|d| { - // ScalarAdapter::from_scalar(d) - // }))) + rw_row_to_pg_values!(row, self.insert_statement), ) .await?; } @@ -392,7 +404,13 @@ impl PostgresSinkWriter { // in that case it needs to be `INSERTED` rather than UPDATED. // On the other hand, if the record is there, it should be `UPDATED`. self.client - .execute_raw(self.merge_statement.as_ref().unwrap(), row.iter()) + .execute_raw( + self.merge_statement.as_ref().unwrap(), + rw_row_to_pg_values!( + row, + self.merge_statement.as_ref().unwrap() + ), + ) .await?; } Op::Delete => { @@ -400,7 +418,10 @@ impl PostgresSinkWriter { self.client .execute_raw( self.delete_statement.as_ref().unwrap(), - row.project(&self.pk_indices).iter(), + rw_row_to_pg_values!( + row.project(&self.pk_indices), + self.delete_statement.as_ref().unwrap() + ), ) .await?; }