Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Added RetryEmitFailureHandler (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v authored Apr 14, 2021
1 parent 7ead33b commit 87d7bf1
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.services.gateway.transport.http;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.netty.buffer.ByteBuf;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.api.ServiceMessage.Builder;
Expand All @@ -12,7 +14,10 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
Expand Down Expand Up @@ -59,7 +64,7 @@ public HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec<Byte
close
.asMono()
.then(doClose())
.doFinally(s -> onClose.tryEmitEmpty())
.doFinally(s -> onClose.emitEmpty(RetryEmitFailureHandler.INSTANCE))
.doOnTerminate(() -> LOGGER.info("Closed HttpGatewayClient resources"))
.subscribe(null, ex -> LOGGER.warn("Exception occurred on HttpGatewayClient close: " + ex));
}
Expand Down Expand Up @@ -100,7 +105,7 @@ public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {

@Override
public void close() {
close.tryEmitEmpty();
close.emitEmpty(RetryEmitFailureHandler.INSTANCE);
}

@Override
Expand Down Expand Up @@ -134,4 +139,14 @@ private ServiceMessage toMessage(HttpClientResponse httpResponse, ByteBuf conten
private boolean isError(int httpCode) {
return httpCode >= 400 && httpCode <= 599;
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.services.gateway.transport.rsocket;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
Expand All @@ -15,7 +17,10 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
Expand Down Expand Up @@ -53,7 +58,7 @@ public RSocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<P
close
.asMono()
.then(doClose())
.doFinally(s -> onClose.tryEmitEmpty())
.doFinally(s -> onClose.emitEmpty(RetryEmitFailureHandler.INSTANCE))
.doOnTerminate(() -> LOGGER.info("Closed RSocketGatewayClient resources"))
.subscribe(
null, ex -> LOGGER.warn("Exception occurred on RSocketGatewayClient close: " + ex));
Expand Down Expand Up @@ -95,7 +100,7 @@ public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {

@Override
public void close() {
close.tryEmitEmpty();
close.emitEmpty(RetryEmitFailureHandler.INSTANCE);
}

@Override
Expand Down Expand Up @@ -181,4 +186,14 @@ private ServiceMessage toMessage(Payload payload) {
LOGGER.debug("Received response {}", message);
return message;
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.services.gateway.transport.websocket;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.scalecube.services.api.ServiceMessage;
Expand All @@ -13,7 +15,10 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
Expand Down Expand Up @@ -74,7 +79,7 @@ public WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec
close
.asMono()
.then(doClose())
.doFinally(s -> onClose.tryEmitEmpty())
.doFinally(s -> onClose.emitEmpty(RetryEmitFailureHandler.INSTANCE))
.doOnTerminate(() -> LOGGER.info("Closed client"))
.subscribe(null, ex -> LOGGER.warn("Failed to close client, cause: " + ex));
}
Expand Down Expand Up @@ -120,7 +125,7 @@ public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {

@Override
public void close() {
close.tryEmitEmpty();
close.emitEmpty(RetryEmitFailureHandler.INSTANCE);
}

@Override
Expand Down Expand Up @@ -208,4 +213,14 @@ private void onReadIdle(Connection connection) {
private ByteBuf encodeRequest(ServiceMessage message, long sid) {
return codec.encode(ServiceMessage.from(message).header(STREAM_ID, sid).build());
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.services.gateway.transport.websocket;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.api.ErrorData;
Expand All @@ -14,7 +16,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.netty.Connection;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
Expand Down Expand Up @@ -81,7 +86,7 @@ public final class WebsocketGatewayClientSession {
});

connection.onDispose(
() -> inboundProcessors.forEach((k, o) -> tryEmitError(o, CLOSED_CHANNEL_EXCEPTION)));
() -> inboundProcessors.forEach((k, o) -> emitError(o, CLOSED_CHANNEL_EXCEPTION)));
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down Expand Up @@ -162,54 +167,52 @@ private void handleResponse(ServiceMessage response, Object processor) {
Optional<Signal> signalOptional =
Optional.ofNullable(response.header(SIGNAL)).map(Signal::from);

if (signalOptional.isPresent()) {

if (!signalOptional.isPresent()) {
// handle normal response
emitNext(processor, response);
} else {
// handle completion signal
Signal signal = signalOptional.get();
if (signal == Signal.COMPLETE) {
tryEmitComplete(processor);
emitComplete(processor);
}

if (signal == Signal.ERROR) {
// decode error data to retrieve real error cause
ServiceMessage errorMessage = codec.decodeData(response, ErrorData.class);
tryEmitValue(processor, errorMessage);
emitNext(processor, errorMessage);
}
} else {
// handle normal response
tryEmitValue(processor, response);
}
} catch (Exception e) {
tryEmitError(processor, e);
emitError(processor, e);
}
}

private static void tryEmitValue(Object processor, ServiceMessage message) {
private static void emitNext(Object processor, ServiceMessage message) {
if (processor instanceof Sinks.One) {
//noinspection unchecked
((Sinks.One<ServiceMessage>) processor).tryEmitValue(message);
((Sinks.One<ServiceMessage>) processor).emitValue(message, RetryEmitFailureHandler.INSTANCE);
}
if (processor instanceof Sinks.Many) {
//noinspection unchecked
((Sinks.Many<ServiceMessage>) processor).tryEmitNext(message);
((Sinks.Many<ServiceMessage>) processor).emitNext(message, RetryEmitFailureHandler.INSTANCE);
}
}

private static void tryEmitComplete(Object processor) {
private static void emitComplete(Object processor) {
if (processor instanceof Sinks.One) {
((Sinks.One<?>) processor).tryEmitEmpty();
((Sinks.One<?>) processor).emitEmpty(RetryEmitFailureHandler.INSTANCE);
}
if (processor instanceof Sinks.Many) {
((Sinks.Many<?>) processor).tryEmitComplete();
((Sinks.Many<?>) processor).emitComplete(RetryEmitFailureHandler.INSTANCE);
}
}

private static void tryEmitError(Object processor, Exception e) {
private static void emitError(Object processor, Exception e) {
if (processor instanceof Sinks.One) {
((Sinks.One<?>) processor).tryEmitError(e);
((Sinks.One<?>) processor).emitError(e, RetryEmitFailureHandler.INSTANCE);
}
if (processor instanceof Sinks.Many) {
((Sinks.Many<?>) processor).tryEmitError(e);
((Sinks.Many<?>) processor).emitError(e, RetryEmitFailureHandler.INSTANCE);
}
}

Expand All @@ -219,4 +222,14 @@ public String toString() {
.add("id=" + id)
.toString();
}

private static class RetryEmitFailureHandler implements EmitFailureHandler {

private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}

0 comments on commit 87d7bf1

Please sign in to comment.