Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of ping/pong for websocket gatewa #864

Merged
merged 3 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.scalecube.services.gateway;

import io.scalecube.services.Address;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.registry.api.ServiceRegistry;

public interface Gateway {

Expand All @@ -21,9 +23,11 @@ public interface Gateway {
/**
* Starts gateway.
*
* @param call {@link ServiceCall} instance
* @param serviceRegistry {@link ServiceRegistry} instance
* @return gateway instance
*/
Gateway start();
Gateway start(ServiceCall call, ServiceRegistry serviceRegistry);

/** Stops gateway. */
void stop();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface ServiceTransport {
/**
* Provider for {@link ServerTransport}.
*
* @param serviceRegistry serviceRegistry
* @param serviceRegistry {@link ServiceRegistry} instance
* @return {@code ServerTransport} instance
*/
ServerTransport serverTransport(ServiceRegistry serviceRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.scalecube.services.Address;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.gateway.Gateway;
import io.scalecube.services.gateway.GatewayOptions;
import io.scalecube.services.registry.api.ServiceRegistry;
import java.net.InetSocketAddress;
import java.util.StringJoiner;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.LoopResources;

public class HttpGateway implements Gateway {

private final GatewayOptions options;
private final String id;
private final int port;
private final Function<ServiceCall, ServiceCall> callFactory;
private final ServiceProviderErrorMapper errorMapper;
private final boolean corsEnabled;
private final CorsConfigBuilder corsConfigBuilder;
Expand All @@ -26,28 +29,35 @@ public class HttpGateway implements Gateway {
private LoopResources loopResources;

private HttpGateway(Builder builder) {
this.options = builder.options;
this.id = builder.id;
this.port = builder.port;
this.callFactory = builder.callFactory;
this.errorMapper = builder.errorMapper;
this.corsEnabled = builder.corsEnabled;
this.corsConfigBuilder = builder.corsConfigBuilder;
}

@Override
public String id() {
return options.id();
return id;
}

@Override
public Gateway start() {
HttpGatewayAcceptor gatewayAcceptor = new HttpGatewayAcceptor(options.call(), errorMapper);

public Gateway start(ServiceCall call, ServiceRegistry serviceRegistry) {
loopResources =
LoopResources.create(
options.id() + ":" + options.port(), LoopResources.DEFAULT_IO_WORKER_COUNT, true);
LoopResources.create(id + ":" + port, LoopResources.DEFAULT_IO_WORKER_COUNT, true);

try {
prepareHttpServer(loopResources, options.port())
.handle(gatewayAcceptor)
HttpServer.create()
.runOn(loopResources)
.bindAddress(() -> new InetSocketAddress(port))
.doOnConnection(
connection -> {
if (corsEnabled) {
connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build()));
}
})
.handle(new HttpGatewayAcceptor(callFactory.apply(call), errorMapper))
.bind()
.doOnSuccess(server -> this.server = server)
.toFuture()
Expand All @@ -59,23 +69,6 @@ public Gateway start() {
return this;
}

private HttpServer prepareHttpServer(LoopResources loopResources, int port) {
HttpServer httpServer = HttpServer.create();

if (loopResources != null) {
httpServer = httpServer.runOn(loopResources);
}

return httpServer
.bindAddress(() -> new InetSocketAddress(port))
.doOnConnection(
connection -> {
if (corsEnabled) {
connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build()));
}
});
}

@Override
public Address address() {
InetSocketAddress address = (InetSocketAddress) server.address();
Expand All @@ -100,21 +93,11 @@ private void shutdownLoopResources(LoopResources loopResources) {
}
}

@Override
public String toString() {
return new StringJoiner(", ", HttpGateway.class.getSimpleName() + "[", "]")
.add("options=" + options)
.add("errorMapper=" + errorMapper)
.add("corsEnabled=" + corsEnabled)
.add("corsConfigBuilder=" + corsConfigBuilder)
.add("server=" + server)
.add("loopResources=" + loopResources)
.toString();
}

public static class Builder {

private GatewayOptions options;
private String id = "http@" + Integer.toHexString(hashCode());
private int port;
private Function<ServiceCall, ServiceCall> callFactory = call -> call;
private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
private boolean corsEnabled = false;
private CorsConfigBuilder corsConfigBuilder =
Expand All @@ -125,12 +108,26 @@ public static class Builder {

public Builder() {}

public GatewayOptions options() {
return options;
public String id() {
return id;
}

public Builder id(String id) {
this.id = id;
return this;
}

public int port() {
return port;
}

public Builder port(int port) {
this.port = port;
return this;
}

public Builder options(GatewayOptions options) {
this.options = options;
public Builder serviceCall(Function<ServiceCall, ServiceCall> operator) {
callFactory = callFactory.andThen(operator);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.gateway.ReferenceCountUtil;
import io.scalecube.services.transport.api.DataCodec;
Expand All @@ -37,10 +36,6 @@ public class HttpGatewayAcceptor
private final ServiceCall serviceCall;
private final ServiceProviderErrorMapper errorMapper;

HttpGatewayAcceptor(ServiceCall serviceCall) {
this(serviceCall, DefaultErrorMapper.INSTANCE);
}

HttpGatewayAcceptor(ServiceCall serviceCall, ServiceProviderErrorMapper errorMapper) {
this.serviceCall = serviceCall;
this.errorMapper = errorMapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.scalecube.services.gateway.websocket;

import io.scalecube.services.annotations.Service;
import io.scalecube.services.annotations.ServiceMethod;
import reactor.core.publisher.Mono;

@Service(HeartbeatService.NAMESPACE)
public interface HeartbeatService {

String NAMESPACE = "v1/scalecube.websocket.heartbeat";

@ServiceMethod
Mono<Long> ping(long value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.scalecube.services.gateway.websocket;

import reactor.core.publisher.Mono;

public class HeartbeatServiceImpl implements HeartbeatService {

@Override
public Mono<Long> ping(long value) {
return Mono.just(value);
}
}
Loading
Loading