Skip to content

Commit

Permalink
MINOR: Add mock implementation of BrokerToControllerChannelManager
Browse files Browse the repository at this point in the history
…(#10026)

Tests involving `BrokerToControllerChannelManager` are simplified by being able to leverage `MockClient`. This patch introduces a `MockBrokerToControllerChannelManager` implementation which makes that possible.

The patch updates `ForwardingManagerTest` to use `MockBrokerToControllerChannelManager`. We also add a couple additional timeout cases, which exposed a minor bug. Previously we were using the wrong `TimeoutException`, which meant that expected timeout errors were in fact translated to `UNKNOWN_SERVER_ERROR`.

Reviewers: David Arthur <[email protected]>
  • Loading branch information
a0x8o committed Feb 3, 2021
1 parent 2f06413 commit 9d48552
Show file tree
Hide file tree
Showing 25 changed files with 717 additions and 103 deletions.
22 changes: 22 additions & 0 deletions clients/src/test/java/org/apache/kafka/clients/MockClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.test.TestUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -84,6 +85,10 @@ public FutureResponse(Node node,
private volatile int numBlockingWakeups = 0;
private volatile boolean active = true;

public MockClient(Time time) {
this(time, new NoOpMetadataUpdater());
}

public MockClient(Time time, Metadata metadata) {
this(time, new DefaultMockMetadataUpdater(metadata));
}
Expand Down Expand Up @@ -608,6 +613,23 @@ default void updateWithCurrentMetadata(Time time) {}
default void close() {}
}

private static class NoOpMetadataUpdater implements MockMetadataUpdater {
@Override
public List<Node> fetchNodes() {
return Collections.emptyList();
}

@Override
public boolean isUpdateNeeded() {
return false;
}

@Override
public void update(Time time, MetadataUpdate update) {
throw new UnsupportedOperationException();
}
}

private static class DefaultMockMetadataUpdater implements MockMetadataUpdater {
private final Metadata metadata;
private MetadataUpdate lastUpdate;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/AlterIsrManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object AlterIsrManager {
): AlterIsrManager = {
val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)

val channelManager = new BrokerToControllerChannelManager(
val channelManager = BrokerToControllerChannelManager(
controllerNodeProvider = nodeProvider,
time = time,
metrics = metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,56 @@ class MetadataCacheControllerNodeProvider(
}
}

object BrokerToControllerChannelManager {
def apply(
controllerNodeProvider: ControllerNodeProvider,
time: Time,
metrics: Metrics,
config: KafkaConfig,
channelName: String,
threadNamePrefix: Option[String],
retryTimeoutMs: Long
): BrokerToControllerChannelManager = {
new BrokerToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
config,
channelName,
threadNamePrefix,
retryTimeoutMs
)
}
}


trait BrokerToControllerChannelManager {
def start(): Unit
def shutdown(): Unit
def controllerApiVersions(): Option[NodeApiVersions]
def sendRequest(
request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: ControllerRequestCompletionHandler
): Unit
}


/**
* This class manages the connection between a broker and the controller. It runs a single
* [[BrokerToControllerRequestThread]] which uses the broker's metadata cache as its own metadata to find
* and connect to the controller. The channel is async and runs the network connection in the background.
* The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore
* care must be taken to not block on outstanding requests for too long.
*/
class BrokerToControllerChannelManager(
class BrokerToControllerChannelManagerImpl(
controllerNodeProvider: ControllerNodeProvider,
time: Time,
metrics: Metrics,
config: KafkaConfig,
channelName: String,
threadNamePrefix: Option[String],
retryTimeoutMs: Long
) extends Logging {
) extends BrokerToControllerChannelManager with Logging {
private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ForwardingManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import java.nio.ByteBuffer
import kafka.network.RequestChannel
import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions}
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestHeader}
import org.apache.kafka.common.utils.Time

import scala.compat.java8.OptionConverters._
import scala.concurrent.TimeoutException

trait ForwardingManager {
def forwardRequest(
Expand All @@ -54,7 +54,7 @@ object ForwardingManager {
): ForwardingManager = {
val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)

val channelManager = new BrokerToControllerChannelManager(
val channelManager = BrokerToControllerChannelManager(
controllerNodeProvider = nodeProvider,
time = time,
metrics = metrics,
Expand Down Expand Up @@ -134,7 +134,7 @@ class ForwardingManagerImpl(

override def onTimeout(): Unit = {
debug(s"Forwarding of the request $request failed due to timeout exception")
val response = request.body[AbstractRequest].getErrorResponse(new TimeoutException)
val response = request.body[AbstractRequest].getErrorResponse(new TimeoutException())
responseCallback(Option(response))
}
}
Expand Down
172 changes: 92 additions & 80 deletions core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,135 +19,139 @@ package kafka.server
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

import kafka.network
import kafka.network.RequestChannel
import kafka.utils.MockTime
import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
import org.apache.kafka.clients.{MockClient, NodeApiVersions}
import org.apache.kafka.clients.MockClient.RequestMatcher
import org.apache.kafka.common.Node
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AlterConfigsResponseData
import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito

import scala.jdk.CollectionConverters._

class ForwardingManagerTest {
private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
private val time = new MockTime()
private val client = new MockClient(time)
private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
private val brokerToController = new MockBrokerToControllerChannelManager(
client, time, controllerNodeProvider, controllerApiVersions)
private val forwardingManager = new ForwardingManagerImpl(brokerToController)
private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)

private def controllerApiVersions: NodeApiVersions = {
// The Envelope API is not yet included in the standard set of APIs
val envelopeApiVersion = new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.ENVELOPE.id)
.setMinVersion(ApiKeys.ENVELOPE.oldestVersion)
.setMaxVersion(ApiKeys.ENVELOPE.latestVersion)
NodeApiVersions.create(List(envelopeApiVersion).asJava)
}

@Test
def testResponseCorrelationIdMismatch(): Unit = {
val forwardingManager = new ForwardingManagerImpl(brokerToController)
val requestCorrelationId = 27
val envelopeCorrelationId = 39
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")

val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
val requestBody = new AlterConfigsRequest.Builder(Map(
configResource -> new AlterConfigsRequest.Config(configs)
).asJava, false).build()
val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)

val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion,
requestCorrelationId + 1)

Mockito.when(brokerToController.sendRequest(
any(classOf[EnvelopeRequest.Builder]),
any(classOf[ControllerRequestCompletionHandler])
)).thenAnswer(invocation => {
val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
val response = buildEnvelopeResponse(responseBuffer, envelopeCorrelationId, completionHandler)
response.onComplete()
})
Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.NONE));

var response: AbstractResponse = null
forwardingManager.forwardRequest(request, result => response = result.orNull)
val responseOpt = new AtomicReference[Option[AbstractResponse]]()
forwardingManager.forwardRequest(request, responseOpt.set)
brokerToController.poll()
assertTrue(Option(responseOpt.get).isDefined)

assertNotNull(response)
val response = responseOpt.get.get
assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, response.errorCounts())
}

@Test
def testUnsupportedVersions(): Unit = {
val forwardingManager = new ForwardingManagerImpl(brokerToController)
val requestCorrelationId = 27
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")

val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
val requestBody = new AlterConfigsRequest.Builder(Map(
configResource -> new AlterConfigsRequest.Config(configs)
).asJava, false).build()
val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)

val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())

val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody,
requestHeader.apiVersion, requestCorrelationId)

Mockito.when(brokerToController.sendRequest(
any(classOf[EnvelopeRequest.Builder]),
any(classOf[ControllerRequestCompletionHandler])
)).thenAnswer(invocation => {
val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
val response = buildEnvelopeResponse(responseBuffer, 30,
completionHandler, Errors.UNSUPPORTED_VERSION)
response.onComplete()
})

var response: AbstractResponse = null
val connectionClosed = new AtomicBoolean(false)
forwardingManager.forwardRequest(request, res => {
response = res.orNull
connectionClosed.set(true)
})

assertTrue(connectionClosed.get())
assertNull(response)
Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION));

val responseOpt = new AtomicReference[Option[AbstractResponse]]()
forwardingManager.forwardRequest(request, responseOpt.set)
brokerToController.poll()
assertEquals(None, responseOpt.get)
}

private def buildEnvelopeResponse(
responseBuffer: ByteBuffer,
correlationId: Int,
completionHandler: RequestCompletionHandler,
error: Errors = Errors.NONE
): ClientResponse = {
val envelopeRequestHeader = new RequestHeader(
ApiKeys.ENVELOPE,
ApiKeys.ENVELOPE.latestVersion(),
"clientId",
correlationId
)
val envelopeResponse = new EnvelopeResponse(
responseBuffer,
error
)
@Test
def testForwardingTimeoutWaitingForControllerDiscovery(): Unit = {
val requestCorrelationId = 27
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)

new ClientResponse(
envelopeRequestHeader,
completionHandler,
"1",
time.milliseconds(),
time.milliseconds(),
false,
null,
null,
envelopeResponse
)
Mockito.when(controllerNodeProvider.get()).thenReturn(None)

val response = new AtomicReference[AbstractResponse]()
forwardingManager.forwardRequest(request, res => res.foreach(response.set))
brokerToController.poll()
assertNull(response.get)

// The controller is not discovered before reaching the retry timeout.
// The request should fail with a timeout error.
time.sleep(brokerToController.retryTimeoutMs)
brokerToController.poll()
assertNotNull(response.get)

val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
assertEquals(Map(Errors.REQUEST_TIMED_OUT -> 1).asJava, alterConfigResponse.errorCounts)
}

@Test
def testForwardingTimeoutAfterRetry(): Unit = {
val requestCorrelationId = 27
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)

Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))

val response = new AtomicReference[AbstractResponse]()
forwardingManager.forwardRequest(request, res => res.foreach(response.set))
brokerToController.poll()
assertNull(response.get)

// After reaching the retry timeout, we get a disconnect. Instead of retrying,
// we should fail the request with a timeout error.
time.sleep(brokerToController.retryTimeoutMs)
client.respond(testAlterConfigRequest.getErrorResponse(0, Errors.UNKNOWN_SERVER_ERROR.exception), true)
brokerToController.poll()
brokerToController.poll()
assertNotNull(response.get)

val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
assertEquals(Map(Errors.REQUEST_TIMED_OUT -> 1).asJava, alterConfigResponse.errorCounts)
}

private def buildRequest(
Expand Down Expand Up @@ -196,4 +200,12 @@ class ForwardingManagerTest {
)
}

private def testAlterConfigRequest: AlterConfigsRequest = {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
new AlterConfigsRequest.Builder(Map(
configResource -> new AlterConfigsRequest.Config(configs)
).asJava, false).build()
}

}
Loading

0 comments on commit 9d48552

Please sign in to comment.