diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java index ec4ecbc..bab6f69 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java @@ -1,5 +1,7 @@ package io.scalecube.services.gateway.transport.http; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; + import io.netty.buffer.ByteBuf; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.api.ServiceMessage.Builder; @@ -12,7 +14,10 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.netty.NettyOutbound; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; @@ -59,7 +64,7 @@ public HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec onClose.tryEmitEmpty()) + .doFinally(s -> onClose.emitEmpty(RetryEmitFailureHandler.INSTANCE)) .doOnTerminate(() -> LOGGER.info("Closed HttpGatewayClient resources")) .subscribe(null, ex -> LOGGER.warn("Exception occurred on HttpGatewayClient close: " + ex)); } @@ -100,7 +105,7 @@ public Flux requestChannel(Flux requests) { @Override public void close() { - close.tryEmitEmpty(); + close.emitEmpty(RetryEmitFailureHandler.INSTANCE); } @Override @@ -134,4 +139,14 @@ private ServiceMessage toMessage(HttpClientResponse httpResponse, ByteBuf conten private boolean isError(int httpCode) { return httpCode >= 400 && httpCode <= 599; } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } } diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java index 469992f..d8c7f29 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java @@ -1,5 +1,7 @@ package io.scalecube.services.gateway.transport.rsocket; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; + import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.core.RSocketConnector; @@ -15,7 +17,10 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; @@ -53,7 +58,7 @@ public RSocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec

onClose.tryEmitEmpty()) + .doFinally(s -> onClose.emitEmpty(RetryEmitFailureHandler.INSTANCE)) .doOnTerminate(() -> LOGGER.info("Closed RSocketGatewayClient resources")) .subscribe( null, ex -> LOGGER.warn("Exception occurred on RSocketGatewayClient close: " + ex)); @@ -95,7 +100,7 @@ public Flux requestChannel(Flux requests) { @Override public void close() { - close.tryEmitEmpty(); + close.emitEmpty(RetryEmitFailureHandler.INSTANCE); } @Override @@ -181,4 +186,14 @@ private ServiceMessage toMessage(Payload payload) { LOGGER.debug("Received response {}", message); return message; } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } } diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java index bcda9b9..6c321de 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java @@ -1,5 +1,7 @@ package io.scalecube.services.gateway.transport.websocket; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; + import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.scalecube.services.api.ServiceMessage; @@ -13,7 +15,10 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.netty.Connection; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; @@ -74,7 +79,7 @@ public WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec close .asMono() .then(doClose()) - .doFinally(s -> onClose.tryEmitEmpty()) + .doFinally(s -> onClose.emitEmpty(RetryEmitFailureHandler.INSTANCE)) .doOnTerminate(() -> LOGGER.info("Closed client")) .subscribe(null, ex -> LOGGER.warn("Failed to close client, cause: " + ex)); } @@ -120,7 +125,7 @@ public Flux requestChannel(Flux requests) { @Override public void close() { - close.tryEmitEmpty(); + close.emitEmpty(RetryEmitFailureHandler.INSTANCE); } @Override @@ -208,4 +213,14 @@ private void onReadIdle(Connection connection) { private ByteBuf encodeRequest(ServiceMessage message, long sid) { return codec.encode(ServiceMessage.from(message).header(STREAM_ID, sid).build()); } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } } diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java index aed2942..f56b3e1 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java @@ -1,5 +1,7 @@ package io.scalecube.services.gateway.transport.websocket; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; + import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.scalecube.services.api.ErrorData; @@ -14,7 +16,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.netty.Connection; import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.http.websocket.WebsocketOutbound; @@ -81,7 +86,7 @@ public final class WebsocketGatewayClientSession { }); connection.onDispose( - () -> inboundProcessors.forEach((k, o) -> tryEmitError(o, CLOSED_CHANNEL_EXCEPTION))); + () -> inboundProcessors.forEach((k, o) -> emitError(o, CLOSED_CHANNEL_EXCEPTION))); } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -162,54 +167,52 @@ private void handleResponse(ServiceMessage response, Object processor) { Optional signalOptional = Optional.ofNullable(response.header(SIGNAL)).map(Signal::from); - if (signalOptional.isPresent()) { - + if (!signalOptional.isPresent()) { + // handle normal response + emitNext(processor, response); + } else { // handle completion signal Signal signal = signalOptional.get(); if (signal == Signal.COMPLETE) { - tryEmitComplete(processor); + emitComplete(processor); } - if (signal == Signal.ERROR) { // decode error data to retrieve real error cause ServiceMessage errorMessage = codec.decodeData(response, ErrorData.class); - tryEmitValue(processor, errorMessage); + emitNext(processor, errorMessage); } - } else { - // handle normal response - tryEmitValue(processor, response); } } catch (Exception e) { - tryEmitError(processor, e); + emitError(processor, e); } } - private static void tryEmitValue(Object processor, ServiceMessage message) { + private static void emitNext(Object processor, ServiceMessage message) { if (processor instanceof Sinks.One) { //noinspection unchecked - ((Sinks.One) processor).tryEmitValue(message); + ((Sinks.One) processor).emitValue(message, RetryEmitFailureHandler.INSTANCE); } if (processor instanceof Sinks.Many) { //noinspection unchecked - ((Sinks.Many) processor).tryEmitNext(message); + ((Sinks.Many) processor).emitNext(message, RetryEmitFailureHandler.INSTANCE); } } - private static void tryEmitComplete(Object processor) { + private static void emitComplete(Object processor) { if (processor instanceof Sinks.One) { - ((Sinks.One) processor).tryEmitEmpty(); + ((Sinks.One) processor).emitEmpty(RetryEmitFailureHandler.INSTANCE); } if (processor instanceof Sinks.Many) { - ((Sinks.Many) processor).tryEmitComplete(); + ((Sinks.Many) processor).emitComplete(RetryEmitFailureHandler.INSTANCE); } } - private static void tryEmitError(Object processor, Exception e) { + private static void emitError(Object processor, Exception e) { if (processor instanceof Sinks.One) { - ((Sinks.One) processor).tryEmitError(e); + ((Sinks.One) processor).emitError(e, RetryEmitFailureHandler.INSTANCE); } if (processor instanceof Sinks.Many) { - ((Sinks.Many) processor).tryEmitError(e); + ((Sinks.Many) processor).emitError(e, RetryEmitFailureHandler.INSTANCE); } } @@ -219,4 +222,14 @@ public String toString() { .add("id=" + id) .toString(); } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } }