Skip to content

Commit

Permalink
Exclude high repication topics from partition movement
Browse files Browse the repository at this point in the history
  • Loading branch information
mavemuri committed Nov 10, 2020
1 parent 4389c75 commit fbd208a
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 21 deletions.
49 changes: 29 additions & 20 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
# =======================================

# The Kafka cluster to control.
bootstrap.servers=localhost:9092

bootstrap.servers=localhost:9091,localhost:9092,localhost:9093

# The maximum interval in milliseconds between two metadata refreshes.
#metadata.max.age.ms=300000
Expand Down Expand Up @@ -90,22 +91,30 @@ min.samples.per.broker.metrics.window=1

# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
#capacity.config.file=config/capacity.json
capacity.config.file=config/capacityJBOD.json
capacity.config.file=config/capacity.json

# Configurations for the analyzer
# =======================================

# The list of goals to optimize the Kafka cluster for with pre-computed proposals -- consider using RackAwareDistributionGoal instead of RackAwareGoal in clusters with partitions whose replication factor > number of racks
default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
# The list of goals to optimize the Kafka cluster for with pre-computed proposals
default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal

# The list of supported goals
goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal
goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal
#com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal
#

# The list of supported intra-broker goals
intra.broker.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal

# The list of supported hard goals -- consider using RackAwareDistributionGoal instead of RackAwareGoal in clusters with partitions whose replication factor > number of racks
hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
# The list of supported hard goals
hard.goals=
#com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal
#com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal

anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal

self.healing.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal

# The minimum percentage of well monitored partitions out of all the partitions
min.valid.partition.ratio=0.95
Expand Down Expand Up @@ -140,9 +149,13 @@ network.outbound.capacity.threshold=0.8
# The threshold to define the cluster to be in a low CPU utilization state
cpu.low.utilization.threshold=0.0

broker.failure.alert.threshold.ms=60000
broker.failure.self.healing.threshold.ms=60000
# The threshold to define the cluster to be in a low disk utilization state
disk.low.utilization.threshold=0.0

# exclude.highreplication.topics.from.partition.movement=false

# The threshold to define the cluster to be in a low network inbound utilization state
network.inbound.low.utilization.threshold=0.0

Expand Down Expand Up @@ -210,23 +223,17 @@ metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetri
# The anomaly detection interval
#anomaly.detection.interval.ms=10000

# The goal violation to detect -- consider using RackAwareDistributionGoal instead of RackAwareGoal in clusters with partitions whose replication factor > number of racks
anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
# The goal violation to detect.
#anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal

# The interested metrics for metric anomaly analyzer.
metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH

# True if recently demoted brokers are excluded from optimizations during broker failure self healing, false otherwise
broker.failure.exclude.recently.demoted.brokers=true

# True if recently removed brokers are excluded from optimizations during broker failure self healing, false otherwise
broker.failure.exclude.recently.removed.brokers=true

# True if recently demoted brokers are excluded from optimizations during goal violation self healing, false otherwise
goal.violation.exclude.recently.demoted.brokers=true
broker.failure.exclude.recently.demoted.brokers=false

# True if recently removed brokers are excluded from optimizations during goal violation self healing, false otherwise
goal.violation.exclude.recently.removed.brokers=true
goal.violation.exclude.recently.removed.brokers=false

# The zk path to store failed broker information.
failed.brokers.zk.path=/CruiseControlBrokerList
Expand Down Expand Up @@ -277,13 +284,15 @@ max.cached.completed.user.tasks=25
max.active.user.tasks=5

# Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled
self.healing.enabled=false
#self.healing.enabled=true

# Enable self healing for broker failure detector
#self.healing.broker.failure.enabled=true
self.healing.broker.failure.enabled=true

self.healing.exclude.recently.removed.brokers=false

# Enable self healing for goal violation detector
#self.healing.goal.violation.enabled=true
self.healing.goal.violation.enabled=true

# Enable self healing for metric anomaly detector
#self.healing.metric.anomaly.enabled=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ private AnalyzerConfig() {
public static final String TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_DOC = "The topics that should be excluded from the "
+ "partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked.";

