diff --git a/src/connector/sink_impl/src/sink/redis.rs b/src/connector/sink_impl/src/sink/redis.rs index 910582b9662b7..710c47e922c92 100644 --- a/src/connector/sink_impl/src/sink/redis.rs +++ b/src/connector/sink_impl/src/sink/redis.rs @@ -20,8 +20,10 @@ use redis::aio::Connection; use redis::{Client as RedisClient, Pipeline}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use serde_derive::{Deserialize, Serialize}; +use risingwave_connector::sink::encoder::template::{KEY_FORMAT, VALUE_FORMAT}; +use serde_derive::Deserialize; use serde_with::serde_as; +use with_options::WithOptions; use super::catalog::SinkFormatDesc; use super::encoder::template::TemplateEncoder; @@ -36,9 +38,8 @@ use crate::sink::writer::{ use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam}; pub const REDIS_SINK: &str = "redis"; -pub const KEY_FORMAT: &str = "key_format"; -pub const VALUE_FORMAT: &str = "value_format"; -#[derive(Deserialize, Serialize, Debug, Clone)] + +#[derive(Deserialize, Debug, Clone, WithOptions)] pub struct RedisCommon { #[serde(rename = "redis.url")] pub url: String, @@ -51,7 +52,7 @@ impl RedisCommon { } } #[serde_as] -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, WithOptions)] pub struct RedisConfig { #[serde(flatten)] pub common: RedisCommon, @@ -188,6 +189,12 @@ impl RedisSinkPayloadWriter { } pub async fn commit(&mut self) -> Result<()> { + #[cfg(test)] + { + if self.conn.is_none() { + return Ok(()); + } + } self.pipe.query_async(self.conn.as_mut().unwrap()).await?; self.pipe.clear(); Ok(()) @@ -269,7 +276,8 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter { _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, { - self.payload_writer.write_chunk(chunk, formatter).await + self.payload_writer.write_chunk(chunk, formatter).await?; + self.payload_writer.commit().await }) } } @@ -283,6 +291,7 @@ mod test { use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; + use risingwave_connector::sink::encoder::template::{KEY_FORMAT, VALUE_FORMAT}; use super::*; use crate::sink::catalog::{SinkEncode, SinkFormat}; diff --git a/src/connector/src/with_options_test/mod.rs b/src/connector/src/with_options_test/mod.rs index 548ee038f1de8..0d47d709f0785 100644 --- a/src/connector/src/with_options_test/mod.rs +++ b/src/connector/src/with_options_test/mod.rs @@ -27,15 +27,15 @@ fn connector_crate_path() -> PathBuf { } fn source_mod_path() -> PathBuf { - connector_crate_path().join("src").join("source") + connector_crate_path().join("src/source") } fn sink_mod_path() -> PathBuf { - connector_crate_path().join("src").join("sink") + connector_crate_path().join("sink_impl/src/sink") } fn common_mod_path() -> PathBuf { - connector_crate_path().join("src").join("common.rs") + connector_crate_path().join("src/common.rs") } pub fn generate_with_options_yaml_source() -> String {