diff --git a/pom.xml b/pom.xml
index 65733d5..f3b6c49 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,10 @@
lettuce-core
5.2.1.RELEASE
+
+ com.github.jcustenborder.kafka.connect
+ connect-utils-jackson
+
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/redis/RedisSinkTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/redis/RedisSinkTask.java
index ebe485c..4273873 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/redis/RedisSinkTask.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/redis/RedisSinkTask.java
@@ -17,12 +17,11 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
-import com.github.jcustenborder.kafka.connect.utils.data.SinkOffsetState;
-import com.github.jcustenborder.kafka.connect.utils.data.TopicPartitionCounter;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import com.google.common.base.Charsets;
import io.lettuce.core.KeyValue;
import io.lettuce.core.RedisFuture;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
@@ -146,8 +145,6 @@ public void put(Collection records) {
SinkOperation operation = SinkOperation.NONE;
- TopicPartitionCounter counter = new TopicPartitionCounter();
-
for (SinkRecord record : records) {
log.trace("put() - Processing record " + formatLocation(record));
if (null == record.key()) {
@@ -182,7 +179,6 @@ public void put(Collection records) {
operations.add(operation);
}
operation.add(key, value);
- counter.increment(record.topic(), record.kafkaPartition(), record.kafkaOffset());
}
log.debug(
@@ -191,33 +187,50 @@ public void put(Collection records) {
records.size()
);
- final List offsetData = counter.offsetStates();
- if (!offsetData.isEmpty()) {
- operation = SinkOperation.create(SinkOperation.Type.SET, this.config, offsetData.size());
- operations.add(operation);
- for (SinkOffsetState e : offsetData) {
- final byte[] key = String.format("__kafka.offset.%s.%s", e.topic(), e.partition()).getBytes(Charsets.UTF_8);
- final byte[] value;
- try {
- value = ObjectMapperFactory.INSTANCE.writeValueAsBytes(e);
- } catch (JsonProcessingException e1) {
- throw new DataException(e1);
- }
- operation.add(key, value);
- log.trace("put() - Setting offset: {}", e);
- }
- }
-
for (SinkOperation op : operations) {
log.debug("put() - Executing {} operation with {} values", op.type, op.size());
try {
op.execute(this.session.asyncCommands());
} catch (InterruptedException e) {
+ log.warn("Exception thrown while executing operation", e);
throw new RetriableException(e);
}
}
}
+ @Override
+ public void flush(Map currentOffsets) {
+ SinkOperation operation = SinkOperation.create(SinkOperation.Type.SET, this.config, currentOffsets.size());
+
+ List states = currentOffsets
+ .entrySet().stream()
+ .map(e -> ImmutableSinkOffsetState.builder()
+ .topic(e.getKey().topic())
+ .partition(e.getKey().partition())
+ .offset(e.getValue().offset())
+ .build()
+ ).collect(Collectors.toList());
+
+ for (SinkOffsetState e : states) {
+ final byte[] key = String.format("__kafka.offset.%s.%s", e.topic(), e.partition()).getBytes(Charsets.UTF_8);
+ final byte[] value;
+ try {
+ value = ObjectMapperFactory.INSTANCE.writeValueAsBytes(e);
+ } catch (JsonProcessingException e1) {
+ throw new DataException(e1);
+ }
+ operation.add(key, value);
+ log.trace("put() - Setting offset: {}", e);
+ }
+
+ try {
+ operation.execute(this.session.asyncCommands());
+ } catch (InterruptedException e) {
+ log.warn("Exception thrown while executing operation", e);
+ throw new RetriableException(e);
+ }
+ }
+
private static String redisOffsetKey(TopicPartition topicPartition) {
return String.format("__kafka.offset.%s.%s", topicPartition.topic(), topicPartition.partition());
}
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/redis/SinkOffsetState.java b/src/main/java/com/github/jcustenborder/kafka/connect/redis/SinkOffsetState.java
new file mode 100644
index 0000000..3e523da
--- /dev/null
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/redis/SinkOffsetState.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.redis;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.kafka.common.TopicPartition;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@JsonDeserialize(as = ImmutableSinkOffsetState.class)
+@JsonAutoDetect(
+ fieldVisibility = Visibility.NONE,
+ getterVisibility = Visibility.NONE,
+ setterVisibility = Visibility.NONE,
+ isGetterVisibility = Visibility.NONE,
+ creatorVisibility = Visibility.NONE)
+public interface SinkOffsetState {
+ @JsonProperty("topic")
+ String topic();
+
+ @JsonProperty("partition")
+ Integer partition();
+
+ @JsonProperty("offset")
+ Long offset();
+
+ @JsonIgnore
+ @Value.Derived
+ default TopicPartition topicPartition() {
+ return new TopicPartition(topic(), partition());
+ }
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/redis/RedisSinkTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/redis/RedisSinkTaskTest.java
index c0d6555..b8dcdd2 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/redis/RedisSinkTaskTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/redis/RedisSinkTaskTest.java
@@ -21,6 +21,8 @@
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -48,6 +50,8 @@
public class RedisSinkTaskTest {
long offset = 1;
+ SinkRecord lastRecord;
+
SinkRecord record(String k, String v) {
final byte[] key = k.getBytes(Charsets.UTF_8);
final Schema keySchema = Schema.BYTES_SCHEMA;
@@ -62,7 +66,7 @@ SinkRecord record(String k, String v) {
valueSchema = Schema.BYTES_SCHEMA;
}
- return new SinkRecord(
+ return lastRecord = new SinkRecord(
"topic",
1,
keySchema,
@@ -147,6 +151,8 @@ public void put() throws InterruptedException {
InOrder inOrder = Mockito.inOrder(asyncCommands);
inOrder.verify(asyncCommands).mset(anyMap());
inOrder.verify(asyncCommands).del(any(byte[].class));
+
+ task.flush(ImmutableMap.of(new TopicPartition(lastRecord.topic(), lastRecord.kafkaPartition()), new OffsetAndMetadata(lastRecord.kafkaOffset())));
inOrder.verify(asyncCommands, times(2)).mset(anyMap());
}