Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 6, 2023
1 parent 5dd6c01 commit a9535c2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
21 changes: 15 additions & 6 deletions src/connector/sink_impl/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ use redis::aio::Connection;
use redis::{Client as RedisClient, Pipeline};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::{Deserialize, Serialize};
use risingwave_connector::sink::encoder::template::{KEY_FORMAT, VALUE_FORMAT};
use serde_derive::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

use super::catalog::SinkFormatDesc;
use super::encoder::template::TemplateEncoder;
Expand All @@ -36,9 +38,8 @@ use crate::sink::writer::{
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam};

pub const REDIS_SINK: &str = "redis";
pub const KEY_FORMAT: &str = "key_format";
pub const VALUE_FORMAT: &str = "value_format";
#[derive(Deserialize, Serialize, Debug, Clone)]

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct RedisCommon {
#[serde(rename = "redis.url")]
pub url: String,
Expand All @@ -51,7 +52,7 @@ impl RedisCommon {
}
}
#[serde_as]
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct RedisConfig {
#[serde(flatten)]
pub common: RedisCommon,
Expand Down Expand Up @@ -188,6 +189,12 @@ impl RedisSinkPayloadWriter {
}

pub async fn commit(&mut self) -> Result<()> {
#[cfg(test)]
{
if self.conn.is_none() {
return Ok(());
}
}
self.pipe.query_async(self.conn.as_mut().unwrap()).await?;
self.pipe.clear();
Ok(())
Expand Down Expand Up @@ -269,7 +276,8 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter {
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
self.payload_writer.write_chunk(chunk, formatter).await
self.payload_writer.write_chunk(chunk, formatter).await?;
self.payload_writer.commit().await
})
}
}
Expand All @@ -283,6 +291,7 @@ mod test {
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_connector::sink::encoder::template::{KEY_FORMAT, VALUE_FORMAT};

use super::*;
use crate::sink::catalog::{SinkEncode, SinkFormat};
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/with_options_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ fn connector_crate_path() -> PathBuf {
}

fn source_mod_path() -> PathBuf {
connector_crate_path().join("src").join("source")
connector_crate_path().join("src/source")
}

fn sink_mod_path() -> PathBuf {
connector_crate_path().join("src").join("sink")
connector_crate_path().join("sink_impl/src/sink")
}

fn common_mod_path() -> PathBuf {
connector_crate_path().join("src").join("common.rs")
connector_crate_path().join("src/common.rs")
}

pub fn generate_with_options_yaml_source() -> String {
Expand Down

0 comments on commit a9535c2

Please sign in to comment.