Skip to content

Commit

Permalink
[Feature][Redis] Flush data when the time reaches checkpoint.interval (
Browse files Browse the repository at this point in the history
…apache#8198)

Co-authored-by: limin <[email protected]>
  • Loading branch information
lm-ylj and limin authored Dec 10, 2024
1 parent d783f94 commit 2e24941
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
Expand Down Expand Up @@ -78,8 +79,7 @@ public void write(SeaTunnelRow element) throws IOException {
String value = getValue(element, fields);
valueBuffer.add(value);
if (keyBuffer.size() >= batchSize) {
doBatchWrite();
clearBuffer();
flush();
}
}

Expand Down Expand Up @@ -221,6 +221,16 @@ private void doBatchWrite() {

@Override
public void close() throws IOException {
flush();
}

@Override
public Optional<Void> prepareCommit() {
flush();
return Optional.empty();
}

private synchronized void flush() {
if (!keyBuffer.isEmpty()) {
doBatchWrite();
clearBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;

@Slf4j
public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements TestResource {

Expand Down Expand Up @@ -492,7 +496,7 @@ public void testFakeToRedisDeleteSetTest(TestContainer container)
}

@TestTemplate
public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
public void testFakeToToRedisDeleteZSetTest(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake-to-redis-test-delete-zset.conf");
Expand All @@ -501,6 +505,26 @@ public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
jedis.del("zset_check");
}

@TestTemplate
public void testFakeToRedisInRealTimeTest(TestContainer container) {
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/fake-to-redis-test-in-real-time.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertEquals(3, jedis.llen("list_check"));
});
jedis.del("list_check");
}

@TestTemplate
public void testFakeToRedisNormalKeyIsNullTest(TestContainer container)
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
shade.identifier = "base64"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
schema = {
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
}

sink {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
key = "list_check"
data_type = list
batch_size = 33
}
}

0 comments on commit 2e24941

Please sign in to comment.