diff --git a/config/cruisecontrol.properties b/config/cruisecontrol.properties index 7fc5d9da3..4f9329e99 100644 --- a/config/cruisecontrol.properties +++ b/config/cruisecontrol.properties @@ -9,7 +9,7 @@ # The Kafka cluster to control. -bootstrap.servers=localhost:9091,localhost:9092,localhost:9093 +bootstrap.servers=localhost:9094,localhost:9092,localhost:9093 # The maximum interval in milliseconds between two metadata refreshes. #metadata.max.age.ms=300000 diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java index 3f263cfdb..738659b27 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java @@ -14,6 +14,7 @@ import com.linkedin.kafka.cruisecontrol.exception.SamplingException; import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements; import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner; +import java.util.ArrayList; import java.util.Collections; import java.util.Random; import java.util.concurrent.ExecutionException; @@ -24,11 +25,13 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigsResult; +import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; @@ -61,6 +64,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -259,6 +263,56 @@ public static boolean maybeUpdateTopicConfig(AdminClient adminClient, NewTopic t return true; } + /** + * Alter the Replication Factor of a topic if needed + * @param adminClient The adminClient to send describeTopics and alterReplicationFactor requests to. + * @param topicToChangeReplicationFactor Existing topic whose RF is to be modified -- cannot be {@code null}. + * @return {@code true} if the request is completed successfully, {@code false} if there are any exceptions. + */ + public static boolean maybeUpdateReplicationFactor(AdminClient adminClient, NewTopic topicToChangeReplicationFactor) { + short desiredReplicationFactor = topicToChangeReplicationFactor.replicationFactor(); + + String topicName = topicToChangeReplicationFactor.name(); + LOG.info("Attempting alter of replication factor for topic {}", topicToChangeReplicationFactor.name()); + // Retrieve ReplicationFactor of topic to check if it needs an update. + TopicDescription topicDescription; + try { + topicDescription = adminClient.describeTopics(Collections.singletonList(topicName)).values() + .get(topicName).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("Modify RF for topic {} failed due to failure to describe cluster.", topicName, e); + return false; + } + + // Alter Replication Factor of topic if needed. + try { + List brokers = new ArrayList(adminClient.describeCluster().nodes().get()); + if ((short) topicDescription.partitions().get(0).replicas().size() != desiredReplicationFactor) { + if (brokers.size() < desiredReplicationFactor) { + LOG.warn("Unable to increase replication due to insufficient brokers- requested {} replicas but only {} brokers in cluster", + desiredReplicationFactor, brokers.size()); + return false; + } + Map> reassignements = new HashMap<>(); + List targetReplicasList = brokers.stream().limit(desiredReplicationFactor).map(Node::id).collect(Collectors.toList()); + // add each partition to reassignments + // move replica onto 1st-n brokers + // will be balanced by CruiseControl later on + // TODO: make smarter move by not requiring all partitionReplicas to be moved + // i.e, incremental changes + increase/decrease RF separately + for (int i = 0; i < topicToChangeReplicationFactor.numPartitions(); i++) { + reassignements.put(new TopicPartition(topicName, i), Optional.of(new NewPartitionReassignment(targetReplicasList))); + } + AlterPartitionReassignmentsResult alterPartitionReassignmentsResult = adminClient.alterPartitionReassignments(reassignements); + alterPartitionReassignmentsResult.all(); + } + } catch (ExecutionException | InterruptedException e) { + LOG.warn("Unable to change topic {} replication factor from {} to {}", + topicName, topicDescription.partitions().get(0).replicas().size(), desiredReplicationFactor, e); + } + return true; + } + /** * Increase the partition count of the given existing topic to the desired partition count (if needed). * diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/AbstractKafkaSampleStore.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/AbstractKafkaSampleStore.java index ecce9a9c2..8c28ec79d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/AbstractKafkaSampleStore.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/AbstractKafkaSampleStore.java @@ -25,6 +25,7 @@ import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.CLIENT_REQUEST_TIMEOUT_MS; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.createTopic; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeUpdateTopicConfig; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeUpdateReplicationFactor; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeIncreasePartitionCount; import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.bootstrapServers; @@ -33,6 +34,7 @@ public abstract class AbstractKafkaSampleStore implements SampleStore { protected static final Duration PRODUCER_CLOSE_TIMEOUT = Duration.ofMinutes(3); protected static final short DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR = 3; protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32; + public static final String SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG = "sample.store.topic.replication.factor"; protected volatile boolean _shutdown = false; protected Short _sampleStoreTopicReplicationFactor; @@ -64,19 +66,22 @@ protected void createProducer(Map config, String producerClientId) { */ protected short sampleStoreTopicReplicationFactor(Map config, AdminClient adminClient) { if (_sampleStoreTopicReplicationFactor == null) { - short numberOfBrokersInCluster; - try { - numberOfBrokersInCluster = (short) adminClient.describeCluster().nodes().get(CLIENT_REQUEST_TIMEOUT_MS, - TimeUnit.MILLISECONDS).size(); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new IllegalStateException("Auto creation of sample store topics failed due to failure to describe cluster.", e); + if (config.get(SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG) != null) { + _sampleStoreTopicReplicationFactor = Short.parseShort((String) config.get(SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG)); + } else { + short numberOfBrokersInCluster; + try { + numberOfBrokersInCluster = (short) adminClient.describeCluster().nodes().get(CLIENT_REQUEST_TIMEOUT_MS, + TimeUnit.MILLISECONDS).size(); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IllegalStateException("Auto creation of sample store topics failed due to failure to describe cluster.", e); + } + if (numberOfBrokersInCluster <= 1) { + throw new IllegalStateException(String.format("Kafka cluster has less than 2 brokers (brokers in cluster=%d, zookeeper.connect=%s)", + numberOfBrokersInCluster, config.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG))); + } + _sampleStoreTopicReplicationFactor = (short) Math.min(DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR, numberOfBrokersInCluster); } - if (numberOfBrokersInCluster <= 1) { - throw new IllegalStateException(String.format("Kafka cluster has less than 2 brokers (brokers in cluster=%d, zookeeper.connect=%s)", - numberOfBrokersInCluster, config.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG))); - } - - return (short) Math.min(DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR, numberOfBrokersInCluster); } return _sampleStoreTopicReplicationFactor; @@ -86,6 +91,7 @@ protected void ensureTopicCreated(AdminClient adminClient, NewTopic sampleStoreT if (!createTopic(adminClient, sampleStoreTopic)) { // Update topic config and partition count to ensure desired properties. maybeUpdateTopicConfig(adminClient, sampleStoreTopic); + maybeUpdateReplicationFactor(adminClient, sampleStoreTopic); maybeIncreasePartitionCount(adminClient, sampleStoreTopic); } }