From 88ad7d1b7f816ddce65c3b4fa188c4781fe75b67 Mon Sep 17 00:00:00 2001 From: Alex Barreto Date: Thu, 4 Feb 2021 10:34:00 -0600 Subject: [PATCH] KAFKA-12193: Re-resolve IPs after a client disconnects (#9902) This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address. Reviewers: Mickael Maison , Satish Duggana , David Jacot --- build.gradle | 1 + checkstyle/import-control.xml | 2 + .../org/apache/kafka/clients/ClientUtils.java | 5 +- .../clients/ClusterConnectionStates.java | 26 +- .../kafka/clients/DefaultHostResolver.java | 29 ++ .../apache/kafka/clients/HostResolver.java | 26 ++ .../apache/kafka/clients/NetworkClient.java | 80 ++--- .../clients/AddressChangeHostResolver.java | 49 ++++ .../apache/kafka/clients/ClientUtilsTest.java | 9 +- .../clients/ClusterConnectionStatesTest.java | 71 +++-- .../kafka/clients/NetworkClientTest.java | 163 +++++++++++ .../org/apache/kafka/test/MockSelector.java | 11 +- .../kafka/connect/runtime/AbstractHerder.java | 6 +- .../connect/runtime/WorkerSourceTask.java | 7 +- .../distributed/DistributedHerder.java | 4 +- .../runtime/standalone/StandaloneHerder.java | 4 +- .../apache/kafka/connect/util/TopicAdmin.java | 103 ++++++- .../connect/runtime/AbstractHerderTest.java | 6 +- ...rrorHandlingTaskWithTopicCreationTest.java | 6 +- ...WorkerSourceTaskWithTopicCreationTest.java | 69 ++++- .../kafka/connect/util/TopicAdminTest.java | 31 +- .../scala/kafka/tools/TestRaftServer.scala | 6 +- .../kafka/server/MetadataRequestTest.scala | 5 +- ...opicIdWithOldInterBrokerProtocolTest.scala | 1 - .../message/MetadataRecordTypeGenerator.java | 14 +- .../kafka/metadata/ApiMessageAndVersion.java | 62 ++++ .../org/apache/kafka/raft/RecordSerde.java | 33 +-- .../raft/internals/BatchAccumulator.java | 7 +- .../kafka/raft/internals/BatchBuilder.java | 15 +- .../kafka/raft/internals/StringSerde.java | 5 +- .../raft/metadata/MetadataRecordSerde.java | 66 +++++ .../kafka/raft/RaftEventSimulationTest.java | 5 +- .../raft/internals/BatchAccumulatorTest.java | 8 +- .../metadata/MetadataRecordSerdeTest.java | 72 +++++ .../apache/kafka/streams/KafkaStreams.java | 160 +++++----- .../processor/internals/StateDirectory.java | 122 +++++++- .../kafka/streams/KafkaStreamsTest.java | 3 + .../KStreamRepartitionIntegrationTest.java | 19 +- ...reignKeyInnerJoinMultiIntegrationTest.java | 45 ++- .../StandbyTaskEOSIntegrationTest.java | 273 +++++++++--------- .../StoreUpgradeIntegrationTest.java | 18 +- .../utils/IntegrationTestUtils.java | 11 + .../internals/StateDirectoryTest.java | 82 +++++- 43 files changed, 1360 insertions(+), 380 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/DefaultHostResolver.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/HostResolver.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/AddressChangeHostResolver.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/ApiMessageAndVersion.java create mode 100644 raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java create mode 100644 raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java diff --git a/build.gradle b/build.gradle index 5b628b7ca8..d9b185e1bd 100644 --- a/build.gradle +++ b/build.gradle @@ -1228,6 +1228,7 @@ project(':raft') { dependencies { compile project(':clients') + compile project(':metadata') compile libs.slf4jApi compile libs.jacksonDatabind diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index ad6e885d49..9cc432efd1 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -337,10 +337,12 @@ + + diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 5e5286e329..5adb7e3361 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -106,8 +106,9 @@ public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time ti clientSaslMechanism, time, true, logContext); } - static List resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException { - InetAddress[] addresses = InetAddress.getAllByName(host); + static List resolve(String host, ClientDnsLookup clientDnsLookup, + HostResolver hostResolver) throws UnknownHostException { + InetAddress[] addresses = hostResolver.resolve(host); switch (clientDnsLookup) { case DEFAULT: diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 1b20dcc70d..e00494ce39 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -43,13 +43,14 @@ final class ClusterConnectionStates { final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2; private final Map nodeState; private final Logger log; + private final HostResolver hostResolver; private Set connectingNodes; private ExponentialBackoff reconnectBackoff; private ExponentialBackoff connectionSetupTimeout; public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs, - LogContext logContext) { + LogContext logContext, HostResolver hostResolver) { this.log = logContext.logger(ClusterConnectionStates.class); this.reconnectBackoff = new ExponentialBackoff( reconnectBackoffMs, @@ -63,6 +64,7 @@ public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMax CONNECTION_SETUP_TIMEOUT_JITTER); this.nodeState = new HashMap<>(); this.connectingNodes = new HashSet<>(); + this.hostResolver = hostResolver; } /** @@ -156,7 +158,8 @@ public void connecting(String id, long now, String host, ClientDnsLookup clientD // Create a new NodeConnectionState if nodeState does not already contain one // for the specified id or if the hostname associated with the node id changed. nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, - reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, clientDnsLookup)); + reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, + clientDnsLookup, hostResolver)); connectingNodes.add(id); } @@ -183,6 +186,11 @@ public void disconnected(String id, long now) { connectingNodes.remove(id); } else { resetConnectionSetupTimeout(nodeState); + if (nodeState.state.isConnected()) { + // If a connection had previously been established, clear the addresses to trigger a new DNS resolution + // because the node IPs may have changed + nodeState.clearAddresses(); + } } nodeState.state = ConnectionState.DISCONNECTED; } @@ -470,9 +478,11 @@ private static class NodeConnectionState { private int addressIndex; private final String host; private final ClientDnsLookup clientDnsLookup; + private final HostResolver hostResolver; private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs, - long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup) { + long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup, + HostResolver hostResolver) { this.state = state; this.addresses = Collections.emptyList(); this.addressIndex = -1; @@ -484,6 +494,7 @@ private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long this.throttleUntilTimeMs = 0; this.host = host; this.clientDnsLookup = clientDnsLookup; + this.hostResolver = hostResolver; } public String host() { @@ -498,7 +509,7 @@ public String host() { private InetAddress currentAddress() throws UnknownHostException { if (addresses.isEmpty()) { // (Re-)initialize list - addresses = ClientUtils.resolve(host, clientDnsLookup); + addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver); addressIndex = 0; } @@ -518,6 +529,13 @@ private void moveToNextAddress() { addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call } + /** + * Clears the resolved addresses in order to trigger re-resolving on the next {@link #currentAddress()} call. + */ + private void clearAddresses() { + addresses = Collections.emptyList(); + } + public String toString() { return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")"; } diff --git a/clients/src/main/java/org/apache/kafka/clients/DefaultHostResolver.java b/clients/src/main/java/org/apache/kafka/clients/DefaultHostResolver.java new file mode 100644 index 0000000000..786173e3a2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/DefaultHostResolver.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class DefaultHostResolver implements HostResolver { + + @Override + public InetAddress[] resolve(String host) throws UnknownHostException { + return InetAddress.getAllByName(host); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/HostResolver.java b/clients/src/main/java/org/apache/kafka/clients/HostResolver.java new file mode 100644 index 0000000000..80209ca8a9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/HostResolver.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public interface HostResolver { + + InetAddress[] resolve(String host) throws UnknownHostException; +} diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 54ba206349..fba6306688 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -146,9 +146,8 @@ public NetworkClient(Selectable selector, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext) { - this(null, + this(selector, metadata, - selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, @@ -167,22 +166,22 @@ public NetworkClient(Selectable selector, } public NetworkClient(Selectable selector, - Metadata metadata, - String clientId, - int maxInFlightRequestsPerConnection, - long reconnectBackoffMs, - long reconnectBackoffMax, - int socketSendBuffer, - int socketReceiveBuffer, - int defaultRequestTimeoutMs, - long connectionSetupTimeoutMs, - long connectionSetupTimeoutMaxMs, - ClientDnsLookup clientDnsLookup, - Time time, - boolean discoverBrokerVersions, - ApiVersions apiVersions, - Sensor throttleTimeSensor, - LogContext logContext) { + Metadata metadata, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + long reconnectBackoffMax, + int socketSendBuffer, + int socketReceiveBuffer, + int defaultRequestTimeoutMs, + long connectionSetupTimeoutMs, + long connectionSetupTimeoutMaxMs, + ClientDnsLookup clientDnsLookup, + Time time, + boolean discoverBrokerVersions, + ApiVersions apiVersions, + Sensor throttleTimeSensor, + LogContext logContext) { this(null, metadata, selector, @@ -200,7 +199,8 @@ public NetworkClient(Selectable selector, discoverBrokerVersions, apiVersions, throttleTimeSensor, - logContext); + logContext, + new DefaultHostResolver()); } public NetworkClient(Selectable selector, @@ -236,27 +236,29 @@ public NetworkClient(Selectable selector, discoverBrokerVersions, apiVersions, null, - logContext); + logContext, + new DefaultHostResolver()); } - private NetworkClient(MetadataUpdater metadataUpdater, - Metadata metadata, - Selectable selector, - String clientId, - int maxInFlightRequestsPerConnection, - long reconnectBackoffMs, - long reconnectBackoffMax, - int socketSendBuffer, - int socketReceiveBuffer, - int defaultRequestTimeoutMs, - long connectionSetupTimeoutMs, - long connectionSetupTimeoutMaxMs, - ClientDnsLookup clientDnsLookup, - Time time, - boolean discoverBrokerVersions, - ApiVersions apiVersions, - Sensor throttleTimeSensor, - LogContext logContext) { + public NetworkClient(MetadataUpdater metadataUpdater, + Metadata metadata, + Selectable selector, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + long reconnectBackoffMax, + int socketSendBuffer, + int socketReceiveBuffer, + int defaultRequestTimeoutMs, + long connectionSetupTimeoutMs, + long connectionSetupTimeoutMaxMs, + ClientDnsLookup clientDnsLookup, + Time time, + boolean discoverBrokerVersions, + ApiVersions apiVersions, + Sensor throttleTimeSensor, + LogContext logContext, + HostResolver hostResolver) { /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the * super constructor is invoked. @@ -273,7 +275,7 @@ private NetworkClient(MetadataUpdater metadataUpdater, this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); this.connectionStates = new ClusterConnectionStates( reconnectBackoffMs, reconnectBackoffMax, - connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext); + connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext, hostResolver); this.socketSendBuffer = socketSendBuffer; this.socketReceiveBuffer = socketReceiveBuffer; this.correlation = 0; diff --git a/clients/src/test/java/org/apache/kafka/clients/AddressChangeHostResolver.java b/clients/src/test/java/org/apache/kafka/clients/AddressChangeHostResolver.java new file mode 100644 index 0000000000..28f9c88af2 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/AddressChangeHostResolver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import java.net.InetAddress; + +class AddressChangeHostResolver implements HostResolver { + private boolean useNewAddresses; + private InetAddress[] initialAddresses; + private InetAddress[] newAddresses; + private int resolutionCount = 0; + + public AddressChangeHostResolver(InetAddress[] initialAddresses, InetAddress[] newAddresses) { + this.initialAddresses = initialAddresses; + this.newAddresses = newAddresses; + } + + @Override + public InetAddress[] resolve(String host) { + ++resolutionCount; + return useNewAddresses ? newAddresses : initialAddresses; + } + + public void changeAddresses() { + useNewAddresses = true; + } + + public boolean useNewAddresses() { + return useNewAddresses; + } + + public int resolutionCount() { + return resolutionCount; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java index d1f12d267b..2f0a5903a3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java @@ -33,6 +33,7 @@ public class ClientUtilsTest { + private HostResolver hostResolver = new DefaultHostResolver(); @Test public void testParseAndValidateAddresses() throws UnknownHostException { @@ -102,25 +103,25 @@ public void testFilterPreferredAddresses() throws UnknownHostException { @Test public void testResolveUnknownHostException() { assertThrows(UnknownHostException.class, - () -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS)); + () -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver)); } @Test public void testResolveDnsLookup() throws UnknownHostException { // Note that kafka.apache.org resolves to at least 2 IP addresses - assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT).size()); + assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT, hostResolver).size()); } @Test public void testResolveDnsLookupAllIps() throws UnknownHostException { // Note that kafka.apache.org resolves to at least 2 IP addresses - assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1); + assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1); } @Test public void testResolveDnsLookupResolveCanonicalBootstrapServers() throws UnknownHostException { // Note that kafka.apache.org resolves to at least 2 IP addresses - assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY).size() > 1); + assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, hostResolver).size() > 1); } private List checkWithoutLookup(String... url) { diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java index b75d9de4db..d13f2f6c9e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java @@ -23,12 +23,13 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.utils.LogContext; @@ -38,6 +39,26 @@ public class ClusterConnectionStatesTest { + private static ArrayList initialAddresses; + private static ArrayList newAddresses; + + static { + try { + initialAddresses = new ArrayList<>(Arrays.asList( + InetAddress.getByName("10.200.20.100"), + InetAddress.getByName("10.200.20.101"), + InetAddress.getByName("10.200.20.102") + )); + newAddresses = new ArrayList<>(Arrays.asList( + InetAddress.getByName("10.200.20.103"), + InetAddress.getByName("10.200.20.104"), + InetAddress.getByName("10.200.20.105") + )); + } catch (UnknownHostException e) { + fail("Attempted to create an invalid InetAddress, this should not happen"); + } + } + private final MockTime time = new MockTime(); private final long reconnectBackoffMs = 10 * 1000; private final long reconnectBackoffMax = 60 * 1000; @@ -50,15 +71,20 @@ public class ClusterConnectionStatesTest { private final String nodeId1 = "1001"; private final String nodeId2 = "2002"; private final String nodeId3 = "3003"; - private final String hostTwoIps = "kafka.apache.org"; - + private final String hostTwoIps = "multiple.ip.address"; private ClusterConnectionStates connectionStates; + // For testing nodes with a single IP address, use localhost and default DNS resolution + private DefaultHostResolver singleIPHostResolver = new DefaultHostResolver(); + + // For testing nodes with multiple IP addresses, mock DNS resolution to get consistent results + private AddressChangeHostResolver multipleIPHostResolver = new AddressChangeHostResolver( + initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));; + @BeforeEach public void setup() { - this.connectionStates = new ClusterConnectionStates( - reconnectBackoffMs, reconnectBackoffMax, - connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext()); + this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, + connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(), this.singleIPHostResolver); } @Test @@ -253,7 +279,7 @@ public void testSingleIPWithDefault() throws UnknownHostException { @Test public void testSingleIPWithUseAll() throws UnknownHostException { - assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS).size()); + assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS, singleIPHostResolver).size()); connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS); InetAddress currAddress = connectionStates.currentAddress(nodeId1); @@ -263,7 +289,9 @@ public void testSingleIPWithUseAll() throws UnknownHostException { @Test public void testMultipleIPsWithDefault() throws UnknownHostException { - assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1); + setupMultipleIPs(); + + assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1); connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT); InetAddress currAddress = connectionStates.currentAddress(nodeId1); @@ -273,7 +301,9 @@ public void testMultipleIPsWithDefault() throws UnknownHostException { @Test public void testMultipleIPsWithUseAll() throws UnknownHostException { - assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1); + setupMultipleIPs(); + + assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1); connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS); InetAddress addr1 = connectionStates.currentAddress(nodeId1); @@ -287,19 +317,14 @@ public void testMultipleIPsWithUseAll() throws UnknownHostException { @Test public void testHostResolveChange() throws UnknownHostException, ReflectiveOperationException { - assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1); + setupMultipleIPs(); + + assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1); connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT); InetAddress addr1 = connectionStates.currentAddress(nodeId1); - // reflection to simulate host change in DNS lookup - Method nodeStateMethod = connectionStates.getClass().getDeclaredMethod("nodeState", String.class); - nodeStateMethod.setAccessible(true); - Object nodeState = nodeStateMethod.invoke(connectionStates, nodeId1); - Field hostField = nodeState.getClass().getDeclaredField("host"); - hostField.setAccessible(true); - hostField.set(nodeState, "localhost"); - + multipleIPHostResolver.changeAddresses(); connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); InetAddress addr2 = connectionStates.currentAddress(nodeId1); @@ -308,9 +333,12 @@ public void testHostResolveChange() throws UnknownHostException, ReflectiveOpera @Test public void testNodeWithNewHostname() throws UnknownHostException { + setupMultipleIPs(); + connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); InetAddress addr1 = connectionStates.currentAddress(nodeId1); + this.multipleIPHostResolver.changeAddresses(); connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT); InetAddress addr2 = connectionStates.currentAddress(nodeId1); @@ -409,4 +437,9 @@ public void testTimedOutConnections() { // Expect no timed out connections assertEquals(0, connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).size()); } + + private void setupMultipleIPs() { + this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, + connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(), this.multipleIPHostResolver); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 52e74d4b86..eb130ff934 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -47,13 +47,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -65,6 +69,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class NetworkClientTest { @@ -82,6 +87,26 @@ public class NetworkClientTest { private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes(); private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery(); + private static ArrayList initialAddresses; + private static ArrayList newAddresses; + + static { + try { + initialAddresses = new ArrayList<>(Arrays.asList( + InetAddress.getByName("10.200.20.100"), + InetAddress.getByName("10.200.20.101"), + InetAddress.getByName("10.200.20.102") + )); + newAddresses = new ArrayList<>(Arrays.asList( + InetAddress.getByName("10.200.20.103"), + InetAddress.getByName("10.200.20.104"), + InetAddress.getByName("10.200.20.105") + )); + } catch (UnknownHostException e) { + fail("Attempted to create an invalid InetAddress, this should not happen"); + } + } + private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) { return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024, @@ -902,6 +927,144 @@ public void testCorrelationId() { ids.forEach(id -> assertTrue(id < SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID)); } + @Test + public void testReconnectAfterAddressChange() { + AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver( + initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0])); + AtomicInteger initialAddressConns = new AtomicInteger(); + AtomicInteger newAddressConns = new AtomicInteger(); + MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { + InetAddress inetAddress = inetSocketAddress.getAddress(); + if (initialAddresses.contains(inetAddress)) { + initialAddressConns.incrementAndGet(); + } else if (newAddresses.contains(inetAddress)) { + newAddressConns.incrementAndGet(); + } + return (mockHostResolver.useNewAddresses() && newAddresses.contains(inetAddress)) || + (!mockHostResolver.useNewAddresses() && initialAddresses.contains(inetAddress)); + }); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + + // Connect to one the initial addresses, then change the addresses and disconnect + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + mockHostResolver.changeAddresses(); + selector.serverDisconnect(node.idString()); + client.poll(0, time.milliseconds()); + assertFalse(client.isReady(node, time.milliseconds())); + + time.sleep(reconnectBackoffMaxMsTest); + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + // We should have tried to connect to one initial address and one new address, and resolved DNS twice + assertEquals(1, initialAddressConns.get()); + assertEquals(1, newAddressConns.get()); + assertEquals(2, mockHostResolver.resolutionCount()); + } + + @Test + public void testFailedConnectionToFirstAddress() { + AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver( + initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0])); + AtomicInteger initialAddressConns = new AtomicInteger(); + AtomicInteger newAddressConns = new AtomicInteger(); + MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { + InetAddress inetAddress = inetSocketAddress.getAddress(); + if (initialAddresses.contains(inetAddress)) { + initialAddressConns.incrementAndGet(); + } else if (newAddresses.contains(inetAddress)) { + newAddressConns.incrementAndGet(); + } + // Refuse first connection attempt + return initialAddressConns.get() > 1; + }); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + + // First connection attempt should fail + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertFalse(client.isReady(node, time.milliseconds())); + + // Second connection attempt should succeed + time.sleep(reconnectBackoffMaxMsTest); + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + // We should have tried to connect to two of the initial addresses, none of the new address, and should + // only have resolved DNS once + assertEquals(2, initialAddressConns.get()); + assertEquals(0, newAddressConns.get()); + assertEquals(1, mockHostResolver.resolutionCount()); + } + + @Test + public void testFailedConnectionToFirstAddressAfterReconnect() { + AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver( + initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0])); + AtomicInteger initialAddressConns = new AtomicInteger(); + AtomicInteger newAddressConns = new AtomicInteger(); + MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { + InetAddress inetAddress = inetSocketAddress.getAddress(); + if (initialAddresses.contains(inetAddress)) { + initialAddressConns.incrementAndGet(); + } else if (newAddresses.contains(inetAddress)) { + newAddressConns.incrementAndGet(); + } + // Refuse first connection attempt to the new addresses + return initialAddresses.contains(inetAddress) || newAddressConns.get() > 1; + }); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + + // Connect to one the initial addresses, then change the addresses and disconnect + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + mockHostResolver.changeAddresses(); + selector.serverDisconnect(node.idString()); + client.poll(0, time.milliseconds()); + assertFalse(client.isReady(node, time.milliseconds())); + + // First connection attempt to new addresses should fail + time.sleep(reconnectBackoffMaxMsTest); + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertFalse(client.isReady(node, time.milliseconds())); + + // Second connection attempt to new addresses should succeed + time.sleep(reconnectBackoffMaxMsTest); + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + // We should have tried to connect to one of the initial addresses and two of the new addresses (the first one + // failed), and resolved DNS twice, once for each set of addresses + assertEquals(1, initialAddressConns.get()); + assertEquals(2, newAddressConns.get()); + assertEquals(2, mockHostResolver.resolutionCount()); + } + private RequestHeader parseHeader(ByteBuffer buffer) { buffer.getInt(); // skip size return RequestHeader.parse(buffer.slice()); diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index f98c80e73a..d1d79dced5 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -32,6 +32,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Predicate; /** * A fake selector to use for testing @@ -46,14 +47,22 @@ public class MockSelector implements Selectable { private final Map disconnected = new HashMap<>(); private final List connected = new ArrayList<>(); private final List delayedReceives = new ArrayList<>(); + private final Predicate canConnect; public MockSelector(Time time) { + this(time, null); + } + + public MockSelector(Time time, Predicate canConnect) { this.time = time; + this.canConnect = canConnect; } @Override public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { - this.connected.add(id); + if (canConnect == null || canConnect.test(address)) { + this.connected.add(id); + } } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 704259858e..a68a0d5649 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -229,9 +229,9 @@ public Plugins plugins() { } /* - * Retrieves config map by connector name + * Retrieves raw config map by connector name. */ - protected abstract Map config(String connName); + protected abstract Map rawConfig(String connName); @Override public void connectorConfig(String connName, Callback> callback) { @@ -284,7 +284,7 @@ public ConnectorStateInfo connectorStatus(String connName) { Collections.sort(taskStates); - Map conf = config(connName); + Map conf = rawConfig(connName); return new ConnectorStateInfo(connName, connectorState, taskStates, conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index bc7df97476..342fa73d87 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -412,10 +412,15 @@ private void maybeCreateTopic(String topic) { log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup); NewTopic newTopic = topicGroup.newTopic(topic); - if (admin.createTopic(newTopic)) { + TopicAdmin.TopicCreationResponse response = admin.createOrFindTopics(newTopic); + if (response.isCreated(newTopic.name())) { topicCreation.addTopic(topic); log.info("Created topic '{}' using creation group {}", newTopic, topicGroup); + } else if (response.isExisting(newTopic.name())) { + topicCreation.addTopic(topic); + log.info("Found existing topic '{}'", newTopic); } else { + // The topic still does not exist and could not be created, so treat it as a task failure log.warn("Request to create new topic '{}' failed", topic); throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure " + "that the task is authorized to create topics or that the topic exists and " diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 0187938160..41b2b4afc9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -746,8 +746,8 @@ public Void call() throws Exception { } @Override - protected Map config(String connName) { - return configState.connectorConfig(connName); + protected Map rawConfig(String connName) { + return configState.rawConnectorConfig(connName); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index fc351f708d..8da8c6e93a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -149,8 +149,8 @@ private synchronized ConnectorInfo createConnectorInfo(String connector) { } @Override - protected synchronized Map config(String connName) { - return configState.connectorConfig(connName); + protected synchronized Map rawConfig(String connName) { + return configState.rawConnectorConfig(connName); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 615e7a35d7..428343b099 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -60,6 +60,60 @@ */ public class TopicAdmin implements AutoCloseable { + public static final TopicCreationResponse EMPTY_CREATION = new TopicCreationResponse(Collections.emptySet(), Collections.emptySet()); + + public static class TopicCreationResponse { + + private final Set created; + private final Set existing; + + public TopicCreationResponse(Set createdTopicNames, Set existingTopicNames) { + this.created = Collections.unmodifiableSet(createdTopicNames); + this.existing = Collections.unmodifiableSet(existingTopicNames); + } + + public Set createdTopics() { + return created; + } + + public Set existingTopics() { + return existing; + } + + public boolean isCreated(String topicName) { + return created.contains(topicName); + } + + public boolean isExisting(String topicName) { + return existing.contains(topicName); + } + + public boolean isCreatedOrExisting(String topicName) { + return isCreated(topicName) || isExisting(topicName); + } + + public int createdTopicsCount() { + return created.size(); + } + + public int existingTopicsCount() { + return existing.size(); + } + + public int createdOrExistingTopicsCount() { + return createdTopicsCount() + existingTopicsCount(); + } + + public boolean isEmpty() { + return createdOrExistingTopicsCount() == 0; + } + + @Override + public String toString() { + return "TopicCreationResponse{" + "created=" + created + ", existing=" + existing + '}'; + } + } + public static final int NO_PARTITIONS = -1; public static final short NO_REPLICATION_FACTOR = -1; @@ -228,7 +282,7 @@ public TopicAdmin(Map adminConfig) { this.logCreation = logCreation; } - /** + /** * Attempt to create the topic described by the given definition, returning true if the topic was created or false * if the topic already existed. * @@ -260,13 +314,48 @@ public boolean createTopic(NewTopic topic) { * attempting to perform this operation */ public Set createTopics(NewTopic... topics) { + return createOrFindTopics(topics).createdTopics(); + } + + /** + * Attempt to find or create the topic described by the given definition, returning true if the topic was created or had + * already existed, or false if the topic did not exist and could not be created. + * + * @param topic the specification of the topic + * @return true if the topic was created or existed, or false if the topic could not already existed. + * @throws ConnectException if an error occurs, the operation takes too long, or the thread is interrupted while + * attempting to perform this operation + * @throws UnsupportedVersionException if the broker does not support the necessary APIs to perform this request + */ + public boolean createOrFindTopic(NewTopic topic) { + if (topic == null) return false; + return createOrFindTopics(topic).isCreatedOrExisting(topic.name()); + } + + /** + * Attempt to create the topics described by the given definitions, returning all of the names of those topics that + * were created by this request. Any existing topics with the same name are unchanged, and the names of such topics + * are excluded from the result. + *

+ * If multiple topic definitions have the same topic name, the last one with that name will be used. + *

+ * Apache Kafka added support for creating topics in 0.10.1.0, so this method works as expected with that and later versions. + * With brokers older than 0.10.1.0, this method is unable to create topics and always returns an empty set. + * + * @param topics the specifications of the topics + * @return the {@link TopicCreationResponse} with the names of the newly created and existing topics; + * never null but possibly empty + * @throws ConnectException if an error occurs, the operation takes too long, or the thread is interrupted while + * attempting to perform this operation + */ + public TopicCreationResponse createOrFindTopics(NewTopic... topics) { Map topicsByName = new HashMap<>(); if (topics != null) { for (NewTopic topic : topics) { if (topic != null) topicsByName.put(topic.name(), topic); } } - if (topicsByName.isEmpty()) return Collections.emptySet(); + if (topicsByName.isEmpty()) return EMPTY_CREATION; String bootstrapServers = bootstrapServers(); String topicNameList = Utils.join(topicsByName.keySet(), "', '"); @@ -276,6 +365,7 @@ public Set createTopics(NewTopic... topics) { // Iterate over each future so that we can handle individual failures like when some topics already exist Set newlyCreatedTopicNames = new HashSet<>(); + Set existingTopicNames = new HashSet<>(); for (Map.Entry> entry : newResults.entrySet()) { String topic = entry.getKey(); try { @@ -288,25 +378,26 @@ public Set createTopics(NewTopic... topics) { Throwable cause = e.getCause(); if (cause instanceof TopicExistsException) { log.debug("Found existing topic '{}' on the brokers at {}", topic, bootstrapServers); + existingTopicNames.add(topic); continue; } if (cause instanceof UnsupportedVersionException) { log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API." + " Falling back to assume topic(s) exist or will be auto-created by the broker.", topicNameList, bootstrapServers); - return Collections.emptySet(); + return EMPTY_CREATION; } if (cause instanceof ClusterAuthorizationException) { log.debug("Not authorized to create topic(s) '{}' upon the brokers {}." + " Falling back to assume topic(s) exist or will be auto-created by the broker.", topicNameList, bootstrapServers); - return Collections.emptySet(); + return EMPTY_CREATION; } if (cause instanceof TopicAuthorizationException) { log.debug("Not authorized to create topic(s) '{}' upon the brokers {}." + " Falling back to assume topic(s) exist or will be auto-created by the broker.", topicNameList, bootstrapServers); - return Collections.emptySet(); + return EMPTY_CREATION; } if (cause instanceof InvalidConfigurationException) { throw new ConnectException("Unable to create topic(s) '" + topicNameList + "': " + cause.getMessage(), @@ -324,7 +415,7 @@ public Set createTopics(NewTopic... topics) { throw new ConnectException("Interrupted while attempting to create/find topic(s) '" + topicNameList + "'", e); } } - return newlyCreatedTopicNames; + return new TopicCreationResponse(newlyCreatedTopicNames, existingTopicNames); } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index fb5a4a6158..d2b7913731 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -159,7 +159,7 @@ public void testConnectors() { .createMock(); EasyMock.expect(herder.generation()).andStubReturn(generation); - EasyMock.expect(herder.config(connector)).andReturn(null); + EasyMock.expect(herder.rawConfig(connector)).andReturn(null); EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT); replayAll(); assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors())); @@ -183,7 +183,7 @@ public void testConnectorStatus() { .createMock(); EasyMock.expect(herder.generation()).andStubReturn(generation); - EasyMock.expect(herder.config(connector)).andReturn(null); + EasyMock.expect(herder.rawConfig(connector)).andReturn(null); EasyMock.expect(statusStore.get(connector)) .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation)); EasyMock.expect(statusStore.getAll(connector)) @@ -207,7 +207,7 @@ public void connectorStatus() { .createMock(); EasyMock.expect(herder.generation()).andStubReturn(generation); - EasyMock.expect(herder.config(connector)).andReturn(null); + EasyMock.expect(herder.rawConfig(connector)).andReturn(null); EasyMock.expect(statusStore.get(connector)) .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java index 9c115ac517..692f4d24b1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java @@ -79,6 +79,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; @@ -529,7 +530,10 @@ private void expectTopicDoesNotExist(String topic) { EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap()); Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true); + Set created = Collections.singleton(topic); + Set existing = Collections.emptySet(); + TopicAdmin.TopicCreationResponse response = new TopicAdmin.TopicCreationResponse(created, existing); + EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java index 222f35d46a..9784bd5588 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java @@ -80,6 +80,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -956,7 +957,7 @@ public void testSendRecordsTopicCreateRetries() throws Exception { expectPreliminaryCalls(); EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) + EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) .andThrow(new RetriableException(new TimeoutException("timeout"))); // Second round @@ -1034,7 +1035,7 @@ public void testSendRecordsTopicCreateRetriesMidway() throws Exception { EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap()); // First call to create the topic times out Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) + EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) .andThrow(new RetriableException(new TimeoutException("timeout"))); // Second round @@ -1085,7 +1086,7 @@ public void testTopicCreateFails() throws Exception { EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) + EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized"))); PowerMock.replayAll(); @@ -1096,7 +1097,7 @@ public void testTopicCreateFails() throws Exception { } @Test - public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() throws Exception { + public void testTopicCreateFailsWithExceptionWhenCreateReturnsTopicNotCreatedOrFound() throws Exception { createWorkerTask(); SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); @@ -1106,7 +1107,7 @@ public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() throws Exc EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(false); + EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(TopicAdmin.EMPTY_CREATION); PowerMock.replayAll(); @@ -1115,6 +1116,62 @@ public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() throws Exc assertTrue(newTopicCapture.hasCaptured()); } + @Test + public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectPreliminaryCalls(); + EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); + + Capture newTopicCapture = EasyMock.newCapture(); + EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC)); + + expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false, false); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + } + + @Test + public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectPreliminaryCalls(); + EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); + + Capture newTopicCapture = EasyMock.newCapture(); + EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); + + expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false, false); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + } + + private TopicAdmin.TopicCreationResponse createdTopic(String topic) { + Set created = Collections.singleton(topic); + Set existing = Collections.emptySet(); + return new TopicAdmin.TopicCreationResponse(created, existing); + } + + private TopicAdmin.TopicCreationResponse foundTopic(String topic) { + Set created = Collections.emptySet(); + Set existing = Collections.singleton(topic); + return new TopicAdmin.TopicCreationResponse(created, existing); + } + private void expectPreliminaryCalls() { expectPreliminaryCalls(TOPIC); } @@ -1431,7 +1488,7 @@ private void expectTopicCreation(String topic) { if (config.topicCreationEnable()) { EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap()); Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true); + EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic)); } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index e4b456df61..fa18b44fb8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -74,15 +74,14 @@ public class TopicAdminTest { * create no topics, and return false. */ @Test - public void returnNullWithApiVersionMismatchOnCreate() { + public void returnEmptyWithApiVersionMismatchOnCreate() { final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic)); TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - boolean created = admin.createTopic(newTopic); - assertFalse(created); + assertTrue(admin.createOrFindTopics(newTopic).isEmpty()); } } @@ -105,14 +104,16 @@ public void throwsWithApiVersionMismatchOnDescribe() { } @Test - public void returnNullWithClusterAuthorizationFailureOnCreate() { + public void returnEmptyWithClusterAuthorizationFailureOnCreate() { final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic)); TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - boolean created = admin.createTopic(newTopic); - assertFalse(created); + assertFalse(admin.createTopic(newTopic)); + + env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic)); + assertTrue(admin.createOrFindTopics(newTopic).isEmpty()); } } @@ -129,14 +130,16 @@ public void throwsWithClusterAuthorizationFailureOnDescribe() { } @Test - public void returnNullWithTopicAuthorizationFailureOnCreate() { + public void returnEmptyWithTopicAuthorizationFailureOnCreate() { final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic)); TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - boolean created = admin.createTopic(newTopic); - assertFalse(created); + assertFalse(admin.createTopic(newTopic)); + + env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic)); + assertTrue(admin.createOrFindTopics(newTopic).isEmpty()); } } @@ -161,6 +164,12 @@ public void shouldNotCreateTopicWhenItAlreadyExists() { mockAdminClient.addTopic(false, "myTopic", Collections.singletonList(topicPartitionInfo), null); TopicAdmin admin = new TopicAdmin(null, mockAdminClient); assertFalse(admin.createTopic(newTopic)); + assertTrue(admin.createTopics(newTopic).isEmpty()); + assertTrue(admin.createOrFindTopic(newTopic)); + TopicAdmin.TopicCreationResponse response = admin.createOrFindTopics(newTopic); + assertTrue(response.isCreatedOrExisting(newTopic.name())); + assertTrue(response.isExisting(newTopic.name())); + assertFalse(response.isCreated(newTopic.name())); } } @@ -509,7 +518,9 @@ protected void assertTopicCreation( clientBuilder.controller(0); try (MockAdminClient admin = clientBuilder.build()) { TopicAdmin topicClient = new TopicAdmin(null, admin, false); - assertTrue(topicClient.createTopic(newTopic)); + TopicAdmin.TopicCreationResponse response = topicClient.createOrFindTopics(newTopic); + assertTrue(response.isCreated(newTopic.name())); + assertFalse(response.isExisting(newTopic.name())); assertTopic(admin, newTopic.name(), expectedPartitions, expectedReplicas); } } diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 8ee60c4a3a..0a83fd9453 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -29,7 +29,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Lo import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles} -import org.apache.kafka.common.protocol.Writable +import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable} import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{Time, Utils} @@ -268,11 +268,11 @@ object TestRaftServer extends Logging { } private class ByteArraySerde extends RecordSerde[Array[Byte]] { - override def recordSize(data: Array[Byte], context: Any): Int = { + override def recordSize(data: Array[Byte], serializationCache: ObjectSerializationCache): Int = { data.length } - override def write(data: Array[Byte], context: Any, out: Writable): Unit = { + override def write(data: Array[Byte], serializationCache: ObjectSerializationCache, out: Writable): Unit = { out.writeByteArray(data) } diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index d04866ebe7..7b7078288e 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -308,12 +308,11 @@ class MetadataRequestTest extends BaseRequestTest { !response.brokers.asScala.exists(_.id == downNode.dataPlaneRequestProcessor.brokerId) }, "Replica was not found down", 5000) - // Validate version 0 still filters unavailable replicas and contains error val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(requestData(List(replicaDownTopic), true), 0.toShort)) val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq assertTrue(v0MetadataResponse.errors.isEmpty, "Response should have no errors") - assertFalse(v0BrokerIds.contains(downNode), s"The downed broker should not be in the brokers list") + assertFalse(v0BrokerIds.contains(downNode.config.brokerId), s"The downed broker should not be in the brokers list") assertTrue(v0MetadataResponse.topicMetadata.size == 1, "Response should have one topic") val v0PartitionMetadata = v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head assertTrue(v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE, "PartitionMetadata should have an error") @@ -323,7 +322,7 @@ class MetadataRequestTest extends BaseRequestTest { val v1MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1)) val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq assertTrue(v1MetadataResponse.errors.isEmpty, "Response should have no errors") - assertFalse(v1BrokerIds.contains(downNode), s"The downed broker should not be in the brokers list") + assertFalse(v1BrokerIds.contains(downNode.config.brokerId), s"The downed broker should not be in the brokers list") assertEquals(1, v1MetadataResponse.topicMetadata.size, "Response should have one topic") val v1PartitionMetadata = v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head assertEquals(Errors.NONE, v1PartitionMetadata.error, "PartitionMetadata should have no errors") diff --git a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala index 4d425c011c..e8f87bb036 100644 --- a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala +++ b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala @@ -21,7 +21,6 @@ import java.util.{Arrays, Properties} import kafka.api.KAFKA_2_7_IV0 import kafka.network.SocketServer -import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.DeleteTopicsRequestData diff --git a/generator/src/main/java/org/apache/kafka/message/MetadataRecordTypeGenerator.java b/generator/src/main/java/org/apache/kafka/message/MetadataRecordTypeGenerator.java index 6d70631f29..cb3db0c64a 100644 --- a/generator/src/main/java/org/apache/kafka/message/MetadataRecordTypeGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MetadataRecordTypeGenerator.java @@ -73,6 +73,10 @@ private void generate() { buffer.printf("%n"); generateAccessor("id", "short"); buffer.printf("%n"); + generateAccessor("lowestSupportedVersion", "short"); + buffer.printf("%n"); + generateAccessor("highestSupportedVersion", "short"); + buffer.printf("%n"); generateToString(); buffer.decrementIndent(); buffer.printf("}%n"); @@ -85,10 +89,12 @@ private void generateEnumValues() { MessageSpec spec = entry.getValue(); String name = spec.name(); numProcessed++; - buffer.printf("%s(\"%s\", (short) %d)%s%n", + buffer.printf("%s(\"%s\", (short) %d, (short) %d, (short) %d)%s%n", MessageGenerator.toSnakeCase(name).toUpperCase(Locale.ROOT), MessageGenerator.capitalizeFirst(name), entry.getKey(), + entry.getValue().validVersions().lowest(), + entry.getValue().validVersions().highest(), (numProcessed == apis.size()) ? ";" : ","); } } @@ -96,13 +102,17 @@ private void generateEnumValues() { private void generateInstanceVariables() { buffer.printf("private final String name;%n"); buffer.printf("private final short id;%n"); + buffer.printf("private final short lowestSupportedVersion;%n"); + buffer.printf("private final short highestSupportedVersion;%n"); } private void generateEnumConstructor() { - buffer.printf("MetadataRecordType(String name, short id) {%n"); + buffer.printf("MetadataRecordType(String name, short id, short lowestSupportedVersion, short highestSupportedVersion) {%n"); buffer.incrementIndent(); buffer.printf("this.name = name;%n"); buffer.printf("this.id = id;%n"); + buffer.printf("this.lowestSupportedVersion = lowestSupportedVersion;%n"); + buffer.printf("this.highestSupportedVersion = highestSupportedVersion;%n"); buffer.decrementIndent(); buffer.printf("}%n"); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/ApiMessageAndVersion.java b/metadata/src/main/java/org/apache/kafka/metadata/ApiMessageAndVersion.java new file mode 100644 index 0000000000..75ccb4807b --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/ApiMessageAndVersion.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.protocol.ApiMessage; + +import java.util.Objects; + +/** + * An ApiMessage and an associated version. + */ +public class ApiMessageAndVersion { + private final ApiMessage message; + private final short version; + + public ApiMessageAndVersion(ApiMessage message, short version) { + this.message = message; + this.version = version; + } + + public ApiMessage message() { + return message; + } + + public short version() { + return version; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ApiMessageAndVersion that = (ApiMessageAndVersion) o; + return version == that.version && + Objects.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(message, version); + } + + @Override + public String toString() { + return "ApiMessageAndVersion(" + message + " at version " + version + ")"; + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/RecordSerde.java b/raft/src/main/java/org/apache/kafka/raft/RecordSerde.java index 2921314b54..9581297de3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RecordSerde.java +++ b/raft/src/main/java/org/apache/kafka/raft/RecordSerde.java @@ -16,39 +16,36 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Writable; +/** + * Serde interface for records written to the Raft log. This class assumes + * a two-pass serialization, with the first pass used to compute the size of the + * serialized record, and the second pass to write the object. + */ public interface RecordSerde { /** - * Create a new context object for to be used when serializing a batch of records. - * This allows for state to be shared between {@link #recordSize(Object, Object)} - * and {@link #write(Object, Object, Writable)}, which is useful in order to avoid - * redundant work (see e.g. {@link org.apache.kafka.common.protocol.ObjectSerializationCache}). - * - * @return context object or null if none is needed - */ - default Object newWriteContext() { - return null; - } - - /** - * Get the size of a record. + * Get the size of a record. This must be called first before writing + * the data through {@link #write(Object, ObjectSerializationCache, Writable)}. * * @param data the record that will be serialized - * @param context context object created by {@link #newWriteContext()} + * @param serializationCache serialization cache * @return the size in bytes of the serialized record */ - int recordSize(T data, Object context); + int recordSize(T data, ObjectSerializationCache serializationCache); /** - * Write the record to the output stream. + * Write the record to the output stream. This must be called after + * computing the size with {@link #recordSize(Object, ObjectSerializationCache)}. + * The same {@link ObjectSerializationCache} instance must be used in both calls. * * @param data the record to serialize and write - * @param context context object created by {@link #newWriteContext()} + * @param serializationCache serialization cache * @param out the output stream to write the record to */ - void write(T data, Object context, Writable out); + void write(T data, ObjectSerializationCache serializationCache, Writable out); /** * Read a record from a {@link Readable} input. diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index 1e41743016..5331e4dd14 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -17,6 +17,7 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.utils.Time; @@ -99,10 +100,10 @@ public Long append(int epoch, List records) { return Long.MAX_VALUE; } - Object serdeContext = serde.newWriteContext(); + ObjectSerializationCache serializationCache = new ObjectSerializationCache(); int batchSize = 0; for (T record : records) { - batchSize += serde.recordSize(record, serdeContext); + batchSize += serde.recordSize(record, serializationCache); } if (batchSize > maxBatchSize) { @@ -125,7 +126,7 @@ public Long append(int epoch, List records) { } for (T record : records) { - batch.appendRecord(record, serdeContext); + batch.appendRecord(record, serializationCache); nextOffset += 1; } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java index a264f7bfeb..542bb5197c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java @@ -17,6 +17,7 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.protocol.DataOutputStreamWritable; +import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Writable; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionType; @@ -36,7 +37,7 @@ /** * Collect a set of records into a single batch. New records are added - * through {@link #appendRecord(Object, Object)}, but the caller must first + * through {@link #appendRecord(Object, ObjectSerializationCache)}, but the caller must first * check whether there is room using {@link #hasRoomFor(int)}. Once the * batch is ready, then {@link #build()} should be used to get the resulting * {@link MemoryRecords} instance. @@ -97,10 +98,10 @@ public BatchBuilder( * using {@link #hasRoomFor(int)}. * * @param record the record to append - * @param serdeContext serialization context for use in {@link RecordSerde#write(Object, Object, Writable)} + * @param serializationCache serialization cache for use in {@link RecordSerde#write(Object, ObjectSerializationCache, Writable)} * @return the offset of the appended batch */ - public long appendRecord(T record, Object serdeContext) { + public long appendRecord(T record, ObjectSerializationCache serializationCache) { if (!isOpenForAppends) { throw new IllegalArgumentException("Cannot append new records after the batch has been built"); } @@ -114,7 +115,7 @@ public long appendRecord(T record, Object serdeContext) { int recordSizeInBytes = writeRecord( offset, record, - serdeContext + serializationCache ); unflushedBytes += recordSizeInBytes; records.add(record); @@ -273,12 +274,12 @@ public MemoryRecords build() { public int writeRecord( long offset, T payload, - Object serdeContext + ObjectSerializationCache serializationCache ) { int offsetDelta = (int) (offset - baseOffset); long timestampDelta = 0; - int payloadSize = serde.recordSize(payload, serdeContext); + int payloadSize = serde.recordSize(payload, serializationCache); int sizeInBytes = DefaultRecord.sizeOfBodyInBytes( offsetDelta, timestampDelta, @@ -300,7 +301,7 @@ public int writeRecord( // Write value recordOutput.writeVarint(payloadSize); - serde.write(payload, serdeContext, recordOutput); + serde.write(payload, serializationCache, recordOutput); // Write headers (currently unused) recordOutput.writeVarint(0); diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java index c3f9244d5a..cf096dfe69 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.raft.internals; +import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Writable; import org.apache.kafka.common.utils.Utils; @@ -24,7 +25,7 @@ public class StringSerde implements RecordSerde { @Override - public int recordSize(String data, Object context) { + public int recordSize(String data, ObjectSerializationCache serializationCache) { return recordSize(data); } @@ -33,7 +34,7 @@ public int recordSize(String data) { } @Override - public void write(String data, Object context, Writable out) { + public void write(String data, ObjectSerializationCache serializationCache, Writable out) { out.writeByteArray(Utils.utf8(data)); } diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java new file mode 100644 index 0000000000..c740497fb3 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.metadata; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.RecordSerde; + +public class MetadataRecordSerde implements RecordSerde { + private static final short DEFAULT_FRAME_VERSION = 0; + private static final int DEFAULT_FRAME_VERSION_SIZE = ByteUtils.sizeOfUnsignedVarint(DEFAULT_FRAME_VERSION); + + @Override + public int recordSize(ApiMessageAndVersion data, ObjectSerializationCache serializationCache) { + int size = DEFAULT_FRAME_VERSION_SIZE; + size += ByteUtils.sizeOfUnsignedVarint(data.message().apiKey()); + size += ByteUtils.sizeOfUnsignedVarint(data.version()); + size += data.message().size(serializationCache, data.version()); + return size; + } + + @Override + public void write(ApiMessageAndVersion data, ObjectSerializationCache serializationCache, Writable out) { + out.writeUnsignedVarint(DEFAULT_FRAME_VERSION); + out.writeUnsignedVarint(data.message().apiKey()); + out.writeUnsignedVarint(data.version()); + data.message().write(out, serializationCache, data.version()); + } + + @Override + public ApiMessageAndVersion read(Readable input, int size) { + short frameVersion = (short) input.readUnsignedVarint(); + if (frameVersion != DEFAULT_FRAME_VERSION) { + throw new SerializationException("Could not deserialize metadata record due to unknown frame version " + + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)"); + } + + short apiKey = (short) input.readUnsignedVarint(); + short version = (short) input.readUnsignedVarint(); + MetadataRecordType recordType = MetadataRecordType.fromId(apiKey); + ApiMessage record = recordType.newMetadataRecord(); + record.read(input, version); + return new ApiMessageAndVersion(record, version); + } + +} diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 2b548d5a60..f94b85a8e8 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Writable; import org.apache.kafka.common.protocol.types.Type; @@ -1124,12 +1125,12 @@ void deliverAll() { private static class IntSerde implements RecordSerde { @Override - public int recordSize(Integer data, Object context) { + public int recordSize(Integer data, ObjectSerializationCache serializationCache) { return Type.INT32.sizeOf(data); } @Override - public void write(Integer data, Object context, Writable out) { + public void write(Integer data, ObjectSerializationCache serializationCache, Writable out) { out.writeInt(data); } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index 7d2aae5612..24e289db26 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Writable; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.MockTime; @@ -277,8 +278,11 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { releaseLockLatch.await(); writable.writeByteArray(Utils.utf8("b")); return null; - }).when(serde) - .write(Mockito.eq("b"), Mockito.eq(null), Mockito.any(Writable.class)); + }).when(serde).write( + Mockito.eq("b"), + Mockito.any(ObjectSerializationCache.class), + Mockito.any(Writable.class) + ); Thread appendThread = new Thread(() -> acc.append(leaderEpoch, singletonList("b"))); appendThread.start(); diff --git a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java b/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java new file mode 100644 index 0000000000..2071814ed9 --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.metadata; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class MetadataRecordSerdeTest { + + @Test + public void testSerde() { + TopicRecord topicRecord = new TopicRecord() + .setName("foo") + .setTopicId(Uuid.randomUuid()); + + MetadataRecordSerde serde = new MetadataRecordSerde(); + + for (short version = TopicRecord.LOWEST_SUPPORTED_VERSION; version <= TopicRecord.HIGHEST_SUPPORTED_VERSION; version++) { + ApiMessageAndVersion messageAndVersion = new ApiMessageAndVersion(topicRecord, version); + + ObjectSerializationCache cache = new ObjectSerializationCache(); + int size = serde.recordSize(messageAndVersion, cache); + + ByteBuffer buffer = ByteBuffer.allocate(size); + ByteBufferAccessor bufferAccessor = new ByteBufferAccessor(buffer); + + serde.write(messageAndVersion, cache, bufferAccessor); + buffer.flip(); + + assertEquals(size, buffer.remaining()); + ApiMessageAndVersion readMessageAndVersion = serde.read(bufferAccessor, size); + assertEquals(messageAndVersion, readMessageAndVersion); + } + } + + @Test + public void testDeserializeWithUnhandledFrameVersion() { + ByteBuffer buffer = ByteBuffer.allocate(16); + ByteUtils.writeUnsignedVarint(15, buffer); + buffer.flip(); + + MetadataRecordSerde serde = new MetadataRecordSerde(); + assertThrows(SerializationException.class, + () -> serde.read(new ByteBufferAccessor(buffer), 16)); + } + +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 3c6cbab47b..de048d1994 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -787,8 +787,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final Time time) throws StreamsException { this.config = config; this.time = time; + + this.internalTopologyBuilder = internalTopologyBuilder; + internalTopologyBuilder.rewriteTopology(config); + + // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception + taskTopology = internalTopologyBuilder.buildTopology(); + globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + + final boolean hasGlobalTopology = globalTaskTopology != null; + final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || + (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); + + try { + stateDirectory = new StateDirectory(config, time, hasPersistentStores); + processId = stateDirectory.initializeProcessId(); + } catch (final ProcessorStateException fatal) { + throw new StreamsException(fatal); + } + + // The application ID is a required config and hence should always have value - processId = UUID.randomUUID(); final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); if (userClientId.length() <= 0) { @@ -798,73 +817,42 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId)); this.log = logContext.logger(getClass()); + + // use client id instead of thread client id since this admin client may be shared among threads this.clientSupplier = clientSupplier; - final MetricConfig metricConfig = new MetricConfig() - .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) - .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) - .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); - final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class, - Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); - final JmxReporter jmxReporter = new JmxReporter(); - jmxReporter.configure(config.originals()); - reporters.add(jmxReporter); - final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, - config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); - metrics = new Metrics(metricConfig, reporters, time, metricsContext); + adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); + + log.info("Kafka Streams version: {}", ClientMetrics.version()); + log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); + + metrics = getMetrics(config, time, clientId); streamsMetrics = new StreamsMetricsImpl( metrics, clientId, config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), time ); + ClientMetrics.addVersionMetric(streamsMetrics); ClientMetrics.addCommitIdMetric(streamsMetrics); ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString()); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); - log.info("Kafka Streams version: {}", ClientMetrics.version()); - log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); - this.internalTopologyBuilder = internalTopologyBuilder; - // re-write the physical topology according to the config - internalTopologyBuilder.rewriteTopology(config); + ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> + Math.toIntExact(countStreamThread(thread -> thread.state().isAlive()))); - // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception - taskTopology = internalTopologyBuilder.buildTopology(); streamsMetadataState = new StreamsMetadataState( - internalTopologyBuilder, - parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); - - final int numStreamThreads; - if (internalTopologyBuilder.hasNoNonGlobalTopology()) { - log.info("Overriding number of StreamThreads to zero for global-only topology"); - numStreamThreads = 0; - } else { - numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); - } - - // create the stream thread, global update thread, and cleanup thread - threads = Collections.synchronizedList(new LinkedList<>()); - globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); - final boolean hasGlobalTopology = globalTaskTopology != null; + internalTopologyBuilder, + parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); - if (numStreamThreads == 0 && !hasGlobalTopology) { - log.error("Topology with no input topics will create no stream threads and no global thread."); - throw new TopologyException("Topology has no stream threads and no global threads, " + - "must subscribe to at least one source topic or global table."); - } oldHandler = false; - totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); - final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads); - final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || - (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler; - try { - stateDirectory = new StateDirectory(config, time, hasPersistentStores); - } catch (final ProcessorStateException fatal) { - throw new StreamsException(fatal); - } delegatingStateRestoreListener = new DelegatingStateRestoreListener(); + + totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); + final int numStreamThreads = getNumStreamThreads(hasGlobalTopology); + final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads); + GlobalStreamThread.State globalThreadState = null; if (hasGlobalTopology) { final String globalThreadId = clientId + "-GlobalStreamThread"; @@ -883,23 +871,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, globalThreadState = globalStreamThread.state(); } - // use client id instead of thread client id since this admin client may be shared among threads - adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); - + threads = Collections.synchronizedList(new LinkedList<>()); threadState = new HashMap<>(numStreamThreads); - storeProviders = new ArrayList<>(); streamStateListener = new StreamStateListener(threadState, globalThreadState); + + final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores()); + if (hasGlobalTopology) { globalStreamThread.setStateListener(streamStateListener); } + + storeProviders = new ArrayList<>(); for (int i = 1; i <= numStreamThreads; i++) { createAndAddStreamThread(cacheSizePerThread, i); } - - ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> - Math.toIntExact(countStreamThread(thread -> thread.state().isAlive()))); - - final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores()); queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); @@ -932,6 +917,39 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin return streamThread; } + private static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) { + final MetricConfig metricConfig = new MetricConfig() + .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); + final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class, + Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); + final JmxReporter jmxReporter = new JmxReporter(); + jmxReporter.configure(config.originals()); + reporters.add(jmxReporter); + final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, + config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); + return new Metrics(metricConfig, reporters, time, metricsContext); + } + + private int getNumStreamThreads(final boolean hasGlobalTopology) { + final int numStreamThreads; + if (internalTopologyBuilder.hasNoNonGlobalTopology()) { + log.info("Overriding number of StreamThreads to zero for global-only topology"); + numStreamThreads = 0; + } else { + numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + } + + if (numStreamThreads == 0 && !hasGlobalTopology) { + log.error("Topology with no input topics will create no stream threads and no global thread."); + throw new TopologyException("Topology has no stream threads and no global threads, " + + "must subscribe to at least one source topic or global table."); + } + return numStreamThreads; + } + /** * Adds and starts a stream thread in addition to the stream threads that are already running in this * Kafka Streams client. @@ -1263,6 +1281,7 @@ private Thread shutdownHelper(final boolean error) { globalStreamThread = null; } + stateDirectory.close(); adminClient.close(); streamsMetrics.removeAllClientLevelSensorsAndMetrics(); @@ -1276,24 +1295,29 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { - if (state == State.ERROR) { - log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); + if (state == State.ERROR || state == State.NOT_RUNNING) { + log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); return true; } - if (state == State.PENDING_ERROR) { - log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped."); - if (waitOnState(State.ERROR, timeoutMs)) { + if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) { + log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); + if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { log.info("Streams client stopped to ERROR completely"); return true; + } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { + log.info("Streams client stopped to NOT_RUNNING completely"); + return true; } else { - log.info("Streams client cannot transition to ERROR completely within the timeout"); + log.warn("Streams client cannot transition to {}} completely within the timeout", + state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : State.ERROR); return false; } } + if (!setState(State.PENDING_SHUTDOWN)) { - // if transition failed, it means it was either in PENDING_SHUTDOWN - // or NOT_RUNNING already; just check that all threads have been stopped - log.info("Already in the pending shutdown state, wait to complete shutdown"); + // if we can't transition to PENDING_SHUTDOWN but not because we're already shutting down, then it must be fatal + log.error("Failed to transition to PENDING_SHUTDOWN, current state is {}", state); + throw new StreamsException("Failed to shut down while in state " + state); } else { final Thread shutdownThread = shutdownHelper(false); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index e7e8e0bcff..01f62e72a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; + import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -25,6 +26,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; import java.io.IOException; import java.nio.channels.FileChannel; @@ -39,6 +43,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; @@ -54,14 +60,37 @@ public class StateDirectory { private static final Logger log = LoggerFactory.getLogger(StateDirectory.class); static final String LOCK_FILE_NAME = ".lock"; + /* The process file is used to persist the process id across restarts. + * For compatibility reasons you should only ever add fields to the json schema + */ + static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata"; + + @JsonIgnoreProperties(ignoreUnknown = true) + static class StateDirectoryProcessFile { + @JsonProperty + private final UUID processId; + + public StateDirectoryProcessFile() { + this.processId = null; + } + + StateDirectoryProcessFile(final UUID processId) { + this.processId = processId; + } + } + private final Object taskDirCreationLock = new Object(); private final Time time; private final String appId; private final File stateDir; private final boolean hasPersistentStores; + private final HashMap channels = new HashMap<>(); private final HashMap locks = new HashMap<>(); + private FileChannel stateDirLockChannel; + private FileLock stateDirLock; + private FileChannel globalStateChannel; private FileLock globalStateLock; @@ -104,17 +133,19 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean throw new ProcessorStateException( String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath())); } + if (stateDirName.startsWith(System.getProperty("java.io.tmpdir"))) { log.warn("Using an OS temp directory in the state.dir property can cause failures with writing" + " the checkpoint file due to the fact that this directory can be cleared by the OS." + " Resolved state.dir: [" + stateDirName + "]"); + } // change the dir permission to "rwxr-x---" to avoid world readable configurePermissions(baseDir); configurePermissions(stateDir); } } - + private void configurePermissions(final File file) { final Path path = file.toPath(); if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) { @@ -134,6 +165,60 @@ private void configurePermissions(final File file) { } } + /** + * @return true if the state directory was successfully locked + */ + private boolean lockStateDirectory() { + final File lockFile = new File(stateDir, LOCK_FILE_NAME); + try { + stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); + stateDirLock = tryLock(stateDirLockChannel); + } catch (final IOException e) { + log.error("Unable to lock the state directory due to unexpected exception", e); + throw new ProcessorStateException("Failed to lock the state directory during startup", e); + } + + return stateDirLock != null; + } + + public UUID initializeProcessId() { + if (!hasPersistentStores) { + return UUID.randomUUID(); + } + + if (!lockStateDirectory()) { + log.error("Unable to obtain lock as state directory is already locked by another process"); + throw new StreamsException("Unable to initialize state, this can happen if multiple instances of " + + "Kafka Streams are running in the same state directory"); + } + + final File processFile = new File(stateDir, PROCESS_FILE_NAME); + final ObjectMapper mapper = new ObjectMapper(); + + try { + if (processFile.exists()) { + try { + final StateDirectoryProcessFile processFileData = mapper.readValue(processFile, StateDirectoryProcessFile.class); + log.info("Reading UUID from process file: {}", processFileData.processId); + if (processFileData.processId != null) { + return processFileData.processId; + } + } catch (final Exception e) { + log.warn("Failed to read json process file", e); + } + } + + final StateDirectoryProcessFile processFileData = new StateDirectoryProcessFile(UUID.randomUUID()); + log.info("No process id found on disk, got fresh process id {}", processFileData.processId); + + mapper.writeValue(processFile, processFileData); + return processFileData.processId; + } catch (final IOException e) { + log.error("Unable to read/write process file due to unexpected exception", e); + throw new ProcessorStateException(e); + } + } + /** * Get or create the directory for the provided {@link TaskId}. * @return directory for the {@link TaskId} @@ -311,14 +396,36 @@ synchronized void unlock(final TaskId taskId) throws IOException { } } + public void close() { + if (hasPersistentStores) { + try { + stateDirLock.release(); + stateDirLockChannel.close(); + + stateDirLock = null; + stateDirLockChannel = null; + } catch (final IOException e) { + log.error("Unexpected exception while unlocking the state dir", e); + throw new StreamsException("Failed to release the lock on the state directory", e); + } + + // all threads should be stopped and cleaned up by now, so none should remain holding a lock + if (locks.isEmpty()) { + log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", locks); + } + if (globalStateLock != null) { + log.error("Global state lock is present while closing the state, this indicates unclean shutdown"); + } + } + } + public synchronized void clean() { - // remove task dirs try { cleanRemovedTasksCalledByUser(); } catch (final Exception e) { throw new StreamsException(e); } - // remove global dir + try { if (stateDir.exists()) { Utils.delete(globalStateDir().getAbsoluteFile()); @@ -385,6 +492,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { } private void cleanRemovedTasksCalledByUser() throws Exception { + final AtomicReference firstException = new AtomicReference<>(); for (final File taskDir : listAllTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parse(dirName); @@ -404,7 +512,7 @@ private void cleanRemovedTasksCalledByUser() throws Exception { logPrefix(), dirName, id), exception ); - throw exception; + firstException.compareAndSet(null, exception); } finally { try { unlock(id); @@ -417,11 +525,15 @@ private void cleanRemovedTasksCalledByUser() throws Exception { logPrefix(), dirName, id), exception ); - throw exception; + firstException.compareAndSet(null, exception); } } } } + final Exception exception = firstException.get(); + if (exception != null) { + throw exception; + } } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 0fb36836fd..a4cd8bf22c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -880,6 +880,8 @@ public void shouldCleanupOldStateDirs() throws Exception { anyObject(Time.class), EasyMock.eq(true) ).andReturn(stateDirectory); + EasyMock.expect(stateDirectory.initializeProcessId()).andReturn(UUID.randomUUID()); + stateDirectory.close(); PowerMock.replayAll(Executors.class, cleanupSchedule, stateDirectory); props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1"); @@ -1049,6 +1051,7 @@ private void startStreamsAndCheckDirExists(final Topology topology, anyObject(Time.class), EasyMock.eq(shouldFilesExist) ).andReturn(stateDirectory); + EasyMock.expect(stateDirectory.initializeProcessId()).andReturn(UUID.randomUUID()); PowerMock.replayAll(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 0f1c0e4d20..0366ff91e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -621,7 +621,10 @@ public void shouldGoThroughRebalancingCorrectly() throws Exception { .to(outputTopic); startStreams(builder); - final KafkaStreams kafkaStreamsToClose = startStreams(builder); + final Properties streamsToCloseConfigs = new Properties(); + streamsToCloseConfigs.putAll(streamsConfiguration); + streamsToCloseConfigs.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + "-2"); + final KafkaStreams kafkaStreamsToClose = startStreams(builder, streamsToCloseConfigs); validateReceivedMessages( new StringDeserializer(), @@ -723,12 +726,24 @@ private void sendEvents(final String topic, } private KafkaStreams startStreams(final StreamsBuilder builder) throws InterruptedException { - return startStreams(builder, REBALANCING, RUNNING, null); + return startStreams(builder, REBALANCING, RUNNING, streamsConfiguration, null); + } + + private KafkaStreams startStreams(final StreamsBuilder builder, final Properties streamsConfiguration) throws InterruptedException { + return startStreams(builder, REBALANCING, RUNNING, streamsConfiguration, null); + } + + private KafkaStreams startStreams(final StreamsBuilder builder, + final State expectedOldState, + final State expectedNewState, + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException { + return startStreams(builder, expectedOldState, expectedNewState, streamsConfiguration, uncaughtExceptionHandler); } private KafkaStreams startStreams(final StreamsBuilder builder, final State expectedOldState, final State expectedNewState, + final Properties streamsConfiguration, final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException { final CountDownLatch latch; final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java index eeac500eba..b52b638167 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java @@ -50,7 +50,6 @@ import org.junit.experimental.categories.Category; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -58,6 +57,7 @@ import java.util.Set; import java.util.function.Function; +import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; @Category({IntegrationTest.class}) @@ -71,7 +71,9 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { private final static String TABLE_2 = "table2"; private final static String TABLE_3 = "table3"; private final static String OUTPUT = "output-"; - private static Properties streamsConfig; + private final Properties streamsConfig = getStreamsConfig(); + private final Properties streamsConfigTwo = getStreamsConfig(); + private final Properties streamsConfigThree = getStreamsConfig(); private KafkaStreams streams; private KafkaStreams streamsTwo; private KafkaStreams streamsThree; @@ -105,14 +107,8 @@ public static void beforeTest() throws Exception { PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - streamsConfig = new Properties(); - streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - final List> table1 = Arrays.asList( + final List> table1 = asList( new KeyValue<>(1, 1.33f), new KeyValue<>(2, 2.22f), new KeyValue<>(3, -1.22f), //Won't be joined in yet. @@ -120,7 +116,7 @@ public static void beforeTest() throws Exception { ); //Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised. - final List> table2 = Arrays.asList( + final List> table2 = asList( new KeyValue<>("0", 0L), //partition 2 new KeyValue<>("1", 10L), //partition 0 new KeyValue<>("2", 20L), //partition 2 @@ -150,7 +146,12 @@ public static void beforeTest() throws Exception { @Before public void before() throws IOException { - IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); + final String stateDirBasePath = TestUtils.tempDirectory().getPath(); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1"); + streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2"); + streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3"); + + IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree)); } @After @@ -187,11 +188,10 @@ private void verifyKTableKTableJoin(final JoinType joinType, final boolean verifyQueryableState) throws Exception { final String queryableName = verifyQueryableState ? joinType + "-store1" : null; final String queryableNameTwo = verifyQueryableState ? joinType + "-store2" : null; - streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType + queryableName); - streams = prepareTopology(queryableName, queryableNameTwo); - streamsTwo = prepareTopology(queryableName, queryableNameTwo); - streamsThree = prepareTopology(queryableName, queryableNameTwo); + streams = prepareTopology(queryableName, queryableNameTwo, streamsConfig); + streamsTwo = prepareTopology(queryableName, queryableNameTwo, streamsConfigTwo); + streamsThree = prepareTopology(queryableName, queryableNameTwo, streamsConfigThree); streams.start(); streamsTwo.start(); streamsThree.start(); @@ -204,7 +204,20 @@ private void verifyKTableKTableJoin(final JoinType joinType, assertEquals(expectedResult, result); } - private KafkaStreams prepareTopology(final String queryableName, final String queryableNameTwo) { + private static Properties getStreamsConfig() { + final Properties streamsConfig = new Properties(); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi"); + streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + return streamsConfig; + } + + private static KafkaStreams prepareTopology(final String queryableName, + final String queryableNameTwo, + final Properties streamsConfig) { + final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 2506ed7f03..799b135ef6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -94,6 +95,11 @@ public static Collection data() { private String storeName; private String outputTopic; + private KafkaStreams streamInstanceOne; + private KafkaStreams streamInstanceTwo; + private KafkaStreams streamInstanceOneRecovery; + + @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); @@ -112,6 +118,19 @@ public void createTopics() throws Exception { CLUSTER.createTopic(outputTopic, 1, 3); } + @After + public void cleanUp() { + if (streamInstanceOne != null) { + streamInstanceOne.close(); + } + if (streamInstanceTwo != null) { + streamInstanceTwo.close(); + } + if (streamInstanceOneRecovery != null) { + streamInstanceOneRecovery.close(); + } + } + @Test public void shouldSurviveWithOneTaskAsStandby() throws Exception { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( @@ -132,21 +151,19 @@ public void shouldSurviveWithOneTaskAsStandby() throws Exception { final CountDownLatch instanceLatch = new CountDownLatch(1); - try ( - final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch); - final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch) - ) { - startApplicationAndWaitUntilRunning(asList(streamInstanceOne, streamInstanceTwo), Duration.ofSeconds(60)); + streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch); + streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch); - // Wait for the record to be processed - assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); + startApplicationAndWaitUntilRunning(asList(streamInstanceOne, streamInstanceTwo), Duration.ofSeconds(60)); - streamInstanceOne.close(Duration.ZERO); - streamInstanceTwo.close(Duration.ZERO); + // Wait for the record to be processed + assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); - streamInstanceOne.cleanUp(); - streamInstanceTwo.cleanUp(); - } + streamInstanceOne.close(Duration.ZERO); + streamInstanceTwo.close(Duration.ZERO); + + streamInstanceOne.cleanUp(); + streamInstanceTwo.cleanUp(); } private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, @@ -195,125 +212,123 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc 10L + time ); - try ( - final KafkaStreams streamInstanceOne = buildWithDeduplicationTopology(base + "-1"); - final KafkaStreams streamInstanceTwo = buildWithDeduplicationTopology(base + "-2"); - final KafkaStreams streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1") - ) { - // start first instance and wait for processing - - startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30)); - IntegrationTestUtils.waitUntilMinRecordsReceived( - TestUtils.consumerConfig( - CLUSTER.bootstrapServers(), - IntegerDeserializer.class, - IntegerDeserializer.class - ), - outputTopic, - 1 - ); - - // start second instance and wait for standby replication - startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30)); - waitForCondition( - () -> streamInstanceTwo.store( - StoreQueryParameters.fromNameAndType( - storeName, - QueryableStoreTypes.keyValueStore() - ).enableStaleStores() - ).get(KEY_0) != null, - REBALANCE_TIMEOUT, - "Could not get key from standby store" - ); - // sanity check that first instance is still active - waitForCondition( - () -> streamInstanceOne.store( - StoreQueryParameters.fromNameAndType( - storeName, - QueryableStoreTypes.keyValueStore() - ) - ).get(KEY_0) != null, - "Could not get key from main store" - ); - - // inject poison pill and wait for crash of first instance and recovery on second instance - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - inputTopic, - Collections.singletonList( - new KeyValue<>(KEY_1, 0) - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - IntegerSerializer.class, - new Properties() - ), - 10L + time - ); - waitForCondition( - () -> streamInstanceOne.state() == KafkaStreams.State.ERROR, - "Stream instance 1 did not go into error state" - ); - streamInstanceOne.close(); + streamInstanceOne = buildWithDeduplicationTopology(base + "-1"); + streamInstanceTwo = buildWithDeduplicationTopology(base + "-2"); - IntegrationTestUtils.waitUntilMinRecordsReceived( - TestUtils.consumerConfig( - CLUSTER.bootstrapServers(), - IntegerDeserializer.class, - IntegerDeserializer.class - ), - outputTopic, - 2 - ); - - // "restart" first client and wait for standby recovery - // (could actually also be active, but it does not matter as long as we enable "state stores" - startApplicationAndWaitUntilRunning( - Collections.singletonList(streamInstanceOneRecovery), - Duration.ofSeconds(30) - ); - waitForCondition( - () -> streamInstanceOneRecovery.store( - StoreQueryParameters.fromNameAndType( - storeName, - QueryableStoreTypes.keyValueStore() - ).enableStaleStores() - ).get(KEY_0) != null, - "Could not get key from recovered standby store" - ); + // start first instance and wait for processing + startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30)); + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class + ), + outputTopic, + 1 + ); - streamInstanceTwo.close(); - waitForCondition( - () -> streamInstanceOneRecovery.store( - StoreQueryParameters.fromNameAndType( - storeName, - QueryableStoreTypes.keyValueStore() - ) - ).get(KEY_0) != null, - REBALANCE_TIMEOUT, - "Could not get key from recovered main store" - ); - - // re-inject poison pill and wait for crash of first instance - skipRecord.set(false); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - inputTopic, - Collections.singletonList( - new KeyValue<>(KEY_1, 0) - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - IntegerSerializer.class, - new Properties() - ), - 10L + time - ); - waitForCondition( - () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR, - "Stream instance 1 did not go into error state. Is in " + streamInstanceOneRecovery.state() + " state." - ); - } + // start second instance and wait for standby replication + startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30)); + waitForCondition( + () -> streamInstanceTwo.store( + StoreQueryParameters.fromNameAndType( + storeName, + QueryableStoreTypes.keyValueStore() + ).enableStaleStores() + ).get(KEY_0) != null, + REBALANCE_TIMEOUT, + "Could not get key from standby store" + ); + // sanity check that first instance is still active + waitForCondition( + () -> streamInstanceOne.store( + StoreQueryParameters.fromNameAndType( + storeName, + QueryableStoreTypes.keyValueStore() + ) + ).get(KEY_0) != null, + "Could not get key from main store" + ); + + // inject poison pill and wait for crash of first instance and recovery on second instance + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic, + Collections.singletonList( + new KeyValue<>(KEY_1, 0) + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class, + new Properties() + ), + 10L + time + ); + waitForCondition( + () -> streamInstanceOne.state() == KafkaStreams.State.ERROR, + "Stream instance 1 did not go into error state" + ); + streamInstanceOne.close(); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class + ), + outputTopic, + 2 + ); + + streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1"); + + // "restart" first client and wait for standby recovery + // (could actually also be active, but it does not matter as long as we enable "state stores" + startApplicationAndWaitUntilRunning( + Collections.singletonList(streamInstanceOneRecovery), + Duration.ofSeconds(30) + ); + waitForCondition( + () -> streamInstanceOneRecovery.store( + StoreQueryParameters.fromNameAndType( + storeName, + QueryableStoreTypes.keyValueStore() + ).enableStaleStores() + ).get(KEY_0) != null, + "Could not get key from recovered standby store" + ); + + streamInstanceTwo.close(); + waitForCondition( + () -> streamInstanceOneRecovery.store( + StoreQueryParameters.fromNameAndType( + storeName, + QueryableStoreTypes.keyValueStore() + ) + ).get(KEY_0) != null, + REBALANCE_TIMEOUT, + "Could not get key from recovered main store" + ); + + // re-inject poison pill and wait for crash of first instance + skipRecord.set(false); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic, + Collections.singletonList( + new KeyValue<>(KEY_1, 0) + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class, + new Properties() + ), + 10L + time + ); + waitForCondition( + () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR, + "Stream instance 1 did not go into error state. Is in " + streamInstanceOneRecovery.state() + " state." + ); } private KafkaStreams buildWithDeduplicationTopology(final String stateDirPath) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index 153f434b64..70082a0a17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -518,8 +518,8 @@ public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi() shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi( - new KafkaStreams(streamsBuilderForOldStore.build(), props()), - new KafkaStreams(streamsBuilderForNewStore.build(), props()), + streamsBuilderForOldStore, + streamsBuilderForNewStore, false); } @@ -554,17 +554,17 @@ public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi( .stream(inputStream) .process(TimestampedWindowedProcessor::new, STORE_NAME); - final Properties props = props(); shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi( - new KafkaStreams(streamsBuilderForOldStore.build(), props), - new KafkaStreams(streamsBuilderForNewStore.build(), props), + streamsBuilderForOldStore, + streamsBuilderForNewStore, true); } - private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final KafkaStreams kafkaStreamsOld, - final KafkaStreams kafkaStreamsNew, + private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final StreamsBuilder streamsBuilderForOldStore, + final StreamsBuilder streamsBuilderForNewStore, final boolean persistentStore) throws Exception { - kafkaStreams = kafkaStreamsOld; + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props); kafkaStreams.start(); processWindowedKeyValueAndVerifyPlainCount(1, singletonList( @@ -608,7 +608,7 @@ private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final Kaf kafkaStreams = null; - kafkaStreams = kafkaStreamsNew; + kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props); kafkaStreams.start(); verifyWindowedCountWithTimestamp(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L, lastUpdateKeyOne); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 4d2ca1fb9d..4afa6a05f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -153,6 +153,17 @@ public static void purgeLocalStreamsState(final Properties streamsConfiguration) } } + /** + * Removes local state stores. Useful to reset state in-between integration test runs. + * + * @param streamsConfigurations Streams configuration settings + */ + public static void purgeLocalStreamsState(final Collection streamsConfigurations) throws IOException { + for (final Properties streamsConfig : streamsConfigurations) { + purgeLocalStreamsState(streamsConfig); + } + } + public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final String... topics) { cleanStateBeforeTest(cluster, 1, topics); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index fe53e1259c..807a734cf2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -16,6 +16,13 @@ */ package org.apache.kafka.streams.processor.internals; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.UUID; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; @@ -54,6 +61,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME; +import static org.apache.kafka.streams.processor.internals.StateDirectory.PROCESS_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.endsWith; @@ -70,7 +78,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - public class StateDirectoryTest { private final MockTime time = new MockTime(); @@ -324,7 +331,6 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws } } - @Test public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() { final File dir = directory.directoryForTask(new TaskId(2, 0)); @@ -642,7 +648,7 @@ public void shouldLogStateDirCleanerMessage() { final long cleanupDelayMs = 0; time.sleep(5000); directory.cleanRemovedTasks(cleanupDelayMs); - assertThat(appender.getMessages(), hasItem(endsWith("ms has elapsed (cleanup delay is " + cleanupDelayMs + "ms)."))); + assertThat(appender.getMessages(), hasItem(endsWith("ms has elapsed (cleanup delay is " + cleanupDelayMs + "ms)."))); } } @@ -662,12 +668,78 @@ public void shouldLogTempDirMessage() { assertThat( appender.getMessages(), hasItem("Using an OS temp directory in the state.dir property can cause failures with writing the" + - " checkpoint file due to the fact that this directory can be cleared by the OS." + - " Resolved state.dir: [/tmp/kafka-streams]") + " checkpoint file due to the fact that this directory can be cleared by the OS." + + " Resolved state.dir: [" + System.getProperty("java.io.tmpdir") + "/kafka-streams]") ); } } + @Test + public void shouldPersistProcessIdAcrossRestart() { + final UUID processId = directory.initializeProcessId(); + directory.close(); + assertThat(directory.initializeProcessId(), equalTo(processId)); + } + + @Test + public void shouldGetFreshProcessIdIfProcessFileDeleted() { + final UUID processId = directory.initializeProcessId(); + directory.close(); + + final File processFile = new File(appDir, PROCESS_FILE_NAME); + assertThat(processFile.exists(), is(true)); + assertThat(processFile.delete(), is(true)); + + assertThat(directory.initializeProcessId(), not(processId)); + } + + @Test + public void shouldGetFreshProcessIdIfJsonUnreadable() throws Exception { + final File processFile = new File(appDir, PROCESS_FILE_NAME); + assertThat(processFile.createNewFile(), is(true)); + final UUID processId = UUID.randomUUID(); + + final FileOutputStream fileOutputStream = new FileOutputStream(processFile); + try (final BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { + writer.write(processId.toString()); + writer.flush(); + fileOutputStream.getFD().sync(); + } + + assertThat(directory.initializeProcessId(), not(processId)); + } + + @Test + public void shouldReadFutureProcessFileFormat() throws Exception { + final File processFile = new File(appDir, PROCESS_FILE_NAME); + final ObjectMapper mapper = new ObjectMapper(); + final UUID processId = UUID.randomUUID(); + mapper.writeValue(processFile, new FutureStateDirectoryProcessFile(processId, "some random junk")); + + assertThat(directory.initializeProcessId(), equalTo(processId)); + } + + private static class FutureStateDirectoryProcessFile { + + @JsonProperty + private final UUID processId; + + @JsonProperty + private final String newField; + + public FutureStateDirectoryProcessFile() { + this.processId = null; + this.newField = null; + } + + FutureStateDirectoryProcessFile(final UUID processId, final String newField) { + this.processId = processId; + this.newField = newField; + + } + } + private static class CreateTaskDirRunner implements Runnable { private final StateDirectory directory; private final TaskId taskId;