From ac324565640b58627ef1a89ad50cfdb632f5d793 Mon Sep 17 00:00:00 2001 From: spencergibb Date: Thu, 18 Nov 2021 15:11:54 -0500 Subject: [PATCH] Uses RSocketRequesterBuilder.transports() for loadbalancing. Initially, it just has the configured broker. See gh-8 --- .../spring/BrokerClientAutoConfiguration.java | 10 +++++++++- .../spring/BrokerRSocketRequesterBuilder.java | 4 ++-- .../CustomClientTransportFactoryTests.java | 18 ++++++++++++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/rsocket-broker-client-spring/src/main/java/io/rsocket/broker/client/spring/BrokerClientAutoConfiguration.java b/rsocket-broker-client-spring/src/main/java/io/rsocket/broker/client/spring/BrokerClientAutoConfiguration.java index c04828f..3c99424 100644 --- a/rsocket-broker-client-spring/src/main/java/io/rsocket/broker/client/spring/BrokerClientAutoConfiguration.java +++ b/rsocket-broker-client-spring/src/main/java/io/rsocket/broker/client/spring/BrokerClientAutoConfiguration.java @@ -17,14 +17,19 @@ package io.rsocket.broker.client.spring; import java.net.URI; +import java.util.Collections; +import java.util.List; import io.rsocket.RSocket; import io.rsocket.broker.common.spring.ClientTransportFactory; import io.rsocket.broker.common.spring.DefaultClientTransportFactory; import io.rsocket.broker.common.spring.MimeTypes; import io.rsocket.broker.frames.RouteSetup; +import io.rsocket.loadbalance.LoadbalanceTarget; +import io.rsocket.loadbalance.RoundRobinLoadbalanceStrategy; import io.rsocket.transport.ClientTransport; import reactor.core.Disposable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; @@ -131,7 +136,10 @@ public BrokerRSocketRequester brokerClientRSocketRequester(BrokerRSocketRequeste .map(factory -> factory.create(broker)) .orElseThrow(() -> new IllegalStateException("Unknown transport " + properties)); - BrokerRSocketRequester requester = builder.transport(clientTransport); + // TODO: targets and strategy as beans + Flux> loadbalanceTargets = Flux.just(Collections.singletonList(LoadbalanceTarget.from("config", clientTransport))); + RoundRobinLoadbalanceStrategy loadbalanceStrategy = new RoundRobinLoadbalanceStrategy(); + BrokerRSocketRequester requester = builder.transports(loadbalanceTargets, loadbalanceStrategy); // if we don't subscribe, there won't be a connection to the broker. requester.rsocketClient().source().subscribe(); diff --git a/rsocket-broker-client-spring/src/main/java/io/rsocket/broker/client/spring/BrokerRSocketRequesterBuilder.java b/rsocket-broker-client-spring/src/main/java/io/rsocket/broker/client/spring/BrokerRSocketRequesterBuilder.java index b8f8d62..9c21a78 100644 --- a/rsocket-broker-client-spring/src/main/java/io/rsocket/broker/client/spring/BrokerRSocketRequesterBuilder.java +++ b/rsocket-broker-client-spring/src/main/java/io/rsocket/broker/client/spring/BrokerRSocketRequesterBuilder.java @@ -111,8 +111,8 @@ public RSocketRequester.Builder apply(Consumer configu } @Override - public RSocketRequester transports(Publisher> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) { - return delegate.transports(targetPublisher, loadbalanceStrategy); + public BrokerRSocketRequester transports(Publisher> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) { + return wrap(delegate.transports(targetPublisher, loadbalanceStrategy)); } @Override diff --git a/rsocket-broker-client-spring/src/test/java/io/rsocket/broker/client/spring/CustomClientTransportFactoryTests.java b/rsocket-broker-client-spring/src/test/java/io/rsocket/broker/client/spring/CustomClientTransportFactoryTests.java index 90ae77d..168cbd3 100644 --- a/rsocket-broker-client-spring/src/test/java/io/rsocket/broker/client/spring/CustomClientTransportFactoryTests.java +++ b/rsocket-broker-client-spring/src/test/java/io/rsocket/broker/client/spring/CustomClientTransportFactoryTests.java @@ -17,14 +17,19 @@ package io.rsocket.broker.client.spring; import java.net.URI; +import java.util.List; import io.rsocket.core.RSocketServer; import io.rsocket.broker.common.Id; import io.rsocket.broker.common.spring.ClientTransportFactory; +import io.rsocket.loadbalance.LoadbalanceStrategy; +import io.rsocket.loadbalance.LoadbalanceTarget; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.local.LocalClientTransport; import io.rsocket.transport.local.LocalServerTransport; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringBootConfiguration; @@ -74,6 +79,19 @@ public BrokerRSocketRequester transport(ClientTransport transport) { } return super.transport(transport); } + + @Override + public BrokerRSocketRequester transports(Publisher> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) { + Flux> targets = Flux.from(targetPublisher) + .map(loadbalanceTargets -> { + assertThat(loadbalanceTargets).hasSize(1); + if (loadbalanceTargets.get(0).getTransport() instanceof LocalClientTransport) { + localTransportSet = true; + } + return loadbalanceTargets; + }); + return super.transports(targets, loadbalanceStrategy); + } } @SpringBootConfiguration