Skip to content

Commit

Permalink
sample store replication-fixes
Browse files Browse the repository at this point in the history
- fix sample.store.replication.factor not being set
- add maybeUpdateReplicationFactor to ensure sample.store.replication.factor is honored
  • Loading branch information
mavemuri committed Jul 20, 2021
1 parent 07aadfc commit 32cbb5b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 13 deletions.
2 changes: 1 addition & 1 deletion config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# The Kafka cluster to control.

bootstrap.servers=localhost:9091,localhost:9092,localhost:9093
bootstrap.servers=localhost:9094,localhost:9092,localhost:9093

# The maximum interval in milliseconds between two metadata refreshes.
#metadata.max.age.ms=300000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.ExecutionException;
Expand All @@ -24,11 +25,13 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
Expand Down Expand Up @@ -61,6 +64,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -259,6 +263,56 @@ public static boolean maybeUpdateTopicConfig(AdminClient adminClient, NewTopic t
return true;
}

/**
* Alter the Replication Factor of a topic if needed
* @param adminClient The adminClient to send describeTopics and alterReplicationFactor requests to.
* @param topicToChangeReplicationFactor Existing topic whose RF is to be modified -- cannot be {@code null}.
* @return {@code true} if the request is completed successfully, {@code false} if there are any exceptions.
*/
public static boolean maybeUpdateReplicationFactor(AdminClient adminClient, NewTopic topicToChangeReplicationFactor) {
short desiredReplicationFactor = topicToChangeReplicationFactor.replicationFactor();

String topicName = topicToChangeReplicationFactor.name();
LOG.info("Attempting alter of replication factor for topic {}", topicToChangeReplicationFactor.name());
// Retrieve ReplicationFactor of topic to check if it needs an update.
TopicDescription topicDescription;
try {
topicDescription = adminClient.describeTopics(Collections.singletonList(topicName)).values()
.get(topicName).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Modify RF for topic {} failed due to failure to describe cluster.", topicName, e);
return false;
}

// Alter Replication Factor of topic if needed.
try {
List<Node> brokers = new ArrayList<Node>(adminClient.describeCluster().nodes().get());
if ((short) topicDescription.partitions().get(0).replicas().size() != desiredReplicationFactor) {
if (brokers.size() < desiredReplicationFactor) {
LOG.warn("Unable to increase replication due to insufficient brokers- requested {} replicas but only {} brokers in cluster",
desiredReplicationFactor, brokers.size());
return false;
}
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignements = new HashMap<>();
List<Integer> targetReplicasList = brokers.stream().limit(desiredReplicationFactor).map(Node::id).collect(Collectors.toList());
// add each partition to reassignments
// move replica onto 1st-n brokers
// will be balanced by CruiseControl later on
// TODO: make smarter move by not requiring all partitionReplicas to be moved
// i.e, incremental changes + increase/decrease RF separately
for (int i = 0; i < topicToChangeReplicationFactor.numPartitions(); i++) {
reassignements.put(new TopicPartition(topicName, i), Optional.of(new NewPartitionReassignment(targetReplicasList)));
}
AlterPartitionReassignmentsResult alterPartitionReassignmentsResult = adminClient.alterPartitionReassignments(reassignements);
alterPartitionReassignmentsResult.all();
}
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Unable to change topic {} replication factor from {} to {}",
topicName, topicDescription.partitions().get(0).replicas().size(), desiredReplicationFactor, e);
}
return true;
}

/**
* Increase the partition count of the given existing topic to the desired partition count (if needed).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.CLIENT_REQUEST_TIMEOUT_MS;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.createTopic;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeUpdateTopicConfig;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeUpdateReplicationFactor;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeIncreasePartitionCount;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.bootstrapServers;

Expand All @@ -33,6 +34,7 @@ public abstract class AbstractKafkaSampleStore implements SampleStore {
protected static final Duration PRODUCER_CLOSE_TIMEOUT = Duration.ofMinutes(3);
protected static final short DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR = 3;
protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32;
public static final String SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG = "sample.store.topic.replication.factor";

protected volatile boolean _shutdown = false;
protected Short _sampleStoreTopicReplicationFactor;
Expand Down Expand Up @@ -64,19 +66,22 @@ protected void createProducer(Map<String, ?> config, String producerClientId) {
*/
protected short sampleStoreTopicReplicationFactor(Map<String, ?> config, AdminClient adminClient) {
if (_sampleStoreTopicReplicationFactor == null) {
short numberOfBrokersInCluster;
try {
numberOfBrokersInCluster = (short) adminClient.describeCluster().nodes().get(CLIENT_REQUEST_TIMEOUT_MS,
TimeUnit.MILLISECONDS).size();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException("Auto creation of sample store topics failed due to failure to describe cluster.", e);
if (config.get(SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG) != null) {
_sampleStoreTopicReplicationFactor = Short.parseShort((String) config.get(SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG));
} else {
short numberOfBrokersInCluster;
try {
numberOfBrokersInCluster = (short) adminClient.describeCluster().nodes().get(CLIENT_REQUEST_TIMEOUT_MS,
TimeUnit.MILLISECONDS).size();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException("Auto creation of sample store topics failed due to failure to describe cluster.", e);
}
if (numberOfBrokersInCluster <= 1) {
throw new IllegalStateException(String.format("Kafka cluster has less than 2 brokers (brokers in cluster=%d, zookeeper.connect=%s)",
numberOfBrokersInCluster, config.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG)));
}
_sampleStoreTopicReplicationFactor = (short) Math.min(DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR, numberOfBrokersInCluster);
}
if (numberOfBrokersInCluster <= 1) {
throw new IllegalStateException(String.format("Kafka cluster has less than 2 brokers (brokers in cluster=%d, zookeeper.connect=%s)",
numberOfBrokersInCluster, config.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG)));
}

return (short) Math.min(DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR, numberOfBrokersInCluster);
}

return _sampleStoreTopicReplicationFactor;
Expand All @@ -86,6 +91,7 @@ protected void ensureTopicCreated(AdminClient adminClient, NewTopic sampleStoreT
if (!createTopic(adminClient, sampleStoreTopic)) {
// Update topic config and partition count to ensure desired properties.
maybeUpdateTopicConfig(adminClient, sampleStoreTopic);
maybeUpdateReplicationFactor(adminClient, sampleStoreTopic);
maybeIncreasePartitionCount(adminClient, sampleStoreTopic);
}
}
Expand Down

0 comments on commit 32cbb5b

Please sign in to comment.