Skip to content

Commit

Permalink
Merge pull request #304 from Aiven-Open/jjaakola-aiven-update-checkstyle
Browse files Browse the repository at this point in the history
chore: update Checkstyle version and fix errors
  • Loading branch information
eliax1996 authored Apr 30, 2024
2 parents 50209cc + 60b75b2 commit 39babfb
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 10 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ tasks.wrapper {
}

checkstyle {
toolVersion = "8.35"
toolVersion = "10.16.0"
configDirectory.set(rootProject.file("checkstyle/"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void start(final Map<String, String> properties) {
log.info("Starting JDBC source task");
try {
config = new JdbcSourceTaskConfig(properties);
config.validate();
} catch (final ConfigException e) {
throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
}
Expand All @@ -101,10 +102,7 @@ public void start(final Map<String, String> properties) {

final List<String> tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
final String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
if ((tables.isEmpty() && query.isEmpty()) || (!tables.isEmpty() && !query.isEmpty())) {
throw new ConnectException("Invalid configuration: each JdbcSourceTask must have at "
+ "least one table assigned to it or one query specified");
}

final TableQuerier.QueryMode queryMode = !query.isEmpty()
? TableQuerier.QueryMode.QUERY
: TableQuerier.QueryMode.TABLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package io.aiven.connect.jdbc.source;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;

/**
* Configuration options for a single JdbcSourceTask. These are processed after all
Expand All @@ -30,12 +32,25 @@
public class JdbcSourceTaskConfig extends JdbcSourceConnectorConfig {

public static final String TABLES_CONFIG = "tables";
private static final String TABLES_DOC = "List of tables for this task to watch for changes.";
private static final String TABLES_DOC = "List of tables encoded as a comma separated string"
+ " for this task to watch for changes.";
public static final String TABLES_DEFAULT = "";


static ConfigDef config = baseConfigDef()
.define(TABLES_CONFIG, Type.LIST, Importance.HIGH, TABLES_DOC);
.define(TABLES_CONFIG, Type.LIST, TABLES_DEFAULT, Importance.HIGH, TABLES_DOC);

public JdbcSourceTaskConfig(final Map<String, String> props) {
super(config, props);
}

public void validate() throws ConfigException {
final List<String> tables = this.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
final String query = this.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
if (tables.isEmpty() && query.isEmpty() || !tables.isEmpty() && !query.isEmpty()) {
throw new org.apache.kafka.connect.errors.ConnectException(
"Invalid configuration: each JdbcSourceTask must have at "
+ "least one table assigned to it or one query specified");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,9 @@ public TimestampIncrementingOffset extractValues(
extractedTimestamp = extractOffsetTimestamp(schema, record);
assert previousOffset == null
|| previousOffset.getTimestampOffset() == null
|| (previousOffset.getTimestampOffset() != null
|| previousOffset.getTimestampOffset() != null
&& previousOffset.getTimestampOffset().compareTo(
extractedTimestamp) <= 0
);
extractedTimestamp) <= 0;
}
Long extractedId = null;
if (hasIncrementedColumn()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc.source;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;

import io.aiven.connect.jdbc.config.JdbcConfig;

import org.junit.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;


public final class JdbcSourceTaskConfigTest {

@Test(expected = ConfigException.class)
public void testValidateEmptyConfig() {
new JdbcSourceTaskConfig(Collections.emptyMap());
}

@Test
public void testValidateTablesAndQueryMandatoryConfigPresent() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcConfig.CONNECTION_URL_CONFIG, "connection-url");
properties.put(JdbcSourceConnectorConfig.MODE_CONFIG, "bulk");
properties.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-prefix");
final JdbcSourceTaskConfig config = new JdbcSourceTaskConfig(properties);
assertThrowsExactly(ConnectException.class, config::validate,
"Invalid configuration: each JdbcSourceTask must"
+ " have at least one table assigned to it or one query specified");
}


@Test
public void testValidateQueryAndTablesGiven() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcSourceTaskConfig.TABLES_CONFIG, "test-table-1, test-table-2");
properties.put(JdbcSourceTaskConfig.QUERY_CONFIG, "test-query");
properties.put(JdbcConfig.CONNECTION_URL_CONFIG, "connection-url");
properties.put(JdbcSourceConnectorConfig.MODE_CONFIG, "bulk");
properties.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-prefix");
final JdbcSourceTaskConfig config = new JdbcSourceTaskConfig(properties);
assertThrowsExactly(ConnectException.class, config::validate,
"Invalid configuration: each JdbcSourceTask must"
+ " have at least one table assigned to it or one query specified");
}

@Test
public void testValidateQueryGiven() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcSourceTaskConfig.QUERY_CONFIG, "test-query");
properties.put(JdbcConfig.CONNECTION_URL_CONFIG, "connection-url");
properties.put(JdbcSourceConnectorConfig.MODE_CONFIG, "bulk");
properties.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-prefix");
final JdbcSourceTaskConfig config = new JdbcSourceTaskConfig(properties);
config.validate();
}

@Test
public void testValidateTablesGiven() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcSourceTaskConfig.TABLES_CONFIG, "test-table-1, test-table-2");
properties.put(JdbcConfig.CONNECTION_URL_CONFIG, "connection-url");
properties.put(JdbcSourceConnectorConfig.MODE_CONFIG, "bulk");
properties.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-prefix");
final JdbcSourceTaskConfig config = new JdbcSourceTaskConfig(properties);
config.validate();
assertEquals(
config.getList(JdbcSourceTaskConfig.TABLES_CONFIG),
List.of("test-table-1", "test-table-2")
);
}

}

0 comments on commit 39babfb

Please sign in to comment.