Skip to content

Commit

Permalink
KAFKA-12193: Re-resolve IPs after a client disconnects (#9902)
Browse files Browse the repository at this point in the history
This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address.

Reviewers: Mickael Maison <[email protected]>, Satish Duggana <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
a0x8o committed Feb 4, 2021
1 parent 4b511a7 commit 88ad7d1
Show file tree
Hide file tree
Showing 43 changed files with 1,360 additions and 380 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,7 @@ project(':raft') {

dependencies {
compile project(':clients')
compile project(':metadata')
compile libs.slf4jApi
compile libs.jacksonDatabind

Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,12 @@

<subpackage name="raft">
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time ti
clientSaslMechanism, time, true, logContext);
}

static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
InetAddress[] addresses = InetAddress.getAllByName(host);
static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup,
HostResolver hostResolver) throws UnknownHostException {
InetAddress[] addresses = hostResolver.resolve(host);

switch (clientDnsLookup) {
case DEFAULT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ final class ClusterConnectionStates {
final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;
private final Map<String, NodeConnectionState> nodeState;
private final Logger log;
private final HostResolver hostResolver;
private Set<String> connectingNodes;
private ExponentialBackoff reconnectBackoff;
private ExponentialBackoff connectionSetupTimeout;

public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
LogContext logContext) {
LogContext logContext, HostResolver hostResolver) {
this.log = logContext.logger(ClusterConnectionStates.class);
this.reconnectBackoff = new ExponentialBackoff(
reconnectBackoffMs,
Expand All @@ -63,6 +64,7 @@ public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMax
CONNECTION_SETUP_TIMEOUT_JITTER);
this.nodeState = new HashMap<>();
this.connectingNodes = new HashSet<>();
this.hostResolver = hostResolver;
}

/**
Expand Down Expand Up @@ -156,7 +158,8 @@ public void connecting(String id, long now, String host, ClientDnsLookup clientD
// Create a new NodeConnectionState if nodeState does not already contain one
// for the specified id or if the hostname associated with the node id changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, clientDnsLookup));
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host,
clientDnsLookup, hostResolver));
connectingNodes.add(id);
}

Expand All @@ -183,6 +186,11 @@ public void disconnected(String id, long now) {
connectingNodes.remove(id);
} else {
resetConnectionSetupTimeout(nodeState);
if (nodeState.state.isConnected()) {
// If a connection had previously been established, clear the addresses to trigger a new DNS resolution
// because the node IPs may have changed
nodeState.clearAddresses();
}
}
nodeState.state = ConnectionState.DISCONNECTED;
}
Expand Down Expand Up @@ -470,9 +478,11 @@ private static class NodeConnectionState {
private int addressIndex;
private final String host;
private final ClientDnsLookup clientDnsLookup;
private final HostResolver hostResolver;

private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup) {
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup,
HostResolver hostResolver) {
this.state = state;
this.addresses = Collections.emptyList();
this.addressIndex = -1;
Expand All @@ -484,6 +494,7 @@ private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long
this.throttleUntilTimeMs = 0;
this.host = host;
this.clientDnsLookup = clientDnsLookup;
this.hostResolver = hostResolver;
}

public String host() {
Expand All @@ -498,7 +509,7 @@ public String host() {
private InetAddress currentAddress() throws UnknownHostException {
if (addresses.isEmpty()) {
// (Re-)initialize list
addresses = ClientUtils.resolve(host, clientDnsLookup);
addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver);
addressIndex = 0;
}

Expand All @@ -518,6 +529,13 @@ private void moveToNextAddress() {
addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
}

/**
* Clears the resolved addresses in order to trigger re-resolving on the next {@link #currentAddress()} call.
*/
private void clearAddresses() {
addresses = Collections.emptyList();
}

public String toString() {
return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class DefaultHostResolver implements HostResolver {

@Override
public InetAddress[] resolve(String host) throws UnknownHostException {
return InetAddress.getAllByName(host);
}
}
26 changes: 26 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/HostResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients;

import java.net.InetAddress;
import java.net.UnknownHostException;

public interface HostResolver {

InetAddress[] resolve(String host) throws UnknownHostException;
}
80 changes: 41 additions & 39 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ public NetworkClient(Selectable selector,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
LogContext logContext) {
this(null,
this(selector,
metadata,
selector,
clientId,
maxInFlightRequestsPerConnection,
reconnectBackoffMs,
Expand All @@ -167,22 +166,22 @@ public NetworkClient(Selectable selector,
}

public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
this(null,
metadata,
selector,
Expand All @@ -200,7 +199,8 @@ public NetworkClient(Selectable selector,
discoverBrokerVersions,
apiVersions,
throttleTimeSensor,
logContext);
logContext,
new DefaultHostResolver());
}

public NetworkClient(Selectable selector,
Expand Down Expand Up @@ -236,27 +236,29 @@ public NetworkClient(Selectable selector,
discoverBrokerVersions,
apiVersions,
null,
logContext);
logContext,
new DefaultHostResolver());
}

private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
public NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext,
HostResolver hostResolver) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
Expand All @@ -273,7 +275,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.connectionStates = new ClusterConnectionStates(
reconnectBackoffMs, reconnectBackoffMax,
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext);
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext, hostResolver);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;

import java.net.InetAddress;

class AddressChangeHostResolver implements HostResolver {
private boolean useNewAddresses;
private InetAddress[] initialAddresses;
private InetAddress[] newAddresses;
private int resolutionCount = 0;

public AddressChangeHostResolver(InetAddress[] initialAddresses, InetAddress[] newAddresses) {
this.initialAddresses = initialAddresses;
this.newAddresses = newAddresses;
}

@Override
public InetAddress[] resolve(String host) {
++resolutionCount;
return useNewAddresses ? newAddresses : initialAddresses;
}

public void changeAddresses() {
useNewAddresses = true;
}

public boolean useNewAddresses() {
return useNewAddresses;
}

public int resolutionCount() {
return resolutionCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

public class ClientUtilsTest {

private HostResolver hostResolver = new DefaultHostResolver();

@Test
public void testParseAndValidateAddresses() throws UnknownHostException {
Expand Down Expand Up @@ -102,25 +103,25 @@ public void testFilterPreferredAddresses() throws UnknownHostException {
@Test
public void testResolveUnknownHostException() {
assertThrows(UnknownHostException.class,
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS));
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver));
}

@Test
public void testResolveDnsLookup() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT).size());
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT, hostResolver).size());
}

@Test
public void testResolveDnsLookupAllIps() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1);
}

@Test
public void testResolveDnsLookupResolveCanonicalBootstrapServers() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY).size() > 1);
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, hostResolver).size() > 1);
}

private List<InetSocketAddress> checkWithoutLookup(String... url) {
Expand Down
Loading

0 comments on commit 88ad7d1

Please sign in to comment.