Skip to content

Commit

Permalink
Fixed Gateway and Microservices, added test o njHearbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Oct 13, 2024
1 parent bdbccf5 commit 5a68e6f
Show file tree
Hide file tree
Showing 16 changed files with 84 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.scalecube.services.gateway;

import io.scalecube.services.Address;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.registry.api.ServiceRegistry;

public interface Gateway {

Expand All @@ -21,9 +23,11 @@ public interface Gateway {
/**
* Starts gateway.
*
* @param call {@link ServiceCall} instance
* @param serviceRegistry {@link ServiceRegistry} instance
* @return gateway instance
*/
Gateway start();
Gateway start(ServiceCall call, ServiceRegistry serviceRegistry);

/** Stops gateway. */
void stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface ServiceTransport {
/**
* Provider for {@link ServerTransport}.
*
* @param serviceRegistry serviceRegistry
* @param serviceRegistry {@link ServiceRegistry} instance
* @return {@code ServerTransport} instance
*/
ServerTransport serverTransport(ServiceRegistry serviceRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.gateway.Gateway;
import io.scalecube.services.registry.api.ServiceRegistry;
import java.net.InetSocketAddress;
import java.util.StringJoiner;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.LoopResources;
Expand All @@ -19,7 +20,7 @@ public class HttpGateway implements Gateway {

private final String id;
private final int port;
private final ServiceCall serviceCall;
private final Function<ServiceCall, ServiceCall> callFactory;
private final ServiceProviderErrorMapper errorMapper;
private final boolean corsEnabled;
private final CorsConfigBuilder corsConfigBuilder;
Expand All @@ -30,7 +31,7 @@ public class HttpGateway implements Gateway {
private HttpGateway(Builder builder) {
this.id = builder.id;
this.port = builder.port;
this.serviceCall = builder.serviceCall;
this.callFactory = builder.callFactory;
this.errorMapper = builder.errorMapper;
this.corsEnabled = builder.corsEnabled;
this.corsConfigBuilder = builder.corsConfigBuilder;
Expand All @@ -42,9 +43,7 @@ public String id() {
}

@Override
public Gateway start() {
HttpGatewayAcceptor gatewayAcceptor = new HttpGatewayAcceptor(serviceCall, errorMapper);

public Gateway start(ServiceCall call, ServiceRegistry serviceRegistry) {
loopResources =
LoopResources.create(id + ":" + port, LoopResources.DEFAULT_IO_WORKER_COUNT, true);

Expand All @@ -58,7 +57,7 @@ public Gateway start() {
connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build()));
}
})
.handle(gatewayAcceptor)
.handle(new HttpGatewayAcceptor(callFactory.apply(call), errorMapper))
.bind()
.doOnSuccess(server -> this.server = server)
.toFuture()
Expand Down Expand Up @@ -94,25 +93,11 @@ private void shutdownLoopResources(LoopResources loopResources) {
}
}

@Override
public String toString() {
return new StringJoiner(", ", HttpGateway.class.getSimpleName() + "[", "]")
.add("id='" + id + "'")
.add("port=" + port)
.add("serviceCall=" + serviceCall)
.add("errorMapper=" + errorMapper)
.add("corsEnabled=" + corsEnabled)
.add("corsConfigBuilder=" + corsConfigBuilder)
.add("server=" + server)
.add("loopResources=" + loopResources)
.toString();
}

public static class Builder {

private String id = "http@" + Integer.toHexString(hashCode());
private int port;
private ServiceCall serviceCall;
private Function<ServiceCall, ServiceCall> callFactory = call -> call;
private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
private boolean corsEnabled = false;
private CorsConfigBuilder corsConfigBuilder =
Expand Down Expand Up @@ -141,12 +126,8 @@ public Builder port(int port) {
return this;
}

public ServiceCall serviceCall() {
return serviceCall;
}

public Builder serviceCall(ServiceCall serviceCall) {
this.serviceCall = serviceCall;
public Builder serviceCall(Function<ServiceCall, ServiceCall> operator) {
callFactory = callFactory.andThen(operator);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
@Service(HeartbeatService.NAMESPACE)
public interface HeartbeatService {

String NAMESPACE = "v1/scalecube.websocket";
String NAMESPACE = "v1/scalecube.websocket.heartbeat";

@ServiceMethod
Mono<Long> ping(long value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.scalecube.services.Address;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.ServiceInfo;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.gateway.Gateway;
import io.scalecube.services.gateway.GatewaySessionHandler;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.services.transport.api.ServiceMessageDataDecoder;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.StringJoiner;
import java.util.function.Function;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
Expand All @@ -19,9 +22,10 @@ public class WebsocketGateway implements Gateway {

private final String id;
private final int port;
private final ServiceCall serviceCall;
private final Function<ServiceCall, ServiceCall> callFactory;
private final GatewaySessionHandler gatewayHandler;
private final Duration keepAliveInterval;
private final boolean heartbeatEnabled;
private final ServiceProviderErrorMapper errorMapper;

private DisposableServer server;
Expand All @@ -30,9 +34,10 @@ public class WebsocketGateway implements Gateway {
private WebsocketGateway(Builder builder) {
this.id = builder.id;
this.port = builder.port;
this.serviceCall = builder.serviceCall;
this.callFactory = builder.callFactory;
this.gatewayHandler = builder.gatewayHandler;
this.keepAliveInterval = builder.keepAliveInterval;
this.heartbeatEnabled = builder.heartbeatEnabled;
this.errorMapper = builder.errorMapper;
}

Expand All @@ -42,19 +47,25 @@ public String id() {
}

@Override
public Gateway start() {
WebsocketGatewayAcceptor gatewayAcceptor =
new WebsocketGatewayAcceptor(serviceCall, gatewayHandler, errorMapper);

public Gateway start(ServiceCall call, ServiceRegistry serviceRegistry) {
loopResources =
LoopResources.create(id + ":" + port, LoopResources.DEFAULT_IO_WORKER_COUNT, true);

if (heartbeatEnabled) {
serviceRegistry.registerService(
ServiceInfo.fromServiceInstance(new HeartbeatServiceImpl())
.errorMapper(DefaultErrorMapper.INSTANCE)
.dataDecoder(ServiceMessageDataDecoder.INSTANCE)
.build());
}

try {
HttpServer.create()
.runOn(loopResources)
.bindAddress(() -> new InetSocketAddress(port))
.doOnConnection(this::setupKeepAlive)
.handle(gatewayAcceptor)
.handle(
new WebsocketGatewayAcceptor(callFactory.apply(call), gatewayHandler, errorMapper))
.bind()
.doOnSuccess(server -> this.server = server)
.thenReturn(this)
Expand Down Expand Up @@ -123,27 +134,14 @@ private void onReadIdle(Connection connection) {
});
}

@Override
public String toString() {
return new StringJoiner(", ", WebsocketGateway.class.getSimpleName() + "[", "]")
.add("id='" + id + "'")
.add("port=" + port)
.add("serviceCall=" + serviceCall)
.add("gatewayHandler=" + gatewayHandler)
.add("keepAliveInterval=" + keepAliveInterval)
.add("errorMapper=" + errorMapper)
.add("server=" + server)
.add("loopResources=" + loopResources)
.toString();
}

public static class Builder {

private String id = "websocket@" + Integer.toHexString(hashCode());
private int port;
private ServiceCall serviceCall;
private Function<ServiceCall, ServiceCall> callFactory = call -> call;
private GatewaySessionHandler gatewayHandler = GatewaySessionHandler.DEFAULT_INSTANCE;
private Duration keepAliveInterval = Duration.ZERO;
private boolean heartbeatEnabled = false;
private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;

public Builder() {}
Expand All @@ -166,12 +164,8 @@ public Builder port(int port) {
return this;
}

public ServiceCall serviceCall() {
return serviceCall;
}

public Builder serviceCall(ServiceCall serviceCall) {
this.serviceCall = serviceCall;
public Builder serviceCall(Function<ServiceCall, ServiceCall> operator) {
callFactory = callFactory.andThen(operator);
return this;
}

Expand All @@ -193,6 +187,15 @@ public Builder keepAliveInterval(Duration keepAliveInterval) {
return this;
}

public boolean heartbeatEnabled() {
return heartbeatEnabled;
}

public Builder heartbeatEnabled(boolean heartbeatEnabled) {
this.heartbeatEnabled = heartbeatEnabled;
return this;
}

public ServiceProviderErrorMapper errorMapper() {
return errorMapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ void testCrossOriginRequest() {
Microservices.start(
new Context()
.gateway(
(context, call) ->
() ->
new HttpGateway.Builder()
.id("http")
.serviceCall(call)
.corsEnabled(true)
.corsConfigBuilder(
builder ->
Expand Down Expand Up @@ -115,9 +114,7 @@ void testOptionRequestCorsDisabled() {
gateway =
Microservices.start(
new Context()
.gateway(
(context, call) ->
new HttpGateway.Builder().id("http").serviceCall(call).build())
.gateway(() -> new HttpGateway.Builder().id("http").build())
.services(new GreetingServiceImpl()));

final HttpClient client = newClient(gateway.gateway("http").address());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ void beforEach() {
.transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory()))
.options(opts -> opts.metadata(serviceEndpoint)))
.transport(RSocketServiceTransport::new)
.gateway(
(context, call) ->
new HttpGateway.Builder().id("HTTP").serviceCall(call).build()));
.gateway(() -> new HttpGateway.Builder().id("HTTP").build()));

gatewayAddress = gateway.gateway("HTTP").address();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ static void beforeAll() {
.options(opts -> opts.metadata(serviceEndpoint)))
.transport(RSocketServiceTransport::new)
.gateway(
(context, call) ->
new HttpGateway.Builder()
.id("HTTP")
.serviceCall(call)
.errorMapper(ERROR_MAPPER)
.build()));
() -> new HttpGateway.Builder().id("HTTP").errorMapper(ERROR_MAPPER).build()));

gatewayAddress = gateway.gateway("HTTP").address();
router = new StaticAddressRouter(gatewayAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ static void beforeAll() {
gateway =
Microservices.start(
new Context()
.gateway(
(context, call) ->
new HttpGateway.Builder().id("HTTP").serviceCall(call).build())
.gateway(() -> new HttpGateway.Builder().id("HTTP").build())
.services(new GreetingServiceImpl())
.services(
ServiceInfo.fromServiceInstance(new ErrorServiceImpl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ void beforEach() {
.options(opts -> opts.metadata(serviceEndpoint)))
.transport(RSocketServiceTransport::new)
.gateway(
(context, call) ->
() ->
new WebsocketGateway.Builder()
.id("WS")
.serviceCall(call)
.gatewayHandler(sessionEventHandler)
.build()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ static void beforeAll() {
.options(opts -> opts.metadata(serviceEndpoint)))
.transport(RSocketServiceTransport::new)
.gateway(
(context, call) ->
() ->
new WebsocketGateway.Builder()
.id("WS")
.serviceCall(call)
.gatewayHandler(new TestGatewaySessionHandler())
.build()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ static void beforeAll() {
Microservices.start(
new Context()
.gateway(
(context, call) ->
() ->
new WebsocketGateway.Builder()
.id("WS")
.serviceCall(call)
.gatewayHandler(new GatewaySessionHandlerImpl(AUTH_REGISTRY))
.build())
.services(new SecuredServiceImpl(AUTH_REGISTRY)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -61,8 +62,7 @@ static void beforeAll() {
.options(opts -> opts.metadata(serviceEndpoint)))
.transport(RSocketServiceTransport::new)
.gateway(
(context, call) ->
new WebsocketGateway.Builder().id("WS").serviceCall(call).build()));
() -> new WebsocketGateway.Builder().id("WS").heartbeatEnabled(true).build()));

gatewayAddress = gateway.gateway("WS").address();
router = new StaticAddressRouter(gatewayAddress);
Expand Down Expand Up @@ -231,4 +231,12 @@ void shouldReturnSomeExceptionOnFlux() {
void shouldReturnSomeExceptionOnMono() {
StepVerifier.create(errorService.oneError()).expectError(SomeException.class).verify(TIMEOUT);
}

@Test
void shouldHeartbeat() {
final var value = System.nanoTime();
StepVerifier.create(serviceCall.api(HeartbeatService.class).ping(value))
.assertNext(pongValue -> Assertions.assertEquals(value, pongValue))
.verifyComplete();
}
}
Loading

0 comments on commit 5a68e6f

Please sign in to comment.