From 07c9b057870211bdd31af7595184cd0b1f175645 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Sun, 13 Oct 2024 19:01:34 +0300 Subject: [PATCH] Support of heartbeat for websocket gateway (#864) --- .../scalecube/services/gateway/Gateway.java | 6 +- .../services/gateway/GatewayOptions.java | 74 ------------ .../transport/api/ServiceTransport.java | 2 +- .../services/gateway/http/HttpGateway.java | 89 +++++++-------- .../gateway/http/HttpGatewayAcceptor.java | 5 - .../gateway/websocket/HeartbeatService.java | 14 +++ .../websocket/HeartbeatServiceImpl.java | 11 ++ .../gateway/websocket/WebsocketGateway.java | 106 ++++++++++-------- .../services/gateway/http/CorsTest.java | 6 +- .../http/HttpClientConnectionTest.java | 2 +- .../gateway/http/HttpGatewayTest.java | 6 +- .../gateway/http/HttpLocalGatewayTest.java | 2 +- .../WebsocketClientConnectionTest.java | 4 +- .../websocket/WebsocketClientTest.java | 4 +- .../websocket/WebsocketGatewayAuthTest.java | 4 +- .../websocket/WebsocketGatewayTest.java | 11 +- .../websocket/WebsocketLocalGatewayTest.java | 6 +- .../websocket/WebsocketServerTest.java | 4 +- .../io/scalecube/services/Microservices.java | 34 +++--- 19 files changed, 176 insertions(+), 214 deletions(-) delete mode 100644 services-api/src/main/java/io/scalecube/services/gateway/GatewayOptions.java create mode 100644 services-gateway/src/main/java/io/scalecube/services/gateway/websocket/HeartbeatService.java create mode 100644 services-gateway/src/main/java/io/scalecube/services/gateway/websocket/HeartbeatServiceImpl.java diff --git a/services-api/src/main/java/io/scalecube/services/gateway/Gateway.java b/services-api/src/main/java/io/scalecube/services/gateway/Gateway.java index 5cc4d5a5b..480be9440 100644 --- a/services-api/src/main/java/io/scalecube/services/gateway/Gateway.java +++ b/services-api/src/main/java/io/scalecube/services/gateway/Gateway.java @@ -1,6 +1,8 @@ package io.scalecube.services.gateway; import io.scalecube.services.Address; +import io.scalecube.services.ServiceCall; +import io.scalecube.services.registry.api.ServiceRegistry; public interface Gateway { @@ -21,9 +23,11 @@ public interface Gateway { /** * Starts gateway. * + * @param call {@link ServiceCall} instance + * @param serviceRegistry {@link ServiceRegistry} instance * @return gateway instance */ - Gateway start(); + Gateway start(ServiceCall call, ServiceRegistry serviceRegistry); /** Stops gateway. */ void stop(); diff --git a/services-api/src/main/java/io/scalecube/services/gateway/GatewayOptions.java b/services-api/src/main/java/io/scalecube/services/gateway/GatewayOptions.java deleted file mode 100644 index 126406df9..000000000 --- a/services-api/src/main/java/io/scalecube/services/gateway/GatewayOptions.java +++ /dev/null @@ -1,74 +0,0 @@ -package io.scalecube.services.gateway; - -import io.scalecube.services.ServiceCall; -import java.util.StringJoiner; -import java.util.concurrent.Executor; - -public class GatewayOptions implements Cloneable { - - private String id; - private int port = 0; - private Executor workerPool; - private ServiceCall call; - - public GatewayOptions() {} - - public String id() { - return id; - } - - public GatewayOptions id(String id) { - final GatewayOptions c = clone(); - c.id = id; - return c; - } - - public int port() { - return port; - } - - public GatewayOptions port(int port) { - final GatewayOptions c = clone(); - c.port = port; - return c; - } - - public Executor workerPool() { - return workerPool; - } - - public GatewayOptions workerPool(Executor workerPool) { - final GatewayOptions c = clone(); - c.workerPool = workerPool; - return c; - } - - public ServiceCall call() { - return call; - } - - public GatewayOptions call(ServiceCall call) { - final GatewayOptions c = clone(); - c.call = call; - return c; - } - - @Override - public GatewayOptions clone() { - try { - return (GatewayOptions) super.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } - } - - @Override - public String toString() { - return new StringJoiner(", ", GatewayOptions.class.getSimpleName() + "[", "]") - .add("id='" + id + "'") - .add("port=" + port) - .add("workerPool=" + workerPool) - .add("call=" + call) - .toString(); - } -} diff --git a/services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java b/services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java index c4007a590..ef1e0ae48 100644 --- a/services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java +++ b/services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java @@ -18,7 +18,7 @@ public interface ServiceTransport { /** * Provider for {@link ServerTransport}. * - * @param serviceRegistry serviceRegistry + * @param serviceRegistry {@link ServiceRegistry} instance * @return {@code ServerTransport} instance */ ServerTransport serverTransport(ServiceRegistry serviceRegistry); diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java index d7c196da3..fd580452a 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java @@ -4,20 +4,23 @@ import io.netty.handler.codec.http.cors.CorsConfigBuilder; import io.netty.handler.codec.http.cors.CorsHandler; import io.scalecube.services.Address; +import io.scalecube.services.ServiceCall; import io.scalecube.services.exceptions.DefaultErrorMapper; import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.gateway.Gateway; -import io.scalecube.services.gateway.GatewayOptions; +import io.scalecube.services.registry.api.ServiceRegistry; import java.net.InetSocketAddress; -import java.util.StringJoiner; import java.util.function.Consumer; +import java.util.function.Function; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; import reactor.netty.resources.LoopResources; public class HttpGateway implements Gateway { - private final GatewayOptions options; + private final String id; + private final int port; + private final Function callFactory; private final ServiceProviderErrorMapper errorMapper; private final boolean corsEnabled; private final CorsConfigBuilder corsConfigBuilder; @@ -26,7 +29,9 @@ public class HttpGateway implements Gateway { private LoopResources loopResources; private HttpGateway(Builder builder) { - this.options = builder.options; + this.id = builder.id; + this.port = builder.port; + this.callFactory = builder.callFactory; this.errorMapper = builder.errorMapper; this.corsEnabled = builder.corsEnabled; this.corsConfigBuilder = builder.corsConfigBuilder; @@ -34,20 +39,25 @@ private HttpGateway(Builder builder) { @Override public String id() { - return options.id(); + return id; } @Override - public Gateway start() { - HttpGatewayAcceptor gatewayAcceptor = new HttpGatewayAcceptor(options.call(), errorMapper); - + public Gateway start(ServiceCall call, ServiceRegistry serviceRegistry) { loopResources = - LoopResources.create( - options.id() + ":" + options.port(), LoopResources.DEFAULT_IO_WORKER_COUNT, true); + LoopResources.create(id + ":" + port, LoopResources.DEFAULT_IO_WORKER_COUNT, true); try { - prepareHttpServer(loopResources, options.port()) - .handle(gatewayAcceptor) + HttpServer.create() + .runOn(loopResources) + .bindAddress(() -> new InetSocketAddress(port)) + .doOnConnection( + connection -> { + if (corsEnabled) { + connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build())); + } + }) + .handle(new HttpGatewayAcceptor(callFactory.apply(call), errorMapper)) .bind() .doOnSuccess(server -> this.server = server) .toFuture() @@ -59,23 +69,6 @@ public Gateway start() { return this; } - private HttpServer prepareHttpServer(LoopResources loopResources, int port) { - HttpServer httpServer = HttpServer.create(); - - if (loopResources != null) { - httpServer = httpServer.runOn(loopResources); - } - - return httpServer - .bindAddress(() -> new InetSocketAddress(port)) - .doOnConnection( - connection -> { - if (corsEnabled) { - connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build())); - } - }); - } - @Override public Address address() { InetSocketAddress address = (InetSocketAddress) server.address(); @@ -100,21 +93,11 @@ private void shutdownLoopResources(LoopResources loopResources) { } } - @Override - public String toString() { - return new StringJoiner(", ", HttpGateway.class.getSimpleName() + "[", "]") - .add("options=" + options) - .add("errorMapper=" + errorMapper) - .add("corsEnabled=" + corsEnabled) - .add("corsConfigBuilder=" + corsConfigBuilder) - .add("server=" + server) - .add("loopResources=" + loopResources) - .toString(); - } - public static class Builder { - private GatewayOptions options; + private String id = "http@" + Integer.toHexString(hashCode()); + private int port; + private Function callFactory = call -> call; private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE; private boolean corsEnabled = false; private CorsConfigBuilder corsConfigBuilder = @@ -125,12 +108,26 @@ public static class Builder { public Builder() {} - public GatewayOptions options() { - return options; + public String id() { + return id; + } + + public Builder id(String id) { + this.id = id; + return this; + } + + public int port() { + return port; + } + + public Builder port(int port) { + this.port = port; + return this; } - public Builder options(GatewayOptions options) { - this.options = options; + public Builder serviceCall(Function operator) { + callFactory = callFactory.andThen(operator); return this; } diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java index 8ba85b7fb..bc8b6f800 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java @@ -14,7 +14,6 @@ import io.scalecube.services.ServiceCall; import io.scalecube.services.api.ErrorData; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.exceptions.DefaultErrorMapper; import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.gateway.ReferenceCountUtil; import io.scalecube.services.transport.api.DataCodec; @@ -37,10 +36,6 @@ public class HttpGatewayAcceptor private final ServiceCall serviceCall; private final ServiceProviderErrorMapper errorMapper; - HttpGatewayAcceptor(ServiceCall serviceCall) { - this(serviceCall, DefaultErrorMapper.INSTANCE); - } - HttpGatewayAcceptor(ServiceCall serviceCall, ServiceProviderErrorMapper errorMapper) { this.serviceCall = serviceCall; this.errorMapper = errorMapper; diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/HeartbeatService.java b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/HeartbeatService.java new file mode 100644 index 000000000..1f3709168 --- /dev/null +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/HeartbeatService.java @@ -0,0 +1,14 @@ +package io.scalecube.services.gateway.websocket; + +import io.scalecube.services.annotations.Service; +import io.scalecube.services.annotations.ServiceMethod; +import reactor.core.publisher.Mono; + +@Service(HeartbeatService.NAMESPACE) +public interface HeartbeatService { + + String NAMESPACE = "v1/scalecube.websocket.heartbeat"; + + @ServiceMethod + Mono ping(long value); +} diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/HeartbeatServiceImpl.java b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/HeartbeatServiceImpl.java new file mode 100644 index 000000000..6b5d024bd --- /dev/null +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/HeartbeatServiceImpl.java @@ -0,0 +1,11 @@ +package io.scalecube.services.gateway.websocket; + +import reactor.core.publisher.Mono; + +public class HeartbeatServiceImpl implements HeartbeatService { + + @Override + public Mono ping(long value) { + return Mono.just(value); + } +} diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java index 07e449dfa..fdb180176 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java @@ -2,15 +2,17 @@ import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.scalecube.services.Address; +import io.scalecube.services.ServiceCall; +import io.scalecube.services.ServiceInfo; import io.scalecube.services.exceptions.DefaultErrorMapper; import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.gateway.Gateway; -import io.scalecube.services.gateway.GatewayOptions; import io.scalecube.services.gateway.GatewaySessionHandler; +import io.scalecube.services.registry.api.ServiceRegistry; +import io.scalecube.services.transport.api.ServiceMessageDataDecoder; import java.net.InetSocketAddress; import java.time.Duration; -import java.util.StringJoiner; -import java.util.function.UnaryOperator; +import java.util.function.Function; import reactor.netty.Connection; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; @@ -18,43 +20,52 @@ public class WebsocketGateway implements Gateway { - private final GatewayOptions options; + private final String id; + private final int port; + private final Function callFactory; private final GatewaySessionHandler gatewayHandler; private final Duration keepAliveInterval; + private final boolean heartbeatEnabled; private final ServiceProviderErrorMapper errorMapper; private DisposableServer server; private LoopResources loopResources; private WebsocketGateway(Builder builder) { - this.options = builder.options; + this.id = builder.id; + this.port = builder.port; + this.callFactory = builder.callFactory; this.gatewayHandler = builder.gatewayHandler; this.keepAliveInterval = builder.keepAliveInterval; + this.heartbeatEnabled = builder.heartbeatEnabled; this.errorMapper = builder.errorMapper; } - public WebsocketGateway(UnaryOperator operator) { - this(operator.apply(new Builder())); - } - @Override public String id() { - return options.id(); + return id; } @Override - public Gateway start() { - WebsocketGatewayAcceptor gatewayAcceptor = - new WebsocketGatewayAcceptor(options.call(), gatewayHandler, errorMapper); - + public Gateway start(ServiceCall call, ServiceRegistry serviceRegistry) { loopResources = - LoopResources.create( - options.id() + ":" + options.port(), LoopResources.DEFAULT_IO_WORKER_COUNT, true); + LoopResources.create(id + ":" + port, LoopResources.DEFAULT_IO_WORKER_COUNT, true); + + if (heartbeatEnabled) { + serviceRegistry.registerService( + ServiceInfo.fromServiceInstance(new HeartbeatServiceImpl()) + .errorMapper(DefaultErrorMapper.INSTANCE) + .dataDecoder(ServiceMessageDataDecoder.INSTANCE) + .build()); + } try { - prepareHttpServer(loopResources, options.port()) + HttpServer.create() + .runOn(loopResources) + .bindAddress(() -> new InetSocketAddress(port)) .doOnConnection(this::setupKeepAlive) - .handle(gatewayAcceptor) + .handle( + new WebsocketGatewayAcceptor(callFactory.apply(call), gatewayHandler, errorMapper)) .bind() .doOnSuccess(server -> this.server = server) .thenReturn(this) @@ -67,17 +78,6 @@ public Gateway start() { return this; } - private HttpServer prepareHttpServer(LoopResources loopResources, int port) { - return HttpServer.create() - .tcpConfiguration( - tcpServer -> { - if (loopResources != null) { - tcpServer = tcpServer.runOn(loopResources); - } - return tcpServer.bindAddress(() -> new InetSocketAddress(port)); - }); - } - @Override public Address address() { InetSocketAddress address = (InetSocketAddress) server.address(); @@ -134,33 +134,38 @@ private void onReadIdle(Connection connection) { }); } - @Override - public String toString() { - return new StringJoiner(", ", WebsocketGateway.class.getSimpleName() + "[", "]") - .add("options=" + options) - .add("gatewayHandler=" + gatewayHandler) - .add("keepAliveInterval=" + keepAliveInterval) - .add("errorMapper=" + errorMapper) - .add("server=" + server) - .add("loopResources=" + loopResources) - .toString(); - } - public static class Builder { - private GatewayOptions options; + private String id = "websocket@" + Integer.toHexString(hashCode()); + private int port; + private Function callFactory = call -> call; private GatewaySessionHandler gatewayHandler = GatewaySessionHandler.DEFAULT_INSTANCE; private Duration keepAliveInterval = Duration.ZERO; + private boolean heartbeatEnabled = false; private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE; public Builder() {} - public GatewayOptions options() { - return options; + public String id() { + return id; } - public Builder options(GatewayOptions options) { - this.options = options; + public Builder id(String id) { + this.id = id; + return this; + } + + public int port() { + return port; + } + + public Builder port(int port) { + this.port = port; + return this; + } + + public Builder serviceCall(Function operator) { + callFactory = callFactory.andThen(operator); return this; } @@ -182,6 +187,15 @@ public Builder keepAliveInterval(Duration keepAliveInterval) { return this; } + public boolean heartbeatEnabled() { + return heartbeatEnabled; + } + + public Builder heartbeatEnabled(boolean heartbeatEnabled) { + this.heartbeatEnabled = heartbeatEnabled; + return this; + } + public ServiceProviderErrorMapper errorMapper() { return errorMapper; } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java index e8e6a654d..7a4bec47b 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java @@ -51,9 +51,9 @@ void testCrossOriginRequest() { Microservices.start( new Context() .gateway( - opts -> + () -> new HttpGateway.Builder() - .options(opts.id("http")) + .id("http") .corsEnabled(true) .corsConfigBuilder( builder -> @@ -114,7 +114,7 @@ void testOptionRequestCorsDisabled() { gateway = Microservices.start( new Context() - .gateway(opts -> new HttpGateway.Builder().options(opts.id("http")).build()) + .gateway(() -> new HttpGateway.Builder().id("http").build()) .services(new GreetingServiceImpl())); final HttpClient client = newClient(gateway.gateway("http").address()); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java index faabdfd44..72129e817 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java @@ -44,7 +44,7 @@ void beforEach() { .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) - .gateway(options -> new HttpGateway.Builder().options(options.id("HTTP")).build())); + .gateway(() -> new HttpGateway.Builder().id("HTTP").build())); gatewayAddress = gateway.gateway("HTTP").address(); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java index 09076a57b..c4d8b0730 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java @@ -59,11 +59,7 @@ static void beforeAll() { .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .gateway( - options -> - new HttpGateway.Builder() - .options(options.id("HTTP")) - .errorMapper(ERROR_MAPPER) - .build())); + () -> new HttpGateway.Builder().id("HTTP").errorMapper(ERROR_MAPPER).build())); gatewayAddress = gateway.gateway("HTTP").address(); router = new StaticAddressRouter(gatewayAddress); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java index 211b9a00d..146445853 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java @@ -48,7 +48,7 @@ static void beforeAll() { gateway = Microservices.start( new Context() - .gateway(options -> new HttpGateway.Builder().options(options.id("HTTP")).build()) + .gateway(() -> new HttpGateway.Builder().id("HTTP").build()) .services(new GreetingServiceImpl()) .services( ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java index 7a57d5280..281903fcf 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java @@ -53,9 +53,9 @@ void beforEach() { .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .gateway( - options -> + () -> new WebsocketGateway.Builder() - .options(options.id("WS")) + .id("WS") .gatewayHandler(sessionEventHandler) .build())); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java index f841adee6..7f21a5ad9 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java @@ -48,9 +48,9 @@ static void beforeAll() { .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .gateway( - options -> + () -> new WebsocketGateway.Builder() - .options(options.id("WS")) + .id("WS") .gatewayHandler(new TestGatewaySessionHandler()) .build())); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayAuthTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayAuthTest.java index 623dd7d99..ca6dbc487 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayAuthTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayAuthTest.java @@ -48,9 +48,9 @@ static void beforeAll() { Microservices.start( new Context() .gateway( - options -> + () -> new WebsocketGateway.Builder() - .options(options.id("WS")) + .id("WS") .gatewayHandler(new GatewaySessionHandlerImpl(AUTH_REGISTRY)) .build()) .services(new SecuredServiceImpl(AUTH_REGISTRY))); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java index 4b2bae311..c1d094de8 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java @@ -31,6 +31,7 @@ import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -61,7 +62,7 @@ static void beforeAll() { .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .gateway( - options -> new WebsocketGateway.Builder().options(options.id("WS")).build())); + () -> new WebsocketGateway.Builder().id("WS").heartbeatEnabled(true).build())); gatewayAddress = gateway.gateway("WS").address(); router = new StaticAddressRouter(gatewayAddress); @@ -230,4 +231,12 @@ void shouldReturnSomeExceptionOnFlux() { void shouldReturnSomeExceptionOnMono() { StepVerifier.create(errorService.oneError()).expectError(SomeException.class).verify(TIMEOUT); } + + @Test + void shouldHeartbeat() { + final var value = System.nanoTime(); + StepVerifier.create(serviceCall.api(HeartbeatService.class).ping(value)) + .assertNext(pongValue -> Assertions.assertEquals(value, pongValue)) + .verifyComplete(); + } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java index b6ca3331f..9139da008 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java @@ -51,10 +51,10 @@ static void beforeAll() { Microservices.start( new Context() .gateway( - options -> + () -> new WebsocketGateway.Builder() - .options( - options.id("WS").call(options.call().errorMapper(ERROR_MAPPER))) + .id("WS") + .serviceCall(call -> call.errorMapper(ERROR_MAPPER)) .errorMapper(ERROR_MAPPER) .build()) .services(new GreetingServiceImpl()) diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java index ea87db877..739eccf58 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java @@ -30,9 +30,9 @@ static void beforeAll() { Microservices.start( new Context() .gateway( - options -> + () -> new WebsocketGateway.Builder() - .options(options.id("WS")) + .id("WS") .gatewayHandler(new TestGatewaySessionHandler()) .build()) .services(new TestServiceImpl())); diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 844390fd6..99f08b6f6 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -10,7 +10,6 @@ import io.scalecube.services.exceptions.DefaultErrorMapper; import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.gateway.Gateway; -import io.scalecube.services.gateway.GatewayOptions; import io.scalecube.services.registry.ServiceRegistryImpl; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.services.routing.RoundRobinServiceRouter; @@ -33,7 +32,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import reactor.core.Disposable; @@ -237,18 +235,16 @@ private void registerService(ServiceInfo serviceInfo) { } private void startGateways() { - final GatewayOptions options = new GatewayOptions().call(serviceCall); - for (Function factory : context.gatewayFactories) { - final var gateway = factory.apply(options); - final var finalGateway = gateway.start(); - gateways.add(finalGateway); + for (var factory : context.gatewaySuppliers) { + final var gateway = factory.get().start(serviceCall, context.serviceRegistry); + gateways.add(gateway); LOGGER.log( Level.INFO, "[{0}] Started {1}, gateway: {2}@{3}", instanceId, - finalGateway, - finalGateway.id(), - finalGateway.address()); + gateway, + gateway.id(), + gateway.address()); } } @@ -525,7 +521,7 @@ public static final class Context { private Integer externalPort; private ServiceDiscoveryFactory discoveryFactory; private Supplier transportSupplier; - private List> gatewayFactories = new ArrayList<>(); + private List> gatewaySuppliers = new ArrayList<>(); public Context() {} @@ -629,9 +625,9 @@ public Context discovery(ServiceDiscoveryFactory discoveryFactory) { } /** - * Setter for supplier of {@link ServiceTransport} instance. + * Setter for {@link ServiceTransport} supplier. * - * @param transportSupplier supplier of {@link ServiceTransport} instance + * @param transportSupplier {@link ServiceTransport} supplier * @return this */ public Context transport(Supplier transportSupplier) { @@ -640,13 +636,13 @@ public Context transport(Supplier transportSupplier) { } /** - * Setter for gateway. + * Adds {@link Gateway} supplier to the list of gateway suppliers. * - * @param factory gateway factory + * @param gatewaySupplier gatewaySupplier * @return this */ - public Context gateway(Function factory) { - gatewayFactories.add(factory); + public Context gateway(Supplier gatewaySupplier) { + gatewaySuppliers.add(gatewaySupplier); return this; } @@ -730,8 +726,8 @@ private Context conclude() { serviceProviders = new ArrayList<>(); } - if (gatewayFactories == null) { - gatewayFactories = new ArrayList<>(); + if (gatewaySuppliers == null) { + gatewaySuppliers = new ArrayList<>(); } return this;