From 0ecc8387d473fa4e8a3c77d3f9e1609469f945d2 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 23 Nov 2023 17:48:12 +0800 Subject: [PATCH] fix(sink): Fix redis commit bug (#13538) --- src/connector/src/sink/redis.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index f1b0d7ac66de2..b52f9d19b3cde 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -190,6 +190,12 @@ impl RedisSinkPayloadWriter { } pub async fn commit(&mut self) -> Result<()> { + #[cfg(test)] + { + if self.conn.is_none() { + return Ok(()); + } + } self.pipe.query_async(self.conn.as_mut().unwrap()).await?; self.pipe.clear(); Ok(()) @@ -271,7 +277,8 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter { _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, { - self.payload_writer.write_chunk(chunk, formatter).await + self.payload_writer.write_chunk(chunk, formatter).await?; + self.payload_writer.commit().await }) } }