Skip to content

Commit

Permalink
add desired replication factor option (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
klorand authored and kanakb committed Nov 3, 2016
1 parent 3103ad4 commit 50ba868
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>siftscience</groupId>
<artifactId>kafka-assigner</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<version>1.1</version>

<name>kafka-assigner</name>
<description>Tools for reassigning Kafka partitions with minimal movement</description>
Expand Down Expand Up @@ -130,4 +130,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class KafkaAssignmentGenerator {
usage = "comma-separated list of topics")
private String topics = null;

@Option(name = "--desired_replication_factor",
usage = "used for changing replication factor for topics, if not present it will use the existing number")
private int desiredReplicationFactor = -1;

@Option(name = "--disable_rack_awareness",
usage = "set to true to ignore rack configurations")
private boolean disableRackAwareness = false;
Expand Down Expand Up @@ -126,7 +130,7 @@ private static void printCurrentBrokers(ZkUtils zkUtils) throws JSONException {

private static void printLeastDisruptiveReassignment(
ZkUtils zkUtils, List<String> specifiedTopics, Set<Integer> specifiedBrokers,
Set<Integer> excludedBrokers, Map<Integer, String> rackAssignment)
Set<Integer> excludedBrokers, Map<Integer, String> rackAssignment, int desiredReplicationFactor)
throws JSONException {
// We need three inputs for rebalacing: the brokers, the topics, and the current assignment
// of topics to brokers.
Expand Down Expand Up @@ -169,7 +173,7 @@ public Integer apply(Broker broker) {
for (String topic : JavaConversions.seqAsJavaList(topics)) {
Map<Integer, List<Integer>> partitionAssignment = initialAssignments.get(topic);
Map<Integer, List<Integer>> finalAssignment = assigner.generateAssignment(
topic, partitionAssignment, brokers, rackAssignment);
topic, partitionAssignment, brokers, rackAssignment, desiredReplicationFactor);
for (Map.Entry<Integer, List<Integer>> e : finalAssignment.entrySet()) {
JSONObject partitionJson = new JSONObject();
partitionJson.put("topic", topic);
Expand Down Expand Up @@ -284,7 +288,7 @@ private void runTool(String[] args) throws JSONException {
break;
case PRINT_REASSIGNMENT:
printLeastDisruptiveReassignment(zkUtils, topics, brokerIdSet,
excludedBrokerIdSet, rackAssignment);
excludedBrokerIdSet, rackAssignment, desiredReplicationFactor);
break;
default:
throw new UnsupportedOperationException("Invalid mode: " + mode);
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,26 @@ public KafkaTopicAssigner() {
* in each list is the "leader" replica for a partition.
* @param brokers a list of broker IDs as strings
* @param rackAssignment a map from broker ID to rack ID if a rack is defined for that broker
* @param desiredReplicationFactor used to change replication factor, use -1 to keep the same as
* the original topic
* @return the new assignment: a map from partition ID to ordered list of broker IDs
*/
public Map<Integer, List<Integer>> generateAssignment(
String topic, Map<Integer, List<Integer>> currentAssignment, Set<Integer> brokers,
Map<Integer, String> rackAssignment) {
Map<Integer, String> rackAssignment, int desiredReplicationFactor) {
// We need to do 2 things:
// - Get the set of partitions as integers
// - Figure out the replication factor (which should be the same for each partition)
int replicationFactor = -1;
// if desiredReplicationFactor is negative
int replicationFactor = desiredReplicationFactor;
Set<Integer> partitions = Sets.newTreeSet();
for (Map.Entry<Integer, List<Integer>> entry : currentAssignment.entrySet()) {
int partition = entry.getKey();
List<Integer> replicas = entry.getValue();
partitions.add(partition);
if (replicationFactor < 0) {
replicationFactor = replicas.size();
} else {
} else if (desiredReplicationFactor < 0) {
Preconditions.checkState(replicationFactor == replicas.size(),
"Topic " + topic + " has partition " + partition +
" with unexpected replication factor " + replicas.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testRackAwareExpansion() {
);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, newBrokers, rackAssignments);
topic, currentAssignment, newBrokers, rackAssignments, -1);

Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testClusterExpansion() {
Set<Integer> newBrokers = ImmutableSet.of(10, 11, 12, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap());
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap(), -1);

Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
Expand All @@ -93,7 +93,7 @@ public void testDecommission() {
Set<Integer> newBrokers = ImmutableSet.of(10, 11, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap());
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap(), -1);

Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
Expand Down Expand Up @@ -133,7 +133,7 @@ public void testReplacement() {
Set<Integer> newBrokers = ImmutableSet.of(10, 11, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap());
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap(),-1);

// run basic sanity checks
Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
Expand Down

0 comments on commit 50ba868

Please sign in to comment.