From d5726a52dda4898845ef8f9c5a0226102047c273 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 21 Nov 2024 08:35:19 +0100 Subject: [PATCH] Propagate handshake failures to Handshake future We now complete the handshake future exceptionally if handshake settings do not match our expectations. This can happen if e.g. the connection id is being sent as String instead of an integer. --- .../java/io/lettuce/core/RedisHandshake.java | 8 +++- .../lettuce/core/RedisHandshakeUnitTests.java | 41 +++++++++++++++++-- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/lettuce/core/RedisHandshake.java b/src/main/java/io/lettuce/core/RedisHandshake.java index 80dc62096..bf5ad7e4b 100644 --- a/src/main/java/io/lettuce/core/RedisHandshake.java +++ b/src/main/java/io/lettuce/core/RedisHandshake.java @@ -134,8 +134,12 @@ private CompletionStage tryHandshakeResp3(Channel channel) { handshake.completeExceptionally(throwable); } } else { - onHelloResponse(settings); - handshake.complete(null); + try { + onHelloResponse(settings); + handshake.complete(null); + } catch (RuntimeException e) { + handshake.completeExceptionally(e); + } } }); diff --git a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java index b01fd810f..7bfd18760 100644 --- a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java +++ b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java @@ -1,7 +1,7 @@ package io.lettuce.core; -import static io.lettuce.TestTags.UNIT_TEST; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static io.lettuce.TestTags.*; +import static java.util.concurrent.TimeUnit.*; import static org.assertj.core.api.Assertions.*; import java.nio.ByteBuffer; @@ -13,12 +13,12 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.ProtocolVersion; import io.netty.channel.embedded.EmbeddedChannel; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; /** * Unit tests for {@link RedisHandshake}. @@ -110,6 +110,23 @@ void handshakeFireAndForgetPostHandshake() { assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); } + @Test + void handshakeWithInvalidResponseShouldPropagateException() { + + EmbeddedChannel channel = new EmbeddedChannel(true, false); + + ConnectionState state = new ConnectionState(); + state.setCredentialsProvider(new StaticCredentialsProvider(null, null)); + RedisHandshake handshake = new RedisHandshake(null, false, state); + CompletionStage handshakeInit = handshake.initialize(channel); + + AsyncCommand> hello = channel.readOutbound(); + helloStringIdResponse(hello.getOutput()); + hello.complete(); + + assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isTrue(); + } + @Test void handshakeDelayedCredentialProvider() { @@ -176,6 +193,22 @@ private static void helloResponse(CommandOutput> output) { + + output.multiMap(8); + output.set(ByteBuffer.wrap("id".getBytes())); + output.set(ByteBuffer.wrap("1".getBytes())); + + output.set(ByteBuffer.wrap("mode".getBytes())); + output.set(ByteBuffer.wrap("master".getBytes())); + + output.set(ByteBuffer.wrap("role".getBytes())); + output.set(ByteBuffer.wrap("master".getBytes())); + + output.set(ByteBuffer.wrap("version".getBytes())); + output.set(ByteBuffer.wrap("1.2.3".getBytes())); + } + static class DelayedRedisCredentialsProvider implements RedisCredentialsProvider { private final Sinks.One credentialsSink = Sinks.one();