/**
* <code>exclude.highreplication.topics.from.partition.movement</code>
*/
public static final String EXCLUDE_HIGHREPLICATION_TOPICS_FROM_PARTITION_MOVEMEMENT_CONFIG =
"exclude.highreplication.topics.from.partition.movement";
public static final boolean DEFAULT_EXCLUDE_HIGHREPLICATION_TOPICS_FROM_PARTITION_MOVEMEMENT = false;
public static final String EXCLUDE_HIGHREPLICATION_TOPICS_FROM_PARTITION_MOVEMEMENT_DOC = "Exclude high replication topics(RF=#brokers) from"
+ "partition movement. This is to allow lower RF topics in the cluster to not be blocked by unfixable topics allowing "
+ "for partial replica distributions for fixable topics";

/**
* <code>goal.violation.distribution.threshold.multiplier</code>
*/
Expand Down Expand Up @@ -488,6 +498,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT,
ConfigDef.Importance.LOW,
TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_DOC)
.define(EXCLUDE_HIGHREPLICATION_TOPICS_FROM_PARTITION_MOVEMEMENT_CONFIG,
ConfigDef.Type.BOOLEAN,
DEFAULT_EXCLUDE_HIGHREPLICATION_TOPICS_FROM_PARTITION_MOVEMEMENT,
ConfigDef.Importance.LOW,
EXCLUDE_HIGHREPLICATION_TOPICS_FROM_PARTITION_MOVEMEMENT_DOC)
.define(GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER_CONFIG,
ConfigDef.Type.DOUBLE,
DEFAULT_GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ public void setBrokerState(int brokerId, Broker.State newState) {
// We need to go through rack so all the cached capacity will be updated.
broker.rack().setBrokerState(brokerId, newState);
_selfHealingEligibleReplicas.addAll(broker.currentOfflineReplicas());
refreshCapacity();
switch (newState) {
case DEAD:
_aliveBrokers.remove(broker);
Expand Down Expand Up @@ -299,6 +298,17 @@ public void setBrokerState(int brokerId, Broker.State newState) {
default:
throw new IllegalArgumentException("Illegal broker state " + newState + " is provided.");
}
if (ModelUtils.excludeHighReplicationFactorTopics()) {
Set<String> topicsWithHighReplicationFactor = _replicationFactorByTopic.entrySet().stream()
.filter(map -> map.getValue() > _aliveBrokers.size())
.map(map -> map.getKey()).collect(Collectors.toSet());
for (Replica r: broker.currentOfflineReplicas()) {
if (topicsWithHighReplicationFactor.contains(r.getJsonStructure().get("topic"))) {
_selfHealingEligibleReplicas.remove(r);
}
}
}
refreshCapacity();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package com.linkedin.kafka.cruisecontrol.model;

import com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.MonitorConfig;
import org.slf4j.Logger;
Expand Down Expand Up @@ -33,15 +34,25 @@ public class ModelUtils {
private static final double ALLOWED_METRIC_ERROR_FACTOR = 1.05;
private static final int UNSTABLE_METRIC_THROUGHPUT_THRESHOLD = 10;
private static boolean _useLinearRegressionModel = false;
private static boolean _excludeHighReplicationFactorTopics = false;

private ModelUtils() {

}

/**
* Set values for useLinearRegressionModel and
* for excludeHighReplicationFactorTopics (default false)
*/
public static void init(KafkaCruiseControlConfig config) {
_excludeHighReplicationFactorTopics = config.getBoolean(AnalyzerConfig.EXCLUDE_HIGHREPLICATION_TOPICS_FROM_PARTITION_MOVEMEMENT_CONFIG);
_useLinearRegressionModel = config.getBoolean(MonitorConfig.USE_LINEAR_REGRESSION_MODEL_CONFIG);
}

public static Boolean excludeHighReplicationFactorTopics() {
return _excludeHighReplicationFactorTopics;
}

/**
* Get the CPU utilization of follower using the load of the leader replica.
*
Expand Down

0 comments on commit fbd208a

Please sign in to comment.