diff --git a/services-examples/src/main/java/io/scalecube/services/examples/auth/CompositeProfileAuthExample.java b/services-examples/src/main/java/io/scalecube/services/examples/auth/CompositeProfileAuthExample.java index b67d63fee..95d3f39d0 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/auth/CompositeProfileAuthExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/auth/CompositeProfileAuthExample.java @@ -3,6 +3,7 @@ import static io.scalecube.services.auth.Authenticator.deferSecured; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceInfo; import io.scalecube.services.api.ServiceMessage; @@ -25,27 +26,29 @@ public class CompositeProfileAuthExample { */ public static void main(String[] args) { Microservices service = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(() -> new RSocketServiceTransport().authenticator(authenticator())) - .services( - call -> - Collections.singletonList( - ServiceInfo.fromServiceInstance(new SecuredServiceByCompositeProfileImpl()) - .authenticator(compositeAuthenticator()) - .build())) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(() -> new RSocketServiceTransport().authenticator(authenticator())) + .services( + call -> + Collections.singletonList( + ServiceInfo.fromServiceInstance( + new SecuredServiceByCompositeProfileImpl()) + .authenticator(compositeAuthenticator()) + .build()))); Microservices caller = - Microservices.builder() - .discovery(endpoint -> discovery(service, endpoint)) - .transport( - () -> new RSocketServiceTransport().credentialsSupplier(credentialsSupplier())) - .startAwait(); + Microservices.start( + new Context() + .discovery(endpoint -> discovery(service, endpoint)) + .transport( + () -> + new RSocketServiceTransport().credentialsSupplier(credentialsSupplier()))); ServiceMessage response = caller diff --git a/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java b/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java index abbf515c9..923ed3134 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java @@ -1,6 +1,7 @@ package io.scalecube.services.examples.auth; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceInfo; import io.scalecube.services.auth.Authenticator; @@ -24,29 +25,30 @@ public class PrincipalMapperAuthExample { */ public static void main(String[] args) { Microservices service = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(() -> new RSocketServiceTransport().authenticator(authenticator())) - .services( - ServiceInfo.fromServiceInstance(new SecuredServiceByApiKeyImpl()) - .principalMapper(PrincipalMapperAuthExample::apiKeyPrincipalMapper) - .build()) - .services( - ServiceInfo.fromServiceInstance(new SecuredServiceByUserProfileImpl()) - .principalMapper(PrincipalMapperAuthExample::userProfilePrincipalMapper) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(() -> new RSocketServiceTransport().authenticator(authenticator())) + .services( + ServiceInfo.fromServiceInstance(new SecuredServiceByApiKeyImpl()) + .principalMapper(PrincipalMapperAuthExample::apiKeyPrincipalMapper) + .build()) + .services( + ServiceInfo.fromServiceInstance(new SecuredServiceByUserProfileImpl()) + .principalMapper(PrincipalMapperAuthExample::userProfilePrincipalMapper) + .build())); Microservices userProfileCaller = - Microservices.builder() - .discovery(endpoint -> discovery(service, endpoint)) - .transport( - () -> new RSocketServiceTransport().credentialsSupplier(credentialsSupplier())) - .startAwait(); + Microservices.start( + new Context() + .discovery(endpoint -> discovery(service, endpoint)) + .transport( + () -> + new RSocketServiceTransport().credentialsSupplier(credentialsSupplier()))); String responseByUserProfile = userProfileCaller @@ -58,11 +60,12 @@ public static void main(String[] args) { System.err.println("### Received 'userProfileCaller' response: " + responseByUserProfile); Microservices apiKeyCaller = - Microservices.builder() - .discovery(endpoint -> discovery(service, endpoint)) - .transport( - () -> new RSocketServiceTransport().credentialsSupplier(credentialsSupplier())) - .startAwait(); + Microservices.start( + new Context() + .discovery(endpoint -> discovery(service, endpoint)) + .transport( + () -> + new RSocketServiceTransport().credentialsSupplier(credentialsSupplier()))); String responseByApiKey = apiKeyCaller diff --git a/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java b/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java index 52ea9b101..ec057f03c 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java @@ -1,6 +1,7 @@ package io.scalecube.services.examples.auth; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.auth.Authenticator; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; @@ -22,22 +23,23 @@ public class ServiceTransportAuthExample { */ public static void main(String[] args) { Microservices service = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(() -> new RSocketServiceTransport().authenticator(authenticator())) - .services(new SecuredServiceByUserProfileImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(() -> new RSocketServiceTransport().authenticator(authenticator())) + .services(new SecuredServiceByUserProfileImpl())); Microservices caller = - Microservices.builder() - .discovery(endpoint -> discovery(service, endpoint)) - .transport( - () -> new RSocketServiceTransport().credentialsSupplier(credentialsSupplier())) - .startAwait(); + Microservices.start( + new Context() + .discovery(endpoint -> discovery(service, endpoint)) + .transport( + () -> + new RSocketServiceTransport().credentialsSupplier(credentialsSupplier()))); String response = caller diff --git a/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java index 0c03f973a..48867d4db 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java @@ -2,6 +2,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; import io.scalecube.services.examples.helloworld.service.api.GreetingsService; @@ -22,30 +23,31 @@ public class Example1 { public static void main(String[] args) { // ScaleCube Node with no members Microservices seed = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .defaultContentType(PROTOSTUFF) // set explicit default data format - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + // .defaultContentType(PROTOSTUFF) // set explicit default data format + ); final Address seedAddress = seed.discoveryAddress(); // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices ms = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); seed.call() .api(GreetingsService.class) @@ -64,7 +66,7 @@ public static void main(String[] args) { .sayHello("bob (on java native Serializable/Externalizable dataFormat)") .subscribe(consumer -> System.out.println(consumer.message())); - seed.onShutdown().block(); - ms.onShutdown().block(); + seed.close(); + ms.close(); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java b/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java index 37f03aeb1..f8912c453 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java @@ -2,6 +2,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceInfo; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; @@ -18,45 +19,48 @@ public class ExceptionMapperExample { */ public static void main(String[] args) throws InterruptedException { Microservices ms1 = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .defaultErrorMapper(new ServiceAProviderErrorMapper()) // default mapper for whole node - .services( - ServiceInfo.fromServiceInstance(new ServiceAImpl()) - .errorMapper(new ServiceAProviderErrorMapper()) // mapper per service instance - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .defaultErrorMapper( + new ServiceAProviderErrorMapper()) // default mapper for whole node + .services( + ServiceInfo.fromServiceInstance(new ServiceAImpl()) + .errorMapper( + new ServiceAProviderErrorMapper()) // mapper per service instance + .build())); System.err.println("ms1 started: " + ms1.serviceAddress()); final Address address1 = ms1.discoveryAddress(); Microservices ms2 = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(address1.toString()))) - .transport(RSocketServiceTransport::new) - .services( - call -> { - ServiceA serviceA = - call.errorMapper( - new ServiceAClientErrorMapper()) // service client error mapper - .api(ServiceA.class); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(address1.toString()))) + .transport(RSocketServiceTransport::new) + .services( + call -> { + ServiceA serviceA = + call.errorMapper( + new ServiceAClientErrorMapper()) // service client error mapper + .api(ServiceA.class); - ServiceB serviceB = new ServiceBImpl(serviceA); + ServiceB serviceB = new ServiceBImpl(serviceA); - return Collections.singleton(ServiceInfo.fromServiceInstance(serviceB).build()); - }) - .startAwait(); + return Collections.singleton( + ServiceInfo.fromServiceInstance(serviceB).build()); + })); System.err.println("ms2 started: " + ms2.serviceAddress()); diff --git a/services-examples/src/main/java/io/scalecube/services/examples/gateway/HttpGatewayExample.java b/services-examples/src/main/java/io/scalecube/services/examples/gateway/HttpGatewayExample.java deleted file mode 100644 index 3002dd941..000000000 --- a/services-examples/src/main/java/io/scalecube/services/examples/gateway/HttpGatewayExample.java +++ /dev/null @@ -1,51 +0,0 @@ -package io.scalecube.services.examples.gateway; - -import io.scalecube.services.Address; -import io.scalecube.services.gateway.Gateway; -import io.scalecube.services.gateway.GatewayOptions; -import java.net.InetSocketAddress; -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; -import reactor.core.publisher.Mono; - -public class HttpGatewayExample implements Gateway { - - private final GatewayOptions options; - private final InetSocketAddress address; - - public HttpGatewayExample(GatewayOptions options) { - this.options = options; - this.address = new InetSocketAddress(options.port()); - } - - @Override - public String id() { - return options.id(); - } - - @Override - public Address address() { - return Address.create(address.getHostString(), address.getPort()); - } - - @Override - public Mono start() { - return Mono.defer( - () -> { - System.out.println("Starting HTTP gateway..."); - - return Mono.delay(Duration.ofMillis(ThreadLocalRandom.current().nextInt(100, 500))) - .map(tick -> this) - .doOnSuccess(gw -> System.out.println("HTTP gateway is started on " + gw.address)); - }); - } - - @Override - public Mono stop() { - return Mono.defer( - () -> { - System.out.println("Stopping HTTP gateway..."); - return Mono.empty(); - }); - } -} diff --git a/services-examples/src/main/java/io/scalecube/services/examples/gateway/WebsocketGatewayExample.java b/services-examples/src/main/java/io/scalecube/services/examples/gateway/WebsocketGatewayExample.java deleted file mode 100644 index e067af37f..000000000 --- a/services-examples/src/main/java/io/scalecube/services/examples/gateway/WebsocketGatewayExample.java +++ /dev/null @@ -1,51 +0,0 @@ -package io.scalecube.services.examples.gateway; - -import io.scalecube.services.Address; -import io.scalecube.services.gateway.Gateway; -import io.scalecube.services.gateway.GatewayOptions; -import java.net.InetSocketAddress; -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; -import reactor.core.publisher.Mono; - -public class WebsocketGatewayExample implements Gateway { - - private final GatewayOptions options; - private final InetSocketAddress address; - - public WebsocketGatewayExample(GatewayOptions options) { - this.options = options; - this.address = new InetSocketAddress(options.port()); - } - - @Override - public String id() { - return options.id(); - } - - @Override - public Address address() { - return Address.create(address.getHostString(), address.getPort()); - } - - @Override - public Mono start() { - return Mono.defer( - () -> { - System.out.println("Starting WS gateway..."); - - return Mono.delay(Duration.ofMillis(ThreadLocalRandom.current().nextInt(100, 500))) - .map(tick -> this) - .doOnSuccess(gw -> System.out.println("WS gateway is started on " + gw.address)); - }); - } - - @Override - public Mono stop() { - return Mono.defer( - () -> { - System.out.println("Stopping WS gateway..."); - return Mono.empty(); - }); - } -} diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java index e20249ff9..3064704a9 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java @@ -2,12 +2,12 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; import io.scalecube.services.examples.helloworld.service.api.GreetingsService; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; -import reactor.core.publisher.Mono; /** * The Hello World project is a time-honored tradition in computer programming. It is a simple @@ -27,29 +27,29 @@ public class Example1 { public static void main(String[] args) { // ScaleCube Node with no members Microservices seed = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); final Address seedAddress = seed.discoveryAddress(); // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices ms = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); // Create service proxy GreetingsService service = seed.call().api(GreetingsService.class); @@ -57,6 +57,7 @@ public static void main(String[] args) { // Execute the services and subscribe to service events service.sayHello("joe").subscribe(consumer -> System.out.println(consumer.message())); - Mono.whenDelayError(seed.shutdown(), ms.shutdown()).block(); + seed.close(); + ms.close(); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java index 7bd9aed60..a77c8761f 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java @@ -2,6 +2,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; @@ -34,29 +35,29 @@ public class Example2 { public static void main(String[] args) { // ScaleCube Node node with no members Microservices seed = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); // Construct a ScaleCube node which joins the cluster hosting the Greeting Service final Address seedAddress = seed.discoveryAddress(); Microservices ms = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); // Create a proxy to the seed service node ServiceCall service = seed.call(); @@ -76,6 +77,7 @@ public static void main(String[] args) { System.out.println(greeting.message()); }); - Mono.whenDelayError(seed.shutdown(), ms.shutdown()).block(); + seed.close(); + ms.close(); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java index 7fc0b7a8a..89dc74949 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java @@ -2,13 +2,13 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.BidiGreetingImpl; import io.scalecube.services.examples.helloworld.service.api.BidiGreetingService; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * The Hello World project is a time-honored tradition in computer programming. It is a simple @@ -28,29 +28,29 @@ public class Example3 { public static void main(String[] args) { // ScaleCube Node with no members Microservices seed = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); final Address seedAddress = seed.discoveryAddress(); // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices ms = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new BidiGreetingImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new BidiGreetingImpl())); // Create service proxy BidiGreetingService service = seed.call().api(BidiGreetingService.class); @@ -61,6 +61,7 @@ public static void main(String[] args) { .doOnNext(System.out::println) .blockLast(); - Mono.whenDelayError(seed.shutdown(), ms.shutdown()).block(); + seed.close(); + ms.close(); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java index 1d61f73e5..d2aa8dba9 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java @@ -2,10 +2,10 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; -import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class Example1 { @@ -17,40 +17,40 @@ public class Example1 { */ public static void main(String[] args) { Microservices gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); final Address gatewayAddress = gateway.discoveryAddress(); Microservices service2Node = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new Service2Impl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new Service2Impl())); Microservices service1Node = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new Service1Impl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new Service1Impl())); gateway .call() @@ -63,7 +63,8 @@ public static void main(String[] args) { .log("complete |") .block(); - Mono.whenDelayError(gateway.shutdown(), service1Node.shutdown(), service2Node.shutdown()) - .block(); + gateway.close(); + service1Node.close(); + service2Node.close(); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java b/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java index 670372245..643e43fd2 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java @@ -2,10 +2,10 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; -import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class Example2 { @@ -17,40 +17,40 @@ public class Example2 { */ public static void main(String[] args) { Microservices gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); final Address gatewayAddress = gateway.discoveryAddress(); Microservices service2Node = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new Service2Impl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new Service2Impl())); Microservices service1Node = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new Service1Impl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new Service1Impl())); gateway .call() @@ -63,7 +63,8 @@ public static void main(String[] args) { .log("complete |") .block(); - Mono.whenDelayError(gateway.shutdown(), service1Node.shutdown(), service2Node.shutdown()) - .block(); + gateway.close(); + service1Node.close(); + service2Node.close(); } } diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java index 272232b5c..b992ee153 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java @@ -11,8 +11,6 @@ import java.net.InetSocketAddress; import java.util.StringJoiner; import java.util.function.Consumer; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; import reactor.netty.resources.LoopResources; @@ -40,20 +38,23 @@ public String id() { } @Override - public Mono start() { - return Mono.defer( - () -> { - HttpGatewayAcceptor gatewayAcceptor = - new HttpGatewayAcceptor(options.call(), errorMapper); - - loopResources = LoopResources.create(options.id() + ":" + options.port()); - - return prepareHttpServer(loopResources, options.port()) - .handle(gatewayAcceptor) - .bind() - .doOnSuccess(server -> this.server = server) - .thenReturn(this); - }); + public Gateway start() { + HttpGatewayAcceptor gatewayAcceptor = new HttpGatewayAcceptor(options.call(), errorMapper); + + loopResources = LoopResources.create(options.id() + ":" + options.port()); + + try { + prepareHttpServer(loopResources, options.port()) + .handle(gatewayAcceptor) + .bind() + .doOnSuccess(server -> this.server = server) + .toFuture() + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return this; } private HttpServer prepareHttpServer(LoopResources loopResources, int port) { @@ -80,24 +81,30 @@ public Address address() { } @Override - public Mono stop() { - return Flux.concatDelayError(shutdownServer(server), shutdownLoopResources(loopResources)) - .then(); + public void stop() { + shutdownServer(server); + shutdownLoopResources(loopResources); } - private Mono shutdownServer(DisposableServer server) { - return Mono.defer( - () -> { - if (server != null) { - server.dispose(); - return server.onDispose(); - } - return Mono.empty(); - }); + private void shutdownServer(DisposableServer server) { + if (server != null) { + server.dispose(); + try { + server.onDispose().toFuture().get(); + } catch (Exception e) { + // TODO: log it + } + } } - private Mono shutdownLoopResources(LoopResources loopResources) { - return loopResources != null ? loopResources.disposeLater() : Mono.empty(); + private void shutdownLoopResources(LoopResources loopResources) { + if (loopResources != null) { + try { + loopResources.disposeLater().toFuture().get(); + } catch (Exception e) { + // TODO: log it + } + } } @Override diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java index d3beb6ef8..8a226c5cb 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java @@ -11,8 +11,6 @@ import java.time.Duration; import java.util.StringJoiner; import java.util.function.UnaryOperator; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; @@ -45,21 +43,26 @@ public String id() { } @Override - public Mono start() { - return Mono.defer( - () -> { - WebsocketGatewayAcceptor gatewayAcceptor = - new WebsocketGatewayAcceptor(options.call(), gatewayHandler, errorMapper); - - loopResources = LoopResources.create(options.id() + ":" + options.port()); - - return prepareHttpServer(loopResources, options.port()) - .doOnConnection(this::setupKeepAlive) - .handle(gatewayAcceptor) - .bind() - .doOnSuccess(server -> this.server = server) - .thenReturn(this); - }); + public Gateway start() { + WebsocketGatewayAcceptor gatewayAcceptor = + new WebsocketGatewayAcceptor(options.call(), gatewayHandler, errorMapper); + + loopResources = LoopResources.create(options.id() + ":" + options.port()); + + try { + prepareHttpServer(loopResources, options.port()) + .doOnConnection(this::setupKeepAlive) + .handle(gatewayAcceptor) + .bind() + .doOnSuccess(server -> this.server = server) + .thenReturn(this) + .toFuture() + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return this; } private HttpServer prepareHttpServer(LoopResources loopResources, int port) { @@ -80,24 +83,30 @@ public Address address() { } @Override - public Mono stop() { - return Flux.concatDelayError(shutdownServer(server), shutdownLoopResources(loopResources)) - .then(); + public void stop() { + shutdownServer(server); + shutdownLoopResources(loopResources); } - private Mono shutdownServer(DisposableServer server) { - return Mono.defer( - () -> { - if (server != null) { - server.dispose(); - return server.onDispose(); - } - return Mono.empty(); - }); + private void shutdownServer(DisposableServer server) { + if (server != null) { + server.dispose(); + try { + server.onDispose().toFuture().get(); + } catch (Exception e) { + // TODO: log it + } + } } - private Mono shutdownLoopResources(LoopResources loopResources) { - return loopResources != null ? loopResources.disposeLater() : Mono.empty(); + private void shutdownLoopResources(LoopResources loopResources) { + if (loopResources != null) { + try { + loopResources.disposeLater().toFuture().get(); + } catch (Exception e) { + // TODO: log it + } + } } private void setupKeepAlive(Connection connection) { diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java index b22da4be8..e8e6a654d 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java @@ -9,6 +9,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.examples.GreetingService; import io.scalecube.services.examples.GreetingServiceImpl; import io.scalecube.services.gateway.BaseTest; @@ -37,7 +38,7 @@ void setUp() { @AfterEach void afterEach() { if (gateway != null) { - gateway.shutdown().block(); + gateway.close(); } if (connectionProvider != null) { connectionProvider.dispose(); @@ -47,19 +48,19 @@ void afterEach() { @Test void testCrossOriginRequest() { gateway = - Microservices.builder() - .gateway( - opts -> - new HttpGateway.Builder() - .options(opts.id("http")) - .corsEnabled(true) - .corsConfigBuilder( - builder -> - builder.allowedRequestHeaders("Content-Type", "X-Correlation-ID")) - .build()) - .services(new GreetingServiceImpl()) - .start() - .block(TIMEOUT); + Microservices.start( + new Context() + .gateway( + opts -> + new HttpGateway.Builder() + .options(opts.id("http")) + .corsEnabled(true) + .corsConfigBuilder( + builder -> + builder.allowedRequestHeaders( + "Content-Type", "X-Correlation-ID")) + .build()) + .services(new GreetingServiceImpl())); final HttpClient client = newClient(gateway.gateway("http").address()); @@ -111,11 +112,10 @@ private HttpClient newClient(final Address address) { @Test void testOptionRequestCorsDisabled() { gateway = - Microservices.builder() - .gateway(opts -> new HttpGateway.Builder().options(opts.id("http")).build()) - .services(new GreetingServiceImpl()) - .start() - .block(TIMEOUT); + Microservices.start( + new Context() + .gateway(opts -> new HttpGateway.Builder().options(opts.id("http")).build()) + .services(new GreetingServiceImpl())); final HttpClient client = newClient(gateway.gateway("http").address()); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java index 77a5ae0fd..faabdfd44 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java @@ -4,6 +4,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.annotations.Service; import io.scalecube.services.annotations.ServiceMethod; @@ -19,7 +20,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -36,39 +36,36 @@ class HttpClientConnectionTest extends BaseTest { @BeforeEach void beforEach() { gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .gateway(options -> new HttpGateway.Builder().options(options.id("HTTP")).build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .gateway(options -> new HttpGateway.Builder().options(options.id("HTTP")).build())); gatewayAddress = gateway.gateway("HTTP").address(); microservices = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint)) - .membership( - opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) - .transport(RSocketServiceTransport::new) - .services(new TestServiceImpl(onCancelCounter)) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) + .membership( + opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) + .transport(RSocketServiceTransport::new) + .services(new TestServiceImpl(onCancelCounter))); } @AfterEach void afterEach() { - Flux.concat( - Mono.justOrEmpty(gateway).map(Microservices::shutdown), - Mono.justOrEmpty(microservices).map(Microservices::shutdown)) - .then() - .block(); + gateway.close(); + microservices.close(); } @Test diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java index c0d90ef15..09076a57b 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java @@ -5,6 +5,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceInfo; import io.scalecube.services.api.Qualifier; @@ -49,40 +50,40 @@ class HttpGatewayTest extends BaseTest { @BeforeAll static void beforeAll() { gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .gateway( - options -> - new HttpGateway.Builder() - .options(options.id("HTTP")) - .errorMapper(ERROR_MAPPER) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .gateway( + options -> + new HttpGateway.Builder() + .options(options.id("HTTP")) + .errorMapper(ERROR_MAPPER) + .build())); gatewayAddress = gateway.gateway("HTTP").address(); router = new StaticAddressRouter(gatewayAddress); microservices = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint)) - .membership( - opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .services( - ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) - .errorMapper(ERROR_MAPPER) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) + .membership( + opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl()) + .services( + ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) + .errorMapper(ERROR_MAPPER) + .build())); } @BeforeEach diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java index 382b01b34..211b9a00d 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java @@ -5,6 +5,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceInfo; import io.scalecube.services.api.Qualifier; @@ -45,14 +46,14 @@ class HttpLocalGatewayTest extends BaseTest { @BeforeAll static void beforeAll() { gateway = - Microservices.builder() - .gateway(options -> new HttpGateway.Builder().options(options.id("HTTP")).build()) - .services(new GreetingServiceImpl()) - .services( - ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) - .errorMapper(ERROR_MAPPER) - .build()) - .startAwait(); + Microservices.start( + new Context() + .gateway(options -> new HttpGateway.Builder().options(options.id("HTTP")).build()) + .services(new GreetingServiceImpl()) + .services( + ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) + .errorMapper(ERROR_MAPPER) + .build())); gatewayAddress = gateway.gateway("HTTP").address(); router = new StaticAddressRouter(gatewayAddress); } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java index e3544c5fe..7a57d5280 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java @@ -7,6 +7,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.gateway.BaseTest; @@ -26,7 +27,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -44,46 +44,43 @@ class WebsocketClientConnectionTest extends BaseTest { @BeforeEach void beforEach() { gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .gateway( - options -> - new WebsocketGateway.Builder() - .options(options.id("WS")) - .gatewayHandler(sessionEventHandler) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .gateway( + options -> + new WebsocketGateway.Builder() + .options(options.id("WS")) + .gatewayHandler(sessionEventHandler) + .build())); gatewayAddress = gateway.gateway("WS").address(); microservices = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint)) - .membership( - opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) - .transport(RSocketServiceTransport::new) - .services(new TestServiceImpl(onCloseCounter::incrementAndGet)) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) + .membership( + opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) + .transport(RSocketServiceTransport::new) + .services(new TestServiceImpl(onCloseCounter::incrementAndGet))); onCloseCounter.set(0); } @AfterEach void afterEach() { - Flux.concat( - Mono.justOrEmpty(gateway).map(Microservices::shutdown), - Mono.justOrEmpty(microservices).map(Microservices::shutdown)) - .then() - .block(); + gateway.close(); + microservices.close(); } @Test diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java index 9abc137b5..f841adee6 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java @@ -4,6 +4,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceInfo; import io.scalecube.services.annotations.Service; @@ -25,7 +26,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.test.StepVerifier; class WebsocketClientTest extends BaseTest { @@ -39,48 +39,45 @@ class WebsocketClientTest extends BaseTest { @BeforeAll static void beforeAll() { gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .gateway( - options -> - new WebsocketGateway.Builder() - .options(options.id("WS")) - .gatewayHandler(new TestGatewaySessionHandler()) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .gateway( + options -> + new WebsocketGateway.Builder() + .options(options.id("WS")) + .gatewayHandler(new TestGatewaySessionHandler()) + .build())); gatewayAddress = gateway.gateway("WS").address(); microservices = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint)) - .membership( - opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) - .transport(RSocketServiceTransport::new) - .services(new TestServiceImpl()) - .services( - ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) - .errorMapper(ERROR_MAPPER) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) + .membership( + opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) + .transport(RSocketServiceTransport::new) + .services(new TestServiceImpl()) + .services( + ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) + .errorMapper(ERROR_MAPPER) + .build())); } @AfterAll static void afterAll() { - Flux.concat( - Mono.justOrEmpty(gateway).map(Microservices::shutdown), - Mono.justOrEmpty(microservices).map(Microservices::shutdown)) - .then() - .block(); + gateway.close(); + microservices.close(); } @Test diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayAuthTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayAuthTest.java index eae73b47a..623dd7d99 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayAuthTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayAuthTest.java @@ -6,6 +6,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.exceptions.ForbiddenException; @@ -44,15 +45,15 @@ public class WebsocketGatewayAuthTest { @BeforeAll static void beforeAll() { gateway = - Microservices.builder() - .gateway( - options -> - new WebsocketGateway.Builder() - .options(options.id("WS")) - .gatewayHandler(new GatewaySessionHandlerImpl(AUTH_REGISTRY)) - .build()) - .services(new SecuredServiceImpl(AUTH_REGISTRY)) - .startAwait(); + Microservices.start( + new Context() + .gateway( + options -> + new WebsocketGateway.Builder() + .options(options.id("WS")) + .gatewayHandler(new GatewaySessionHandlerImpl(AUTH_REGISTRY)) + .build()) + .services(new SecuredServiceImpl(AUTH_REGISTRY))); } @BeforeEach diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java index 1e10f9148..4b2bae311 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java @@ -4,6 +4,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceInfo; import io.scalecube.services.api.Qualifier; @@ -51,35 +52,36 @@ class WebsocketGatewayTest extends BaseTest { @BeforeAll static void beforeAll() { gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .gateway(options -> new WebsocketGateway.Builder().options(options.id("WS")).build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .gateway( + options -> new WebsocketGateway.Builder().options(options.id("WS")).build())); gatewayAddress = gateway.gateway("WS").address(); router = new StaticAddressRouter(gatewayAddress); microservices = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint)) - .membership( - opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .services( - ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) - .errorMapper(ERROR_MAPPER) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) + .membership( + opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl()) + .services( + ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) + .errorMapper(ERROR_MAPPER) + .build())); } @BeforeEach diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java index 52cce13b4..b6ca3331f 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java @@ -4,6 +4,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceInfo; import io.scalecube.services.api.Qualifier; @@ -47,19 +48,21 @@ class WebsocketLocalGatewayTest extends BaseTest { @BeforeAll static void beforeAll() { gateway = - Microservices.builder() - .gateway( - options -> - new WebsocketGateway.Builder() - .options(options.id("WS").call(options.call().errorMapper(ERROR_MAPPER))) + Microservices.start( + new Context() + .gateway( + options -> + new WebsocketGateway.Builder() + .options( + options.id("WS").call(options.call().errorMapper(ERROR_MAPPER))) + .errorMapper(ERROR_MAPPER) + .build()) + .services(new GreetingServiceImpl()) + .services( + ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) .errorMapper(ERROR_MAPPER) - .build()) - .services(new GreetingServiceImpl()) - .services( - ServiceInfo.fromServiceInstance(new ErrorServiceImpl()) - .errorMapper(ERROR_MAPPER) - .build()) - .startAwait(); + .build())); + gatewayAddress = gateway.gateway("WS").address(); router = new StaticAddressRouter(gatewayAddress); } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java index cbbabbaca..ea87db877 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java @@ -2,6 +2,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.annotations.Service; import io.scalecube.services.annotations.ServiceMethod; @@ -26,15 +27,16 @@ class WebsocketServerTest extends BaseTest { @BeforeAll static void beforeAll() { gateway = - Microservices.builder() - .gateway( - options -> - new WebsocketGateway.Builder() - .options(options.id("WS")) - .gatewayHandler(new TestGatewaySessionHandler()) - .build()) - .services(new TestServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .gateway( + options -> + new WebsocketGateway.Builder() + .options(options.id("WS")) + .gatewayHandler(new TestGatewaySessionHandler()) + .build()) + .services(new TestServiceImpl())); + gatewayAddress = gateway.gateway("WS").address(); } diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 47771ab04..8fe291278 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -29,15 +29,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.Exceptions; @@ -111,7 +108,7 @@ */ public class Microservices implements AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(Microservices.class); + // private static final Logger LOGGER = LoggerFactory.getLogger(Microservices.class); private final Microservices.Context context; private final String id = UUID.randomUUID().toString(); @@ -120,7 +117,6 @@ public class Microservices implements AutoCloseable { private ClientTransport clientTransport; private ServerTransport serverTransport; private Address serviceAddress = Address.NULL_ADDRESS; - private ServiceEndpoint serviceEndpoint; private ServiceCall serviceCall; private List serviceInstances; @@ -145,13 +141,6 @@ public static Microservices start(Microservices.Context context) { microservices.createDiscovery(); microservices.doInject(); microservices.startListen(); - - // concludeDiscovery(this, new ServiceDiscoveryOptions().serviceEndpoint(serviceEndpoint)) - // .then(startGateways(new GatewayOptions().call(serviceCall))) - // .then(Mono.fromCallable(() -> Injector.inject(this, serviceInstances))) - // .then(discoveryBootstrap.startListen()) - // .thenReturn(this); - } catch (Exception ex) { microservices.close(); throw Exceptions.propagate(ex); @@ -244,7 +233,6 @@ private void createDiscovery() { } serviceDiscovery = discoveryFactory.createServiceDiscovery(serviceEndpoint); - discoveryAddress = serviceDiscovery.address(); } private void doInject() { @@ -268,6 +256,7 @@ private void startListen() { .subscribe()); serviceDiscovery.start(); + discoveryAddress = serviceDiscovery.address(); } private void onDiscoveryEvent(ServiceDiscoveryEvent event) { @@ -425,13 +414,11 @@ public static final class Context { private Map tags = new HashMap<>(); private final List serviceProviders = new ArrayList<>(); - private ServiceRegistry serviceRegistry = new ServiceRegistryImpl(); - private Authenticator defaultAuthenticator = null; - private PrincipalMapper defaultPrincipalMapper = null; - private ServiceProviderErrorMapper defaultErrorMapper = DefaultErrorMapper.INSTANCE; - private ServiceMessageDataDecoder defaultDataDecoder = - Optional.ofNullable(ServiceMessageDataDecoder.INSTANCE) - .orElse((message, dataType) -> message); + private ServiceRegistry serviceRegistry; + private Authenticator defaultAuthenticator; + private PrincipalMapper defaultPrincipalMapper; + private ServiceProviderErrorMapper defaultErrorMapper; + private ServiceMessageDataDecoder defaultDataDecoder; private String externalHost; private Integer externalPort; private ServiceDiscoveryFactory discoveryFactory; @@ -506,7 +493,7 @@ public Context gateway(Function factory) { * @return this builder with applied parameter */ public Context defaultErrorMapper(ServiceProviderErrorMapper errorMapper) { - this.defaultErrorMapper = Objects.requireNonNull(errorMapper, "default errorMapper"); + this.defaultErrorMapper = errorMapper; return this; } @@ -519,7 +506,7 @@ public Context defaultErrorMapper(ServiceProviderErrorMapper errorMapper) { * @return this builder with applied parameter */ public Context defaultDataDecoder(ServiceMessageDataDecoder dataDecoder) { - this.defaultDataDecoder = Objects.requireNonNull(dataDecoder, "default dataDecoder"); + this.defaultDataDecoder = dataDecoder; return this; } @@ -556,7 +543,19 @@ private Context conclude() { throw new IllegalStateException("Context is already concluded"); } - // TODO: add validations + if (defaultErrorMapper == null) { + defaultErrorMapper = DefaultErrorMapper.INSTANCE; + } + + if (defaultDataDecoder == null) { + defaultDataDecoder = + Optional.ofNullable(ServiceMessageDataDecoder.INSTANCE) + .orElse((message, dataType) -> message); + } + + if (serviceRegistry == null) { + serviceRegistry = new ServiceRegistryImpl(); + } return this; } diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java index 7189e2174..272248d8d 100644 --- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java +++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static reactor.core.publisher.Mono.from; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.exceptions.BadRequestException; @@ -30,28 +31,29 @@ public class ErrorFlowTest extends BaseTest { @BeforeAll public static void initNodes() throws InterruptedException { provider = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .transport(cfg -> cfg.port(PORT.incrementAndGet())) - .options(opts -> opts.metadata(endpoint))) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .transport(cfg -> cfg.port(PORT.incrementAndGet())) + .options(opts -> opts.metadata(endpoint))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); consumer = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .membership(cfg -> cfg.seedMembers(provider.discoveryAddress().toString())) - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .transport(cfg -> cfg.port(PORT.incrementAndGet())) - .options(opts -> opts.metadata(endpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .membership( + cfg -> cfg.seedMembers(provider.discoveryAddress().toString())) + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .transport(cfg -> cfg.port(PORT.incrementAndGet())) + .options(opts -> opts.metadata(endpoint))) + .transport(RSocketServiceTransport::new)); while (consumer.serviceEndpoints().size() != 1) { //noinspection BusyWait @@ -61,8 +63,8 @@ public static void initNodes() throws InterruptedException { @AfterAll public static void shutdownNodes() { - consumer.shutdown().block(); - provider.shutdown().block(); + consumer.close(); + provider.close(); } @Test diff --git a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java index 3bcae7948..d7db334b3 100644 --- a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import io.rsocket.exceptions.RejectedSetupException; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.auth.Authenticator; import io.scalecube.services.auth.PrincipalMapper; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; @@ -75,60 +76,60 @@ static void beforeAll() { principalMapper = authData -> new UserProfile(authData.get("name"), authData.get("role")); service = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(() -> new RSocketServiceTransport().authenticator(authenticator)) - .services( - ServiceInfo.fromServiceInstance(new SecuredServiceImpl()) - .principalMapper(principalMapper) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(() -> new RSocketServiceTransport().authenticator(authenticator)) + .services( + ServiceInfo.fromServiceInstance(new SecuredServiceImpl()) + .principalMapper(principalMapper) + .build())); serviceWithoutAuthenticator = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .services( - ServiceInfo.fromServiceInstance(new AnotherSecuredServiceImpl()) - .principalMapper(principalMapper) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .services( + ServiceInfo.fromServiceInstance(new AnotherSecuredServiceImpl()) + .principalMapper(principalMapper) + .build())); partiallySecuredService = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(() -> new RSocketServiceTransport().authenticator(authenticator)) - .services( - ServiceInfo.fromServiceInstance(new PartiallySecuredServiceImpl()) - .principalMapper(principalMapper) - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(() -> new RSocketServiceTransport().authenticator(authenticator)) + .services( + ServiceInfo.fromServiceInstance(new PartiallySecuredServiceImpl()) + .principalMapper(principalMapper) + .build())); } @AfterAll static void afterAll() { if (service != null) { - service.shutdown().block(TIMEOUT); + service.close(); } if (serviceWithoutAuthenticator != null) { - serviceWithoutAuthenticator.shutdown().block(TIMEOUT); + serviceWithoutAuthenticator.close(); } if (partiallySecuredService != null) { - partiallySecuredService.shutdown().block(TIMEOUT); + partiallySecuredService.close(); } } @@ -263,30 +264,30 @@ void successfulCallOfPublicMethodWithoutAuthentication() { } private static Microservices newCaller() { - return Microservices.builder() - .discovery(ServiceAuthRemoteTest::serviceDiscovery) - .transport( - () -> - new RSocketServiceTransport() - .credentialsSupplier(ServiceAuthRemoteTest::credentialsSupplier)) - .startAwait(); + return Microservices.start( + new Context() + .discovery(ServiceAuthRemoteTest::serviceDiscovery) + .transport( + () -> + new RSocketServiceTransport() + .credentialsSupplier(ServiceAuthRemoteTest::credentialsSupplier))); } private static Microservices newEmptyCredentialsCaller() { - return Microservices.builder() - .discovery(ServiceAuthRemoteTest::serviceDiscovery) - .transport(RSocketServiceTransport::new) - .startAwait(); + return Microservices.start( + new Context() + .discovery(ServiceAuthRemoteTest::serviceDiscovery) + .transport(RSocketServiceTransport::new)); } private static Microservices newInvalidCredentialsCaller() { - return Microservices.builder() - .discovery(ServiceAuthRemoteTest::serviceDiscovery) - .transport( - () -> - new RSocketServiceTransport() - .credentialsSupplier(ServiceAuthRemoteTest::invalidCredentialsSupplier)) - .startAwait(); + return Microservices.start( + new Context() + .discovery(ServiceAuthRemoteTest::serviceDiscovery) + .transport( + () -> + new RSocketServiceTransport() + .credentialsSupplier(ServiceAuthRemoteTest::invalidCredentialsSupplier))); } private static Mono> credentialsSupplier(ServiceReference serviceReference) { diff --git a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java index be400b5c6..d1ccf9255 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java @@ -16,6 +16,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.exceptions.ServiceException; @@ -55,7 +56,7 @@ public static void setup() { @AfterAll public static void tearDown() { - provider.shutdown().block(); + provider.close(); } @Test @@ -73,15 +74,15 @@ public void test_local_async_no_params() { } private static Microservices serviceProvider() { - return Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + return Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); } @Test diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index 55563b1af..1fc679b19 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -17,6 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.exceptions.ServiceException; @@ -60,29 +61,29 @@ public static void setup() { @AfterAll public static void tearDown() { try { - gateway.shutdown().block(); + gateway.close(); } catch (Exception ignore) { // no-op } try { - provider.shutdown().block(); + provider.close(); } catch (Exception ignore) { // no-op } } private static Microservices serviceProvider(Object service) { - return Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gateway.discoveryAddress().toString()))) - .transport(RSocketServiceTransport::new) - .services(service) - .startAwait(); + return Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gateway.discoveryAddress().toString()))) + .transport(RSocketServiceTransport::new) + .services(service)); } @Test @@ -241,7 +242,7 @@ public void test_service_address_lookup_occur_only_after_subscription() { StepVerifier.create(quotes.take(1)).expectNextCount(1).expectComplete().verify(TIMEOUT); try { - quotesService.shutdown(); + quotesService.close(); } catch (Exception ignored) { // no-op } @@ -299,13 +300,13 @@ private static Optional route( } private static Microservices gateway() { - return Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + return Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); } } diff --git a/services/src/test/java/io/scalecube/services/ServiceDiscoverySubscriberTest.java b/services/src/test/java/io/scalecube/services/ServiceDiscoverySubscriberTest.java index 92e02b898..2c834af3b 100644 --- a/services/src/test/java/io/scalecube/services/ServiceDiscoverySubscriberTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceDiscoverySubscriberTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verifyNoInteractions; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.annotations.Subscriber; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import java.util.concurrent.atomic.AtomicReference; @@ -18,7 +19,7 @@ void testRegisterNonDiscoveryCoreSubscriber() { final NonDiscoverySubscriber1 discoverySubscriber1 = spy(new NonDiscoverySubscriber1()); final NonDiscoverySubscriber2 discoverySubscriber2 = spy(new NonDiscoverySubscriber2()); - Microservices.builder().services(discoverySubscriber1, discoverySubscriber2).startAwait(); + Microservices.start(new Context().services(discoverySubscriber1, discoverySubscriber2)); verifyNoInteractions(discoverySubscriber1, discoverySubscriber2); } @@ -28,7 +29,7 @@ void testRegisterNotMatchingTypeDiscoveryCoreSubscriber() { final NotMatchingTypeDiscoverySubscriber discoverySubscriber = spy(new NotMatchingTypeDiscoverySubscriber()); - Microservices.builder().services(discoverySubscriber).startAwait(); + Microservices.start(new Context().services(discoverySubscriber)); verifyNoInteractions(discoverySubscriber); } @@ -39,7 +40,7 @@ void testRegisterDiscoveryCoreSubscriber() { final NormalDiscoverySubscriber normalDiscoverySubscriber = new NormalDiscoverySubscriber(subscriptionReference); - Microservices.builder().services(normalDiscoverySubscriber).startAwait(); + Microservices.start(new Context().services(normalDiscoverySubscriber)); assertNotNull(subscriptionReference.get(), "subscription"); } diff --git a/services/src/test/java/io/scalecube/services/ServiceLifecycleAnnotationsTest.java b/services/src/test/java/io/scalecube/services/ServiceLifecycleAnnotationsTest.java index 37fbf875f..bd4aaa67a 100644 --- a/services/src/test/java/io/scalecube/services/ServiceLifecycleAnnotationsTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceLifecycleAnnotationsTest.java @@ -5,6 +5,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.annotations.AfterConstruct; import io.scalecube.services.annotations.BeforeDestroy; import io.scalecube.services.annotations.Service; @@ -21,25 +22,25 @@ public class ServiceLifecycleAnnotationsTest extends BaseTest { void testAfterConstructThenBeforeDestroy() { //noinspection EmptyTryBlock,unused try (Microservices microservices = - Microservices.builder() - .services( - ServiceInfo.fromServiceInstance( - new TestService() { - @AfterConstruct - void init() { - afterConstruct.invoke(); - } - }) - .build(), - ServiceInfo.fromServiceInstance( - new TestService() { - @BeforeDestroy - void cleanup() { - beforeDestroy.invoke(); - } - }) - .build()) - .startAwait()) {} + Microservices.start( + new Context() + .services( + ServiceInfo.fromServiceInstance( + new TestService() { + @AfterConstruct + void init() { + afterConstruct.invoke(); + } + }) + .build(), + ServiceInfo.fromServiceInstance( + new TestService() { + @BeforeDestroy + void cleanup() { + beforeDestroy.invoke(); + } + }) + .build()))) {} verify(afterConstruct, times(1)).invoke(); verify(beforeDestroy, times(1)).invoke(); @@ -52,25 +53,25 @@ void testAfterConstructFailsThenBeforeDestroy() { //noinspection EmptyTryBlock,unused try (Microservices microservices = - Microservices.builder() - .services( - ServiceInfo.fromServiceInstance( - new TestService() { - @AfterConstruct - void init() { - afterConstruct.invoke(); - } - }) - .build(), - ServiceInfo.fromServiceInstance( - new TestService() { - @BeforeDestroy - void cleanup() { - beforeDestroy.invoke(); - } - }) - .build()) - .startAwait()) { + Microservices.start( + new Context() + .services( + ServiceInfo.fromServiceInstance( + new TestService() { + @AfterConstruct + void init() { + afterConstruct.invoke(); + } + }) + .build(), + ServiceInfo.fromServiceInstance( + new TestService() { + @BeforeDestroy + void cleanup() { + beforeDestroy.invoke(); + } + }) + .build()))) { } catch (Exception ex) { assertSame(exception, Throwables.getRootCause(ex)); } diff --git a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java index f44a61d79..eb3cd4bbe 100644 --- a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.sut.GreetingRequest; import io.scalecube.services.sut.GreetingResponse; @@ -28,13 +29,13 @@ public class ServiceLocalTest extends BaseTest { @BeforeEach public void setUp() { - microservices = Microservices.builder().services(new GreetingServiceImpl()).startAwait(); + microservices = Microservices.start(new Context().services(new GreetingServiceImpl())); } @AfterEach public void cleanUp() { if (microservices != null) { - microservices.shutdown().block(timeout); + microservices.close(); } } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java index d4da76f70..923cc72b1 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java @@ -8,6 +8,7 @@ import io.scalecube.cluster.codec.jackson.JacksonMetadataCodec; import io.scalecube.cluster.metadata.JdkMetadataCodec; import io.scalecube.cluster.metadata.MetadataCodec; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import io.scalecube.services.discovery.api.ServiceDiscoveryFactory; @@ -23,7 +24,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; @@ -42,10 +42,10 @@ public void test_added_removed_registration_events(MetadataCodec metadataCodec) Sinks.Many events = Sinks.many().replay().all(); Microservices seed = - Microservices.builder() - .discovery(defServiceDiscovery(metadataCodec)) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(metadataCodec)) + .transport(RSocketServiceTransport::new)); seed.listenDiscovery() .subscribe(events::tryEmitNext, events::tryEmitError, events::tryEmitComplete); @@ -53,28 +53,32 @@ public void test_added_removed_registration_events(MetadataCodec metadataCodec) Address seedAddress = seed.discoveryAddress(); Microservices ms1 = - Microservices.builder() - .discovery(defServiceDiscovery(seedAddress, metadataCodec)) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(seedAddress, metadataCodec)) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); Microservices ms2 = - Microservices.builder() - .discovery(defServiceDiscovery(seedAddress, metadataCodec)) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(seedAddress, metadataCodec)) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); StepVerifier.create(events.asFlux().onBackpressureBuffer()) .assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type())) .assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type())) - .then(() -> Mono.whenDelayError(ms1.shutdown(), ms2.shutdown()).block(TIMEOUT)) + .then( + () -> { + ms1.close(); + ms2.close(); + }) .assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type())) .assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type())) .assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type())) .assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type())) - .then(() -> seed.shutdown().block(TIMEOUT)) + .then(seed::close) .thenCancel() .verify(TIMEOUT); } @@ -87,11 +91,12 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) { List cluster = new CopyOnWriteArrayList<>(); Microservices seed = - Microservices.builder() - .discovery(defServiceDiscovery(metadataCodec)) - .transport(RSocketServiceTransport::new) - .services(new AnnotationServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(metadataCodec)) + .transport(RSocketServiceTransport::new) + .services(new AnnotationServiceImpl())); + cluster.add(seed); seed.listenDiscovery() @@ -103,29 +108,31 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) { .then( () -> { Microservices ms1 = - Microservices.builder() - .discovery(defServiceDiscovery(seedAddress, metadataCodec)) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(seedAddress, metadataCodec)) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); + cluster.add(ms1); }) .assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type())) .then( () -> { Microservices ms2 = - Microservices.builder() - .discovery(defServiceDiscovery(seedAddress, metadataCodec)) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(seedAddress, metadataCodec)) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); + cluster.add(ms2); }) .assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type())) - .then(() -> cluster.remove(2).shutdown().block(TIMEOUT)) + .then(() -> cluster.remove(2).close()) .assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type())) .assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type())) - .then(() -> cluster.remove(1).shutdown().block(TIMEOUT)) + .then(() -> cluster.remove(1).close()) .assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type())) .assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type())) .thenCancel() @@ -141,8 +148,7 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) { .thenCancel() .verify(TIMEOUT); - Mono.whenDelayError(cluster.stream().map(Microservices::shutdown).toArray(Mono[]::new)) - .block(TIMEOUT); + cluster.forEach(Microservices::close); } @ParameterizedTest @@ -153,11 +159,11 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) List cluster = new CopyOnWriteArrayList<>(); Microservices seed = - Microservices.builder() - .discovery(defServiceDiscovery(metadataCodec)) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(metadataCodec)) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); cluster.add(seed); seed.listenDiscovery() @@ -169,22 +175,23 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) .then( () -> { Microservices ms1 = - Microservices.builder() - .discovery(defServiceDiscovery(seedAddress, metadataCodec)) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl(), new AnnotationServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(seedAddress, metadataCodec)) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl(), new AnnotationServiceImpl())); cluster.add(ms1); }) .assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type())) .then( () -> { Microservices ms2 = - Microservices.builder() - .discovery(defServiceDiscovery(seedAddress, metadataCodec)) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery(defServiceDiscovery(seedAddress, metadataCodec)) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); + cluster.add(ms2); }) .assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type())) @@ -197,8 +204,7 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) .thenCancel() .verify(TIMEOUT); - Mono.whenDelayError(cluster.stream().map(Microservices::shutdown).toArray(Mono[]::new)) - .block(TIMEOUT); + cluster.forEach(Microservices::close); } private ServiceDiscoveryFactory defServiceDiscovery(MetadataCodec metadataCodec) { diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index 630ac3b52..e32915728 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscovery; @@ -54,35 +55,35 @@ public static void setup() { @AfterAll public static void tearDown() { try { - gateway.shutdown().block(); + gateway.close(); } catch (Exception ignore) { // no-op } try { - provider.shutdown().block(); + provider.close(); } catch (Exception ignore) { // no-op } } private static Microservices gateway() { - return Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + return Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); } private static Microservices serviceProvider() { - return Microservices.builder() - .discovery(ServiceRemoteTest::serviceDiscovery) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) - .startAwait(); + return Microservices.start( + new Context() + .discovery(ServiceRemoteTest::serviceDiscovery) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); } @Test @@ -274,11 +275,12 @@ public void test_remote_serviceA_calls_serviceB_using_setter() { // Create microservices instance cluster. // noinspection unused Microservices provider = - Microservices.builder() - .discovery(ServiceRemoteTest::serviceDiscovery) - .transport(RSocketServiceTransport::new) - .services(new CoarseGrainedServiceImpl()) // add service a and b - .startAwait(); + Microservices.start( + new Context() + .discovery(ServiceRemoteTest::serviceDiscovery) + .transport(RSocketServiceTransport::new) + .services(new CoarseGrainedServiceImpl()) // add service a and b + ); // Get a proxy to the service api. CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class); @@ -286,7 +288,7 @@ public void test_remote_serviceA_calls_serviceB_using_setter() { Publisher future = service.callGreeting("joe"); assertEquals(" hello to: joe", Mono.from(future).block(Duration.ofSeconds(1))); - provider.shutdown().then(Mono.delay(TIMEOUT2)).block(); + provider.close(); } @Test @@ -297,17 +299,17 @@ public void test_remote_serviceA_calls_serviceB() { // Create microservices instance cluster. // noinspection unused Microservices provider = - Microservices.builder() - .discovery(ServiceRemoteTest::serviceDiscovery) - .transport(RSocketServiceTransport::new) - .services(another) - .startAwait(); + Microservices.start( + new Context() + .discovery(ServiceRemoteTest::serviceDiscovery) + .transport(RSocketServiceTransport::new) + .services(another)); // Get a proxy to the service api. CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class); Publisher future = service.callGreeting("joe"); assertEquals(" hello to: joe", Mono.from(future).block(Duration.ofSeconds(1))); - provider.shutdown().then(Mono.delay(TIMEOUT2)).block(); + provider.close(); } @Test @@ -317,11 +319,12 @@ public void test_remote_serviceA_calls_serviceB_with_timeout() { // Create microservices instance cluster. Microservices ms = - Microservices.builder() - .discovery(ServiceRemoteTest::serviceDiscovery) - .transport(RSocketServiceTransport::new) - .services(another) // add service a and b - .startAwait(); + Microservices.start( + new Context() + .discovery(ServiceRemoteTest::serviceDiscovery) + .transport(RSocketServiceTransport::new) + .services(another) // add service a and b + ); // Get a proxy to the service api. CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class); @@ -331,7 +334,7 @@ public void test_remote_serviceA_calls_serviceB_with_timeout() { () -> Mono.from(service.callGreetingTimeout("joe")).block()); assertTrue(exception.getMessage().contains("Did not observe any item or terminal signal")); System.out.println("done"); - ms.shutdown().then(Mono.delay(TIMEOUT2)).block(); + ms.close(); } @Test @@ -342,11 +345,12 @@ public void test_remote_serviceA_calls_serviceB_with_dispatcher() { // Create microservices instance cluster. Microservices provider = - Microservices.builder() - .discovery(ServiceRemoteTest::serviceDiscovery) - .transport(RSocketServiceTransport::new) - .services(another) // add service a and b - .startAwait(); + Microservices.start( + new Context() + .discovery(ServiceRemoteTest::serviceDiscovery) + .transport(RSocketServiceTransport::new) + .services(another) // add service a and b + ); // Get a proxy to the service api. CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class); @@ -354,7 +358,7 @@ public void test_remote_serviceA_calls_serviceB_with_dispatcher() { String response = service.callGreetingWithDispatcher("joe").block(Duration.ofSeconds(5)); assertEquals(response, " hello to: joe"); - provider.shutdown().then(Mono.delay(TIMEOUT2)).block(); + provider.close(); } @Test @@ -504,16 +508,16 @@ public void test_services_contribute_to_cluster_metadata() { tags.put("HOSTNAME", "host1"); Microservices ms = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .tags(tags) - .services(new GreetingServiceImpl()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .tags(tags) + .services(new GreetingServiceImpl())); assertTrue(ms.serviceEndpoint().tags().containsKey("HOSTNAME")); } diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index 025053d1a..87c8c4673 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.sut.QuoteService; @@ -28,30 +29,30 @@ public class StreamingServiceTest extends BaseTest { @BeforeAll public static void setup() { gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .defaultDataDecoder(ServiceMessageCodec::decodeData) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .defaultDataDecoder(ServiceMessageCodec::decodeData)); final Address gatewayAddress = gateway.discoveryAddress(); node = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .defaultDataDecoder(ServiceMessageCodec::decodeData) - .services(new SimpleQuoteService()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .defaultDataDecoder(ServiceMessageCodec::decodeData) + .services(new SimpleQuoteService())); } @Test diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index 05ed4d079..073ce51c8 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -12,6 +12,7 @@ import io.scalecube.services.Address; import io.scalecube.services.BaseTest; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.Reflect; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceInfo; @@ -56,81 +57,81 @@ public class RoutersTest extends BaseTest { @BeforeAll public static void setup() { gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); gatewayAddress = gateway.discoveryAddress(); // Create microservices instance cluster. provider1 = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services( - ServiceInfo.fromServiceInstance(new GreetingServiceImpl(1)) - .tag("ONLYFOR", "joe") - .tag("SENDER", "1") - .build(), - ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) - .tag("Weight", "0.1") - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services( + ServiceInfo.fromServiceInstance(new GreetingServiceImpl(1)) + .tag("ONLYFOR", "joe") + .tag("SENDER", "1") + .build(), + ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) + .tag("Weight", "0.1") + .build())); // Create microservices instance cluster. provider2 = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services( - ServiceInfo.fromServiceInstance(new GreetingServiceImpl(2)) - .tag("ONLYFOR", "fransin") - .tag("SENDER", "2") - .build(), - ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) - .tag("Weight", "0.9") - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services( + ServiceInfo.fromServiceInstance(new GreetingServiceImpl(2)) + .tag("ONLYFOR", "fransin") + .tag("SENDER", "2") + .build(), + ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) + .tag("Weight", "0.9") + .build())); TagService tagService = input -> input.map(String::toUpperCase); provider3 = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services( - ServiceInfo.fromServiceInstance(tagService) - .tag("tagB", "bb") - .tag("tagC", "c") - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services( + ServiceInfo.fromServiceInstance(tagService) + .tag("tagB", "bb") + .tag("tagC", "c") + .build())); } @AfterAll public static void tearDown() { - gateway.shutdown().block(); - provider1.shutdown().block(); - provider2.shutdown().block(); - provider3.shutdown().block(); + gateway.close(); + provider1.close(); + provider2.close(); + provider3.close(); } @Test diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java index d11f07e7a..d99386c84 100644 --- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java +++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java @@ -2,6 +2,7 @@ import io.scalecube.services.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceInfo; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.routings.sut.CanaryService; @@ -22,46 +23,46 @@ public class ServiceTagsExample { */ public static void main(String[] args) { Microservices gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); Address seedAddress = gateway.discoveryAddress(); Microservices services1 = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services( - ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) - .tag("Weight", "0.3") - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services( + ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) + .tag("Weight", "0.3") + .build())); Microservices services2 = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services( - ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) - .tag("Weight", "0.7") - .build()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services( + ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) + .tag("Weight", "0.7") + .build())); CanaryService service = gateway.call().router(WeightedRandomRouter.class).api(CanaryService.class); diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java index 71aee1a36..175bef1b7 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java @@ -5,6 +5,7 @@ import io.scalecube.services.Address; import io.scalecube.services.BaseTest; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.annotations.Inject; import io.scalecube.services.annotations.Service; @@ -28,54 +29,54 @@ public class RSocketNettyColocatedEventLoopGroupTest extends BaseTest { @BeforeEach public void setUp() { this.gateway = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new)); final Address gatewayAddress = this.gateway.discoveryAddress(); Microservices facade = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new Facade()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new Facade())); final Address facadeAddress = facade.discoveryAddress(); this.ping = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(facadeAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services((PingService) () -> Mono.just(Thread.currentThread().getName())) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(facadeAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services((PingService) () -> Mono.just(Thread.currentThread().getName()))); this.pong = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(facadeAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services((PongService) () -> Mono.just(Thread.currentThread().getName())) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .membership(cfg -> cfg.seedMembers(facadeAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services((PongService) () -> Mono.just(Thread.currentThread().getName()))); } @Test @@ -94,15 +95,9 @@ public void testColocatedEventLoopGroup() { @AfterEach public void tearDown() { - try { - Mono.whenDelayError( - Optional.ofNullable(gateway).map(Microservices::shutdown).orElse(Mono.empty()), - Optional.ofNullable(ping).map(Microservices::shutdown).orElse(Mono.empty()), - Optional.ofNullable(pong).map(Microservices::shutdown).orElse(Mono.empty())) - .block(); - } catch (Throwable ignore) { - // no-op - } + Optional.ofNullable(gateway).ifPresent(Microservices::close); + Optional.ofNullable(ping).ifPresent(Microservices::close); + Optional.ofNullable(pong).ifPresent(Microservices::close); } @Service("facade") diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index 0c41f4ab6..8c2419212 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -7,6 +7,7 @@ import io.scalecube.services.Address; import io.scalecube.services.BaseTest; import io.scalecube.services.Microservices; +import io.scalecube.services.Microservices.Context; import io.scalecube.services.ServiceCall; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; @@ -25,7 +26,6 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.Disposable; -import reactor.core.publisher.Mono; public class RSocketServiceTransportTest extends BaseTest { @@ -43,40 +43,34 @@ public class RSocketServiceTransportTest extends BaseTest { @BeforeEach public void setUp() { gateway = - Microservices.builder() - .discovery( - endpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(endpoint))) - .transport(RSocketServiceTransport::new) - .startAwait(); + Microservices.start( + new Context() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint))) + .transport(RSocketServiceTransport::new)); final Address gatewayAddress = this.gateway.discoveryAddress(); serviceNode = - Microservices.builder() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) - .transport(RSocketServiceTransport::new) - .services(new SimpleQuoteService()) - .startAwait(); + Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) + .transport(RSocketServiceTransport::new) + .services(new SimpleQuoteService())); } @AfterEach public void cleanUp() { - try { - Mono.whenDelayError( - Optional.ofNullable(gateway).map(Microservices::shutdown).orElse(Mono.empty()), - Optional.ofNullable(serviceNode).map(Microservices::shutdown).orElse(Mono.empty())) - .block(); - } catch (Throwable ignore) { - // no-op - } + Optional.ofNullable(gateway).ifPresent(Microservices::close); + Optional.ofNullable(serviceNode).ifPresent(Microservices::close); } @Disabled @@ -98,7 +92,7 @@ public void test_remote_node_died_mono_never() throws Exception { // service node goes down TimeUnit.SECONDS.sleep(3); - serviceNode.shutdown().block(TIMEOUT); + serviceNode.close(); if (!latch.await(20, TimeUnit.SECONDS)) { fail("latch.await"); @@ -129,7 +123,7 @@ public void test_remote_node_died_many_never() throws Exception { // service node goes down TimeUnit.SECONDS.sleep(3); - serviceNode.shutdown().block(TIMEOUT); + serviceNode.close(); if (!latch.await(20, TimeUnit.SECONDS)) { fail("latch.await"); @@ -164,7 +158,7 @@ public void test_remote_node_died_many_then_never() throws Exception { // service node goes down TimeUnit.SECONDS.sleep(3); - serviceNode.shutdown().block(TIMEOUT); + serviceNode.close(); if (!latch.await(20, TimeUnit.SECONDS)) { fail("latch.await");