diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java index e03ee69..b266c87 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java @@ -188,20 +188,20 @@ private void onRequest(WebsocketGatewaySession session, ServiceMessage request, final Flux serviceStream = serviceCall.requestMany(request); Disposable disposable = - Optional.ofNullable(request.header(RATE_LIMIT_FIELD)) - .map(Integer::valueOf) - .map(serviceStream::limitRate) - .orElse(serviceStream) - .map( - response -> { - boolean isErrorResponse = false; - if (response.isError()) { - receivedError.set(true); - isErrorResponse = true; - } - return newResponseMessage(sid, response, isErrorResponse); - }) - .flatMap(session::send) + session + .send( + Optional.ofNullable(request.header(RATE_LIMIT_FIELD)) + .map(Integer::valueOf) + .map(serviceStream::limitRate) + .orElse(serviceStream) + .map( + response -> { + boolean isErrorResponse = response.isError(); + if (isErrorResponse) { + receivedError.set(true); + } + return newResponseMessage(sid, response, isErrorResponse); + })) .doOnError(th -> ReferenceCountUtil.safestRelease(request.data())) .doOnError( th -> @@ -209,7 +209,7 @@ private void onRequest(WebsocketGatewaySession session, ServiceMessage request, .send(toErrorResponse(errorMapper, request, th)) .contextWrite(context) .subscribe()) - .doOnComplete( + .doOnTerminate( () -> { if (!receivedError.get()) { session diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java index 7ac6f95..2aaddf0 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java @@ -86,18 +86,38 @@ public Flux receive() { * @return mono void */ public Mono send(ServiceMessage response) { + return Mono.deferContextual( + context -> { + final TextWebSocketFrame frame = new TextWebSocketFrame(codec.encode(response)); + gatewayHandler.onResponse(this, frame.content(), response, (Context) context); + // send with publisher (defer buffer cleanup to netty) + return outbound + .sendObject(frame) + .then() + .doOnError(th -> gatewayHandler.onError(this, th, (Context) context)); + }); + } + + /** + * Method to send normal response. + * + * @param messages messages + * @return mono void + */ + public Mono send(Flux messages) { return Mono.deferContextual( context -> { // send with publisher (defer buffer cleanup to netty) return outbound .sendObject( - Mono.just(response) - .map(codec::encode) - .map(TextWebSocketFrame::new) - .doOnNext( - frame -> - gatewayHandler.onResponse( - this, frame.content(), response, (Context) context)), + messages.map( + response -> { + final TextWebSocketFrame frame = + new TextWebSocketFrame(codec.encode(response)); + gatewayHandler.onResponse( + this, frame.content(), response, (Context) context); + return frame; + }), f -> true) .then() .doOnError(th -> gatewayHandler.onError(this, th, (Context) context)); diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/CancelledSubscriber.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/CancelledSubscriber.java new file mode 100644 index 0000000..2e7cc6b --- /dev/null +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/CancelledSubscriber.java @@ -0,0 +1,36 @@ +package io.scalecube.services.gateway.websocket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.CoreSubscriber; + +public class CancelledSubscriber implements CoreSubscriber { + + private static final Logger LOGGER = LoggerFactory.getLogger(CancelledSubscriber.class); + + public static final CancelledSubscriber INSTANCE = new CancelledSubscriber(); + + private CancelledSubscriber() { + // Do not instantiate + } + + @Override + public void onSubscribe(org.reactivestreams.Subscription s) { + // no-op + } + + @Override + public void onNext(Object o) { + LOGGER.warn("Received ({}) which will be dropped immediately due cancelled aeron inbound", o); + } + + @Override + public void onError(Throwable t) { + // no-op + } + + @Override + public void onComplete() { + // no-op + } +} diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/ReactiveAdapter.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/ReactiveAdapter.java new file mode 100644 index 0000000..738a81a --- /dev/null +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/ReactiveAdapter.java @@ -0,0 +1,176 @@ +package io.scalecube.services.gateway.websocket; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.CoreSubscriber; +import reactor.core.Exceptions; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Operators; + +public final class ReactiveAdapter extends BaseSubscriber implements ReactiveOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveAdapter.class); + + private static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(ReactiveAdapter.class, "requested"); + + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater + DESTINATION_SUBSCRIBER = + AtomicReferenceFieldUpdater.newUpdater( + ReactiveAdapter.class, CoreSubscriber.class, "destinationSubscriber"); + + private final FluxReceive inbound = new FluxReceive(); + + private volatile long requested; + private volatile boolean fastPath; + private long produced; + private volatile CoreSubscriber destinationSubscriber; + private Throwable lastError; + + @Override + public boolean isDisposed() { + return destinationSubscriber == CancelledSubscriber.INSTANCE; + } + + @Override + public void dispose(Throwable throwable) { + Subscription upstream = upstream(); + if (upstream != null) { + upstream.cancel(); + } + CoreSubscriber destination = + DESTINATION_SUBSCRIBER.getAndSet(this, CancelledSubscriber.INSTANCE); + if (destination != null) { + destination.onError(throwable); + } + } + + @Override + public void dispose() { + inbound.cancel(); + } + + public Flux receive() { + return inbound; + } + + @Override + public void lastError(Throwable throwable) { + lastError = throwable; + } + + @Override + public Throwable lastError() { + return lastError; + } + + @Override + public void tryNext(Object Object) { + if (!isDisposed()) { + destinationSubscriber.onNext(Object); + } else { + LOGGER.warn("[tryNext] reactiveAdapter is disposed, dropping : " + Object); + } + } + + @Override + public boolean isFastPath() { + return fastPath; + } + + @Override + public void commitProduced() { + if (produced > 0) { + Operators.produced(REQUESTED, this, produced); + produced = 0; + } + } + + @Override + public long incrementProduced() { + return ++produced; + } + + @Override + public long requested(long limit) { + return Math.min(requested, limit); + } + + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscription.request(requested); + } + + @Override + protected void hookOnNext(Object Object) { + tryNext(Object); + } + + @Override + protected void hookOnComplete() { + dispose(); + } + + @Override + protected void hookOnError(Throwable throwable) { + dispose(throwable); + } + + @Override + protected void hookOnCancel() { + dispose(); + } + + class FluxReceive extends Flux implements Subscription { + + @Override + public void request(long n) { + Subscription upstream = upstream(); + if (upstream != null) { + upstream.request(n); + } + if (fastPath) { + return; + } + if (n == Long.MAX_VALUE) { + fastPath = true; + requested = Long.MAX_VALUE; + return; + } + Operators.addCap(REQUESTED, ReactiveAdapter.this, n); + } + + @Override + public void cancel() { + Subscription upstream = upstream(); + if (upstream != null) { + upstream.cancel(); + } + CoreSubscriber destination = + DESTINATION_SUBSCRIBER.getAndSet(ReactiveAdapter.this, CancelledSubscriber.INSTANCE); + if (destination != null) { + destination.onComplete(); + } + } + + @Override + public void subscribe(CoreSubscriber destinationSubscriber) { + boolean result = + DESTINATION_SUBSCRIBER.compareAndSet(ReactiveAdapter.this, null, destinationSubscriber); + if (result) { + destinationSubscriber.onSubscribe(this); + } else { + Operators.error( + destinationSubscriber, + isDisposed() + ? Exceptions.failWithCancel() + : Exceptions.duplicateOnSubscribeException()); + } + } + } +} diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/ReactiveOperator.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/ReactiveOperator.java new file mode 100644 index 0000000..5007a53 --- /dev/null +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/ReactiveOperator.java @@ -0,0 +1,22 @@ +package io.scalecube.services.gateway.websocket; + +import reactor.core.Disposable; + +public interface ReactiveOperator extends Disposable { + + void dispose(Throwable throwable); + + void lastError(Throwable throwable); + + Throwable lastError(); + + void tryNext(Object fragment); + + boolean isFastPath(); + + void commitProduced(); + + long incrementProduced(); + + long requested(long limit); +} diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java index 9f97e29..389effe 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java @@ -20,7 +20,6 @@ import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; @@ -29,7 +28,6 @@ import org.junit.jupiter.api.RepeatedTest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; class WebsocketClientTest extends BaseTest { @@ -106,7 +104,7 @@ void testMessageSequence() { .transport(new GatewayClientTransport(client)) .router(new StaticAddressRouter(gatewayAddress)); - int count = ThreadLocalRandom.current().nextInt(100, 1042) + 24; + int count = (int) 1e3; StepVerifier.create(serviceCall.api(TestService.class).many(count) /*.log("<<< ")*/) .expectNextSequence(IntStream.range(0, count).boxed().collect(Collectors.toList())) @@ -125,9 +123,49 @@ private static class TestServiceImpl implements TestService { @Override public Flux many(int count) { - return Flux.range(0, count) - .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.boundedElastic()); + return Flux.using( + ReactiveAdapter::new, + reactiveAdapter -> + reactiveAdapter + .receive() + .take(count) + .cast(Integer.class) + .doOnSubscribe( + s -> + new Thread( + () -> { + for (int i = 0; ; ) { + int r = (int) reactiveAdapter.requested(100); + + if (reactiveAdapter.isFastPath()) { + try { + if (reactiveAdapter.isDisposed()) { + return; + } + reactiveAdapter.tryNext(i++); + reactiveAdapter.incrementProduced(); + } catch (Throwable e) { + reactiveAdapter.lastError(e); + return; + } + } else if (r > 0) { + try { + if (reactiveAdapter.isDisposed()) { + return; + } + reactiveAdapter.tryNext(i++); + reactiveAdapter.incrementProduced(); + } catch (Throwable e) { + reactiveAdapter.lastError(e); + return; + } + + reactiveAdapter.commitProduced(); + } + } + }) + .start()), + ReactiveAdapter::dispose); } } } diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java index c14b8de..a26dbe3 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java @@ -20,7 +20,6 @@ import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; @@ -29,7 +28,6 @@ import org.junit.jupiter.api.RepeatedTest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; class WebsocketServerTest extends BaseTest { @@ -89,7 +87,7 @@ void testMessageSequence() { .transport(new GatewayClientTransport(client)) .router(new StaticAddressRouter(gatewayAddress)); - int count = ThreadLocalRandom.current().nextInt(100, 1042) + 24; + int count = (int) 1e3; StepVerifier.create(serviceCall.api(TestService.class).many(count) /*.log("<<< ")*/) .expectNextSequence(IntStream.range(0, count).boxed().collect(Collectors.toList())) @@ -108,9 +106,49 @@ private static class TestServiceImpl implements TestService { @Override public Flux many(int count) { - return Flux.range(0, count) - .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.boundedElastic()); + return Flux.using( + ReactiveAdapter::new, + reactiveAdapter -> + reactiveAdapter + .receive() + .take(count) + .cast(Integer.class) + .doOnSubscribe( + s -> + new Thread( + () -> { + for (int i = 0; ; ) { + int r = (int) reactiveAdapter.requested(100); + + if (reactiveAdapter.isFastPath()) { + try { + if (reactiveAdapter.isDisposed()) { + return; + } + reactiveAdapter.tryNext(i++); + reactiveAdapter.incrementProduced(); + } catch (Throwable e) { + reactiveAdapter.lastError(e); + return; + } + } else if (r > 0) { + try { + if (reactiveAdapter.isDisposed()) { + return; + } + reactiveAdapter.tryNext(i++); + reactiveAdapter.incrementProduced(); + } catch (Throwable e) { + reactiveAdapter.lastError(e); + return; + } + + reactiveAdapter.commitProduced(); + } + } + }) + .start()), + ReactiveAdapter::dispose); } } }