diff --git a/build.gradle b/build.gradle
index e67f5a3b2e..5b628b7ca8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -749,6 +749,7 @@ project(':core') {
dependencies {
compile project(':clients')
+ compile project(':metadata')
compile project(':raft')
compile libs.jacksonDatabind
compile libs.jacksonModuleScala
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index 15a7940f6e..5b9b4185ff 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -49,6 +49,7 @@
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index cd8b4c894d..ad6e885d49 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -469,6 +469,7 @@
+
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
index 6ef77f0fb7..8c41762d76 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
@@ -18,6 +18,7 @@
package org.apache.kafka.connect.rest.basic.auth.extension;
import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.slf4j.Logger;
@@ -26,6 +27,7 @@
import javax.security.auth.login.Configuration;
import java.io.IOException;
import java.util.Map;
+import java.util.function.Supplier;
/**
* Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link
@@ -62,14 +64,38 @@ public class BasicAuthSecurityRestExtension implements ConnectRestExtension {
private static final Logger log = LoggerFactory.getLogger(BasicAuthSecurityRestExtension.class);
+ private static final Supplier CONFIGURATION = initializeConfiguration(Configuration::getConfiguration);
+
// Capture the JVM's global JAAS configuration as soon as possible, as it may be altered later
// by connectors, converters, other REST extensions, etc.
- private static final Configuration CONFIGURATION = Configuration.getConfiguration();
+ static Supplier initializeConfiguration(Supplier configurationSupplier) {
+ try {
+ Configuration configuration = configurationSupplier.get();
+ return () -> configuration;
+ } catch (Exception e) {
+ // We have to be careful not to throw anything here as this static block gets executed during plugin scanning and any exceptions will
+ // cause the worker to fail during startup, even if it's not configured to use the basic auth extension.
+ return () -> {
+ throw new ConnectException("Failed to retrieve JAAS configuration", e);
+ };
+ }
+ }
+
+ private final Supplier configuration;
+
+ public BasicAuthSecurityRestExtension() {
+ this(CONFIGURATION);
+ }
+
+ // For testing
+ BasicAuthSecurityRestExtension(Supplier configuration) {
+ this.configuration = configuration;
+ }
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
log.trace("Registering JAAS basic auth filter");
- restPluginContext.configurable().register(new JaasBasicAuthFilter(CONFIGURATION));
+ restPluginContext.configurable().register(new JaasBasicAuthFilter(configuration.get()));
log.trace("Finished registering JAAS basic auth filter");
}
@@ -80,7 +106,8 @@ public void close() throws IOException {
@Override
public void configure(Map configs) {
-
+ // If we failed to retrieve a JAAS configuration during startup, throw that exception now
+ configuration.get();
}
@Override
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java
index 2d809de0f8..a0ec4bba55 100644
--- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.rest.basic.auth.extension;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -27,7 +28,15 @@
import javax.security.auth.login.Configuration;
import javax.ws.rs.core.Configurable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class BasicAuthSecurityRestExtensionTest {
@@ -63,4 +72,44 @@ public void testJaasConfigurationNotOverwritten() {
assertNotEquals(overwrittenConfiguration, jaasFilter.getValue().configuration,
"Overwritten JAAS configuration should not be used by basic auth REST extension");
}
+
+ @Test
+ public void testBadJaasConfigInitialization() {
+ SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad"));
+ Supplier configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> {
+ throw jaasConfigurationException;
+ });
+
+ ConnectException thrownException = assertThrows(ConnectException.class, configuration::get);
+ assertEquals(jaasConfigurationException, thrownException.getCause());
+ }
+
+ @Test
+ public void testGoodJaasConfigInitialization() {
+ AtomicBoolean configurationInitializerEvaluated = new AtomicBoolean(false);
+ Configuration mockConfiguration = EasyMock.mock(Configuration.class);
+ Supplier configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> {
+ configurationInitializerEvaluated.set(true);
+ return mockConfiguration;
+ });
+
+ assertTrue(configurationInitializerEvaluated.get());
+ assertEquals(mockConfiguration, configuration.get());
+ }
+
+ @Test
+ public void testBadJaasConfigExtensionSetup() {
+ SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad"));
+ Supplier configuration = () -> {
+ throw jaasConfigurationException;
+ };
+
+ BasicAuthSecurityRestExtension extension = new BasicAuthSecurityRestExtension(configuration);
+
+ Exception thrownException = assertThrows(Exception.class, () -> extension.configure(Collections.emptyMap()));
+ assertEquals(jaasConfigurationException, thrownException);
+
+ thrownException = assertThrows(Exception.class, () -> extension.register(EasyMock.mock(ConnectRestExtensionContext.class)));
+ assertEquals(jaasConfigurationException, thrownException);
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index 8b213f798d..6f287d060d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -16,11 +16,9 @@
*/
package org.apache.kafka.connect.util.clusters;
-import kafka.server.BrokerState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
-import kafka.server.RunningAsBroker;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
@@ -47,6 +45,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.metadata.BrokerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -241,13 +240,13 @@ public String zKConnectString() {
}
/**
- * Get the brokers that have a {@link RunningAsBroker} state.
+ * Get the brokers that have a {@link BrokerState#RUNNING} state.
*
* @return the list of {@link KafkaServer} instances that are running;
* never null but possibly empty
*/
public Set runningBrokers() {
- return brokersInState(state -> state.currentState() == RunningAsBroker.state());
+ return brokersInState(state -> state == BrokerState.RUNNING);
}
/**
@@ -264,7 +263,7 @@ public Set brokersInState(Predicate desiredState) {
protected boolean hasState(KafkaServer server, Predicate desiredState) {
try {
- return desiredState.test(server.brokerState());
+ return desiredState.test(server.brokerState().get());
} catch (Throwable e) {
// Broker failed to respond.
return false;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index f5f77e0349..761ba69bfa 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -20,16 +20,17 @@ package kafka.log
import java.io._
import java.nio.file.Files
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile
-import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
+import kafka.server._
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
+import org.apache.kafka.metadata.BrokerState
import scala.jdk.CollectionConverters._
import scala.collection._
@@ -60,7 +61,7 @@ class LogManager(logDirs: Seq[File],
val retentionCheckMs: Long,
val maxPidExpirationMs: Int,
scheduler: Scheduler,
- val brokerState: BrokerState,
+ val brokerState: AtomicReference[BrokerState],
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time) extends Logging with KafkaMetricsGroup {
@@ -326,7 +327,7 @@ class LogManager(logDirs: Seq[File],
} else {
// log recovery itself is being performed by `Log` class during initialization
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
- brokerState.newState(RecoveringFromUncleanShutdown)
+ brokerState.set(BrokerState.RECOVERY)
}
var recoveryPoints = Map[TopicPartition, Long]()
@@ -1182,7 +1183,7 @@ object LogManager {
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
zkClient: KafkaZkClient,
- brokerState: BrokerState,
+ brokerState: AtomicReference[BrokerState],
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 88abe460bb..9eca82ee48 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -20,7 +20,7 @@ package kafka.server
import java.io.{File, IOException}
import java.net.{InetAddress, SocketTimeoutException}
import java.util.concurrent._
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1}
import kafka.cluster.Broker
@@ -46,6 +46,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
+import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.zookeeper.client.ZKClientConfig
@@ -102,7 +103,7 @@ class KafkaServer(
var kafkaYammerMetrics: KafkaYammerMetrics = null
var metrics: Metrics = null
- val brokerState: BrokerState = new BrokerState
+ val brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING)
var dataPlaneRequestProcessor: KafkaApis = null
var controlPlaneRequestProcessor: KafkaApis = null
@@ -166,7 +167,7 @@ class KafkaServer(
private[kafka] def featureChangeListener = _featureChangeListener
- newGauge("BrokerState", () => brokerState.currentState)
+ newGauge("BrokerState", () => brokerState.get().value())
newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
@@ -193,7 +194,7 @@ class KafkaServer(
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
- brokerState.newState(Starting)
+ brokerState.set(BrokerState.STARTING)
/* setup zookeeper */
initZkClient(time)
@@ -380,7 +381,7 @@ class KafkaServer(
socketServer.startProcessingRequests(authorizerFutures)
- brokerState.newState(RunningAsBroker)
+ brokerState.set(BrokerState.RUNNING)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
@@ -632,7 +633,7 @@ class KafkaServer(
// the shutdown.
info("Starting controlled shutdown")
- brokerState.newState(PendingControlledShutdown)
+ brokerState.set(BrokerState.PENDING_CONTROLLED_SHUTDOWN)
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
@@ -657,7 +658,7 @@ class KafkaServer(
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown(), this)
- brokerState.newState(BrokerShuttingDown)
+ brokerState.set(BrokerState.SHUTTING_DOWN)
if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
@@ -726,7 +727,7 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
- brokerState.newState(NotRunning)
+ brokerState.set(BrokerState.NOT_RUNNING)
startupComplete.set(false)
isShuttingDown.set(false)
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala b/core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala
new file mode 100644
index 0000000000..ad4986e173
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.ThreadLocalRandom
+
+import kafka.cluster.BrokerEndPoint
+import kafka.common.BrokerEndPointNotAvailableException
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metadata.RegisterBrokerRecord
+import org.apache.kafka.common.network.ListenerName
+import org.slf4j.Logger
+
+import scala.jdk.CollectionConverters._
+
+object MetadataBroker {
+ def apply(record: RegisterBrokerRecord): MetadataBroker = {
+ new MetadataBroker(record.brokerId, record.rack,
+ record.endPoints().asScala.map { endPoint =>
+ endPoint.name() ->
+ new Node(record.brokerId, endPoint.host, endPoint.port, record.rack)
+ }.toMap,
+ true)
+ }
+}
+
+case class MetadataBroker(id: Int,
+ rack: String,
+ endpoints: collection.Map[String, Node],
+ fenced: Boolean) {
+ def brokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
+ endpoints.get(listenerName.value) match {
+ case None => throw new BrokerEndPointNotAvailableException(
+ s"End point with listener name ${listenerName.value} not found for broker $id")
+ case Some(node) => new BrokerEndPoint(node.id, node.host, node.port)
+ }
+ }
+}
+
+class MetadataBrokersBuilder(log: Logger, prevBrokers: MetadataBrokers) {
+ private var newBrokerMap = prevBrokers.cloneBrokerMap()
+
+ def add(broker: MetadataBroker): Unit = {
+ newBrokerMap.put(broker.id, broker)
+ }
+
+ def changeFencing(id: Int, fenced: Boolean): Unit = {
+ val broker = newBrokerMap.get(id)
+ if (broker == null) {
+ throw new RuntimeException(s"Unknown broker id ${id}")
+ }
+ val newBroker = new MetadataBroker(broker.id, broker.rack, broker.endpoints, fenced)
+ newBrokerMap.put(id, newBroker)
+ }
+
+ def remove(id: Int): Unit = {
+ newBrokerMap.remove(id)
+ }
+
+ def get(brokerId: Int): Option[MetadataBroker] = Option(newBrokerMap.get(brokerId))
+
+ def build(): MetadataBrokers = {
+ val result = MetadataBrokers(log, newBrokerMap)
+ newBrokerMap = Collections.unmodifiableMap(newBrokerMap)
+ result
+ }
+}
+
+object MetadataBrokers {
+ def apply(log: Logger,
+ brokerMap: util.Map[Integer, MetadataBroker]): MetadataBrokers = {
+ var listenersIdenticalAcrossBrokers = true
+ var prevListeners: collection.Set[String] = null
+ val _aliveBrokers = new util.ArrayList[MetadataBroker](brokerMap.size())
+ brokerMap.values().iterator().asScala.foreach { broker =>
+ if (!broker.fenced) {
+ if (prevListeners == null) {
+ prevListeners = broker.endpoints.keySet
+ } else if (!prevListeners.equals(broker.endpoints.keySet)) {
+ listenersIdenticalAcrossBrokers = false
+ }
+ _aliveBrokers.add(broker)
+ }
+ }
+ if (!listenersIdenticalAcrossBrokers) {
+ log.error("Listeners are not identical across alive brokers. " +
+ _aliveBrokers.asScala.map(
+ broker => s"${broker.id}: ${broker.endpoints.keySet.mkString(", ")}"))
+ }
+ new MetadataBrokers(_aliveBrokers, brokerMap)
+ }
+}
+
+case class MetadataBrokers(private val aliveBrokersList: util.List[MetadataBroker],
+ private val brokerMap: util.Map[Integer, MetadataBroker]) {
+ def size(): Int = brokerMap.size()
+
+ def iterator(): Iterator[MetadataBroker] = brokerMap.values().iterator().asScala
+
+ def cloneBrokerMap(): util.Map[Integer, MetadataBroker] = {
+ val result = new util.HashMap[Integer, MetadataBroker]
+ result.putAll(brokerMap)
+ result
+ }
+
+ def aliveBroker(id: Int): Option[MetadataBroker] = {
+ get(id).filter(!_.fenced)
+ }
+
+ def randomAliveBrokerId(): Option[Int] = {
+ if (aliveBrokersList.isEmpty) {
+ None
+ } else {
+ Some(aliveBrokersList.get(ThreadLocalRandom.current().nextInt(aliveBrokersList.size())).id)
+ }
+ }
+
+ def aliveBrokers(): collection.Seq[MetadataBroker] = aliveBrokersList.asScala
+
+ def get(id: Int): Option[MetadataBroker] = Option(brokerMap.get(id))
+}
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataImage.scala b/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
new file mode 100755
index 0000000000..7723fb6898
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util
+import java.util.Collections
+
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.slf4j.Logger
+
+case class MetadataImageBuilder(brokerId: Int,
+ log: Logger,
+ prevImage: MetadataImage) {
+ private var _partitionsBuilder: MetadataPartitionsBuilder = null
+ private var _controllerId = prevImage.controllerId
+ private var _brokersBuilder: MetadataBrokersBuilder = null
+
+ def partitionsBuilder(): MetadataPartitionsBuilder = {
+ if (_partitionsBuilder == null) {
+ _partitionsBuilder = new MetadataPartitionsBuilder(brokerId, prevImage.partitions)
+ }
+ _partitionsBuilder
+ }
+
+ def hasPartitionChanges: Boolean = _partitionsBuilder != null
+
+ def topicIdToName(topicId: Uuid): Option[String] = {
+ if (_partitionsBuilder != null) {
+ _partitionsBuilder.topicIdToName(topicId)
+ } else {
+ prevImage.topicIdToName(topicId)
+ }
+ }
+
+ def controllerId(controllerId: Option[Int]): Unit = {
+ _controllerId = controllerId
+ }
+
+ def brokersBuilder(): MetadataBrokersBuilder = {
+ if (_brokersBuilder == null) {
+ _brokersBuilder = new MetadataBrokersBuilder(log, prevImage.brokers)
+ }
+ _brokersBuilder
+ }
+
+ def broker(brokerId: Int): Option[MetadataBroker] = {
+ if (_brokersBuilder == null) {
+ prevImage.brokers.get(brokerId)
+ } else {
+ _brokersBuilder.get(brokerId)
+ }
+ }
+
+ def partition(topicName: String, partitionId: Int): Option[MetadataPartition] = {
+ if (_partitionsBuilder == null) {
+ prevImage.partitions.topicPartition(topicName, partitionId)
+ } else {
+ _partitionsBuilder.get(topicName, partitionId)
+ }
+ }
+
+ def hasChanges: Boolean = {
+ _partitionsBuilder != null ||
+ !_controllerId.equals(prevImage.controllerId) ||
+ _brokersBuilder != null
+ }
+
+ def build(): MetadataImage = {
+ val nextPartitions = if (_partitionsBuilder == null) {
+ prevImage.partitions
+ } else {
+ _partitionsBuilder.build()
+ }
+ val nextBrokers = if (_brokersBuilder == null) {
+ prevImage.brokers
+ } else {
+ _brokersBuilder.build()
+ }
+ MetadataImage(nextPartitions, _controllerId, nextBrokers)
+ }
+}
+
+case class MetadataImage(partitions: MetadataPartitions,
+ controllerId: Option[Int],
+ brokers: MetadataBrokers) {
+ def this() = {
+ this(MetadataPartitions(Collections.emptyMap(), Collections.emptyMap()),
+ None,
+ new MetadataBrokers(Collections.emptyList(), new util.HashMap[Integer, MetadataBroker]()))
+ }
+
+ def contains(partition: TopicPartition): Boolean =
+ partitions.topicPartition(partition.topic(), partition.partition()).isDefined
+
+ def contains(topic: String): Boolean = partitions.topicPartitions(topic).hasNext
+
+ def aliveBroker(id: Int): Option[MetadataBroker] = brokers.aliveBroker(id)
+
+ def numAliveBrokers(): Int = brokers.aliveBrokers().size
+
+ def controller(): Option[MetadataBroker] = controllerId.flatMap(id => brokers.aliveBroker(id))
+
+ def topicIdToName(uuid: Uuid): Option[String] = {
+ partitions.topicIdToName(uuid)
+ }
+}
+
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala b/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
new file mode 100644
index 0000000000..bed5c58ce9
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util
+import java.util.Collections
+
+import org.apache.kafka.common.message.LeaderAndIsrRequestData
+import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
+import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.metadata.{IsrChangeRecord, PartitionRecord}
+import org.apache.kafka.common.{TopicPartition, Uuid}
+
+import scala.jdk.CollectionConverters._
+
+
+object MetadataPartition {
+ def apply(name: String, record: PartitionRecord): MetadataPartition = {
+ MetadataPartition(name,
+ record.partitionId(),
+ record.leader(),
+ record.leaderEpoch(),
+ record.replicas(),
+ record.isr(),
+ Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
+ Collections.emptyList(),
+ Collections.emptyList())
+ }
+
+ def apply(prevPartition: Option[MetadataPartition],
+ partition: UpdateMetadataPartitionState): MetadataPartition = {
+ new MetadataPartition(partition.topicName(),
+ partition.partitionIndex(),
+ partition.leader(),
+ partition.leaderEpoch(),
+ partition.replicas(),
+ partition.isr(),
+ partition.offlineReplicas(),
+ prevPartition.flatMap(p => Some(p.addingReplicas)).getOrElse(Collections.emptyList()),
+ prevPartition.flatMap(p => Some(p.removingReplicas)).getOrElse(Collections.emptyList())
+ )
+ }
+}
+
+case class MetadataPartition(topicName: String,
+ partitionIndex: Int,
+ leaderId: Int,
+ leaderEpoch: Int,
+ replicas: util.List[Integer],
+ isr: util.List[Integer],
+ offlineReplicas: util.List[Integer],
+ addingReplicas: util.List[Integer],
+ removingReplicas: util.List[Integer]) {
+ def toTopicPartition: TopicPartition = new TopicPartition(topicName, partitionIndex)
+
+ def toLeaderAndIsrPartitionState(isNew: Boolean): LeaderAndIsrRequestData.LeaderAndIsrPartitionState = {
+ new LeaderAndIsrPartitionState().setTopicName(topicName).
+ setPartitionIndex(partitionIndex).
+ setLeader(leaderId).
+ setLeaderEpoch(leaderEpoch).
+ setReplicas(replicas).
+ setIsr(isr).
+ setAddingReplicas(addingReplicas).
+ setRemovingReplicas(removingReplicas).
+ setIsNew(isNew)
+ // Note: we don't set ZKVersion here.
+ }
+
+ def isReplicaFor(brokerId: Int): Boolean = replicas.contains(Integer.valueOf(brokerId))
+
+ def copyWithIsrChanges(record: IsrChangeRecord): MetadataPartition = {
+ MetadataPartition(topicName,
+ partitionIndex,
+ record.leader(),
+ record.leaderEpoch(),
+ replicas,
+ record.isr(),
+ offlineReplicas,
+ addingReplicas,
+ removingReplicas)
+ }
+}
+
+class MetadataPartitionsBuilder(val brokerId: Int,
+ val prevPartitions: MetadataPartitions) {
+ private var newNameMap = prevPartitions.copyNameMap()
+ private var newIdMap = prevPartitions.copyIdMap()
+ private val changed = Collections.newSetFromMap[Any](new util.IdentityHashMap())
+ private val _localChanged = new util.HashSet[MetadataPartition]
+ private val _localRemoved = new util.HashSet[MetadataPartition]
+
+ def topicIdToName(id: Uuid): Option[String] = Option(newIdMap.get(id))
+
+ def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
+ Option(newIdMap.remove(id)) match {
+ case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
+ case Some(name) => newNameMap.remove(name).values().asScala
+ }
+ }
+
+ def handleIsrChange(record: IsrChangeRecord): Unit = {
+ Option(newIdMap.get(record.topicId())) match {
+ case None => throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId()}")
+ case Some(name) => Option(newNameMap.get(name)) match {
+ case None => throw new RuntimeException(s"Unable to locate topic with name $name")
+ case Some(partitionMap) => Option(partitionMap.get(record.partitionId())) match {
+ case None => throw new RuntimeException(s"Unable to locate $name-${record.partitionId}")
+ case Some(partition) => set(partition.copyWithIsrChanges(record))
+ }
+ }
+ }
+ }
+
+ def addUuidMapping(name: String, id: Uuid): Unit = {
+ newIdMap.put(id, name)
+ }
+
+ def removeUuidMapping(id: Uuid): Unit = {
+ newIdMap.remove(id)
+ }
+
+ def get(topicName: String, partitionId: Int): Option[MetadataPartition] = {
+ Option(newNameMap.get(topicName)).flatMap(m => Option(m.get(partitionId)))
+ }
+
+ def set(partition: MetadataPartition): Unit = {
+ val prevPartitionMap = newNameMap.get(partition.topicName)
+ val newPartitionMap = if (prevPartitionMap == null) {
+ val m = new util.HashMap[Int, MetadataPartition](1)
+ changed.add(m)
+ m
+ } else if (changed.contains(prevPartitionMap)) {
+ prevPartitionMap
+ } else {
+ val m = new util.HashMap[Int, MetadataPartition](prevPartitionMap.size() + 1)
+ m.putAll(prevPartitionMap)
+ changed.add(m)
+ m
+ }
+ val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
+ if (partition.isReplicaFor(brokerId)) {
+ _localChanged.add(partition)
+ } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
+ _localRemoved.add(prevPartition)
+ }
+ newNameMap.put(partition.topicName, newPartitionMap)
+ }
+
+ def remove(topicName: String, partitionId: Int): Unit = {
+ val prevPartitionMap = newNameMap.get(topicName)
+ if (prevPartitionMap != null) {
+ if (changed.contains(prevPartitionMap)) {
+ val prevPartition = prevPartitionMap.remove(partitionId)
+ if (prevPartition.isReplicaFor(brokerId)) {
+ _localRemoved.add(prevPartition)
+ }
+ } else {
+ Option(prevPartitionMap.get(partitionId)).foreach { prevPartition =>
+ if (prevPartition.isReplicaFor(brokerId)) {
+ _localRemoved.add(prevPartition)
+ }
+ val newPartitionMap = new util.HashMap[Int, MetadataPartition](prevPartitionMap.size() - 1)
+ prevPartitionMap.forEach { (prevPartitionId, prevPartition) =>
+ if (!prevPartitionId.equals(partitionId)) {
+ newPartitionMap.put(prevPartitionId, prevPartition)
+ }
+ }
+ changed.add(newPartitionMap)
+ newNameMap.put(topicName, newPartitionMap)
+ }
+ }
+ }
+ }
+
+ def build(): MetadataPartitions = {
+ val result = MetadataPartitions(newNameMap, newIdMap)
+ newNameMap = Collections.unmodifiableMap(newNameMap)
+ newIdMap = Collections.unmodifiableMap(newIdMap)
+ result
+ }
+
+ def localChanged(): collection.Set[MetadataPartition] = _localChanged.asScala
+
+ def localRemoved(): collection.Set[MetadataPartition] = _localRemoved.asScala
+}
+
+case class MetadataPartitions(private val nameMap: util.Map[String, util.Map[Int, MetadataPartition]],
+ private val idMap: util.Map[Uuid, String]) {
+ def topicIdToName(uuid: Uuid): Option[String] = Option(idMap.get(uuid))
+
+ def copyNameMap(): util.Map[String, util.Map[Int, MetadataPartition]] = {
+ val copy = new util.HashMap[String, util.Map[Int, MetadataPartition]](nameMap.size())
+ copy.putAll(nameMap)
+ copy
+ }
+
+ def copyIdMap(): util.Map[Uuid, String] = {
+ val copy = new util.HashMap[Uuid, String](idMap.size())
+ copy.putAll(idMap)
+ copy
+ }
+
+ def allPartitions(): Iterator[MetadataPartition] = new AllPartitionsIterator(nameMap).asScala
+
+ def allTopicNames(): collection.Set[String] = nameMap.keySet().asScala
+
+ def numTopicPartitions(topicName: String): Option[Int] = {
+ val partitionMap = nameMap.get(topicName)
+ if (partitionMap == null) {
+ None
+ } else {
+ Some(partitionMap.size())
+ }
+ }
+
+ def topicPartitions(topicName: String): Iterator[MetadataPartition] = {
+ val partitionMap = nameMap.get(topicName)
+ if (partitionMap == null) {
+ Collections.emptyIterator().asScala
+ } else {
+ partitionMap.values().iterator().asScala
+ }
+ }
+
+ def topicPartition(topicName: String, partitionId: Int): Option[MetadataPartition] = {
+ Option(nameMap.get(topicName)).flatMap(m => Option(m.get(partitionId)))
+ }
+
+ def contains(topicName: String): Boolean = nameMap.containsKey(topicName)
+}
+
+class AllPartitionsIterator(nameMap: util.Map[String, util.Map[Int, MetadataPartition]])
+ extends util.Iterator[MetadataPartition] {
+
+ val outerIterator: util.Iterator[util.Map[Int, MetadataPartition]] = nameMap.values().iterator()
+
+ var innerIterator: util.Iterator[MetadataPartition] = Collections.emptyIterator()
+
+ var _next: MetadataPartition = _
+
+ override def hasNext: Boolean = {
+ if (_next != null) {
+ true
+ } else {
+ while (!innerIterator.hasNext) {
+ if (!outerIterator.hasNext) {
+ return false
+ } else {
+ innerIterator = outerIterator.next().values().iterator()
+ }
+ }
+ _next = innerIterator.next()
+ true
+ }
+ }
+
+ override def next(): MetadataPartition = {
+ if (!hasNext()) {
+ throw new NoSuchElementException()
+ }
+ val result = _next
+ _next = null
+ result
+ }
+}
diff --git a/core/src/test/scala/kafka/server/metadata/MetadataBrokersTest.scala b/core/src/test/scala/kafka/server/metadata/MetadataBrokersTest.scala
new file mode 100644
index 0000000000..dd7316e697
--- /dev/null
+++ b/core/src/test/scala/kafka/server/metadata/MetadataBrokersTest.scala
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util.Collections
+import kafka.utils.TestUtils
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+
+
+@Timeout(value = 120000, unit = TimeUnit.MILLISECONDS)
+class MetadataBrokersTest {
+
+ private val log = LoggerFactory.getLogger(classOf[MetadataBrokersTest])
+
+ val emptyBrokers = new MetadataBrokers(Collections.emptyList(), Collections.emptyMap())
+
+ @Test
+ def testBuildBrokers(): Unit = {
+ val builder = new MetadataBrokersBuilder(log, emptyBrokers)
+ builder.add(TestUtils.createMetadataBroker(0))
+ builder.add(TestUtils.createMetadataBroker(1))
+ builder.add(TestUtils.createMetadataBroker(2))
+ builder.add(TestUtils.createMetadataBroker(3))
+ builder.remove(0)
+ val brokers = builder.build()
+ val found = new mutable.HashSet[MetadataBroker]
+ brokers.iterator().foreach { found += _ }
+ val expected = new mutable.HashSet[MetadataBroker]
+ expected += TestUtils.createMetadataBroker(1)
+ expected += TestUtils.createMetadataBroker(2)
+ expected += TestUtils.createMetadataBroker(3)
+ assertEquals(expected, found)
+ }
+
+ @Test
+ def testChangeFencing(): Unit = {
+ val builder = new MetadataBrokersBuilder(log, emptyBrokers)
+ assertEquals(None, builder.get(0))
+ assertThrows(classOf[RuntimeException], () => builder.changeFencing(0, false))
+ builder.add(TestUtils.createMetadataBroker(0, fenced = true))
+ assertTrue(builder.get(0).get.fenced)
+ builder.changeFencing(0, false)
+ assertFalse(builder.get(0).get.fenced)
+ val brokers = builder.build()
+ assertTrue(brokers.aliveBroker(0).isDefined)
+ }
+
+ @Test
+ def testAliveBrokers(): Unit = {
+ val builder = new MetadataBrokersBuilder(log, emptyBrokers)
+ builder.add(TestUtils.createMetadataBroker(0))
+ builder.add(TestUtils.createMetadataBroker(1))
+ builder.add(TestUtils.createMetadataBroker(2))
+ builder.changeFencing(1, true)
+ val brokers = builder.build()
+ assertEquals(2, brokers.aliveBrokers().size)
+ assertTrue(brokers.aliveBrokers().exists(_.id == 0))
+ assertTrue(!brokers.aliveBrokers().exists(_.id == 1))
+ assertTrue(brokers.aliveBrokers().exists(_.id == 2))
+ while (!brokers.randomAliveBrokerId().contains(0)) { }
+ while (!brokers.randomAliveBrokerId().contains(2)) { }
+ assertEquals(3, brokers.size())
+ assertEquals(Some(TestUtils.createMetadataBroker(0)), brokers.get(0))
+ assertEquals(Some(TestUtils.createMetadataBroker(1, fenced = true)), brokers.get(1))
+ assertEquals(Some(TestUtils.createMetadataBroker(2)), brokers.get(2))
+ assertEquals(None, brokers.get(3))
+ assertEquals(Some(TestUtils.createMetadataBroker(0)), brokers.aliveBroker(0))
+ assertEquals(None, brokers.aliveBroker(1))
+ }
+}
diff --git a/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala b/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
new file mode 100644
index 0000000000..1b4cff7156
--- /dev/null
+++ b/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util.Collections
+import org.apache.kafka.common.Uuid
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+
+@Timeout(value = 120000, unit = TimeUnit.MILLISECONDS)
+class MetadataPartitionsTest {
+
+ val emptyPartitions = MetadataPartitions(Collections.emptyMap(), Collections.emptyMap())
+
+ private def newPartition(topicName: String,
+ partitionIndex: Int,
+ replicas: Option[Seq[Int]] = None,
+ isr: Option[Seq[Int]] = None): MetadataPartition = {
+ val effectiveReplicas = replicas
+ .getOrElse(List(partitionIndex, partitionIndex + 1, partitionIndex + 2))
+ .map(Int.box)
+ .toList.asJava
+
+ val effectiveIsr = isr match {
+ case None => effectiveReplicas
+ case Some(s) => s.map(Integer.valueOf).toList.asJava
+ }
+ new MetadataPartition(topicName,
+ partitionIndex,
+ partitionIndex % 3, 100,
+ effectiveReplicas,
+ effectiveIsr,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList())
+ }
+
+ @Test
+ def testBuildPartitions(): Unit = {
+ val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+ assertEquals(None, builder.get("foo", 0))
+ builder.set(newPartition("foo", 0))
+ assertEquals(Some(newPartition("foo", 0)), builder.get("foo", 0))
+ assertEquals(None, builder.get("foo", 1))
+ builder.set(newPartition("foo", 1))
+ builder.set(newPartition("bar", 0))
+ val partitions = builder.build()
+ assertEquals(Some(newPartition("foo", 0)), partitions.topicPartition("foo", 0))
+ assertEquals(Some(newPartition("foo", 1)), partitions.topicPartition("foo", 1))
+ assertEquals(None, partitions.topicPartition("foo", 2))
+ assertEquals(Some(newPartition("bar", 0)), partitions.topicPartition("bar", 0))
+ }
+
+ @Test
+ def testAllPartitionsIterator(): Unit = {
+ val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+ val expected = new mutable.HashSet[MetadataPartition]()
+ expected += newPartition("foo", 0)
+ expected += newPartition("foo", 1)
+ expected += newPartition("foo", 2)
+ expected += newPartition("bar", 0)
+ expected += newPartition("bar", 1)
+ expected += newPartition("baz", 0)
+ expected.foreach { builder.set }
+ val partitions = builder.build()
+ val found = new mutable.HashSet[MetadataPartition]()
+ partitions.allPartitions().foreach { found += _ }
+ assertEquals(expected, found)
+ }
+
+ @Test
+ def testLocalChangedAndRemoved(): Unit = {
+ val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+ builder.set(newPartition("foo", 0))
+ assertTrue(newPartition("foo", 0).isReplicaFor(0))
+ assertFalse(newPartition("foo", 0).isReplicaFor(4))
+ builder.set(newPartition("foo", 1))
+ builder.set(newPartition("foo", 2))
+ builder.set(newPartition("bar", 0))
+ val expectedLocalChanged = new mutable.HashSet[MetadataPartition]()
+ expectedLocalChanged += newPartition("foo", 0)
+ expectedLocalChanged += newPartition("bar", 0)
+ assertEquals(expectedLocalChanged, builder.localChanged())
+ assertEquals(Set(), builder.localRemoved())
+ val image = builder.build()
+ assertEquals(Some(3), image.numTopicPartitions("foo"))
+ assertEquals(None, image.numTopicPartitions("quux"))
+
+ val builder2 = new MetadataPartitionsBuilder(1, image)
+ builder2.set(newPartition("foo", 0, replicas = Some(Seq(2, 3, 4))))
+ builder2.set(newPartition("foo", 1, isr = Some(Seq(0, 1))))
+ builder2.set(newPartition("bar", 2))
+ builder2.remove("bar", 0)
+ builder2.remove("foo", 2)
+ val expectedLocalChanged2 = new mutable.HashSet[MetadataPartition]()
+ expectedLocalChanged2 += newPartition("foo", 1, isr = Some(Seq(0, 1)))
+ assertEquals(expectedLocalChanged2, builder2.localChanged())
+ val expectedLocalRemoved2 = new mutable.HashSet[MetadataPartition]()
+ expectedLocalRemoved2 += newPartition("bar", 0)
+ expectedLocalRemoved2 += newPartition("foo", 0)
+ assertEquals(expectedLocalRemoved2, builder2.localRemoved())
+ }
+
+ @Test
+ def testAllTopicNames(): Unit = {
+ val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+ builder.set(newPartition("foo", 0))
+ builder.set(newPartition("foo", 1))
+ builder.set(newPartition("foo", 2))
+ builder.set(newPartition("bar", 0))
+ builder.set(newPartition("baz", 0))
+ builder.set(newPartition("baz", 1))
+ val image = builder.build()
+ val expectedTopicNames = new mutable.HashSet[String]()
+ expectedTopicNames += "foo"
+ expectedTopicNames += "bar"
+ expectedTopicNames += "baz"
+ assertEquals(expectedTopicNames, image.allTopicNames())
+ }
+
+ @Test
+ def testUuidMappings(): Unit = {
+ val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+ builder.addUuidMapping("foo", Uuid.fromString("qbUrhSpXTau_836U7T5ktg"))
+ builder.addUuidMapping("bar", Uuid.fromString("a1I0JF3yRzWFyOuY3F_vHw"))
+ builder.removeUuidMapping(Uuid.fromString("gdMy05W7QWG4ZjWir1DjBw"))
+ val image = builder.build()
+ assertEquals(Some("foo"), image.topicIdToName(Uuid.fromString("qbUrhSpXTau_836U7T5ktg")))
+ assertEquals(Some("bar"), image.topicIdToName(Uuid.fromString("a1I0JF3yRzWFyOuY3F_vHw")))
+ assertEquals(None, image.topicIdToName(Uuid.fromString("gdMy05W7QWG4ZjWir1DjBw")))
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index ee107dc768..e9694103d7 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,6 +20,7 @@ package kafka.log
import java.io._
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
+import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{Callable, Executors}
import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties}
@@ -30,7 +31,7 @@ import kafka.log.Log.DeleteDirSuffix
import kafka.metrics.KafkaYammerMetrics
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
import kafka.utils._
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.errors._
@@ -41,6 +42,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.metadata.BrokerState
import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -96,7 +98,8 @@ class LogTest {
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], topicConfigs = Map(),
initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4,
flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L,
- retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, brokerState = BrokerState(),
+ retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time,
+ brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING),
brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) {
override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 13cd74e5a3..720dbaf279 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
import java.util.Properties
+
import kafka.api.IntegrationTestHarness
import kafka.network.SocketServer
import kafka.utils.NotNothing
@@ -28,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, RequestTestUtils, ResponseHeader}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.metadata.BrokerState
import scala.annotation.nowarn
import scala.collection.Seq
@@ -51,8 +53,8 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
def anySocketServer: SocketServer = {
servers.find { server =>
- val state = server.brokerState.currentState
- state != NotRunning.state && state != BrokerShuttingDown.state
+ val state = server.brokerState.get()
+ state != BrokerState.NOT_RUNNING && state != BrokerState.SHUTTING_DOWN
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 1b4508f0e1..d04866ebe7 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.test.TestUtils.isValidClusterId
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -332,7 +333,7 @@ class MetadataRequestTest extends BaseRequestTest {
@Test
def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
- val activeBrokers = servers.filter(_.brokerState.currentState != NotRunning.state)
+ val activeBrokers = servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING)
val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
// Assert that topic metadata at new brokers is updated correctly
@@ -378,7 +379,7 @@ class MetadataRequestTest extends BaseRequestTest {
val brokersInController = controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)
// Assert that metadata is propagated correctly
- servers.filter(_.brokerState.currentState != NotRunning.state).foreach { broker =>
+ servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING).foreach { broker =>
TestUtils.waitUntilTrue(() => {
val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
Some(brokerSocketServer(broker.config.brokerId)))
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index a93949eade..0239465b64 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
@@ -171,10 +172,10 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// goes wrong so that awaitShutdown doesn't hang
case e: Exception =>
assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
- assertEquals(NotRunning.state, server.brokerState.currentState)
+ assertEquals(BrokerState.NOT_RUNNING, server.brokerState.get())
}
finally {
- if (server.brokerState.currentState != NotRunning.state)
+ if (server.brokerState.get() != BrokerState.NOT_RUNNING)
server.shutdown()
server.awaitShutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 24130312f6..cc5706e40c 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -20,8 +20,8 @@ package kafka.server
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.KafkaException
+import org.apache.kafka.metadata.BrokerState
import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@@ -94,24 +94,15 @@ class ServerStartupTest extends ZooKeeperTestHarness {
@Test
def testBrokerStateRunningAfterZK(): Unit = {
val brokerId = 0
- val mockBrokerState: BrokerState = EasyMock.niceMock(classOf[BrokerState])
-
- class BrokerStateInterceptor() extends BrokerState {
- override def newState(newState: BrokerStates): Unit = {
- val brokers = zkClient.getAllBrokersInCluster
- assertEquals(1, brokers.size)
- assertEquals(brokerId, brokers.head.id)
- }
- }
-
- class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {}
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
- server = new MockKafkaServer(KafkaConfig.fromProps(props))
-
- EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once()
- EasyMock.replay(mockBrokerState)
+ server = new KafkaServer(KafkaConfig.fromProps(props))
server.startup()
+ TestUtils.waitUntilTrue(() => server.brokerState.get() == BrokerState.RUNNING,
+ "waiting for the broker state to become RUNNING")
+ val brokers = zkClient.getAllBrokersInCluster
+ assertEquals(1, brokers.size)
+ assertEquals(brokerId, brokers.head.id)
}
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 998df4632a..12cd15c24e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,7 +23,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.{Arrays, Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
@@ -37,6 +37,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
import com.yammer.metrics.core.Meter
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.metrics.KafkaYammerMetrics
+import kafka.server.metadata.MetadataBroker
import kafka.utils.Implicits._
import kafka.zk._
import org.apache.kafka.clients.CommonClientConfigs
@@ -59,7 +60,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.Utils._
-import org.apache.kafka.common.{KafkaFuture, TopicPartition}
+import org.apache.kafka.common.{KafkaFuture, Node, TopicPartition}
+import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.zookeeper.KeeperException.SessionExpiredException
@@ -173,6 +175,16 @@ object TestUtils extends Logging {
def createBroker(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Broker =
new Broker(id, host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
+ def createMetadataBroker(id: Int,
+ host: String = "localhost",
+ port: Int = 9092,
+ securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
+ rack: Option[String] = None,
+ fenced: Boolean = false): MetadataBroker = {
+ MetadataBroker(id, rack.getOrElse(null),
+ Map(securityProtocol.name -> new Node(id, host, port, rack.getOrElse(null))), fenced)
+ }
+
def createBrokerAndEpoch(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
epoch: Long = 0): (Broker, Long) = {
(new Broker(id, host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol), epoch)
@@ -1076,7 +1088,7 @@ object TestUtils extends Logging {
maxPidExpirationMs = 60 * 60 * 1000,
scheduler = time.scheduler,
time = time,
- brokerState = BrokerState(),
+ brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING),
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(logDirs.size))
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 26fb960855..a396054688 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -28,7 +28,6 @@
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
-import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
@@ -56,6 +55,7 @@
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -88,6 +88,7 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark)
@Fork(value = 1)
@@ -132,7 +133,7 @@ public void setup() throws IOException {
1000L,
60000,
scheduler,
- new BrokerState(),
+ new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats,
logDirFailureChannel,
Time.SYSTEM);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 9598390b35..dcff8c4eb2 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -26,7 +26,6 @@
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
-import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
@@ -39,6 +38,7 @@
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -67,6 +67,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark)
@Fork(value = 1)
@@ -108,7 +109,7 @@ public void setup() throws IOException {
1000L,
60000,
scheduler,
- new BrokerState(),
+ new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats,
logDirFailureChannel,
Time.SYSTEM);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index a58bd7dd8c..cb237f4ffe 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -26,7 +26,6 @@
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
-import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.LogOffsetMetadata;
@@ -36,6 +35,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -59,6 +59,7 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark)
@Fork(value = 1)
@@ -93,7 +94,7 @@ public void setUp() {
1000L,
60000,
scheduler,
- new BrokerState(),
+ new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats,
logDirFailureChannel,
Time.SYSTEM);