Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load balancing support for RSocket #1

Closed
PMacho opened this issue Apr 17, 2020 · 1 comment
Closed

Load balancing support for RSocket #1

PMacho opened this issue Apr 17, 2020 · 1 comment

Comments

@PMacho
Copy link

PMacho commented Apr 17, 2020

Is your feature request related to a problem? Please describe.
At present there is (as far as I know) no load balancing supported for the spring abstraction for RSockets.

Describe the solution you'd like
A perfect final state would (in my imagination) uses cloud discovery provided by spring cloud to allow for auto-configured client side load balancing for RSockets (like @LoadBalanced does at present for the WebClient.Builder).

Describe alternatives you've considered
none

Additional context
I made the following work (code pasted below): I use the standard spring classes to configure the RSocketRequester, and a mock implementation of ReactiveDiscoveryClient and combined both with the io.rsocket.client.LoadBalancedRSocketMono.

Some comments in advance:

  • I think the RSocketRequester.Builder should be split into two classes: (1) configuration of e.g. MimeType, DefaultPayload, ... (2) the actual connection, injecting the configuration. In such a situation, one could have two implementations of the connecting class, i.e. Default... and LoadBalanced... .
  • My solution at present has a problem that is not really clear to me: The LoadBalancedRSocketMono seems to have problems with reactively providing and especially updating the available RSockets. Thus on startup sometimes one possible connection is ignored and updating the sockets doesn't work at all. However, this seems to be related to io.rsocket.client library. (There are even several similar issues raised in the RSocket project, but apparently not really resolved.)
  • My implementation at present only supports websocket, since I am not sure how to easily auto-configure other transport implementations. But this is just a matter of doing.

Here is the implementation:

Server:

@Controller
@RequiredArgsConstructor
public class RSocketController {

    private final Environment environment;

    @MessageMapping("/hello")
    public String hello(){
        return "hello from rsocket port: " + environment.getProperty("local.server.port");
    }
}

with application.properties:

spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket

The server can be started several times. I use --server.port=8081 and --server.port=8082 command line options.

Client (the actually interesting part):

I defined a simple factory class with the issues mentioned above acknowledged in comments:

@RequiredArgsConstructor
public class LoadBalancedRSocketRequesterFactory {

    private final RSocketRequester.Builder builder;
    private final RSocketStrategies rSocketStrategies;
    private final ReactiveDiscoveryClient reactiveDiscoveryClient;

    private final Map<String, ConfigData> configDataMap = new ConcurrentHashMap<>();


    public Mono<RSocketRequester> create(String serviceId) {
        indirectlyCreateConfig(serviceId);

        return loadBalancedRSocketMono(serviceId).map(balancedRSocket -> RSocketRequester.wrap(
                balancedRSocket,
                configDataMap.get(serviceId).getMimeType(),
                configDataMap.get(serviceId).getMetaMimeType(),
                rSocketStrategies
        ));
    }

    // only needed to get the Requester meta data
    private void indirectlyCreateConfig(String serviceId) {
        final RSocketRequester rSocketRequester = reactiveDiscoveryClient
                .getInstances(serviceId)
                .next()
                .map(ServiceInstance::getUri)
                .flatMap(builder::connectWebSocket)
                .block();

        Assert.notNull(rSocketRequester, "Default requester can't be null.");
        configDataMap.put(
                serviceId,
                new ConfigData(rSocketRequester.dataMimeType(), rSocketRequester.metadataMimeType())
        );
    }

    private LoadBalancedRSocketMono loadBalancedRSocketMono(String serviceId) {
        return
                // this would update the load balancer frequently, however doesn't work at present
//                Flux
//                        .interval(Duration.ZERO, Duration.ofSeconds(15))
//                        .flatMap(i ->
                reactiveDiscoveryClient
                        .getInstances(serviceId)
                        .map(serviceInstance -> new RSocketSupplier(() -> builder
                                .connectWebSocket(webSocketUri(serviceInstance))
                                .map(RSocketRequester::rsocket)
                        ))
                        .collectList()
//                        )
                        .as(LoadBalancedRSocketMono::create);
    }

    private URI webSocketUri(ServiceInstance serviceInstance) {
        return UriComponentsBuilder
                .fromUri(serviceInstance.getUri())
                .scheme("ws")
                .build()
                .toUri();
    }

    @Value
    private static class ConfigData {
        MimeType mimeType;
        MimeType metaMimeType;
    }

}

A simple configuration class (especially implementing the ReactiveDiscoveryClient):

@Configuration
public class LoadBalancerConfiguration {

    @Bean
    public LoadBalancedRSocketRequesterFactory loadBalancedRSocketRequesterFactory(
            RSocketRequester.Builder builder,
            RSocketStrategies rSocketStrategies
    ) {
        return new LoadBalancedRSocketRequesterFactory(builder, rSocketStrategies, reactiveDiscoveryClient());
    }

    @Bean
    public ReactiveDiscoveryClient reactiveDiscoveryClient() {
        return new ReactiveDiscoveryClientMock();
    }

    @NoArgsConstructor
    static class ReactiveDiscoveryClientMock implements ReactiveDiscoveryClient {
        @Override
        public String description() {
            return null;
        }

        @Override
        public Flux<ServiceInstance> getInstances(String serviceId) {
            return Flux.fromIterable(Arrays.asList(
                    serviceInstanceMock(8081),
                    serviceInstanceMock(8082)
            ));
        }

        @Override
        public Flux<String> getServices() {
            return null;
        }
    }

    private static ServiceInstance serviceInstanceMock(int port) {
        return new ServiceInstance() {
            @Override
            public String getServiceId() {
                return null;
            }

            @Override
            public String getHost() {
                return null;
            }

            @Override
            public int getPort() {
                return 0;
            }

            @Override
            public boolean isSecure() {
                return false;
            }

            @Override
            public URI getUri() {
                return URI.create("http://localhost:" + port + "/rsocket");
            }

            @Override
            public Map<String, String> getMetadata() {
                return null;
            }
        };
    }

And finally a service doing the actual requests:

@Service
public class RSocketRequestService {

    private final Mono<RSocketRequester> loadBalancedRSocketRequester;

    public RSocketRequestService(LoadBalancedRSocketRequesterFactory loadBalancedRSocketRequesterFactory) {
        this.loadBalancedRSocketRequester = loadBalancedRSocketRequesterFactory.create("any");
    }

    @EventListener(ApplicationReadyEvent.class)
    public void start() {
        Flux
                .interval(Duration.ofSeconds(1), Duration.ofMillis(10))
                .flatMap(i -> request())
                .subscribe(m -> System.out.println(Instant.now() + m));
    }

    private Mono<String> request() {
        return loadBalancedRSocketRequester.flatMap(
                rSocketRequester -> rSocketRequester
                        .route("/hello")
                        .retrieveMono(String.class)
        );
    }

}

bootstrap.yml :

spring:
  cloud:
    discovery:
      enabled: false

I know this is no final solution but could be a starting point to enhance the awesome spring cloud project to make RSockets easily available in cloud native applications. If you think this is interesting, I would of course help implementing a really useful, spring style solution.

@spencergibb
Copy link
Member

Think that this belongs here rsocket-broker/rsocket-broker-client#8

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants