From 50ba86802cbc9a4bd72dbc43a914a843e3738222 Mon Sep 17 00:00:00 2001 From: Lorand Kasler Date: Thu, 3 Nov 2016 17:47:58 +0100 Subject: [PATCH] add desired replication factor option (#4) --- pom.xml | 4 ++-- .../kafka/tools/KafkaAssignmentGenerator.java | 10 +++++++--- .../siftscience/kafka/tools/KafkaTopicAssigner.java | 9 ++++++--- .../kafka/tools/KafkaTopicAssignerTest.java | 8 ++++---- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index c495f0a..399cd43 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ siftscience kafka-assigner jar - 1.0 + 1.1 kafka-assigner Tools for reassigning Kafka partitions with minimal movement @@ -130,4 +130,4 @@ - \ No newline at end of file + diff --git a/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java b/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java index 05b9abf..9375154 100644 --- a/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java +++ b/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java @@ -75,6 +75,10 @@ public class KafkaAssignmentGenerator { usage = "comma-separated list of topics") private String topics = null; + @Option(name = "--desired_replication_factor", + usage = "used for changing replication factor for topics, if not present it will use the existing number") + private int desiredReplicationFactor = -1; + @Option(name = "--disable_rack_awareness", usage = "set to true to ignore rack configurations") private boolean disableRackAwareness = false; @@ -126,7 +130,7 @@ private static void printCurrentBrokers(ZkUtils zkUtils) throws JSONException { private static void printLeastDisruptiveReassignment( ZkUtils zkUtils, List specifiedTopics, Set specifiedBrokers, - Set excludedBrokers, Map rackAssignment) + Set excludedBrokers, Map rackAssignment, int desiredReplicationFactor) throws JSONException { // We need three inputs for rebalacing: the brokers, the topics, and the current assignment // of topics to brokers. @@ -169,7 +173,7 @@ public Integer apply(Broker broker) { for (String topic : JavaConversions.seqAsJavaList(topics)) { Map> partitionAssignment = initialAssignments.get(topic); Map> finalAssignment = assigner.generateAssignment( - topic, partitionAssignment, brokers, rackAssignment); + topic, partitionAssignment, brokers, rackAssignment, desiredReplicationFactor); for (Map.Entry> e : finalAssignment.entrySet()) { JSONObject partitionJson = new JSONObject(); partitionJson.put("topic", topic); @@ -284,7 +288,7 @@ private void runTool(String[] args) throws JSONException { break; case PRINT_REASSIGNMENT: printLeastDisruptiveReassignment(zkUtils, topics, brokerIdSet, - excludedBrokerIdSet, rackAssignment); + excludedBrokerIdSet, rackAssignment, desiredReplicationFactor); break; default: throw new UnsupportedOperationException("Invalid mode: " + mode); diff --git a/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java b/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java index f34dd50..5b5211e 100644 --- a/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java +++ b/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java @@ -35,15 +35,18 @@ public KafkaTopicAssigner() { * in each list is the "leader" replica for a partition. * @param brokers a list of broker IDs as strings * @param rackAssignment a map from broker ID to rack ID if a rack is defined for that broker + * @param desiredReplicationFactor used to change replication factor, use -1 to keep the same as + * the original topic * @return the new assignment: a map from partition ID to ordered list of broker IDs */ public Map> generateAssignment( String topic, Map> currentAssignment, Set brokers, - Map rackAssignment) { + Map rackAssignment, int desiredReplicationFactor) { // We need to do 2 things: // - Get the set of partitions as integers // - Figure out the replication factor (which should be the same for each partition) - int replicationFactor = -1; + // if desiredReplicationFactor is negative + int replicationFactor = desiredReplicationFactor; Set partitions = Sets.newTreeSet(); for (Map.Entry> entry : currentAssignment.entrySet()) { int partition = entry.getKey(); @@ -51,7 +54,7 @@ public Map> generateAssignment( partitions.add(partition); if (replicationFactor < 0) { replicationFactor = replicas.size(); - } else { + } else if (desiredReplicationFactor < 0) { Preconditions.checkState(replicationFactor == replicas.size(), "Topic " + topic + " has partition " + partition + " with unexpected replication factor " + replicas.size()); diff --git a/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java b/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java index 8c63d68..1d56448 100644 --- a/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java +++ b/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java @@ -35,7 +35,7 @@ public void testRackAwareExpansion() { ); KafkaTopicAssigner assigner = new KafkaTopicAssigner(); Map> newAssignment = assigner.generateAssignment( - topic, currentAssignment, newBrokers, rackAssignments); + topic, currentAssignment, newBrokers, rackAssignments, -1); Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts( currentAssignment, newAssignment, 1); @@ -68,7 +68,7 @@ public void testClusterExpansion() { Set newBrokers = ImmutableSet.of(10, 11, 12, 13); KafkaTopicAssigner assigner = new KafkaTopicAssigner(); Map> newAssignment = assigner.generateAssignment( - topic, currentAssignment, newBrokers, Collections.emptyMap()); + topic, currentAssignment, newBrokers, Collections.emptyMap(), -1); Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts( currentAssignment, newAssignment, 1); @@ -93,7 +93,7 @@ public void testDecommission() { Set newBrokers = ImmutableSet.of(10, 11, 13); KafkaTopicAssigner assigner = new KafkaTopicAssigner(); Map> newAssignment = assigner.generateAssignment( - topic, currentAssignment, newBrokers, Collections.emptyMap()); + topic, currentAssignment, newBrokers, Collections.emptyMap(), -1); Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts( currentAssignment, newAssignment, 1); @@ -133,7 +133,7 @@ public void testReplacement() { Set newBrokers = ImmutableSet.of(10, 11, 13); KafkaTopicAssigner assigner = new KafkaTopicAssigner(); Map> newAssignment = assigner.generateAssignment( - topic, currentAssignment, newBrokers, Collections.emptyMap()); + topic, currentAssignment, newBrokers, Collections.emptyMap(),-1); // run basic sanity checks Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(