You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem? Please describe.
At present there is (as far as I know) no load balancing supported for the spring abstraction for RSockets.
Describe the solution you'd like
A perfect final state would (in my imagination) uses cloud discovery provided by spring cloud to allow for auto-configured client side load balancing for RSockets (like @LoadBalanced does at present for the WebClient.Builder).
Describe alternatives you've considered none
Additional context
I made the following work (code pasted below): I use the standard spring classes to configure the RSocketRequester, and a mock implementation of ReactiveDiscoveryClient and combined both with the io.rsocket.client.LoadBalancedRSocketMono.
Some comments in advance:
I think the RSocketRequester.Builder should be split into two classes: (1) configuration of e.g. MimeType, DefaultPayload, ... (2) the actual connection, injecting the configuration. In such a situation, one could have two implementations of the connecting class, i.e. Default... and LoadBalanced... .
My solution at present has a problem that is not really clear to me: The LoadBalancedRSocketMono seems to have problems with reactively providing and especially updating the available RSockets. Thus on startup sometimes one possible connection is ignored and updating the sockets doesn't work at all. However, this seems to be related to io.rsocket.client library. (There are even several similar issues raised in the RSocket project, but apparently not really resolved.)
My implementation at present only supports websocket, since I am not sure how to easily auto-configure other transport implementations. But this is just a matter of doing.
Here is the implementation:
Server:
@Controller
@RequiredArgsConstructor
public class RSocketController {
private final Environment environment;
@MessageMapping("/hello")
public String hello(){
return "hello from rsocket port: " + environment.getProperty("local.server.port");
}
}
The server can be started several times. I use --server.port=8081 and --server.port=8082 command line options.
Client (the actually interesting part):
I defined a simple factory class with the issues mentioned above acknowledged in comments:
@RequiredArgsConstructor
public class LoadBalancedRSocketRequesterFactory {
private final RSocketRequester.Builder builder;
private final RSocketStrategies rSocketStrategies;
private final ReactiveDiscoveryClient reactiveDiscoveryClient;
private final Map<String, ConfigData> configDataMap = new ConcurrentHashMap<>();
public Mono<RSocketRequester> create(String serviceId) {
indirectlyCreateConfig(serviceId);
return loadBalancedRSocketMono(serviceId).map(balancedRSocket -> RSocketRequester.wrap(
balancedRSocket,
configDataMap.get(serviceId).getMimeType(),
configDataMap.get(serviceId).getMetaMimeType(),
rSocketStrategies
));
}
// only needed to get the Requester meta data
private void indirectlyCreateConfig(String serviceId) {
final RSocketRequester rSocketRequester = reactiveDiscoveryClient
.getInstances(serviceId)
.next()
.map(ServiceInstance::getUri)
.flatMap(builder::connectWebSocket)
.block();
Assert.notNull(rSocketRequester, "Default requester can't be null.");
configDataMap.put(
serviceId,
new ConfigData(rSocketRequester.dataMimeType(), rSocketRequester.metadataMimeType())
);
}
private LoadBalancedRSocketMono loadBalancedRSocketMono(String serviceId) {
return
// this would update the load balancer frequently, however doesn't work at present
// Flux
// .interval(Duration.ZERO, Duration.ofSeconds(15))
// .flatMap(i ->
reactiveDiscoveryClient
.getInstances(serviceId)
.map(serviceInstance -> new RSocketSupplier(() -> builder
.connectWebSocket(webSocketUri(serviceInstance))
.map(RSocketRequester::rsocket)
))
.collectList()
// )
.as(LoadBalancedRSocketMono::create);
}
private URI webSocketUri(ServiceInstance serviceInstance) {
return UriComponentsBuilder
.fromUri(serviceInstance.getUri())
.scheme("ws")
.build()
.toUri();
}
@Value
private static class ConfigData {
MimeType mimeType;
MimeType metaMimeType;
}
}
A simple configuration class (especially implementing the ReactiveDiscoveryClient):
@Configuration
public class LoadBalancerConfiguration {
@Bean
public LoadBalancedRSocketRequesterFactory loadBalancedRSocketRequesterFactory(
RSocketRequester.Builder builder,
RSocketStrategies rSocketStrategies
) {
return new LoadBalancedRSocketRequesterFactory(builder, rSocketStrategies, reactiveDiscoveryClient());
}
@Bean
public ReactiveDiscoveryClient reactiveDiscoveryClient() {
return new ReactiveDiscoveryClientMock();
}
@NoArgsConstructor
static class ReactiveDiscoveryClientMock implements ReactiveDiscoveryClient {
@Override
public String description() {
return null;
}
@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
return Flux.fromIterable(Arrays.asList(
serviceInstanceMock(8081),
serviceInstanceMock(8082)
));
}
@Override
public Flux<String> getServices() {
return null;
}
}
private static ServiceInstance serviceInstanceMock(int port) {
return new ServiceInstance() {
@Override
public String getServiceId() {
return null;
}
@Override
public String getHost() {
return null;
}
@Override
public int getPort() {
return 0;
}
@Override
public boolean isSecure() {
return false;
}
@Override
public URI getUri() {
return URI.create("http://localhost:" + port + "/rsocket");
}
@Override
public Map<String, String> getMetadata() {
return null;
}
};
}
And finally a service doing the actual requests:
@Service
public class RSocketRequestService {
private final Mono<RSocketRequester> loadBalancedRSocketRequester;
public RSocketRequestService(LoadBalancedRSocketRequesterFactory loadBalancedRSocketRequesterFactory) {
this.loadBalancedRSocketRequester = loadBalancedRSocketRequesterFactory.create("any");
}
@EventListener(ApplicationReadyEvent.class)
public void start() {
Flux
.interval(Duration.ofSeconds(1), Duration.ofMillis(10))
.flatMap(i -> request())
.subscribe(m -> System.out.println(Instant.now() + m));
}
private Mono<String> request() {
return loadBalancedRSocketRequester.flatMap(
rSocketRequester -> rSocketRequester
.route("/hello")
.retrieveMono(String.class)
);
}
}
bootstrap.yml :
spring:
cloud:
discovery:
enabled: false
I know this is no final solution but could be a starting point to enhance the awesome spring cloud project to make RSockets easily available in cloud native applications. If you think this is interesting, I would of course help implementing a really useful, spring style solution.
The text was updated successfully, but these errors were encountered:
Is your feature request related to a problem? Please describe.
At present there is (as far as I know) no load balancing supported for the spring abstraction for RSockets.
Describe the solution you'd like
A perfect final state would (in my imagination) uses cloud discovery provided by spring cloud to allow for auto-configured client side load balancing for RSockets (like @LoadBalanced does at present for the WebClient.Builder).
Describe alternatives you've considered
none
Additional context
I made the following work (code pasted below): I use the standard spring classes to configure the RSocketRequester, and a mock implementation of ReactiveDiscoveryClient and combined both with the io.rsocket.client.LoadBalancedRSocketMono.
Some comments in advance:
Here is the implementation:
Server:
with application.properties:
The server can be started several times. I use --server.port=8081 and --server.port=8082 command line options.
Client (the actually interesting part):
I defined a simple factory class with the issues mentioned above acknowledged in comments:
A simple configuration class (especially implementing the ReactiveDiscoveryClient):
And finally a service doing the actual requests:
bootstrap.yml :
I know this is no final solution but could be a starting point to enhance the awesome spring cloud project to make RSockets easily available in cloud native applications. If you think this is interesting, I would of course help implementing a really useful, spring style solution.
The text was updated successfully, but these errors were encountered: