Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Renamings, fixed WebsocketGatewayAcceptor (added filtering of reactor…
Browse files Browse the repository at this point in the history
….netty's AbortedException)
  • Loading branch information
artem-v committed Apr 6, 2021
1 parent 1e1e463 commit e16bd15
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.scalecube.services.gateway.ws.GatewayMessages.validateSidOnSession;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaders;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
Expand All @@ -35,6 +36,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableChannel;
import reactor.netty.channel.AbortedException;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
import reactor.netty.http.websocket.WebsocketInbound;
Expand Down Expand Up @@ -127,16 +129,25 @@ private Mono<Void> onConnect(WebsocketGatewaySession session) {

session
.receive()
.doOnError(th -> gatewayHandler.onSessionError(session, th))
.subscribe(
byteBuf -> {
if (byteBuf == Unpooled.EMPTY_BUFFER) {
return;
}

if (!byteBuf.isReadable()) {
ReferenceCountUtil.safestRelease(byteBuf);
return;
}

Mono.deferContextual(context -> onRequest(session, byteBuf, (Context) context))
.contextWrite(context -> gatewayHandler.onRequest(session, byteBuf, context))
.subscribe();
},
th -> {
if (!(th instanceof AbortedException)) {
gatewayHandler.onSessionError(session, th);
}
});

return session.onClose(() -> gatewayHandler.onSessionClose(session));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class RsocketClientConnectionTest extends BaseTest {
class RSocketClientConnectionTest extends BaseTest {

public static final GatewayClientCodec<Payload> CLIENT_CODEC =
GatewayClientTransports.RSOCKET_CLIENT_CODEC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.test.StepVerifier;

class RsocketClientErrorMapperTest extends BaseTest {
class RSocketClientErrorMapperTest extends BaseTest {

@RegisterExtension
static RsocketGatewayExtension extension =
new RsocketGatewayExtension(
static RSocketGatewayExtension extension =
new RSocketGatewayExtension(
ServiceInfo.fromServiceInstance(new ErrorServiceImpl())
.errorMapper(ERROR_MAPPER)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import io.scalecube.services.gateway.AbstractGatewayExtension;
import io.scalecube.services.gateway.transport.GatewayClientTransports;

class RsocketGatewayExtension extends AbstractGatewayExtension {
class RSocketGatewayExtension extends AbstractGatewayExtension {

private static final String GATEWAY_ALIAS_NAME = "rsws";

RsocketGatewayExtension(Object serviceInstance) {
RSocketGatewayExtension(Object serviceInstance) {
this(ServiceInfo.fromServiceInstance(serviceInstance).build());
}

RsocketGatewayExtension(ServiceInfo serviceInfo) {
RSocketGatewayExtension(ServiceInfo serviceInfo) {
super(
serviceInfo,
opts -> new RSocketGateway(opts.id(GATEWAY_ALIAS_NAME)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RSocketGatewayTest extends BaseTest {
private static final Duration TIMEOUT = Duration.ofSeconds(3);

@RegisterExtension
static RsocketGatewayExtension extension = new RsocketGatewayExtension(new GreetingServiceImpl());
static RSocketGatewayExtension extension = new RSocketGatewayExtension(new GreetingServiceImpl());

private GreetingService service;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.test.StepVerifier;

class RsocketLocalGatewayErrorMapperTest extends BaseTest {
class RSocketLocalGatewayErrorMapperTest extends BaseTest {

@RegisterExtension
static RsocketLocalGatewayExtension extension =
new RsocketLocalGatewayExtension(
static RSocketLocalGatewayExtension extension =
new RSocketLocalGatewayExtension(
ServiceInfo.fromServiceInstance(new ErrorServiceImpl()).errorMapper(ERROR_MAPPER).build(),
opts ->
new RSocketGateway(opts.call(opts.call().errorMapper(ERROR_MAPPER)), ERROR_MAPPER));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
import io.scalecube.services.gateway.transport.GatewayClientTransports;
import java.util.function.Function;

class RsocketLocalGatewayExtension extends AbstractLocalGatewayExtension {
class RSocketLocalGatewayExtension extends AbstractLocalGatewayExtension {

private static final String GATEWAY_ALIAS_NAME = "rsws";

RsocketLocalGatewayExtension(Object serviceInstance) {
RSocketLocalGatewayExtension(Object serviceInstance) {
this(ServiceInfo.fromServiceInstance(serviceInstance).build());
}

RsocketLocalGatewayExtension(ServiceInfo serviceInfo) {
RSocketLocalGatewayExtension(ServiceInfo serviceInfo) {
this(serviceInfo, RSocketGateway::new);
}

RsocketLocalGatewayExtension(
RSocketLocalGatewayExtension(
ServiceInfo serviceInfo, Function<GatewayOptions, RSocketGateway> gatewaySupplier) {
super(
serviceInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class RSocketLocalGatewayTest extends BaseTest {
private static final Duration TIMEOUT = Duration.ofSeconds(3);

@RegisterExtension
static RsocketLocalGatewayExtension extension =
new RsocketLocalGatewayExtension(new GreetingServiceImpl());
static RSocketLocalGatewayExtension extension =
new RSocketLocalGatewayExtension(new GreetingServiceImpl());

private GreetingService service;

Expand Down

0 comments on commit e16bd15

Please sign in to comment.