diff --git a/services-benchmarks/src/main/java/io/scalecube/services/benchmarks/transport/BenchmarkServiceState.java b/services-benchmarks/src/main/java/io/scalecube/services/benchmarks/transport/BenchmarkServiceState.java index a1dfaf721..29722131a 100644 --- a/services-benchmarks/src/main/java/io/scalecube/services/benchmarks/transport/BenchmarkServiceState.java +++ b/services-benchmarks/src/main/java/io/scalecube/services/benchmarks/transport/BenchmarkServiceState.java @@ -37,15 +37,15 @@ public void beforeAll() { .transport(RSocketServiceTransport::new) .startAwait(); - Address seedAddress = seed.discovery().address(); + final Address seedAddress = seed.discovery().address(); node = Microservices.builder() .metrics(registry()) .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(seedAddress)))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services(services) .startAwait(); diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index ff7f4c76c..9f8cb3d7f 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -4,7 +4,11 @@ import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterImpl; import io.scalecube.cluster.ClusterMessageHandler; +import io.scalecube.cluster.fdetector.FailureDetectorConfig; +import io.scalecube.cluster.gossip.GossipConfig; +import io.scalecube.cluster.membership.MembershipConfig; import io.scalecube.cluster.membership.MembershipEvent; +import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.net.Address; import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.discovery.api.ServiceDiscovery; @@ -32,10 +36,8 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery { private static final Logger LOGGER = LoggerFactory.getLogger("io.scalecube.services.discovery.ServiceDiscovery"); - private final ServiceEndpoint serviceEndpoint; - + private ServiceEndpoint serviceEndpoint; private ClusterConfig clusterConfig; - private Cluster cluster; private final DirectProcessor subject = DirectProcessor.create(); @@ -63,9 +65,9 @@ private ScalecubeServiceDiscovery(ScalecubeServiceDiscovery other) { } /** - * Setter for {@code ClusterConfig.Builder} options. + * Setter for {@code ClusterConfig} options. * - * @param opts ClusterConfig options builder + * @param opts options operator * @return new instance of {@code ScalecubeServiceDiscovery} */ public ScalecubeServiceDiscovery options(UnaryOperator opts) { @@ -74,6 +76,46 @@ public ScalecubeServiceDiscovery options(UnaryOperator opts) { return d; } + /** + * Setter for {@code TransportConfig} options. + * + * @param opts options operator + * @return new instance of {@code ScalecubeServiceDiscovery} + */ + public ScalecubeServiceDiscovery transport(UnaryOperator opts) { + return options(cfg -> cfg.transport(opts)); + } + + /** + * Setter for {@code MembershipConfig} options. + * + * @param opts options operator + * @return new instance of {@code ScalecubeServiceDiscovery} + */ + public ScalecubeServiceDiscovery membership(UnaryOperator opts) { + return options(cfg -> cfg.membership(opts)); + } + + /** + * Setter for {@code GossipConfig} options. + * + * @param opts options operator + * @return new instance of {@code ScalecubeServiceDiscovery} + */ + public ScalecubeServiceDiscovery gossip(UnaryOperator opts) { + return options(cfg -> cfg.gossip(opts)); + } + + /** + * Setter for {@code FailureDetectorConfig} options. + * + * @param opts options operator + * @return new instance of {@code ScalecubeServiceDiscovery} + */ + public ScalecubeServiceDiscovery failureDetector(UnaryOperator opts) { + return options(cfg -> cfg.failureDetector(opts)); + } + @Override public Address address() { return cluster.address(); diff --git a/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java b/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java index 8305ac2f3..7701479ee 100644 --- a/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java +++ b/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java @@ -219,21 +219,19 @@ public static ServiceEndpoint newServiceEndpoint() { private Mono newServiceDiscovery( Address seedAddress, MetadataCodec metadataCodec) { return Mono.fromCallable( - () -> { - ServiceEndpoint serviceEndpoint = newServiceEndpoint(); - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.metadataCodec(metadataCodec)) - .options(opts -> opts.gossip(cfg -> GOSSIP_CONFIG)) - .options(opts -> opts.membership(cfg -> MEMBERSHIP_CONFIG)) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(seedAddress))); - }); + () -> + new ScalecubeServiceDiscovery(newServiceEndpoint()) + .options(opts -> opts.metadataCodec(metadataCodec)) + .gossip(cfg -> GOSSIP_CONFIG) + .membership(cfg -> MEMBERSHIP_CONFIG) + .membership(cfg -> cfg.seedMembers(seedAddress))); } private Address startSeed(MetadataCodec metadataCodec) { return new ScalecubeServiceDiscovery(newServiceEndpoint()) .options(opts -> opts.metadataCodec(metadataCodec)) - .options(opts -> opts.gossip(cfg -> GOSSIP_CONFIG)) - .options(opts -> opts.membership(cfg -> MEMBERSHIP_CONFIG)) + .gossip(cfg -> GOSSIP_CONFIG) + .membership(cfg -> MEMBERSHIP_CONFIG) .start() .block() .address(); diff --git a/services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java b/services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java index f4b9697ca..631e505fe 100644 --- a/services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java +++ b/services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java @@ -32,9 +32,8 @@ public class ExamplesRunner { * Main method of gateway runner. * * @param args program arguments - * @throws Exception exception thrown */ - public static void main(String[] args) throws Exception { + public static void main(String[] args) { ConfigRegistry configRegistry = ConfigBootstrap.configRegistry(); Config config = @@ -53,7 +52,7 @@ public static void main(String[] args) throws Exception { LOGGER.info("Number of worker threads: " + numOfThreads); Microservices.builder() - .discovery(serviceEndpoint -> serviceDiscovery(serviceEndpoint, config)) + .discovery(endpoint -> serviceDiscovery(endpoint, config)) .transport( () -> new RSocketServiceTransport() @@ -77,14 +76,10 @@ public static void main(String[] args) throws Exception { .block(); } - private static ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint, Config config) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership(cfg1 -> cfg1.seedMembers(config.seedAddresses())) - .transport(cfg1 -> cfg1.port(config.discoveryPort())) - .memberHost(config.memberHost()) - .memberPort(config.memberPort())); + private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint, Config config) { + return new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(config.seedAddresses())) + .transport(cfg -> cfg.port(config.discoveryPort())); } public static class Config { diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java index 15aaab988..bafd07b87 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java @@ -1,10 +1,9 @@ package io.scalecube.services.examples.exceptions; +import io.scalecube.net.Address; import io.scalecube.services.Microservices; -import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceInfo; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import java.util.Collections; @@ -30,9 +29,14 @@ public static void main(String[] args) throws InterruptedException { System.err.println("ms1 started: " + ms1.serviceAddress()); + final Address address1 = ms1.discovery().address(); + Microservices ms2 = Microservices.builder() - .discovery(serviceEndpoint -> serviceDiscovery(serviceEndpoint, ms1)) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(address1))) .transport(RSocketServiceTransport::new) .services( call -> { @@ -63,10 +67,4 @@ public static void main(String[] args) throws InterruptedException { Thread.currentThread().join(); } - - private static ServiceDiscovery serviceDiscovery( - ServiceEndpoint serviceEndpoint, Microservices ms1) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(ms1.discovery().address()))); - } } diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java index bb24349d9..1ca40f956 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java @@ -1,5 +1,6 @@ package io.scalecube.services.examples.helloworld; +import io.scalecube.net.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; @@ -29,16 +30,15 @@ public static void main(String[] args) { .transport(RSocketServiceTransport::new) .startAwait(); + final Address seedAddress = seed.discovery().address(); + // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices ms = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership( - cfg -> cfg.seedMembers(seed.discovery().address())))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java index 388ce3d1d..fc20274db 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java @@ -1,11 +1,10 @@ package io.scalecube.services.examples.helloworld; +import io.scalecube.net.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; -import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; import io.scalecube.services.examples.helloworld.service.api.Greeting; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; @@ -40,9 +39,14 @@ public static void main(String[] args) { .startAwait(); // Construct a ScaleCube node which joins the cluster hosting the Greeting Service + final Address seedAddress = seed.discovery().address(); + Microservices ms = Microservices.builder() - .discovery(serviceEndpoint -> serviceDiscovery(serviceEndpoint, seed)) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) .startAwait(); @@ -68,10 +72,4 @@ public static void main(String[] args) { seed.onShutdown().block(); ms.onShutdown().block(); } - - private static ServiceDiscovery serviceDiscovery( - ServiceEndpoint serviceEndpoint, Microservices seed) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(seed.discovery().address()))); - } } diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java index c8ca62985..8891ffa1e 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java @@ -1,5 +1,6 @@ package io.scalecube.services.examples.helloworld; +import io.scalecube.net.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.BidiGreetingImpl; @@ -30,16 +31,15 @@ public static void main(String[] args) { .transport(RSocketServiceTransport::new) .startAwait(); + final Address seedAddress = seed.discovery().address(); + // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices ms = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership( - cfg -> cfg.seedMembers(seed.discovery().address())))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services(new BidiGreetingImpl()) .startAwait(); diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/orderbook/Example1.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/orderbook/Example1.java index 4c0cffb13..b2bd1b737 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/orderbook/Example1.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/orderbook/Example1.java @@ -2,9 +2,7 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; -import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.examples.orderbook.service.DefaultMarketDataService; import io.scalecube.services.examples.orderbook.service.OrderBookSnapshoot; import io.scalecube.services.examples.orderbook.service.OrderRequest; @@ -40,10 +38,14 @@ public static void main(String[] args) throws InterruptedException { .transport(RSocketServiceTransport::new) .startAwait(); + final Address gatewayAddress = gateway.discovery().address(); + Microservices ms = Microservices.builder() .discovery( - serviceEndpoint -> serviceDiscovery(serviceEndpoint, gateway.discovery().address())) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new DefaultMarketDataService()) .startAwait(); @@ -64,7 +66,7 @@ public static void main(String[] args) throws InterruptedException { new Order( new PriceLevel(Side.BUY, RANDOM.nextInt(10) + 1), // prices System.currentTimeMillis(), - Long.valueOf(RANDOM.nextInt(110) + 1 + "")), // units + Long.parseLong(RANDOM.nextInt(110) + 1 + "")), // units INSTRUMENT)) .block(); } else { @@ -74,7 +76,7 @@ public static void main(String[] args) throws InterruptedException { new Order( new PriceLevel(Side.SELL, RANDOM.nextInt(10) + 1), // prices System.currentTimeMillis(), - Long.valueOf(RANDOM.nextInt(70) + 1 + "")), // units + Long.parseLong(RANDOM.nextInt(70) + 1 + "")), // units INSTRUMENT)) .block(); } @@ -89,12 +91,6 @@ public static void main(String[] args) throws InterruptedException { Thread.currentThread().join(); } - private static ServiceDiscovery serviceDiscovery( - ServiceEndpoint serviceEndpoint, Address address) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(address))); - } - private static void print(OrderBookSnapshoot snapshot) { System.out.println("====== Asks ========"); diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java index 657eeb41b..b6b039151 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java @@ -1,5 +1,6 @@ package io.scalecube.services.examples.services; +import io.scalecube.net.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; @@ -20,15 +21,14 @@ public static void main(String[] args) { .transport(RSocketServiceTransport::new) .startAwait(); + final Address gatewayAddress = gateway.discovery().address(); + Microservices service2Node = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership( - cfg -> cfg.seedMembers(gateway.discovery().address())))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Service2Impl()) .startAwait(); @@ -36,12 +36,9 @@ public static void main(String[] args) { Microservices service1Node = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership( - cfg -> cfg.seedMembers(gateway.discovery().address())))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Service1Impl()) .startAwait(); diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java index 7716a1b29..a5c528d20 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java @@ -1,5 +1,6 @@ package io.scalecube.services.examples.services; +import io.scalecube.net.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; @@ -20,15 +21,14 @@ public static void main(String[] args) { .transport(RSocketServiceTransport::new) .startAwait(); + final Address gatewayAddress = gateway.discovery().address(); + Microservices service2Node = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership( - cfg -> cfg.seedMembers(gateway.discovery().address())))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Service2Impl()) .startAwait(); @@ -36,12 +36,9 @@ public static void main(String[] args) { Microservices service1Node = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership( - cfg -> cfg.seedMembers(gateway.discovery().address())))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Service1Impl()) .startAwait(); diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java index 8118e8ea8..bd6f56ef0 100644 --- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java +++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java @@ -31,24 +31,22 @@ public static void initNodes() { provider = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.transport(cfg -> cfg.port(port.incrementAndGet())))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .transport(cfg -> cfg.port(port.incrementAndGet()))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) .startAwait(); - Address seedAddress = provider.discovery().address(); + final Address seedAddress = provider.discovery().address(); consumer = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership(cfg -> cfg.seedMembers(seedAddress)) - .transport(cfg -> cfg.port(port.incrementAndGet())))) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(seedAddress)) + .transport(cfg -> cfg.port(port.incrementAndGet()))) .transport(RSocketServiceTransport::new) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java index 0a89cec00..78fb41d7f 100644 --- a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java @@ -198,9 +198,8 @@ void successfulCallOfPublicMethodWithoutAuthentication() { service.shutdown().block(TIMEOUT); } - private static ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - config -> config.membership(opts -> opts.seedMembers(caller.discovery().address()))); + private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint) { + return new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(caller.discovery().address())); } } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index a2bb1b824..4143c90db 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -19,7 +19,6 @@ import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.exceptions.ServiceException; import io.scalecube.services.sut.EmptyGreetingResponse; import io.scalecube.services.sut.GreetingResponse; @@ -67,7 +66,10 @@ public static void tearDown() { private static Microservices serviceProvider(Object service) { return Microservices.builder() - .discovery(ServiceCallRemoteTest::serviceDiscovery) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gateway.discovery().address()))) .transport(RSocketServiceTransport::new) .services(service) .startAwait(); @@ -259,9 +261,4 @@ private static Microservices gateway() { .transport(RSocketServiceTransport::new) .startAwait(); } - - private static ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(gateway.discovery().address()))); - } } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java index dccd233b2..fad756c56 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java @@ -207,14 +207,15 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) private Function defServiceDiscovery( MetadataCodec metadataCodec) { - return se -> new ScalecubeServiceDiscovery(se).options(cfg -> cfg.metadataCodec(metadataCodec)); + return endpoint -> + new ScalecubeServiceDiscovery(endpoint).options(cfg -> cfg.metadataCodec(metadataCodec)); } private static Function defServiceDiscovery( Address address, MetadataCodec metadataCodec) { - return se -> - new ScalecubeServiceDiscovery(se) + return endpoint -> + new ScalecubeServiceDiscovery(endpoint) .options(cfg -> cfg.metadataCodec(metadataCodec)) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(address))); + .membership(cfg -> cfg.seedMembers(address)); } } diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index 8944de51f..03ddfb7ca 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -473,8 +473,8 @@ private GreetingService createProxy() { return gateway.call().api(GreetingService.class); // create proxy for GreetingService API } - private static ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(gateway.discovery().address()))); + private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint) { + return new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gateway.discovery().address())); } } diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index 9fdc8d17e..db4c56a8b 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -7,7 +7,6 @@ import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; import io.scalecube.services.transport.api.ServiceMessageCodec; @@ -36,7 +35,10 @@ public static void setup() { node = Microservices.builder() - .discovery(StreamingServiceTest::serviceDiscovery) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gateway.discovery().address()))) .transport(RSocketServiceTransport::new) .defaultDataDecoder(ServiceMessageCodec::decodeData) .services(new SimpleQuoteService()) @@ -184,9 +186,4 @@ public void test_snapshot_completes() { assertEquals(batchSize, serviceMessages.size()); } - - private static ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(gateway.discovery().address()))); - } } diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index 7b372b11c..8e588e62c 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -9,16 +9,15 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.scalecube.net.Address; import io.scalecube.services.BaseTest; import io.scalecube.services.Microservices; import io.scalecube.services.Reflect; import io.scalecube.services.ServiceCall; -import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceInfo; import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.routing.RandomServiceRouter; import io.scalecube.services.routing.Routers; import io.scalecube.services.routings.sut.CanaryService; @@ -49,6 +48,7 @@ public class RoutersTest extends BaseTest { private Duration timeout = Duration.ofSeconds(TIMEOUT); private static Microservices gateway; + private static Address gatewayAddress; private static Microservices provider1; private static Microservices provider2; private static Microservices provider3; @@ -60,10 +60,16 @@ public static void setup() { .discovery(ScalecubeServiceDiscovery::new) .transport(RSocketServiceTransport::new) .startAwait(); + + gatewayAddress = gateway.discovery().address(); + // Create microservices instance cluster. provider1 = Microservices.builder() - .discovery(RoutersTest::serviceDiscovery) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(1)) @@ -78,7 +84,10 @@ public static void setup() { // Create microservices instance cluster. provider2 = Microservices.builder() - .discovery(RoutersTest::serviceDiscovery) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(2)) @@ -93,7 +102,10 @@ public static void setup() { TagService tagService = input -> input.map(String::toUpperCase); provider3 = Microservices.builder() - .discovery(RoutersTest::serviceDiscovery) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(tagService) @@ -267,9 +279,4 @@ public void test_service_tags() throws Exception { "Service B's Weight=0.9; at least more than half " + "of invocations have to be routed to Service B"); } - - private static ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(gateway.discovery().address()))); - } } diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java index 06fc5ed26..1151880c8 100644 --- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java +++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java @@ -2,10 +2,8 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; -import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceInfo; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.routings.sut.CanaryService; import io.scalecube.services.routings.sut.GreetingServiceImplA; import io.scalecube.services.routings.sut.GreetingServiceImplB; @@ -32,7 +30,10 @@ public static void main(String[] args) { Microservices services1 = Microservices.builder() - .discovery(serviceEndpoint -> serviceDiscovery(serviceEndpoint, seedAddress)) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) @@ -42,7 +43,10 @@ public static void main(String[] args) { Microservices services2 = Microservices.builder() - .discovery(serviceEndpoint -> serviceDiscovery(serviceEndpoint, seedAddress)) + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) @@ -62,10 +66,4 @@ public static void main(String[] args) { }); } } - - private static ServiceDiscovery serviceDiscovery( - ServiceEndpoint serviceEndpoint, Address address) { - return new ScalecubeServiceDiscovery(serviceEndpoint) - .options(opts -> opts.membership(cfg -> cfg.seedMembers(address))); - } } diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java index 4c885bc9c..d333a1b93 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java @@ -9,10 +9,8 @@ import io.scalecube.services.annotations.Service; import io.scalecube.services.annotations.ServiceMethod; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.transport.api.ServiceTransport; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -22,59 +20,44 @@ public class RSocketNettyColocatedEventLoopGroupTest extends BaseTest { private Microservices ping; - private Microservices pong; - - private Microservices facade; - private Microservices gateway; @BeforeEach public void setUp() { - - Supplier transport = RSocketServiceTransport::new; this.gateway = Microservices.builder() .discovery(ScalecubeServiceDiscovery::new) - .transport(transport) + .transport(RSocketServiceTransport::new) .startAwait(); - this.facade = + Microservices facade = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opt -> - opt.membership( - cfg -> cfg.seedMembers(gateway.discovery().address())))) - .transport(transport) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(gateway.discovery().address()))) + .transport(RSocketServiceTransport::new) .services(new Facade()) .startAwait(); this.ping = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opt -> - opt.membership( - cfg -> cfg.seedMembers(facade.discovery().address())))) - .transport(transport) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(facade.discovery().address()))) + .transport(RSocketServiceTransport::new) .services((PingService) () -> Mono.just(Thread.currentThread().getName())) .startAwait(); this.pong = Microservices.builder() .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opt -> - opt.membership( - cfg -> cfg.seedMembers(facade.discovery().address())))) - .transport(transport) + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(facade.discovery().address()))) + .transport(RSocketServiceTransport::new) .services((PongService) () -> Mono.just(Thread.currentThread().getName())) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index 44bc9c9b5..ac042a89d 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -48,10 +48,7 @@ public void setUp() { .discovery( serviceEndpoint -> new ScalecubeServiceDiscovery(serviceEndpoint) - .options( - opts -> - opts.membership( - cfg -> cfg.seedMembers(gateway.discovery().address())))) + .membership(cfg -> cfg.seedMembers(gateway.discovery().address()))) .transport(RSocketServiceTransport::new) .services(new SimpleQuoteService()) .startAwait();