> tasksConfig = buildTasksConfig(connName);
+ if (tasksConfig.isEmpty()) {
+ callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), tasksConfig);
+ return;
+ }
+ callback.onCompletion(null, tasksConfig);
+ }
+
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index fbcc35bb4b..d4e6358e2e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -62,6 +62,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
/**
*
@@ -224,6 +225,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) {
// Connector and task configs: name or id -> config map
private final Map> connectorConfigs = new HashMap<>();
private final Map> taskConfigs = new HashMap<>();
+ private final Supplier topicAdminSupplier;
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
@@ -241,11 +243,17 @@ public static String COMMIT_TASKS_KEY(String connectorName) {
private final WorkerConfigTransformer configTransformer;
+ @Deprecated
public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) {
+ this(converter, config, configTransformer, null);
+ }
+
+ public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier) {
this.lock = new Object();
this.started = false;
this.converter = converter;
this.offset = -1;
+ this.topicAdminSupplier = adminSupplier;
this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
if (this.topic == null || this.topic.trim().length() == 0)
@@ -471,6 +479,7 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo
Map adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
+ Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Map topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).configStorageTopicSettings()
: Collections.emptyMap();
@@ -481,30 +490,25 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();
- return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
+ return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
}
private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps,
Map consumerProps,
Callback> consumedCallback,
- final NewTopic topicDescription, final Map adminProps) {
- Runnable createTopics = new Runnable() {
- @Override
- public void run() {
- log.debug("Creating admin client to manage Connect internal config topic");
- try (TopicAdmin admin = new TopicAdmin(adminProps)) {
- // Create the topic if it doesn't exist
- Set newTopics = admin.createTopics(topicDescription);
- if (!newTopics.contains(topic)) {
- // It already existed, so check that the topic cleanup policy is compact only and not delete
- log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
- admin.verifyTopicCleanupPolicyOnlyCompact(topic,
- DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
- }
- }
+ final NewTopic topicDescription, Supplier adminSupplier) {
+ java.util.function.Consumer createTopics = admin -> {
+ log.debug("Creating admin client to manage Connect internal config topic");
+ // Create the topic if it doesn't exist
+ Set newTopics = admin.createTopics(topicDescription);
+ if (!newTopics.contains(topic)) {
+ // It already existed, so check that the topic cleanup policy is compact only and not delete
+ log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+ admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+ DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
}
};
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
}
@SuppressWarnings("unchecked")
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 8408f99c10..26b47f996b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -41,11 +41,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
/**
*
@@ -62,6 +64,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
private KafkaBasedLog offsetLog;
private HashMap data;
+ private final Supplier topicAdminSupplier;
+
+ @Deprecated
+ public KafkaOffsetBackingStore() {
+ this.topicAdminSupplier = null;
+ }
+
+ public KafkaOffsetBackingStore(Supplier topicAdmin) {
+ this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
+ }
@Override
public void configure(final WorkerConfig config) {
@@ -86,6 +98,7 @@ public void configure(final WorkerConfig config) {
Map adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
+ Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Map topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).offsetStorageTopicSettings()
: Collections.emptyMap();
@@ -96,30 +109,25 @@ public void configure(final WorkerConfig config) {
.replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();
- offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
+ offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier);
}
private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps,
Map consumerProps,
Callback> consumedCallback,
- final NewTopic topicDescription, final Map adminProps) {
- Runnable createTopics = new Runnable() {
- @Override
- public void run() {
- log.debug("Creating admin client to manage Connect internal offset topic");
- try (TopicAdmin admin = new TopicAdmin(adminProps)) {
- // Create the topic if it doesn't exist
- Set newTopics = admin.createTopics(topicDescription);
- if (!newTopics.contains(topic)) {
- // It already existed, so check that the topic cleanup policy is compact only and not delete
- log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
- admin.verifyTopicCleanupPolicyOnlyCompact(topic,
- DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
- }
- }
+ final NewTopic topicDescription, Supplier adminSupplier) {
+ java.util.function.Consumer createTopics = admin -> {
+ log.debug("Creating admin client to manage Connect internal offset topic");
+ // Create the topic if it doesn't exist
+ Set newTopics = admin.createTopics(topicDescription);
+ if (!newTopics.contains(topic)) {
+ // It already existed, so check that the topic cleanup policy is compact only and not delete
+ log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+ admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+ DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
}
};
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 5d6057d799..efa405f3a4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -61,6 +61,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
/**
* StatusBackingStore implementation which uses a compacted topic for storage
@@ -128,17 +129,24 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
protected final Table> tasks;
protected final Map> connectors;
protected final ConcurrentMap> topics;
+ private final Supplier topicAdminSupplier;
private String statusTopic;
private KafkaBasedLog kafkaLog;
private int generation;
+ @Deprecated
public KafkaStatusBackingStore(Time time, Converter converter) {
+ this(time, converter, null);
+ }
+
+ public KafkaStatusBackingStore(Time time, Converter converter, Supplier topicAdminSupplier) {
this.time = time;
this.converter = converter;
this.tasks = new Table<>();
this.connectors = new HashMap<>();
this.topics = new ConcurrentHashMap<>();
+ this.topicAdminSupplier = topicAdminSupplier;
}
// visible for testing
@@ -169,6 +177,7 @@ public void configure(final WorkerConfig config) {
Map adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
+ Supplier adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Map topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).statusStorageTopicSettings()
@@ -180,36 +189,26 @@ public void configure(final WorkerConfig config) {
.replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();
- Callback> readCallback = new Callback>() {
- @Override
- public void onCompletion(Throwable error, ConsumerRecord record) {
- read(record);
- }
- };
- this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminProps);
+ Callback> readCallback = (error, record) -> read(record);
+ this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier);
}
private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps,
Map consumerProps,
Callback> consumedCallback,
- final NewTopic topicDescription, final Map adminProps) {
- Runnable createTopics = new Runnable() {
- @Override
- public void run() {
- log.debug("Creating admin client to manage Connect internal status topic");
- try (TopicAdmin admin = new TopicAdmin(adminProps)) {
- // Create the topic if it doesn't exist
- Set newTopics = admin.createTopics(topicDescription);
- if (!newTopics.contains(topic)) {
- // It already existed, so check that the topic cleanup policy is compact only and not delete
- log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
- admin.verifyTopicCleanupPolicyOnlyCompact(topic,
- DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
- }
- }
+ final NewTopic topicDescription, Supplier adminSupplier) {
+ java.util.function.Consumer createTopics = admin -> {
+ log.debug("Creating admin client to manage Connect internal status topic");
+ // Create the topic if it doesn't exist
+ Set newTopics = admin.createTopics(topicDescription);
+ if (!newTopics.contains(topic)) {
+ // It already existed, so check that the topic cleanup policy is compact only and not delete
+ log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+ admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+ DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
}
};
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, time, createTopics);
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics);
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index 90039b9c01..b89efa432f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -74,7 +74,7 @@ public synchronized ClusterConfigState snapshot() {
connectorConfigs,
connectorTargetStates,
taskConfigs,
- Collections.emptySet(),
+ Collections.emptySet(),
configTransformer);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
index ccec12a864..ceefd137d6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -27,7 +27,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -78,33 +77,26 @@ public void stop() {
@Override
public Future