Skip to content

Commit

Permalink
KAFKA-10021: Changed Kafka backing stores to use shared admin client …
Browse files Browse the repository at this point in the history
…to get end offsets and create topics (#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <[email protected]>
Reviewer: Konstantine Karantasis <[email protected]>
  • Loading branch information
a0x8o committed Feb 10, 2021
1 parent 88ad7d1 commit 2c9c372
Show file tree
Hide file tree
Showing 252 changed files with 10,471 additions and 3,559 deletions.
17 changes: 17 additions & 0 deletions bin/kafka-cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ClusterTool "$@"
17 changes: 17 additions & 0 deletions bin/kafka-storage.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.tools.StorageTool "$@"
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ project(':core') {
compile project(':clients')
compile project(':metadata')
compile project(':raft')
compile libs.argparse4j
compile libs.jacksonDatabind
compile libs.jacksonModuleScala
compile libs.jacksonDataformatCsv
Expand Down Expand Up @@ -1012,7 +1013,10 @@ project(':core') {
}
test {
java {
srcDirs = ["src/generated/java", "src/test/java"]
srcDirs = []
}
scala {
srcDirs = ["src/test/java", "src/test/scala"]
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,19 @@
<allow pkg="org.apache.kafka.clients" />
</subpackage>

<subpackage name="test">
<allow pkg="kafka.test.annotation"/>
<allow pkg="kafka.test.junit"/>
<allow pkg="kafka.network"/>
<allow pkg="kafka.api"/>
<allow pkg="kafka.server"/>
<allow pkg="org.apache.kafka.clients.admin"/>
<allow pkg="integration.kafka.server" class="IntegrationTestHelper"/>
<subpackage name="annotation">
<allow pkg="kafka.test"/>
</subpackage>
<subpackage name="junit">
<allow pkg="kafka.test"/>
</subpackage>
</subpackage>
</import-control>
3 changes: 3 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<!-- core -->
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="ClusterTestExtensions.java"/>

<!-- Clients -->
<suppress id="dontUseSystemExit"
Expand Down Expand Up @@ -262,6 +263,8 @@
files="RequestResponseTest.java"/>

<!-- metadata -->
<suppress checks="NPathComplexity"
files="KafkaEventQueue.java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -1550,7 +1550,7 @@ public void handleResponse(AbstractResponse response) {
} else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(builder.data().key()));
} else {
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to " +
"unexpected error: %s", coordinatorType, builder.data().key(),
findCoordinatorResponse.data().errorMessage())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public CreateTopicsRequest build(short version) {
public String toString() {
return data.toString();
}

@Override
public boolean equals(Object other) {
return other instanceof Builder && this.data.equals(((Builder) other).data);
}

@Override
public int hashCode() {
return data.hashCode();
}
}

private final CreateTopicsRequestData data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
Expand Down Expand Up @@ -102,7 +103,7 @@ public static CoordinatorType forId(byte id) {
case 1:
return TRANSACTION;
default:
throw new IllegalArgumentException("Unknown coordinator type received: " + id);
throw new InvalidRequestException("Unknown coordinator type received: " + id);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -171,32 +172,23 @@ private void handleCallback(OAuthBearerValidatorCallback callback) {
OAuthBearerValidationUtils.validateTimeConsistency(unsecuredJwt).throwExceptionIfFailed();
OAuthBearerValidationUtils.validateScope(unsecuredJwt, requiredScope).throwExceptionIfFailed();
log.info("Successfully validated token with principal {}: {}", unsecuredJwt.principalName(),
unsecuredJwt.claims().toString());
unsecuredJwt.claims());
callback.token(unsecuredJwt);
}

private String principalClaimName() {
String principalClaimNameValue = option(PRINCIPAL_CLAIM_NAME_OPTION);
String principalClaimName = principalClaimNameValue != null && !principalClaimNameValue.trim().isEmpty()
? principalClaimNameValue.trim()
: "sub";
return principalClaimName;
return Utils.isBlank(principalClaimNameValue) ? "sub" : principalClaimNameValue.trim();
}

private String scopeClaimName() {
String scopeClaimNameValue = option(SCOPE_CLAIM_NAME_OPTION);
String scopeClaimName = scopeClaimNameValue != null && !scopeClaimNameValue.trim().isEmpty()
? scopeClaimNameValue.trim()
: "scope";
return scopeClaimName;
return Utils.isBlank(scopeClaimNameValue) ? "scope" : scopeClaimNameValue.trim();
}

private List<String> requiredScope() {
String requiredSpaceDelimitedScope = option(REQUIRED_SCOPE_OPTION);
List<String> requiredScope = requiredSpaceDelimitedScope == null || requiredSpaceDelimitedScope.trim().isEmpty()
? Collections.emptyList()
: OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
return requiredScope;
return Utils.isBlank(requiredSpaceDelimitedScope) ? Collections.emptyList() : OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
}

private int allowableClockSkewMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
* value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
*/
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";
private String encoding = StandardCharsets.UTF_8.name();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
* value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
*/
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
private String encoding = StandardCharsets.UTF_8.name();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;

Expand All @@ -27,7 +28,7 @@
* the property key.deserializer.encoding, value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
*/
public class UUIDDeserializer implements Deserializer<UUID> {
private String encoding = "UTF8";
private String encoding = StandardCharsets.UTF_8.name();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;

Expand All @@ -27,7 +28,7 @@
* the property key.deserializer.encoding, value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
*/
public class UUIDSerializer implements Serializer<UUID> {
private String encoding = "UTF8";
private String encoding = StandardCharsets.UTF_8.name();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,4 +1334,13 @@ public static long getDateTime(String timestamp) throws ParseException, IllegalA
public static <S> Iterator<S> covariantCast(Iterator<? extends S> iterator) {
return (Iterator<S>) iterator;
}

/**
* Checks if a string is null, empty or whitespace only.
* @param str a string to be checked
* @return true if the string is null, empty or whitespace only; otherwise, return false.
*/
public static boolean isBlank(String str) {
return str == null || str.trim().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class MockAdminClient extends AdminClient {
new HashMap<>();
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets;
private final boolean usingRaftController;
private final String clusterId;
private final List<List<String>> brokerLogDirs;
private final List<Map<String, String>> brokerConfigs;
Expand All @@ -90,6 +92,7 @@ public static class Builder {
private Node controller = null;
private List<List<String>> brokerLogDirs = new ArrayList<>();
private Short defaultPartitions;
private boolean usingRaftController = false;
private Integer defaultReplicationFactor;

public Builder() {
Expand Down Expand Up @@ -135,6 +138,11 @@ public Builder defaultReplicationFactor(int defaultReplicationFactor) {
return this;
}

public Builder usingRaftController(boolean usingRaftController) {
this.usingRaftController = usingRaftController;
return this;
}

public Builder defaultPartitions(short numPartitions) {
this.defaultPartitions = numPartitions;
return this;
Expand All @@ -146,7 +154,8 @@ public MockAdminClient build() {
clusterId,
defaultPartitions != null ? defaultPartitions.shortValue() : 1,
defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3),
brokerLogDirs);
brokerLogDirs,
usingRaftController);
}
}

Expand All @@ -156,15 +165,16 @@ public MockAdminClient() {

public MockAdminClient(List<Node> brokers, Node controller) {
this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(),
Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS));
Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS), false);
}

private MockAdminClient(List<Node> brokers,
Node controller,
String clusterId,
int defaultPartitions,
int defaultReplicationFactor,
List<List<String>> brokerLogDirs) {
List<List<String>> brokerLogDirs,
boolean usingRaftController) {
this.brokers = brokers;
controller(controller);
this.clusterId = clusterId;
Expand All @@ -177,6 +187,7 @@ private MockAdminClient(List<Node> brokers,
}
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
this.usingRaftController = usingRaftController;
}

synchronized public void controller(Node controller) {
Expand Down Expand Up @@ -889,7 +900,13 @@ public UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpd

@Override
public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
if (usingRaftController) {
return new DecommissionBrokerResult(KafkaFuture.completedFuture(null));
} else {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new UnsupportedVersionException(""));
return new DecommissionBrokerResult(future);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.InvalidRequestException;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertThrows;

class FindCoordinatorRequestTest {

@Test
public void getInvalidCoordinatorTypeId() {
assertThrows(InvalidRequestException.class,
() -> FindCoordinatorRequest.CoordinatorType.forId((byte) 10));
}
}
Loading

0 comments on commit 2c9c372

Please sign in to comment.