diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index 71739b57897..b634462fbf4 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public class RedisSinkWriter extends AbstractSinkWriter implements SupportMultiTableSinkWriter { @@ -78,8 +79,7 @@ public void write(SeaTunnelRow element) throws IOException { String value = getValue(element, fields); valueBuffer.add(value); if (keyBuffer.size() >= batchSize) { - doBatchWrite(); - clearBuffer(); + flush(); } } @@ -221,6 +221,16 @@ private void doBatchWrite() { @Override public void close() throws IOException { + flush(); + } + + @Override + public Optional prepareCommit() { + flush(); + return Optional.empty(); + } + + private synchronized void flush() { if (!keyBuffer.isEmpty()) { doBatchWrite(); clearBuffer(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java index 96ac20cbe6e..d21caa60f2e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java @@ -57,8 +57,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.awaitility.Awaitility.await; + @Slf4j public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements TestResource { @@ -492,7 +496,7 @@ public void testFakeToRedisDeleteSetTest(TestContainer container) } @TestTemplate - public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container) + public void testFakeToToRedisDeleteZSetTest(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/fake-to-redis-test-delete-zset.conf"); @@ -501,6 +505,26 @@ public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container) jedis.del("zset_check"); } + @TestTemplate + public void testFakeToRedisInRealTimeTest(TestContainer container) { + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/fake-to-redis-test-in-real-time.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals(3, jedis.llen("list_check")); + }); + jedis.del("list_check"); + } + @TestTemplate public void testFakeToRedisNormalKeyIsNullTest(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf new file mode 100644 index 00000000000..923d53b5a0c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 10000 + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "list_check" + data_type = list + batch_size = 33 + } +} \ No newline at end of file