Skip to content

Commit

Permalink
Uses RSocketRequesterBuilder.transports() for loadbalancing.
Browse files Browse the repository at this point in the history
Initially, it just has the configured broker.

See gh-8
  • Loading branch information
spencergibb committed Nov 18, 2021
1 parent 704a4ce commit ac32456
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
package io.rsocket.broker.client.spring;

import java.net.URI;
import java.util.Collections;
import java.util.List;

import io.rsocket.RSocket;
import io.rsocket.broker.common.spring.ClientTransportFactory;
import io.rsocket.broker.common.spring.DefaultClientTransportFactory;
import io.rsocket.broker.common.spring.MimeTypes;
import io.rsocket.broker.frames.RouteSetup;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.loadbalance.RoundRobinLoadbalanceStrategy;
import io.rsocket.transport.ClientTransport;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One;

Expand Down Expand Up @@ -131,7 +136,10 @@ public BrokerRSocketRequester brokerClientRSocketRequester(BrokerRSocketRequeste
.map(factory -> factory.create(broker))
.orElseThrow(() -> new IllegalStateException("Unknown transport " + properties));

BrokerRSocketRequester requester = builder.transport(clientTransport);
// TODO: targets and strategy as beans
Flux<List<LoadbalanceTarget>> loadbalanceTargets = Flux.just(Collections.singletonList(LoadbalanceTarget.from("config", clientTransport)));
RoundRobinLoadbalanceStrategy loadbalanceStrategy = new RoundRobinLoadbalanceStrategy();
BrokerRSocketRequester requester = builder.transports(loadbalanceTargets, loadbalanceStrategy);

// if we don't subscribe, there won't be a connection to the broker.
requester.rsocketClient().source().subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public RSocketRequester.Builder apply(Consumer<RSocketRequester.Builder> configu
}

@Override
public RSocketRequester transports(Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) {
return delegate.transports(targetPublisher, loadbalanceStrategy);
public BrokerRSocketRequester transports(Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) {
return wrap(delegate.transports(targetPublisher, loadbalanceStrategy));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
package io.rsocket.broker.client.spring;

import java.net.URI;
import java.util.List;

import io.rsocket.core.RSocketServer;
import io.rsocket.broker.common.Id;
import io.rsocket.broker.common.spring.ClientTransportFactory;
import io.rsocket.loadbalance.LoadbalanceStrategy;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.local.LocalClientTransport;
import io.rsocket.transport.local.LocalServerTransport;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
Expand Down Expand Up @@ -74,6 +79,19 @@ public BrokerRSocketRequester transport(ClientTransport transport) {
}
return super.transport(transport);
}

@Override
public BrokerRSocketRequester transports(Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) {
Flux<List<LoadbalanceTarget>> targets = Flux.from(targetPublisher)
.map(loadbalanceTargets -> {
assertThat(loadbalanceTargets).hasSize(1);
if (loadbalanceTargets.get(0).getTransport() instanceof LocalClientTransport) {
localTransportSet = true;
}
return loadbalanceTargets;
});
return super.transports(targets, loadbalanceStrategy);
}
}

@SpringBootConfiguration
Expand Down

0 comments on commit ac32456

Please sign in to comment.