From e16bd15fe0cb73c3db34bd7b9ef26b62b3106176 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Tue, 6 Apr 2021 15:46:29 +0300 Subject: [PATCH] Renamings, fixed WebsocketGatewayAcceptor (added filtering of reactor.netty's AbortedException) --- .../gateway/ws/WebsocketGatewayAcceptor.java | 13 ++++++++++++- ...onTest.java => RSocketClientConnectionTest.java} | 2 +- ...rTest.java => RSocketClientErrorMapperTest.java} | 6 +++--- ...yExtension.java => RSocketGatewayExtension.java} | 6 +++--- .../gateway/rsocket/RSocketGatewayTest.java | 2 +- ...java => RSocketLocalGatewayErrorMapperTest.java} | 6 +++--- ...nsion.java => RSocketLocalGatewayExtension.java} | 8 ++++---- .../gateway/rsocket/RSocketLocalGatewayTest.java | 4 ++-- 8 files changed, 29 insertions(+), 18 deletions(-) rename services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/{RsocketClientConnectionTest.java => RSocketClientConnectionTest.java} (99%) rename services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/{RsocketClientErrorMapperTest.java => RSocketClientErrorMapperTest.java} (90%) rename services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/{RsocketGatewayExtension.java => RSocketGatewayExtension.java} (75%) rename services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/{RsocketLocalGatewayErrorMapperTest.java => RSocketLocalGatewayErrorMapperTest.java} (90%) rename services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/{RsocketLocalGatewayExtension.java => RSocketLocalGatewayExtension.java} (78%) diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java index 271d5397..e03ee696 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java @@ -10,6 +10,7 @@ import static io.scalecube.services.gateway.ws.GatewayMessages.validateSidOnSession; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpHeaders; import io.scalecube.services.ServiceCall; import io.scalecube.services.api.ServiceMessage; @@ -35,6 +36,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.DisposableChannel; +import reactor.netty.channel.AbortedException; import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; import reactor.netty.http.websocket.WebsocketInbound; @@ -127,16 +129,25 @@ private Mono onConnect(WebsocketGatewaySession session) { session .receive() - .doOnError(th -> gatewayHandler.onSessionError(session, th)) .subscribe( byteBuf -> { + if (byteBuf == Unpooled.EMPTY_BUFFER) { + return; + } + if (!byteBuf.isReadable()) { ReferenceCountUtil.safestRelease(byteBuf); return; } + Mono.deferContextual(context -> onRequest(session, byteBuf, (Context) context)) .contextWrite(context -> gatewayHandler.onRequest(session, byteBuf, context)) .subscribe(); + }, + th -> { + if (!(th instanceof AbortedException)) { + gatewayHandler.onSessionError(session, th); + } }); return session.onClose(() -> gatewayHandler.onSessionClose(session)); diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientConnectionTest.java similarity index 99% rename from services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java rename to services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientConnectionTest.java index bb1d6124..4dc5fdf5 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientConnectionTest.java @@ -35,7 +35,7 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -class RsocketClientConnectionTest extends BaseTest { +class RSocketClientConnectionTest extends BaseTest { public static final GatewayClientCodec CLIENT_CODEC = GatewayClientTransports.RSOCKET_CLIENT_CODEC; diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientErrorMapperTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientErrorMapperTest.java similarity index 90% rename from services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientErrorMapperTest.java rename to services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientErrorMapperTest.java index 7d671d89..48446998 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientErrorMapperTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientErrorMapperTest.java @@ -13,11 +13,11 @@ import org.junit.jupiter.api.extension.RegisterExtension; import reactor.test.StepVerifier; -class RsocketClientErrorMapperTest extends BaseTest { +class RSocketClientErrorMapperTest extends BaseTest { @RegisterExtension - static RsocketGatewayExtension extension = - new RsocketGatewayExtension( + static RSocketGatewayExtension extension = + new RSocketGatewayExtension( ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) .errorMapper(ERROR_MAPPER) .build()); diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketGatewayExtension.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketGatewayExtension.java similarity index 75% rename from services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketGatewayExtension.java rename to services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketGatewayExtension.java index 1a881951..1bedb294 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketGatewayExtension.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketGatewayExtension.java @@ -4,15 +4,15 @@ import io.scalecube.services.gateway.AbstractGatewayExtension; import io.scalecube.services.gateway.transport.GatewayClientTransports; -class RsocketGatewayExtension extends AbstractGatewayExtension { +class RSocketGatewayExtension extends AbstractGatewayExtension { private static final String GATEWAY_ALIAS_NAME = "rsws"; - RsocketGatewayExtension(Object serviceInstance) { + RSocketGatewayExtension(Object serviceInstance) { this(ServiceInfo.fromServiceInstance(serviceInstance).build()); } - RsocketGatewayExtension(ServiceInfo serviceInfo) { + RSocketGatewayExtension(ServiceInfo serviceInfo) { super( serviceInfo, opts -> new RSocketGateway(opts.id(GATEWAY_ALIAS_NAME)), diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketGatewayTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketGatewayTest.java index cf6a9fe6..e851b88e 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketGatewayTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketGatewayTest.java @@ -26,7 +26,7 @@ class RSocketGatewayTest extends BaseTest { private static final Duration TIMEOUT = Duration.ofSeconds(3); @RegisterExtension - static RsocketGatewayExtension extension = new RsocketGatewayExtension(new GreetingServiceImpl()); + static RSocketGatewayExtension extension = new RSocketGatewayExtension(new GreetingServiceImpl()); private GreetingService service; diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketLocalGatewayErrorMapperTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayErrorMapperTest.java similarity index 90% rename from services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketLocalGatewayErrorMapperTest.java rename to services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayErrorMapperTest.java index 89e8213f..a1ddb9a9 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketLocalGatewayErrorMapperTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayErrorMapperTest.java @@ -13,11 +13,11 @@ import org.junit.jupiter.api.extension.RegisterExtension; import reactor.test.StepVerifier; -class RsocketLocalGatewayErrorMapperTest extends BaseTest { +class RSocketLocalGatewayErrorMapperTest extends BaseTest { @RegisterExtension - static RsocketLocalGatewayExtension extension = - new RsocketLocalGatewayExtension( + static RSocketLocalGatewayExtension extension = + new RSocketLocalGatewayExtension( ServiceInfo.fromServiceInstance(new ErrorServiceImpl()).errorMapper(ERROR_MAPPER).build(), opts -> new RSocketGateway(opts.call(opts.call().errorMapper(ERROR_MAPPER)), ERROR_MAPPER)); diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketLocalGatewayExtension.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayExtension.java similarity index 78% rename from services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketLocalGatewayExtension.java rename to services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayExtension.java index 61879421..c38ef39e 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketLocalGatewayExtension.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayExtension.java @@ -6,19 +6,19 @@ import io.scalecube.services.gateway.transport.GatewayClientTransports; import java.util.function.Function; -class RsocketLocalGatewayExtension extends AbstractLocalGatewayExtension { +class RSocketLocalGatewayExtension extends AbstractLocalGatewayExtension { private static final String GATEWAY_ALIAS_NAME = "rsws"; - RsocketLocalGatewayExtension(Object serviceInstance) { + RSocketLocalGatewayExtension(Object serviceInstance) { this(ServiceInfo.fromServiceInstance(serviceInstance).build()); } - RsocketLocalGatewayExtension(ServiceInfo serviceInfo) { + RSocketLocalGatewayExtension(ServiceInfo serviceInfo) { this(serviceInfo, RSocketGateway::new); } - RsocketLocalGatewayExtension( + RSocketLocalGatewayExtension( ServiceInfo serviceInfo, Function gatewaySupplier) { super( serviceInfo, diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayTest.java index 9c3bf95a..3b6449eb 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketLocalGatewayTest.java @@ -25,8 +25,8 @@ class RSocketLocalGatewayTest extends BaseTest { private static final Duration TIMEOUT = Duration.ofSeconds(3); @RegisterExtension - static RsocketLocalGatewayExtension extension = - new RsocketLocalGatewayExtension(new GreetingServiceImpl()); + static RSocketLocalGatewayExtension extension = + new RSocketLocalGatewayExtension(new GreetingServiceImpl()); private GreetingService service;