From 1b4220fa4801506a1b55ea30f22998230938d823 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Mon, 14 Aug 2023 08:19:47 -0400 Subject: [PATCH] feat: implement KIP-618 source connector API related to exactly-once support --- build.gradle.kts | 2 +- .../connect/jdbc/JdbcSourceConnector.java | 12 +++++++++ .../connect/jdbc/JdbcSourceConnectorTest.java | 26 +++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index 9007eda7..75480471 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -117,7 +117,7 @@ publishing { } } -val kafkaVersion = "3.0.2" +val kafkaVersion = "3.3.2" val slf4jVersion = "2.0.13" val avroVersion = "1.8.1" diff --git a/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java index 64e8a2af..843ee7da 100644 --- a/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java +++ b/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.util.ConnectorUtils; @@ -215,4 +216,15 @@ public void stop() throws ConnectException { public ConfigDef config() { return JdbcSourceConnectorConfig.CONFIG_DEF; } + + @Override + public ExactlyOnceSupport exactlyOnceSupport(final Map props) { + final String rawMode = props.get(JdbcSourceConnectorConfig.MODE_CONFIG); + // We don't support exactly-once in bulk mode (there's no offsets tracking), and we + // don't currently support exactly-once in timestamp mode (there may be multiple rows + // with the same timestamp, which can lead to either data loss or duplicates on restart) + final boolean supported = JdbcSourceConnectorConfig.MODE_INCREMENTING.equals(rawMode) + || JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING.equals(rawMode); + return supported ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED; + } } diff --git a/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java b/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java index 7d3d7c72..8729c4b2 100644 --- a/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java +++ b/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.ExactlyOnceSupport; import io.aiven.connect.jdbc.config.JdbcConfig; import io.aiven.connect.jdbc.source.EmbeddedDerby; @@ -43,6 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; @@ -212,6 +214,30 @@ public void testConflictingQueryTableSettings() { assertThatThrownBy(() -> connector.start(connProps)).isInstanceOf(ConnectException.class); } + @Test + public void testExactlyOnceSupport() { + connProps.remove(JdbcSourceConnectorConfig.MODE_CONFIG); + assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps)); + + connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, null); + assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps)); + + connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, "unsupported mode"); + assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps)); + + connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_BULK); + assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps)); + + connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_TIMESTAMP); + assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps)); + + connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_INCREMENTING); + assertEquals(ExactlyOnceSupport.SUPPORTED, connector.exactlyOnceSupport(connProps)); + + connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING); + assertEquals(ExactlyOnceSupport.SUPPORTED, connector.exactlyOnceSupport(connProps)); + } + private void assertTaskConfigsHaveParentConfigs(final List> configs) { for (final Map config : configs) { assertThat(config).containsEntry(JdbcConfig.CONNECTION_URL_CONFIG, this.db.getUrl());