diff --git a/README.md b/README.md index 4152b8421..f2f12519e 100644 --- a/README.md +++ b/README.md @@ -63,20 +63,24 @@ The example provisions 2 cluster nodes and making a remote interaction. ```java //1. ScaleCube Node node with no members Microservices seed = Microservices.builder().startAwait(); + MicroservicesContext seedContext = seed.context(); - //2. Construct a ScaleCube node which joins the cluster hosting the Greeting Service + //2. Create ServiceFactory + ServiceFactory serviceFactory = ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()); + + //3. Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices microservices = Microservices.builder() .discovery( self -> new ScalecubeServiceDiscovery(self) - .options(opts -> opts.seedMembers(toAddress(seed.discovery().address())))) + .options(opts -> opts.seedMembers(seedContext.discovery().address()))) .transport(ServiceTransports::rsocketServiceTransport) - .services(new GreetingServiceImpl()) + .serviceFactory(serviceFactory) .startAwait(); //3. Create service proxy - GreetingsService service = seed.call().api(GreetingsService.class); + GreetingsService service = seedContext.serviceCall().api(GreetingsService.class); // Execute the services and subscribe to service events service.sayHello("joe").subscribe(consumer -> { diff --git a/services-api/src/main/java/io/scalecube/services/MicroservicesContext.java b/services-api/src/main/java/io/scalecube/services/MicroservicesContext.java new file mode 100644 index 000000000..1bfbcaac0 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/MicroservicesContext.java @@ -0,0 +1,46 @@ +package io.scalecube.services; + +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; +import reactor.core.publisher.Flux; + +/** + * Context of Microservices node. Contain all public API of Microservices node, include gateways, + * service discoveries, etc. + * + *

Can be used in user services. + * + * @see ServiceFactory + */ +public interface MicroservicesContext { + + /** + * Service endpoint of current Scalecube node. + * + * @return id + */ + ServiceEndpoint serviceEndpoint(); + + /** + * Used for remote service call. + * + * @return new instance service call + * @see ServiceCall + */ + ServiceCall serviceCall(); + + /** + * Function to subscribe and listen on {@code ServiceDiscoveryEvent} events. + * + * @return stream of {@code ServiceDiscoveryEvent} events + */ + Flux listenDiscovery(); + + /** + * Function to subscribe and listen on {@code ServiceDiscoveryEvent} events by service discovery + * id. + * + * @param id service discovery id + * @return service discovery context + */ + Flux listenDiscovery(String id); +} diff --git a/services-api/src/main/java/io/scalecube/services/Reflect.java b/services-api/src/main/java/io/scalecube/services/Reflect.java index 9f1b64b92..54a6a2674 100644 --- a/services-api/src/main/java/io/scalecube/services/Reflect.java +++ b/services-api/src/main/java/io/scalecube/services/Reflect.java @@ -18,11 +18,11 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -243,14 +243,26 @@ public static Map serviceMethods(Class serviceInterface) { /** * Util function to get service interfaces collections from service instance. * - * @param serviceObject with extends service interface with @Service annotation. + * @param serviceObject object with extends service interface with @Service annotation. * @return service interface class. */ - public static Collection> serviceInterfaces(Object serviceObject) { - Class[] interfaces = serviceObject.getClass().getInterfaces(); + public static Stream> serviceInterfaces(Object serviceObject) { + return serviceInterfaces(serviceObject.getClass()); + } + + /** + * Util function to get service interfaces collections from service instance. + * + * @param serviceType with extends service interface with @Service annotation. + * @return service interface class. + */ + public static Stream> serviceInterfaces(Class serviceType) { + if (serviceType.isInterface() && serviceType.isAnnotationPresent(Service.class)) { + return Stream.of(serviceType); + } + Class[] interfaces = serviceType.getInterfaces(); return Arrays.stream(interfaces) - .filter(interfaceClass -> interfaceClass.isAnnotationPresent(Service.class)) - .collect(Collectors.toList()); + .filter(interfaceClass -> interfaceClass.isAnnotationPresent(Service.class)); } public static String methodName(Method method) { diff --git a/services-api/src/main/java/io/scalecube/services/ServiceCall.java b/services-api/src/main/java/io/scalecube/services/ServiceCall.java index 6b7f99819..4194ac77a 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceCall.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceCall.java @@ -367,7 +367,7 @@ public T api(Class serviceInterface) { return (T) Proxy.newProxyInstance( getClass().getClassLoader(), - new Class[] {serviceInterface}, + new Class[]{serviceInterface}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] params) { diff --git a/services-api/src/main/java/io/scalecube/services/ServiceDefinition.java b/services-api/src/main/java/io/scalecube/services/ServiceDefinition.java new file mode 100644 index 000000000..cc8ecd9f8 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/ServiceDefinition.java @@ -0,0 +1,28 @@ +package io.scalecube.services; + +import java.util.Collections; +import java.util.Map; + +/** Definition of service - type and tags. */ +public class ServiceDefinition { + + private final Class serviceType; + private final Map tags; + + public ServiceDefinition(Class serviceType, Map tags) { + this.serviceType = serviceType; + this.tags = tags; + } + + public ServiceDefinition(Class serviceType) { + this(serviceType, Collections.emptyMap()); + } + + public Class type() { + return serviceType; + } + + public Map tags() { + return tags; + } +} diff --git a/services-api/src/main/java/io/scalecube/services/ServiceFactory.java b/services-api/src/main/java/io/scalecube/services/ServiceFactory.java new file mode 100644 index 000000000..dc4780994 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/ServiceFactory.java @@ -0,0 +1,34 @@ +package io.scalecube.services; + +import java.util.Collection; +import reactor.core.publisher.Mono; + +/** Manages the life cycle of all services registered with Scalecube Services. */ +public interface ServiceFactory { + + /** + * Provide service definitions to be deployed on the current Scalecube node. + * + * @return collection of service definitions - service type and tags. + * @see ServiceDefinition + */ + Collection getServiceDefinitions(); + + /** + * Creates instances of services declared in the method {@link this#getServiceDefinitions()} + * getServiceDefinitions. + * + * @param microservices microservices context + * @return Completed Mono if initialization was successful for all services. + */ + Mono> initializeServices(MicroservicesContext microservices); + + /** + * Finalization of service instances. + * + * @return completed Mono if finalization was successful for all services. + */ + default Mono shutdownServices() { + return Mono.defer(Mono::empty); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/ServiceInfo.java b/services-api/src/main/java/io/scalecube/services/ServiceInfo.java index da4ed3128..dfd394ee6 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceInfo.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceInfo.java @@ -74,7 +74,7 @@ public String toString() { public static class Builder { - private Object serviceInstance; + private final Object serviceInstance; private Map tags = new HashMap<>(); private ServiceProviderErrorMapper errorMapper; private ServiceMessageDataDecoder dataDecoder; @@ -161,6 +161,12 @@ public Builder principalMapper( return this; } + /** + * Set up {@link ServiceProviderErrorMapper} if it hasn't been set up before. + * + * @param errorMapper error mapper. + * @return current builder's state. + */ Builder errorMapperIfAbsent(ServiceProviderErrorMapper errorMapper) { if (this.errorMapper == null) { this.errorMapper = errorMapper; @@ -175,6 +181,12 @@ Builder dataDecoderIfAbsent(ServiceMessageDataDecoder dataDecoder) { return this; } + /** + * Set up {@link Authenticator} if it hasn't been set up before. + * + * @param authenticator authenticator. + * @return current builder's state. + */ Builder authenticatorIfAbsent(Authenticator authenticator) { if (this.authenticator == null) { this.authenticator = authenticator; @@ -189,6 +201,11 @@ Builder principalMapperIfAbsent(PrincipalMapper principalMapper) return this; } + /** + * Build service info. + * + * @return service info. + */ public ServiceInfo build() { return new ServiceInfo(this); } diff --git a/services-api/src/main/java/io/scalecube/services/ServiceProvider.java b/services-api/src/main/java/io/scalecube/services/ServiceProvider.java index 3c55e23f4..6e7f1ce23 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceProvider.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceProvider.java @@ -2,7 +2,13 @@ import java.util.Collection; +/** + * Provide service instances. + * + * @deprecated use {@link ServiceFactory} + */ @FunctionalInterface +@Deprecated public interface ServiceProvider { Collection provide(ServiceCall call); diff --git a/services-api/src/main/java/io/scalecube/services/ServiceScanner.java b/services-api/src/main/java/io/scalecube/services/ServiceScanner.java index 45e564528..29f4fbb60 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceScanner.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceScanner.java @@ -1,6 +1,5 @@ package io.scalecube.services; -import io.scalecube.services.annotations.Service; import io.scalecube.services.annotations.ServiceMethod; import java.util.Arrays; import java.util.Collections; @@ -18,15 +17,15 @@ private ServiceScanner() { /** * Scans {@code ServiceInfo} and builds list of {@code ServiceRegistration}-s. * - * @param serviceInfo service info instance + * @param serviceDefinition service info instance * @return list of {@code ServiceRegistration}-s */ - public static List scanServiceInfo(ServiceInfo serviceInfo) { - return Arrays.stream(serviceInfo.serviceInstance().getClass().getInterfaces()) - .filter(serviceInterface -> serviceInterface.isAnnotationPresent(Service.class)) + public static List scanServiceDefinition( + ServiceDefinition serviceDefinition) { + return Reflect.serviceInterfaces(serviceDefinition.type()) .map( serviceInterface -> { - Map serviceInfoTags = serviceInfo.tags(); + Map serviceInfoTags = serviceDefinition.tags(); Map apiTags = Reflect.serviceTags(serviceInterface); Map buffer = new HashMap<>(apiTags); // service tags override tags from @Service diff --git a/services-benchmarks/src/main/java/io/scalecube/services/benchmarks/transport/BenchmarkServiceState.java b/services-benchmarks/src/main/java/io/scalecube/services/benchmarks/transport/BenchmarkServiceState.java index 4528dc7ea..c3ed90f73 100644 --- a/services-benchmarks/src/main/java/io/scalecube/services/benchmarks/transport/BenchmarkServiceState.java +++ b/services-benchmarks/src/main/java/io/scalecube/services/benchmarks/transport/BenchmarkServiceState.java @@ -4,7 +4,9 @@ import io.scalecube.benchmarks.BenchmarkState; import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; import io.scalecube.services.ServiceCall; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import java.time.Duration; @@ -38,6 +40,8 @@ public void beforeAll() { final Address seedAddress = seed.discovery("seed").address(); + ServiceFactory serviceFactory = ScalecubeServiceFactory.fromInstances(services); + node = Microservices.builder() .discovery( @@ -46,7 +50,7 @@ public void beforeAll() { new ScalecubeServiceDiscovery(serviceEndpoint) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) - .services(services) + .serviceFactory(serviceFactory) .startAwait(); LOGGER.info( @@ -74,6 +78,6 @@ public T api(Class c) { } public ServiceCall call() { - return seed.call(); + return seed.serviceCall(); } } diff --git a/services-examples-parent/services-examples-runner/pom.xml b/services-examples-parent/services-examples-runner/pom.xml index ca2d41314..db7590d7e 100644 --- a/services-examples-parent/services-examples-runner/pom.xml +++ b/services-examples-parent/services-examples-runner/pom.xml @@ -10,6 +10,7 @@ scalecube-services-examples-runner + ${env.TRAVIS_COMMIT} io.scalecube.services.examples.ExamplesRunner diff --git a/services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java b/services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java index 55df64551..899d58e7d 100644 --- a/services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java +++ b/services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java @@ -9,7 +9,9 @@ import io.scalecube.net.Address; import io.scalecube.runners.Runners; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; import io.scalecube.services.ServiceEndpoint; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; @@ -52,6 +54,9 @@ public static void main(String[] args) { .orElse(Runtime.getRuntime().availableProcessors()); LOGGER.info("Number of worker threads: " + numOfThreads); + ServiceFactory serviceFactory = + ScalecubeServiceFactory.fromInstances( + new BenchmarkServiceImpl(), new GreetingServiceImpl()); Microservices microservices = Microservices.builder() .discovery("microservices", endpoint -> serviceDiscovery(endpoint, config)) @@ -72,7 +77,7 @@ public static void main(String[] args) { .port(config.servicePort()) .runOn(loopResources) .noSSL())) - .services(new BenchmarkServiceImpl(), new GreetingServiceImpl()) + .serviceFactory(serviceFactory) .startAwait(); Runners.onShutdown(() -> microservices.shutdown().subscribe()); diff --git a/services-examples-parent/services-examples/pom.xml b/services-examples-parent/services-examples/pom.xml index 00aa0e867..40d0d0df7 100644 --- a/services-examples-parent/services-examples/pom.xml +++ b/services-examples-parent/services-examples/pom.xml @@ -57,6 +57,21 @@ org.apache.logging.log4j log4j-core + + org.springframework + spring-core + 5.2.6.RELEASE + + + org.springframework + spring-context + 5.2.6.RELEASE + + + com.google.inject + guice + 4.1.0 + diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java index f95898ce4..d2a691ce5 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java @@ -2,6 +2,8 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; import io.scalecube.services.examples.helloworld.service.api.GreetingsService; @@ -29,6 +31,10 @@ public static void main(String[] args) { final Address seedAddress = seed.discovery("seed").address(); + // Create service factory for GreetingService + ServiceFactory serviceFactory = + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()); + // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices ms = Microservices.builder() @@ -38,21 +44,21 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(serviceFactory) .startAwait(); - seed.call() + seed.serviceCall() .api(GreetingsService.class) .sayHello("joe (on default dataFormat PROTOSTUFF)") .subscribe(consumer -> System.out.println(consumer.message())); - seed.call() + seed.serviceCall() .contentType(JSON) .api(GreetingsService.class) .sayHello("alice (on JSON dataFormat)") .subscribe(consumer -> System.out.println(consumer.message())); - seed.call() + seed.serviceCall() .contentType(OCTET_STREAM) .api(GreetingsService.class) .sayHello("bob (on java native Serializable/Externalizable dataFormat)") diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java index b39cca8e5..2cced46e1 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java @@ -2,11 +2,12 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; +import io.scalecube.services.ScalecubeServiceFactory; import io.scalecube.services.annotations.AfterConstruct; import io.scalecube.services.annotations.Service; import io.scalecube.services.annotations.ServiceMethod; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; -import io.scalecube.services.discovery.api.ServiceDiscoveryContext; import io.scalecube.services.examples.helloworld.service.api.Greeting; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import reactor.core.publisher.Mono; @@ -51,7 +52,7 @@ public static void main(String[] args) { .options(opts -> opts.memberAlias("ms1")) .membership(cfg -> cfg.seedMembers(seed1Address))) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl1()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl1())) .startAwait(); Microservices ms2 = @@ -63,7 +64,7 @@ public static void main(String[] args) { .options(opts -> opts.memberAlias("ms2")) .membership(cfg -> cfg.seedMembers(seed2Address))) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl2()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl2())) .startAwait(); Microservices compositeMs = @@ -84,11 +85,19 @@ public static void main(String[] args) { .startAwait(); Greeting greeting1 = - compositeMs.call().api(GreetingsService1.class).sayHello("hello one").block(); + compositeMs + .serviceCall() + .api(GreetingsService1.class) + .sayHello("hello one") + .block(); System.err.println("This is response from GreetingsService1: " + greeting1.message()); Greeting greeting2 = - compositeMs.call().api(GreetingsService2.class).sayHello("hello two").block(); + compositeMs + .serviceCall() + .api(GreetingsService2.class) + .sayHello("hello two") + .block(); System.err.println("This is response from GreetingsService2: " + greeting2.message()); } @@ -109,11 +118,8 @@ public interface GreetingsService2 { public static class GreetingServiceImpl1 implements GreetingsService1 { @AfterConstruct - void init(Microservices ms) { - ServiceDiscoveryContext discoveryContext = ms.discovery("ms1"); - System.err.println("discovery(\"ms1\"): " + discoveryContext); - discoveryContext - .listen() + void init(MicroservicesContext ms) { + ms.listenDiscovery("ms1") .subscribe( discoveryEvent -> System.err.println("discovery(\"ms1\") event: " + discoveryEvent)); } @@ -131,11 +137,8 @@ public Mono sayHello(String name) { public static class GreetingServiceImpl2 implements GreetingsService2 { @AfterConstruct - void init(Microservices ms) { - ServiceDiscoveryContext discoveryContext = ms.discovery("ms2"); - System.err.println("discovery(\"ms2\"): " + discoveryContext); - discoveryContext - .listen() + void init(MicroservicesContext ms) { + ms.listenDiscovery("ms2") .subscribe( discoveryEvent -> System.err.println("discovery(\"ms2\") event: " + discoveryEvent)); } diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java index 25ba9e813..232daa47a 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java @@ -2,10 +2,16 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; +import io.scalecube.services.ServiceCall; +import io.scalecube.services.ServiceDefinition; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.ServiceInfo; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; -import java.util.Collections; +import java.util.Collection; +import java.util.List; +import reactor.core.publisher.Mono; public class ExceptionMapperExample { @@ -31,6 +37,25 @@ public static void main(String[] args) throws InterruptedException { final Address address1 = ms1.discovery("ms1").address(); + ServiceFactory serviceFactory = + new ServiceFactory() { + @Override + public Collection getServiceDefinitions() { + ServiceDefinition serviceB = new ServiceDefinition(ServiceB.class); + return List.of(serviceB); + } + + @Override + public Mono> initializeServices( + MicroservicesContext microservices) { + ServiceCall call = microservices.serviceCall(); + ServiceA serviceA = + call.errorMapper(new ServiceAClientErrorMapper()).api(ServiceA.class); + ServiceB serviceB = new ServiceBImpl(serviceA); + ServiceInfo serviceInfoB = ServiceInfo.fromServiceInstance(serviceB).build(); + return Mono.just(List.of(serviceInfoB)); + } + }; Microservices ms2 = Microservices.builder() .discovery( @@ -39,22 +64,12 @@ public static void main(String[] args) throws InterruptedException { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(address1))) .transport(RSocketServiceTransport::new) - .services( - call -> { - ServiceA serviceA = - call.errorMapper( - new ServiceAClientErrorMapper()) // service client error mapper - .api(ServiceA.class); - - ServiceB serviceB = new ServiceBImpl(serviceA); - - return Collections.singleton(ServiceInfo.fromServiceInstance(serviceB).build()); - }) + .serviceFactory(serviceFactory) .startAwait(); System.err.println("ms2 started: " + ms2.serviceAddress()); - ms2.call() + ms2.serviceCall() .api(ServiceB.class) .doAnotherStuff(0) .subscribe( diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java index c8815c4c1..123e698b1 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java @@ -2,6 +2,8 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; import io.scalecube.services.examples.helloworld.service.api.GreetingsService; @@ -32,6 +34,10 @@ public static void main(String[] args) { final Address seedAddress = seed.discovery("seed").address(); + // Create service factory for Greeting Service + ServiceFactory serviceFactory = + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()); + // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices ms = Microservices.builder() @@ -41,11 +47,11 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(serviceFactory) .startAwait(); // Create service proxy - GreetingsService service = seed.call().api(GreetingsService.class); + GreetingsService service = seed.serviceCall().api(GreetingsService.class); // Execute the services and subscribe to service events service.sayHello("joe").subscribe(consumer -> System.out.println(consumer.message())); diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java index fc6ada588..c215a866a 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java @@ -2,7 +2,9 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; import io.scalecube.services.ServiceCall; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; @@ -41,6 +43,10 @@ public static void main(String[] args) { // Construct a ScaleCube node which joins the cluster hosting the Greeting Service final Address seedAddress = seed.discovery("seed").address(); + // Create service factory for Greeting Service + ServiceFactory serviceFactory = + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()); + Microservices ms = Microservices.builder() .discovery( @@ -49,11 +55,11 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(serviceFactory) .startAwait(); // Create a proxy to the seed service node - ServiceCall service = seed.call(); + ServiceCall service = seed.serviceCall(); // Create a ServiceMessage request with service qualifier and data ServiceMessage request = diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java index 4c23ca79b..2508942bc 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java @@ -2,6 +2,8 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.BidiGreetingImpl; import io.scalecube.services.examples.helloworld.service.api.BidiGreetingService; @@ -33,6 +35,9 @@ public static void main(String[] args) { final Address seedAddress = seed.discovery("seed").address(); + // Create service factory for Bid Greeting Service + ServiceFactory serviceFactory = ScalecubeServiceFactory.fromInstances(new BidiGreetingImpl()); + // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices ms = Microservices.builder() @@ -42,11 +47,11 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) - .services(new BidiGreetingImpl()) + .serviceFactory(serviceFactory) .startAwait(); // Create service proxy - BidiGreetingService service = seed.call().api(BidiGreetingService.class); + BidiGreetingService service = seed.serviceCall().api(BidiGreetingService.class); // Execute the services and subscribe to service events service diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/orderbook/Example1.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/orderbook/Example1.java index d6adb6099..4abad4d8c 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/orderbook/Example1.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/orderbook/Example1.java @@ -2,6 +2,8 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.orderbook.service.DefaultMarketDataService; import io.scalecube.services.examples.orderbook.service.OrderBookSnapshoot; @@ -40,6 +42,9 @@ public static void main(String[] args) throws InterruptedException { final Address gatewayAddress = gateway.discovery("gateway").address(); + ServiceFactory serviceFactory = + ScalecubeServiceFactory.fromInstances(new DefaultMarketDataService()); + Microservices ms = Microservices.builder() .discovery( @@ -48,10 +53,10 @@ public static void main(String[] args) throws InterruptedException { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) - .services(new DefaultMarketDataService()) + .serviceFactory(serviceFactory) .startAwait(); - MarketDataService marketService = ms.call().api(MarketDataService.class); + MarketDataService marketService = ms.serviceCall().api(MarketDataService.class); marketService.orderBook().subscribe(Example1::print); diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java index 0e1b80a54..70d6ac881 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java @@ -2,6 +2,8 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import reactor.core.publisher.Mono; @@ -23,6 +25,8 @@ public static void main(String[] args) { final Address gatewayAddress = gateway.discovery("gateway").address(); + ServiceFactory serviceFactory2 = ScalecubeServiceFactory.fromInstances(new Service2Impl()); + Microservices service2Node = Microservices.builder() .discovery( @@ -31,9 +35,11 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) - .services(new Service2Impl()) + .serviceFactory(serviceFactory2) .startAwait(); + ServiceFactory serviceFactory1 = ScalecubeServiceFactory.fromInstances(new Service1Impl()); + Microservices service1Node = Microservices.builder() .discovery( @@ -42,11 +48,11 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) - .services(new Service1Impl()) + .serviceFactory(serviceFactory1) .startAwait(); gateway - .call() + .serviceCall() .api(Service1.class) .manyDelay(100) .publishOn(Schedulers.parallel()) diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java index 361ac56eb..8aa8f68b3 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java @@ -2,6 +2,8 @@ import io.scalecube.net.Address; import io.scalecube.services.Microservices; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceFactory; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import reactor.core.publisher.Mono; @@ -23,6 +25,8 @@ public static void main(String[] args) { final Address gatewayAddress = gateway.discovery("gateway").address(); + ServiceFactory serviceFactory2 = ScalecubeServiceFactory.fromInstances(new Service2Impl()); + Microservices service2Node = Microservices.builder() .discovery( @@ -31,9 +35,11 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) - .services(new Service2Impl()) + .serviceFactory(serviceFactory2) .startAwait(); + ServiceFactory serviceFactory1 = ScalecubeServiceFactory.fromInstances(new Service1Impl()); + Microservices service1Node = Microservices.builder() .discovery( @@ -42,11 +48,11 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) - .services(new Service1Impl()) + .serviceFactory(serviceFactory1) .startAwait(); gateway - .call() + .serviceCall() .api(Service1.class) .remoteCallThenManyDelay(100) .publishOn(Schedulers.parallel()) diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/GuiceServiceFactoryExample.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/GuiceServiceFactoryExample.java new file mode 100644 index 000000000..fa63b29de --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/GuiceServiceFactoryExample.java @@ -0,0 +1,174 @@ +package io.scalecube.services.examples.services.factory; + +import com.google.inject.AbstractModule; +import com.google.inject.Binding; +import com.google.inject.BindingAnnotation; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.Provider; +import com.google.inject.TypeLiteral; +import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceCall; +import io.scalecube.services.ServiceDefinition; +import io.scalecube.services.ServiceEndpoint; +import io.scalecube.services.ServiceFactory; +import io.scalecube.services.ServiceInfo; +import io.scalecube.services.discovery.ScalecubeServiceDiscovery; +import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; +import io.scalecube.services.examples.services.factory.service.BidiGreetingImpl; +import io.scalecube.services.examples.services.factory.service.api.BidiGreetingService; +import io.scalecube.services.examples.services.factory.service.api.GreetingsService; +import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import reactor.core.publisher.Mono; + +public class GuiceServiceFactoryExample { + + /** + * Main method. + * + * @param args - program arguments + */ + public static void main(String[] args) { + ServiceFactory serviceFactory2 = + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()); + + Microservices service2Node = + Microservices.builder() + .serviceFactory(serviceFactory2) + .discovery("s2", ScalecubeServiceDiscovery::new) + .transport(RSocketServiceTransport::new) + .startAwait(); + + ServiceFactory serviceFactory1 = new GuiceServiceFactory(new SampleModule()); + + Microservices service1Node = + Microservices.builder() + .discovery( + "s1", + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership( + cfg -> + cfg.seedMembers(service2Node.discovery("s2").address()))) + .serviceFactory(serviceFactory1) + .transport(RSocketServiceTransport::new) + .startAwait(); + + service1Node + .serviceCall() + .api(BidiGreetingService.class) + .greeting() + .log("receive |") + .log() + .log("complete |") + .block(); + + Mono.whenDelayError(service1Node.shutdown(), service2Node.shutdown()).block(); + } + + @Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE}) + @Retention(RetentionPolicy.RUNTIME) + @BindingAnnotation + @interface ScalecubeBean {} + + public static class SampleModule extends AbstractModule { + + @Override + protected void configure() { + try { + bind(GreetingsService.class) + .toProvider( + new Provider<>() { + + @Inject private Provider serviceCall; + + @Override + public GreetingsService get() { + return serviceCall.get().api(GreetingsService.class); + } + }); + Constructor constructor = + BidiGreetingImpl1.class.getConstructor(GreetingsService.class); + bind(BidiGreetingService.class) + .annotatedWith(ScalecubeBean.class) + .toConstructor(constructor); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + + @ScalecubeBean + public static class BidiGreetingImpl1 extends BidiGreetingImpl implements BidiGreetingService { + + @Inject + public BidiGreetingImpl1(GreetingsService greetingsService) { + super(greetingsService); + } + } + + public static class GuiceServiceFactory implements ServiceFactory { + + private final List modules; + private Injector injector; + private final AtomicReference lazyContext; + + private GuiceServiceFactory(Module... modules) { + this.lazyContext = new AtomicReference<>(); + this.modules = new ArrayList<>(Arrays.asList(modules)); + this.modules.add( + new AbstractModule() { + @Override + protected void configure() { + AtomicReference context = GuiceServiceFactory.this.lazyContext; + bind(MicroservicesContext.class).toProvider(context::get); + bind(ServiceCall.class).toProvider(() -> context.get().serviceCall()); + bind(ServiceEndpoint.class).toProvider(() -> context.get().serviceEndpoint()); + } + }); + } + + @Override + public Collection getServiceDefinitions() { + Injector injector = Guice.createInjector(this.modules); + this.injector = injector; + return injector.getAllBindings().keySet().stream() + .filter(key -> key.getAnnotationType() == ScalecubeBean.class) + .map(Key::getTypeLiteral) + .map(TypeLiteral::getRawType) + .map(ServiceDefinition::new) + .collect(Collectors.toList()); + } + + @Override + public Mono> initializeServices( + MicroservicesContext microservices) { + return Mono.fromCallable( + () -> { + this.lazyContext.set(microservices); + return this.injector.getAllBindings().values().stream() + .filter(binding -> binding.getKey().getAnnotationType() == ScalecubeBean.class) + .map(Binding::getProvider) + .map(Provider::get) + .map(bean -> ServiceInfo.fromServiceInstance(bean).build()) + .collect(Collectors.toList()); + }); + } + } +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/LightweightSpringServiceFactoryExample.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/LightweightSpringServiceFactoryExample.java new file mode 100644 index 000000000..ae1cf47a7 --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/LightweightSpringServiceFactoryExample.java @@ -0,0 +1,142 @@ +package io.scalecube.services.examples.services.factory; + +import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceCall; +import io.scalecube.services.ServiceDefinition; +import io.scalecube.services.ServiceFactory; +import io.scalecube.services.ServiceInfo; +import io.scalecube.services.discovery.ScalecubeServiceDiscovery; +import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; +import io.scalecube.services.examples.services.factory.service.BidiGreetingImpl; +import io.scalecube.services.examples.services.factory.service.api.BidiGreetingService; +import io.scalecube.services.examples.services.factory.service.api.GreetingsService; +import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.BeanDefinitionCustomizer; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.core.ResolvableType; +import reactor.core.publisher.Mono; + +public class LightweightSpringServiceFactoryExample { + + /** + * Main method. + * + * @param args - program arguments + */ + public static void main(String[] args) { + ServiceFactory serviceFactory2 = + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()); + + Microservices service2Node = + Microservices.builder() + .serviceFactory(serviceFactory2) + .discovery("s2", ScalecubeServiceDiscovery::new) + .transport(RSocketServiceTransport::new) + .startAwait(); + + ServiceFactory serviceFactory1 = + LightweightSpringServiceFactory.initialize( + ctx -> { + ctx.registerBean( + GreetingsService.class, + () -> ctx.getBean(ServiceCall.class).api(GreetingsService.class)); + + ctx.registerBean(BidiGreetingImpl.class, Customizers.LOCAL); + }); + + Microservices service1Node = + Microservices.builder() + .discovery( + "s1", + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership( + cfg -> + cfg.seedMembers(service2Node.discovery("s2").address()))) + .serviceFactory(serviceFactory1) + .transport(RSocketServiceTransport::new) + .startAwait(); + + service1Node + .serviceCall() + .api(BidiGreetingService.class) + .greeting() + .log("receive |") + .log() + .log("complete |") + .block(); + + Mono.whenDelayError(service1Node.shutdown(), service2Node.shutdown()).block(); + } + + private static class LightweightSpringServiceFactory implements ServiceFactory { + + private final GenericApplicationContext context; + private final List localServiceBeanNames = new ArrayList<>(); + + public LightweightSpringServiceFactory( + ApplicationContextInitializer initializer) { + this.context = new GenericApplicationContext(); + initializer.initialize(this.context); + } + + public static ServiceFactory initialize( + ApplicationContextInitializer initializer) { + return new LightweightSpringServiceFactory(initializer); + } + + @Override + public Collection getServiceDefinitions() { + return Stream.of(this.context.getBeanDefinitionNames()) + .map( + beanName -> { + BeanDefinition bd = this.context.getBeanDefinition(beanName); + bd.setAttribute("name", beanName); + return bd; + }) + .filter(Customizers.IS_LOCAL) + .peek(bd -> this.localServiceBeanNames.add((String) bd.getAttribute("name"))) + .map(BeanDefinition::getResolvableType) + .map(ResolvableType::resolve) + .map(ServiceDefinition::new) + .collect(Collectors.toList()); + } + + @Override + public Mono> initializeServices( + MicroservicesContext microservices) { + return Mono.fromCallable( + () -> { + this.context.registerBean(ServiceCall.class, microservices::serviceCall); + this.context.refresh(); + this.context.start(); + return this.localServiceBeanNames.stream() + .map(this.context::getBean) + .map(bean -> ServiceInfo.fromServiceInstance(bean).build()) + .collect(Collectors.toList()); + }); + } + + @Override + public Mono shutdownServices() { + return Mono.fromRunnable(this.context::stop); + } + } + + private static class Customizers { + + public static final BeanDefinitionCustomizer LOCAL = bd -> bd.setAttribute("_LOCAL", true); + public static final Predicate IS_LOCAL = + bd -> bd.hasAttribute("_LOCAL") && (boolean) bd.getAttribute("_LOCAL"); + } +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/SpringServiceFactoryExample.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/SpringServiceFactoryExample.java new file mode 100644 index 000000000..fb9e43289 --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/SpringServiceFactoryExample.java @@ -0,0 +1,194 @@ +package io.scalecube.services.examples.services.factory; + +import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE; + +import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; +import io.scalecube.services.Reflect; +import io.scalecube.services.ScalecubeServiceFactory; +import io.scalecube.services.ServiceCall; +import io.scalecube.services.ServiceDefinition; +import io.scalecube.services.ServiceEndpoint; +import io.scalecube.services.ServiceFactory; +import io.scalecube.services.ServiceInfo; +import io.scalecube.services.discovery.ScalecubeServiceDiscovery; +import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; +import io.scalecube.services.examples.services.factory.service.BidiGreetingImpl; +import io.scalecube.services.examples.services.factory.service.api.BidiGreetingService; +import io.scalecube.services.examples.services.factory.service.api.GreetingsService; +import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.Collection; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; +import org.springframework.core.ResolvableType; +import org.springframework.core.annotation.AliasFor; +import reactor.core.publisher.Mono; + +public class SpringServiceFactoryExample { + + /** + * Main method. + * + * @param args - program arguments + */ + public static void main(String[] args) { + ServiceFactory serviceFactory2 = + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()); + + Microservices service2Node = + Microservices.builder() + .serviceFactory(serviceFactory2) + .discovery("s2", ScalecubeServiceDiscovery::new) + .transport(RSocketServiceTransport::new) + .startAwait(); + + ServiceFactory serviceFactory1 = + AnnotatedSpringServiceFactory.configurations(ExampleConfiguration.class); + + Microservices service1Node = + Microservices.builder() + .discovery( + "s1", + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership( + cfg -> + cfg.seedMembers(service2Node.discovery("s2").address()))) + .serviceFactory(serviceFactory1) + .transport(RSocketServiceTransport::new) + .startAwait(); + + service1Node + .serviceCall() + .api(BidiGreetingService.class) + .greeting() + .log("receive |") + .log() + .log("complete |") + .block(); + + Mono.whenDelayError(service1Node.shutdown(), service2Node.shutdown()).block(); + } + + @Configuration + static class ExampleConfiguration { + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + @Bean + @Lazy + public GreetingsService greetingService(ServiceCall serviceCall) { + return serviceCall.api(GreetingsService.class); + } + + @ScalecubeBean + public BidiGreetingService bidiGreetingService(GreetingsService greetingsService) { + return new BidiGreetingImpl(greetingsService); + } + } + + @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) + @Retention(RetentionPolicy.RUNTIME) + @Bean + @Lazy + @interface ScalecubeBean { + + /** + * {@link Bean#value()}. + * + * @return name + */ + @AliasFor(annotation = Bean.class, attribute = "name") + String[] value() default {}; + + /** + * {@link Bean#name()}. + * + * @return name + */ + @AliasFor(annotation = Bean.class, attribute = "value") + String[] name() default {}; + + /** + * {@link Bean#initMethod()} ()}. + * + * @return init method + */ + @AliasFor(annotation = Bean.class, attribute = "initMethod") + String initMethod() default ""; + + /** + * {@link Bean#destroyMethod()} ()}. + * + * @return destroy method + */ + @AliasFor(annotation = Bean.class, attribute = "destroyMethod") + String destroyMethod() default AbstractBeanDefinition.INFER_METHOD; + } + + private static class AnnotatedSpringServiceFactory implements ServiceFactory { + + private final AnnotationConfigApplicationContext context; + private final Class[] configuration; + + public static ServiceFactory configurations(Class... configurations) { + return new AnnotatedSpringServiceFactory(configurations); + } + + private AnnotatedSpringServiceFactory(Class... configurations) { + this.context = new AnnotationConfigApplicationContext(); + this.configuration = configurations; + } + + @Override + public Collection getServiceDefinitions() { + this.context.register(this.configuration); + this.context.refresh(); + String[] beanNames = this.context.getBeanNamesForAnnotation(ScalecubeBean.class); + return Stream.of(beanNames) + .map(this.context::getBeanDefinition) + .map(BeanDefinition::getResolvableType) + .map(ResolvableType::resolve) + .filter(Objects::nonNull) + .filter(Reflect::isService) + .map(ServiceDefinition::new) + .collect(Collectors.toList()); + } + + @Override + public Mono> initializeServices( + MicroservicesContext microservices) { + return Mono.fromCallable( + () -> { + this.context.registerBean(MicroservicesContext.class, () -> microservices); + this.context.registerBean( + ServiceCall.class, + microservices::serviceCall, + beanDefinition -> beanDefinition.setScope(SCOPE_PROTOTYPE)); + this.context.registerBean( + ServiceEndpoint.class, + microservices::serviceEndpoint, + beanDefinition -> beanDefinition.setScope(SCOPE_PROTOTYPE)); + this.context.start(); + return this.context.getBeansWithAnnotation(ScalecubeBean.class).values().stream() + .map(bean -> ServiceInfo.fromServiceInstance(bean).build()) + .collect(Collectors.toList()); + }); + } + + @Override + public Mono shutdownServices() { + return Mono.fromRunnable(this.context::stop).then(); + } + } +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/BidiGreetingImpl.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/BidiGreetingImpl.java new file mode 100644 index 000000000..ce3d518a8 --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/BidiGreetingImpl.java @@ -0,0 +1,36 @@ +package io.scalecube.services.examples.services.factory.service; + +import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; +import io.scalecube.services.discovery.api.ServiceDiscovery; +import io.scalecube.services.examples.services.factory.service.api.BidiGreetingService; +import io.scalecube.services.examples.services.factory.service.api.Greeting; +import io.scalecube.services.examples.services.factory.service.api.GreetingsService; +import org.springframework.beans.factory.annotation.Autowired; +import reactor.core.publisher.Mono; + +/** + * Greeting is an act of communication in which human beings intentionally make their presence known + * to each other, to show attention to, and to suggest a type of relationship (usually cordial) or + * social status (formal or informal) between individuals or groups of people coming in contact with + * each other. + */ +public class BidiGreetingImpl implements BidiGreetingService { + + private final GreetingsService greetingsService; + + public BidiGreetingImpl(GreetingsService greetingsService) { + this.greetingsService = greetingsService; + } + + /** + * Call this method to be greeted by the this ScaleCube service. + * + * @return service greeting + */ + @Override + public Mono greeting() { + return this.greetingsService.sayHello("Jack").map(Greeting::message); + } + +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/GreetingServiceImpl.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/GreetingServiceImpl.java new file mode 100644 index 000000000..e330ba992 --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/GreetingServiceImpl.java @@ -0,0 +1,12 @@ +package io.scalecube.services.examples.services.factory.service; + +import io.scalecube.services.examples.services.factory.service.api.Greeting; +import io.scalecube.services.examples.services.factory.service.api.GreetingsService; +import reactor.core.publisher.Mono; + +public class GreetingServiceImpl implements GreetingsService { + @Override + public Mono sayHello(String name) { + return Mono.just(new Greeting("Nice to meet you " + name + " and welcome to ScaleCube")); + } +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/BidiGreetingService.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/BidiGreetingService.java new file mode 100644 index 000000000..e9c8f9609 --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/BidiGreetingService.java @@ -0,0 +1,13 @@ +package io.scalecube.services.examples.services.factory.service.api; + +import io.scalecube.services.annotations.Service; +import io.scalecube.services.annotations.ServiceMethod; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Service("BidiGreeting") +public interface BidiGreetingService { + + @ServiceMethod() + Mono greeting(); +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/Greeting.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/Greeting.java new file mode 100644 index 000000000..a873e3155 --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/Greeting.java @@ -0,0 +1,18 @@ +package io.scalecube.services.examples.services.factory.service.api; + +import java.io.Serializable; + +public class Greeting implements Serializable { + + String message; + + public Greeting() {} + + public Greeting(String message) { + this.message = message; + } + + public String message() { + return message; + } +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/GreetingsService.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/GreetingsService.java new file mode 100644 index 000000000..0c46a98e4 --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/services/factory/service/api/GreetingsService.java @@ -0,0 +1,23 @@ +package io.scalecube.services.examples.services.factory.service.api; + +import io.scalecube.services.annotations.Service; +import io.scalecube.services.annotations.ServiceMethod; +import reactor.core.publisher.Mono; + +/** + * Greeting is an act of communication in which human beings intentionally make their presence known + * to each other, to show attention to, and to suggest a type of relationship (usually cordial) or + * social status (formal or informal) between individuals or groups of people coming in contact with + * each other. + */ +@Service("io.scalecube.Greetings") +public interface GreetingsService { + /** + * Call this method to be greeted by the this ScaleCube service. + * + * @param name name of the caller + * @return service greeting + */ + @ServiceMethod("sayHello") + Mono sayHello(String name); +} diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 74ffc384c..813579a26 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -31,6 +31,7 @@ import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.StandardMBean; @@ -129,39 +131,50 @@ public final class Microservices { private final String id = UUID.randomUUID().toString().replace("-", ""); private final Map tags; - private final List serviceProviders; + private final ServiceFactory serviceFactory; private final ServiceRegistry serviceRegistry; private final ServiceMethodRegistry methodRegistry; - private final Authenticator authenticator; + private final Authenticator defaultAuthenticator; private final ServiceTransportBootstrap transportBootstrap; private final GatewayBootstrap gatewayBootstrap; + private final ServiceProviderErrorMapper defaultErrorMapper; + private final ServiceMessageDataDecoder defaultDataDecoder; private final CompositeServiceDiscovery compositeDiscovery; - private final ServiceProviderErrorMapper errorMapper; - private final ServiceMessageDataDecoder dataDecoder; private final String contentType; private final PrincipalMapper principalMapper; private final MonoProcessor shutdown = MonoProcessor.create(); private final MonoProcessor onShutdown = MonoProcessor.create(); + + // lazy private ServiceEndpoint serviceEndpoint; private Microservices(Builder builder) { this.tags = new HashMap<>(builder.tags); - this.serviceProviders = new ArrayList<>(builder.serviceProviders); + this.serviceRegistry = builder.serviceRegistry; this.methodRegistry = builder.methodRegistry; - this.authenticator = builder.authenticator; + this.defaultAuthenticator = builder.authenticator; this.gatewayBootstrap = builder.gatewayBootstrap; this.compositeDiscovery = builder.compositeDiscovery; this.transportBootstrap = builder.transportBootstrap; - this.errorMapper = builder.errorMapper; - this.dataDecoder = builder.dataDecoder; + this.defaultErrorMapper = builder.errorMapper; + this.defaultDataDecoder = builder.dataDecoder; this.contentType = builder.contentType; this.principalMapper = builder.principalMapper; + this.serviceFactory = + builder.serviceFactory != null + ? builder.serviceFactory + : ScalecubeServiceFactory.create(builder.serviceProviders); + // for initialization services by deprecated ServiceProvider + if (this.serviceFactory instanceof ScalecubeServiceFactory) { + ((ScalecubeServiceFactory) this.serviceFactory).setServiceCall(this::serviceCall); + } + // Setup cleanup - shutdown + this.shutdown .then(doShutdown()) - .doFinally(s -> onShutdown.onComplete()) + .doFinally(s -> this.onShutdown.onComplete()) .subscribe( null, ex -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", id, ex.toString())); } @@ -170,6 +183,11 @@ public static Builder builder() { return new Builder(); } + /** + * Unique id of this Microservices node. + * + * @return id + */ public String id() { return this.id; } @@ -184,77 +202,77 @@ private Mono start() { // Create bootstrap scheduler Scheduler scheduler = Schedulers.newSingle(toString(), true); - - return transportBootstrap + return this.transportBootstrap .start(this) + .map(ServiceTransportBootstrap::address) + .flatMap(this::initializeServiceEndpoint) + .flatMap(endpoint -> createDiscovery(this)) + .map( + serviceDiscovery -> + new MicroservicesContextImpl( + serviceDiscovery, this::serviceCall, this.serviceEndpoint)) + .flatMap(this.serviceFactory::initializeServices) + .doOnNext(this::registerInMethodRegistry) .publishOn(scheduler) - .flatMap( - transportBootstrap -> { - final ServiceCall call = call(); - final Address serviceAddress = transportBootstrap.transportAddress; - - final ServiceEndpoint.Builder serviceEndpointBuilder = - ServiceEndpoint.builder() - .id(id) - .address(serviceAddress) - .contentTypes(DataCodec.getAllContentTypes()) - .tags(tags); - - // invoke service providers and register services - List serviceInstances = - serviceProviders.stream() - .flatMap(serviceProvider -> serviceProvider.provide(call).stream()) - .peek(this::registerInMethodRegistry) - .peek( - serviceInfo -> - serviceEndpointBuilder.appendServiceRegistrations( - ServiceScanner.scanServiceInfo(serviceInfo))) - .map(ServiceInfo::serviceInstance) - .collect(Collectors.toList()); - - if (transportBootstrap == ServiceTransportBootstrap.NULL_INSTANCE - && !serviceInstances.isEmpty()) { - LOGGER.warn("[{}] ServiceTransport is not set", this.id()); - } - - serviceEndpoint = serviceEndpointBuilder.build(); - - return createDiscovery( - this, new ServiceDiscoveryOptions().serviceEndpoint(serviceEndpoint)) - .publishOn(scheduler) - .then(startGateway(new GatewayOptions().call(call))) - .publishOn(scheduler) - .then(Mono.fromCallable(() -> Injector.inject(this, serviceInstances))) - .then(Mono.fromCallable(() -> JmxMonitorMBean.start(this))) - .then(compositeDiscovery.startListen()) - .publishOn(scheduler) - .thenReturn(this); - }) + .then(startGateway()) + .publishOn(scheduler) + .then(Mono.fromCallable(() -> JmxMonitorMBean.start(this))) + .then(this.compositeDiscovery.startListen()) + .publishOn(scheduler) + .thenReturn(this) .onErrorResume( ex -> Mono.defer(this::shutdown).then(Mono.error(ex)).cast(Microservices.class)) .doOnSuccess(m -> LOGGER.info("[{}][start] Started", id)) .doOnTerminate(scheduler::dispose); } - private Mono startGateway(GatewayOptions options) { - return gatewayBootstrap.start(this, options); + private Mono createDiscovery(Microservices microservices) { + return this.compositeDiscovery.createInstance(microservices); + } + + private Mono initializeServiceEndpoint(Address serviceAddress) { + Mono> serviceDefinitionsMono = + Mono.fromCallable(this.serviceFactory::getServiceDefinitions); + return serviceDefinitionsMono + .map( + serviceDefinitions -> { + final ServiceEndpoint.Builder serviceEndpointBuilder = + ServiceEndpoint.builder() + .id(this.id) + .address(serviceAddress) + .contentTypes(DataCodec.getAllContentTypes()) + .tags(this.tags); + serviceDefinitions.forEach( + serviceDefinition -> + serviceEndpointBuilder.appendServiceRegistrations( + ServiceScanner.scanServiceDefinition(serviceDefinition))); + return serviceEndpointBuilder.build(); + }) + .doOnSuccess(serviceEndpoint -> this.serviceEndpoint = serviceEndpoint); } - private Mono createDiscovery( - Microservices microservices, ServiceDiscoveryOptions options) { - return compositeDiscovery.createInstance(microservices, options); + private void registerInMethodRegistry(Collection services) { + services.stream() + .map( + serviceInfo -> + ServiceInfo.from(serviceInfo) + .errorMapperIfAbsent(this.defaultErrorMapper) + .dataDecoderIfAbsent(this.defaultDataDecoder) + .authenticatorIfAbsent(this.defaultAuthenticator) + .principalMapperIfAbsent(this.principalMapper) + .build()) + .forEach(this.methodRegistry::registerService); } - private void registerInMethodRegistry(ServiceInfo serviceInfo) { - methodRegistry.registerService( - ServiceInfo.from(serviceInfo) - .errorMapperIfAbsent(errorMapper) - .dataDecoderIfAbsent(dataDecoder) - .authenticatorIfAbsent(authenticator) - .principalMapperIfAbsent(principalMapper) - .build()); + private Mono startGateway() { + return this.gatewayBootstrap.start(this, new GatewayOptions().call(this.serviceCall())); } + /** + * Network address of this Microservices node. + * + * @return network address + */ public Address serviceAddress() { return transportBootstrap.transportAddress; } @@ -263,27 +281,53 @@ public Address serviceAddress() { * Creates new instance {@code ServiceCall}. * * @return new {@code ServiceCall} instance. + * @deprecated use {@link this#serviceCall()} */ + @Deprecated public ServiceCall call() { + return this.serviceCall(); + } + + /** + * Creates new instance {@code ServiceCall}. + * + * @return new {@code ServiceCall} instance. + */ + public ServiceCall serviceCall() { return new ServiceCall() - .transport(transportBootstrap.clientTransport) - .serviceRegistry(serviceRegistry) - .methodRegistry(methodRegistry) - .contentType(contentType) + .transport(this.transportBootstrap.clientTransport) + .serviceRegistry(this.serviceRegistry) + .methodRegistry(this.methodRegistry) + .contentType(this.contentType) .errorMapper(DefaultErrorMapper.INSTANCE) .router(Routers.getRouter(RoundRobinServiceRouter.class)); } + /** + * List of all gateway registered in this Microservices node. + * + * @return gateways + */ public List gateways() { - return gatewayBootstrap.gateways(); + return this.gatewayBootstrap.gateways(); } + /** + * Get gateway registered in this Microservices node by id. + * + * @return gateway + */ public Gateway gateway(String id) { - return gatewayBootstrap.gateway(id); + return this.gatewayBootstrap.gateway(id); } + /** + * Service endpoint. + * + * @return service endpoint + */ public ServiceEndpoint serviceEndpoint() { - return serviceEndpoint; + return this.serviceEndpoint; } public List serviceEndpoints() { @@ -315,7 +359,7 @@ public ServiceDiscoveryContext discovery(String id) { * @return stream of {@code ServiceDiscoveryEvent}\s */ public Flux listenDiscovery() { - return compositeDiscovery.listen(); + return this.compositeDiscovery.listen(); } /** @@ -324,7 +368,7 @@ public Flux listenDiscovery() { * @return result of shutdown */ public Mono shutdown() { - return Mono.fromRunnable(shutdown::onComplete).then(onShutdown); + return Mono.fromRunnable(this.shutdown::onComplete).then(this.onShutdown); } /** @@ -333,7 +377,7 @@ public Mono shutdown() { * @return signal of when shutdown completed */ public Mono onShutdown() { - return onShutdown; + return this.onShutdown; } private Mono doShutdown() { @@ -341,26 +385,24 @@ private Mono doShutdown() { () -> { LOGGER.info("[{}][doShutdown] Shutting down", id); return Mono.whenDelayError( - processBeforeDestroy(), - compositeDiscovery.shutdown(), - gatewayBootstrap.shutdown(), - transportBootstrap.shutdown()) + processBeforeDestroy(), + compositeDiscovery.shutdown(), + gatewayBootstrap.shutdown(), + transportBootstrap.shutdown()) .doOnSuccess(s -> LOGGER.info("[{}][doShutdown] Shutdown", id)); }); } private Mono processBeforeDestroy() { - return Mono.whenDelayError( - methodRegistry.listServices().stream() - .map(ServiceInfo::serviceInstance) - .map(s -> Mono.fromRunnable(() -> Injector.processBeforeDestroy(this, s))) - .collect(Collectors.toList())); + return this.serviceFactory.shutdownServices().then(); } public static final class Builder { private Map tags = new HashMap<>(); + private final List services = new ArrayList<>(); private final List serviceProviders = new ArrayList<>(); + private ServiceFactory serviceFactory; private ServiceRegistry serviceRegistry = new ServiceRegistryImpl(); private ServiceMethodRegistry methodRegistry = new ServiceMethodRegistryImpl(); private Authenticator authenticator = new DelegatingAuthenticator(); @@ -374,7 +416,23 @@ public static final class Builder { private String contentType = ServiceMessage.DEFAULT_DATA_FORMAT; private PrincipalMapper principalMapper = authData -> authData; + private void build() { + ServiceProvider serviceProvider = + this.services.stream() + .map( + service -> + ServiceInfo.from(service) + .dataDecoderIfAbsent(this.dataDecoder) + .errorMapperIfAbsent(this.errorMapper) + .build()) + .collect( + Collectors.collectingAndThen( + Collectors.toList(), services -> (ServiceProvider) call -> services)); + this.serviceProviders.add(serviceProvider); + } + public Mono start() { + build(); return Mono.defer(() -> new Microservices(this).start()); } @@ -382,32 +440,61 @@ public Microservices startAwait() { return start().block(); } + /** + * Adds service instance to microservice. + * + *

WARNING This service will be ignored if custom {@link ServiceFactory} is + * installed. This method has been left for backward compatibility only and will be removed in + * future releases. + * + * @param services service info instance. + * @return builder + * @deprecated use {@link this#serviceFactory(ServiceFactory)} + */ + @Deprecated public Builder services(ServiceInfo... services) { - serviceProviders.add(call -> Arrays.stream(services).collect(Collectors.toList())); + this.services.addAll(Arrays.asList(services)); return this; } /** * Adds service instance to microservice. * + *

WARNING This service will be ignored if custom {@link ServiceFactory} is + * installed. This method has been left for backward compatibility only and will be removed in + * future releases. + * * @param services service instance. * @return builder + * @deprecated use {@link this#serviceFactory(ServiceFactory)} */ + @Deprecated public Builder services(Object... services) { - serviceProviders.add( - call -> - Arrays.stream(services) - .map( - s -> - s instanceof ServiceInfo - ? (ServiceInfo) s - : ServiceInfo.fromServiceInstance(s).build()) - .collect(Collectors.toList())); + Stream.of(services) + .map(service -> ServiceInfo.fromServiceInstance(service).build()) + .forEach(this.services::add); return this; } + /** + * Set up service provider. + * + *

WARNING This service will be ignored if custom {@link ServiceFactory} is + * installed. This method has been left for backward compatibility only and will be removed in + * future releases. + * + * @param serviceProvider - old service provider + * @return this + * @deprecated use {@link this#serviceFactory(ServiceFactory)} + */ + @Deprecated public Builder services(ServiceProvider serviceProvider) { - serviceProviders.add(serviceProvider); + this.serviceProviders.add(serviceProvider); + return this; + } + + public Builder serviceFactory(ServiceFactory serviceFactory) { + this.serviceFactory = serviceFactory; return this; } @@ -553,30 +640,28 @@ private CompositeServiceDiscovery addOperator(UnaryOperator createInstance( - Microservices microservices, ServiceDiscoveryOptions options) { + private Mono createInstance(Microservices microservices) { this.microservices = microservices; this.scheduler = Schedulers.newSingle("composite-discovery", true); + ServiceDiscoveryOptions discoveryOptions = new ServiceDiscoveryOptions(); for (UnaryOperator operator : this.optionOperators) { - - final ServiceDiscoveryOptions finalOptions = operator.apply(options); + final ServiceDiscoveryOptions finalOptions = operator.apply(discoveryOptions); final String id = finalOptions.id(); - final ServiceEndpoint serviceEndpoint = finalOptions.serviceEndpoint(); final ServiceDiscovery serviceDiscovery = - finalOptions.discoveryFactory().createServiceDiscovery(serviceEndpoint); + finalOptions.discoveryFactory().createServiceDiscovery(microservices.serviceEndpoint()); - discoveryInstances.put(id, serviceDiscovery); + this.discoveryInstances.put(id, serviceDiscovery); - discoveryContexts.put( + this.discoveryContexts.put( id, ServiceDiscoveryContext.builder() .id(id) .address(Address.NULL_ADDRESS) .discovery(serviceDiscovery) .serviceRegistry(microservices.serviceRegistry) - .scheduler(scheduler) + .scheduler(this.scheduler) .build()); } @@ -585,28 +670,28 @@ private Mono createInstance( private Mono startListen() { return start() // start composite discovery - .doOnSubscribe(s -> LOGGER.info("[{}][startListen] Starting", microservices.id())) - .doOnSuccess(avoid -> LOGGER.info("[{}][startListen] Started", microservices.id())) + .doOnSubscribe(s -> LOGGER.info("[{}][startListen] Starting", this.microservices.id())) + .doOnSuccess(avoid -> LOGGER.info("[{}][startListen] Started", this.microservices.id())) .doOnError( ex -> LOGGER.error( "[{}][startListen] Exception occurred: {}", - microservices.id(), + this.microservices.id(), ex.toString())); } @Override public Flux listen() { - return Flux.fromStream(microservices.serviceRegistry.listServiceEndpoints().stream()) + return Flux.fromStream(this.microservices.serviceRegistry.listServiceEndpoints().stream()) .map(ServiceDiscoveryEvent::newEndpointAdded) - .concatWith(subject) - .subscribeOn(scheduler) - .publishOn(scheduler); + .concatWith(this.subject) + .subscribeOn(this.scheduler) + .publishOn(this.scheduler); } @Override public Mono start() { - return Flux.fromIterable(discoveryInstances.entrySet()) + return Flux.fromIterable(this.discoveryInstances.entrySet()) .flatMap( entry -> { final String id = entry.getKey(); @@ -689,20 +774,20 @@ private Mono start(Microservices microservices, GatewayOptions s -> LOGGER.info( "[{}][gateway][{}][start] Starting", - microservices.id(), + microservices.id, gateway.id())) .doOnSuccess( gateway1 -> LOGGER.info( "[{}][gateway][{}][start] Started, address: {}", - microservices.id(), + microservices.id, gateway1.id(), gateway1.address())) .doOnError( ex -> LOGGER.error( "[{}][gateway][{}][start] Exception occurred: {}", - microservices.id(), + microservices.id, gateway.id(), ex.toString())); }) @@ -746,14 +831,14 @@ public ServiceTransportBootstrap(Supplier transportSupplier) { } private Mono start(Microservices microservices) { - if (transportSupplier == NULL_SUPPLIER - || (serviceTransport = transportSupplier.get()) == null) { + if (this.transportSupplier == NULL_SUPPLIER + || (this.serviceTransport = this.transportSupplier.get()) == null) { return Mono.just(NULL_INSTANCE); } - return serviceTransport + return this.serviceTransport .start() - .doOnSuccess(transport -> serviceTransport = transport) // reset self + .doOnSuccess(transport -> this.serviceTransport = transport) // reset self .flatMap( transport -> serviceTransport.serverTransport().bind(microservices.methodRegistry)) .doOnSuccess(transport -> serverTransport = transport) @@ -767,31 +852,35 @@ private Mono start(Microservices microservices) { return this; }) .doOnSubscribe( - s -> LOGGER.info("[{}][serviceTransport][start] Starting", microservices.id())) + s -> LOGGER.info("[{}][serviceTransport][start] Starting", microservices.id)) .doOnSuccess( transport -> LOGGER.info( "[{}][serviceTransport][start] Started, address: {}", - microservices.id(), + microservices.id, this.transportAddress)) .doOnError( ex -> LOGGER.error( "[{}][serviceTransport][start] Exception occurred: {}", - microservices.id(), + microservices.id, ex.toString())); } + public Address address() { + return this.transportAddress; + } + private Mono shutdown() { return Mono.defer( () -> Flux.concatDelayError( - Optional.ofNullable(serverTransport) - .map(ServerTransport::stop) - .orElse(Mono.empty()), - Optional.ofNullable(serviceTransport) - .map(ServiceTransport::stop) - .orElse(Mono.empty())) + Optional.ofNullable(this.serverTransport) + .map(ServerTransport::stop) + .orElse(Mono.empty()), + Optional.ofNullable(this.serviceTransport) + .map(ServiceTransport::stop) + .orElse(Mono.empty())) .then()); } } @@ -859,7 +948,7 @@ private static String asString(ServiceMethodInvoker invoker) { .add("methodInfo=" + asString(invoker.methodInfo())) .add( "serviceMethod=" - + invoker.service().getClass().getCanonicalName() + + invoker.service() + "." + invoker.methodInfo().methodName() + "(" @@ -882,4 +971,47 @@ private static String asString(ServiceInfo serviceInfo) { .toString(); } } + + private static final class MicroservicesContextImpl implements MicroservicesContext { + + private final CompositeServiceDiscovery serviceDiscovery; + private final Supplier serviceCallSupplier; + private final ServiceEndpoint serviceEndpoint; + + private MicroservicesContextImpl( + CompositeServiceDiscovery serviceDiscovery, + Supplier serviceCallSupplier, + ServiceEndpoint serviceEndpoint) { + this.serviceDiscovery = serviceDiscovery; + this.serviceCallSupplier = serviceCallSupplier; + this.serviceEndpoint = serviceEndpoint; + } + + @Override + public ServiceEndpoint serviceEndpoint() { + return this.serviceEndpoint; + } + + @Override + public ServiceCall serviceCall() { + return this.serviceCallSupplier.get(); + } + + @Override + public Flux listenDiscovery() { + return this.serviceDiscovery.listen(); + } + + /** + * {@inheritDoc} + * + * @see Microservices.Builder#discovery(String, ServiceDiscoveryFactory) + */ + @Override + public Flux listenDiscovery(String id) { + return Optional.ofNullable(this.serviceDiscovery.discoveryInstances.get(id)) + .map(ServiceDiscovery::listen) + .orElseThrow(() -> new NoSuchElementException("[discovery] id: " + id)); + } + } } diff --git a/services/src/main/java/io/scalecube/services/ScalecubeServiceFactory.java b/services/src/main/java/io/scalecube/services/ScalecubeServiceFactory.java new file mode 100644 index 000000000..4f444f835 --- /dev/null +++ b/services/src/main/java/io/scalecube/services/ScalecubeServiceFactory.java @@ -0,0 +1,150 @@ +package io.scalecube.services; + +import io.scalecube.services.inject.Injector; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import reactor.core.publisher.Mono; + +public class ScalecubeServiceFactory implements ServiceFactory { + + private final Supplier> serviceFactory; + + // lazy init + private final AtomicReference> services = new AtomicReference<>(); + private Supplier serviceCallSupplier; + // lazy init + private MicroservicesContext microserviceContext; + + private ScalecubeServiceFactory(Collection serviceProviders) { + this.serviceFactory = + () -> { + final ServiceCall serviceCall = this.serviceCallSupplier.get(); + return serviceProviders.stream() + .map(serviceProvider -> serviceProvider.provide(serviceCall)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + }; + } + + /** + * Create the instance from {@link ServiceProvider}. + * + * @param serviceProviders old service providers. + * @return default services factory. + * @deprecated use {@link this#fromInstances(Object...)} + */ + public static ServiceFactory create(Collection serviceProviders) { + return new ScalecubeServiceFactory(serviceProviders); + } + + /** + * Create the instance {@link ServiceFactory} with pre-installed services. + * + * @param services user's services + * @return service factory + */ + public static ServiceFactory fromInstances(Object... services) { + ServiceProvider provider = + call -> + Stream.of(services) + .map( + service -> { + ServiceInfo.Builder builder; + if (service instanceof ServiceInfo) { + builder = ServiceInfo.from((ServiceInfo) service); + } else { + builder = ServiceInfo.fromServiceInstance(service); + } + return builder.build(); + }) + .collect(Collectors.toList()); + return new ScalecubeServiceFactory(Collections.singleton(provider)); + } + + /** + * Since the service instance factory ({@link ServiceProvider}) we have to leave behind does not + * provide us with information about the types of services produced, there is nothing left for us + * to do but start creating all the services and then retrieve the type of service, previously + * saving it as a {@link ScalecubeServiceFactory} state. + * + *

{@inheritDoc} + * + *

Use {@link io.scalecube.services.annotations.Inject} for inject {@link Microservices}, + * {@link io.scalecube.services.ServiceCall}. + * + * @see ServiceInfo + * @see ServiceDefinition + * @return + */ + @Override + public Collection getServiceDefinitions() { + return this.services().stream() + .map( + serviceInfo -> + new ServiceDefinition(serviceInfo.serviceInstance().getClass(), serviceInfo.tags())) + .collect(Collectors.toList()); + } + + /** + * {@inheritDoc} + * + *

Use {@link io.scalecube.services.annotations.AfterConstruct} for initialization service's + * instance. + * + * @param microservices microservices context + */ + @Override + public Mono> initializeServices( + MicroservicesContext microservices) { + return Mono.fromCallable( + () -> { + this.microserviceContext = microservices; + return this.services().stream() + .map(service -> Injector.inject(microservices, service)) + .map(service -> Injector.processAfterConstruct(microservices, service)) + .collect(Collectors.toList()); + }); + } + + /** + * {@inheritDoc} + * + *

Use {@link io.scalecube.services.annotations.BeforeDestroy} for finilization service's + * instance. + * + * @return + */ + @Override + public Mono shutdownServices() { + return Mono.fromRunnable(() -> shutdown0(this.microserviceContext)); + } + + private void shutdown0(MicroservicesContext microservices) { + if (this.services.get() != null) { + this.services.get().forEach(service -> Injector.processBeforeDestroy(microservices, service)); + } + } + + private Collection services() { + return this.services.updateAndGet( + currentValue -> currentValue == null ? this.serviceFactory.get() : currentValue); + } + + /** + * Setting serviceCall supplier. + * + * @param serviceCallSupplier lazy serviceCall initialization function + * @return current instance scalecube service factory + * @deprecated see reason in {@link ServiceProvider} + */ + @Deprecated + ScalecubeServiceFactory setServiceCall(Supplier serviceCallSupplier) { + this.serviceCallSupplier = serviceCallSupplier; + return this; + } +} diff --git a/services/src/main/java/io/scalecube/services/Injector.java b/services/src/main/java/io/scalecube/services/inject/Injector.java similarity index 54% rename from services/src/main/java/io/scalecube/services/Injector.java rename to services/src/main/java/io/scalecube/services/inject/Injector.java index c7433dbfe..f4dbc5a8b 100644 --- a/services/src/main/java/io/scalecube/services/Injector.java +++ b/services/src/main/java/io/scalecube/services/inject/Injector.java @@ -1,5 +1,9 @@ -package io.scalecube.services; +package io.scalecube.services.inject; +import io.scalecube.services.MicroservicesContext; +import io.scalecube.services.Reflect; +import io.scalecube.services.ServiceCall; +import io.scalecube.services.ServiceInfo; import io.scalecube.services.annotations.AfterConstruct; import io.scalecube.services.annotations.BeforeDestroy; import io.scalecube.services.annotations.Inject; @@ -8,40 +12,40 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Arrays; -import java.util.Collection; import reactor.core.Exceptions; /** Service Injector scan and injects beans to a given Microservices instance. */ -final class Injector { +public final class Injector { private Injector() { // Do not instantiate } /** - * Inject instances to the microservices instance. either Microservices or ServiceProxy. Scan all + * Inject instance to the microservices instance. either Microservices or ServiceProxy. Scan all * local service instances and inject a service proxy. * * @param microservices microservices instance - * @param services services set - * @return microservices instance + * @param service service + * @return service instance */ - public static Microservices inject(Microservices microservices, Collection services) { - services.forEach( - service -> - Arrays.stream(service.getClass().getDeclaredFields()) - .forEach(field -> injectField(microservices, field, service))); - services.forEach(service -> processAfterConstruct(microservices, service)); - return microservices; + public static ServiceInfo inject( + MicroservicesContext microservices, ServiceInfo service) { + Object serviceInstance = service.serviceInstance(); + Arrays.stream(serviceInstance.getClass().getDeclaredFields()) + .forEach(field -> injectField(microservices, field, serviceInstance)); + return service; } - private static void injectField(Microservices microservices, Field field, Object service) { - if (field.isAnnotationPresent(Inject.class) && field.getType().equals(Microservices.class)) { + private static void injectField( + MicroservicesContext microservices, Field field, Object service) { + if (field.isAnnotationPresent(Inject.class) + && MicroservicesContext.class.isAssignableFrom(field.getType())) { setField(field, service, microservices); } else if (field.isAnnotationPresent(Inject.class) && Reflect.isService(field.getType())) { Inject injection = field.getAnnotation(Inject.class); Class routerClass = injection.router(); - final ServiceCall call = microservices.call(); + final ServiceCall call = microservices.serviceCall(); if (!routerClass.isInterface()) { call.router(routerClass); } @@ -59,16 +63,26 @@ private static void setField(Field field, Object object, Object value) { } } - private static void processAfterConstruct(Microservices microservices, Object targetInstance) { - processMethodWithAnnotation(microservices, targetInstance, AfterConstruct.class); + /** + * Run methods with {@link AfterConstruct} annotations. + * + * @param microservices scale cube instance. + * @param serviceInfo service info. + * @return service info with modified service's instance. + */ + public static ServiceInfo processAfterConstruct( + MicroservicesContext microservices, ServiceInfo serviceInfo) { + processMethodWithAnnotation(microservices, serviceInfo.serviceInstance(), AfterConstruct.class); + return serviceInfo; } - public static void processBeforeDestroy(Microservices microservices, Object targetInstance) { - processMethodWithAnnotation(microservices, targetInstance, BeforeDestroy.class); + public static void processBeforeDestroy( + MicroservicesContext microservices, ServiceInfo serviceInfo) { + processMethodWithAnnotation(microservices, serviceInfo.serviceInstance(), BeforeDestroy.class); } private static void processMethodWithAnnotation( - Microservices microservices, Object targetInstance, Class annotation) { + MicroservicesContext microservices, Object targetInstance, Class annotation) { Method[] declaredMethods = targetInstance.getClass().getDeclaredMethods(); Arrays.stream(declaredMethods) .filter(method -> method.isAnnotationPresent(annotation)) @@ -80,10 +94,10 @@ private static void processMethodWithAnnotation( Arrays.stream(targetMethod.getParameters()) .map( mapper -> { - if (mapper.getType().equals(Microservices.class)) { + if (MicroservicesContext.class.isAssignableFrom(mapper.getType())) { return microservices; } else if (Reflect.isService(mapper.getType())) { - return microservices.call().api(mapper.getType()); + return microservices.serviceCall().api(mapper.getType()); } else { return null; } diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java index 91b94a8ba..eb63a578f 100644 --- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java +++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java @@ -37,7 +37,7 @@ public static void initNodes() { new ScalecubeServiceDiscovery(endpoint) .transport(cfg -> cfg.port(PORT.incrementAndGet()))) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); final Address seedAddress = provider.discovery("provider").address(); @@ -64,7 +64,7 @@ public static void shutdownNodes() { public void testCorruptedRequest() { Publisher req = consumer - .call() + .serviceCall() .requestOne(TestRequests.GREETING_CORRUPTED_PAYLOAD_REQUEST, GreetingResponse.class); assertThrows(InternalServiceException.class, () -> from(req).block()); } @@ -73,7 +73,7 @@ public void testCorruptedRequest() { public void testNotAuthorized() { Publisher req = consumer - .call() + .serviceCall() .requestOne(TestRequests.GREETING_UNAUTHORIZED_REQUEST, GreetingResponse.class); assertThrows(ForbiddenException.class, () -> from(req).block()); } @@ -81,13 +81,15 @@ public void testNotAuthorized() { @Test public void testNullRequestPayload() { Publisher req = - consumer.call().requestOne(TestRequests.GREETING_NULL_PAYLOAD, GreetingResponse.class); + consumer + .serviceCall() + .requestOne(TestRequests.GREETING_NULL_PAYLOAD, GreetingResponse.class); assertThrows(BadRequestException.class, () -> from(req).block()); } @Test public void testServiceUnavailable() { - StepVerifier.create(consumer.call().requestOne(TestRequests.NOT_FOUND_REQ)) + StepVerifier.create(consumer.serviceCall().requestOne(TestRequests.NOT_FOUND_REQ)) .expectError(ServiceUnavailableException.class) .verify(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java index 5376c73d5..37072970a 100644 --- a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java @@ -98,7 +98,7 @@ static void afterAll() { @DisplayName("Successful authentication") void successfulAuthentication() { SecuredService securedService = - caller.call().credentials(CREDENTIALS).api(SecuredService.class); + caller.serviceCall().credentials(CREDENTIALS).api(SecuredService.class); StepVerifier.create(securedService.helloWithRequest("Bob")) .assertNext(response -> assertEquals("Hello, Bob", response)) @@ -127,7 +127,7 @@ void failedAuthenticationWhenAuthenticatorNotProvided() { .startAwait(); SecuredService securedService = - caller.call().credentials(CREDENTIALS).api(SecuredService.class); + caller.serviceCall().credentials(CREDENTIALS).api(SecuredService.class); Consumer verifyError = th -> { @@ -153,7 +153,7 @@ void failedAuthenticationWhenAuthenticatorNotProvided() { @Test @DisplayName("Authentication failed with invalid or empty credentials") void failedAuthenticationWithInvalidOrEmptyCredentials() { - SecuredService securedService = caller.call().api(SecuredService.class); + SecuredService securedService = caller.serviceCall().api(SecuredService.class); Consumer verifyError = th -> { @@ -189,7 +189,7 @@ void successfulAuthenticationOnPartiallySecuredService() { .startAwait(); PartiallySecuredService proxy = - caller.call().credentials(CREDENTIALS).api(PartiallySecuredService.class); + caller.serviceCall().credentials(CREDENTIALS).api(PartiallySecuredService.class); StepVerifier.create(proxy.securedMethod("Alice")) .assertNext(response -> assertEquals("Hello, Alice", response)) @@ -211,7 +211,8 @@ void successfulCallOfPublicMethodWithoutAuthentication() { .build()) .startAwait(); - PartiallySecuredService proxy = caller.call().api(PartiallySecuredService.class); + PartiallySecuredService proxy = + caller.serviceCall().api(PartiallySecuredService.class); StepVerifier.create(proxy.publicMethod("Alice")) .assertNext(response -> assertEquals("Hello, Alice", response)) diff --git a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java index 44f26a202..5c02e16ec 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java @@ -52,7 +52,7 @@ public static void tearDown() { @Test public void test_local_async_no_params() { - ServiceCall serviceCall = provider.call().router(RoundRobinServiceRouter.class); + ServiceCall serviceCall = provider.serviceCall().router(RoundRobinServiceRouter.class); // call the service. Publisher future = serviceCall.requestOne(GREETING_NO_PARAMS_REQUEST); @@ -67,27 +67,27 @@ private static Microservices serviceProvider() { return Microservices.builder() .discovery("serviceProvider", ScalecubeServiceDiscovery::new) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); } @Test public void test_local_void_greeting() { // WHEN - provider.call().oneWay(GREETING_VOID_REQ).block(Duration.ofSeconds(TIMEOUT)); + provider.serviceCall().oneWay(GREETING_VOID_REQ).block(Duration.ofSeconds(TIMEOUT)); } @Test public void test_local_failng_void_greeting() { - StepVerifier.create(provider.call().oneWay(GREETING_FAILING_VOID_REQ)) + StepVerifier.create(provider.serviceCall().oneWay(GREETING_FAILING_VOID_REQ)) .expectErrorMessage(GREETING_FAILING_VOID_REQ.data().toString()) .verify(Duration.ofSeconds(TIMEOUT)); } @Test public void test_local_throwing_void_greeting() { - StepVerifier.create(provider.call().oneWay(GREETING_THROWING_VOID_REQ)) + StepVerifier.create(provider.serviceCall().oneWay(GREETING_THROWING_VOID_REQ)) .expectErrorMessage(GREETING_THROWING_VOID_REQ.data().toString()) .verify(Duration.ofSeconds(TIMEOUT)); } @@ -98,7 +98,7 @@ public void test_local_fail_greeting() { Throwable exception = assertThrows( ServiceException.class, - () -> Mono.from(provider.call().requestOne(GREETING_FAIL_REQ)).block(timeout)); + () -> Mono.from(provider.serviceCall().requestOne(GREETING_FAIL_REQ)).block(timeout)); assertEquals("GreetingRequest{name='joe'}", exception.getMessage()); } @@ -109,14 +109,15 @@ public void test_local_exception_greeting() { Throwable exception = assertThrows( ServiceException.class, - () -> Mono.from(provider.call().requestOne(GREETING_ERROR_REQ)).block(timeout)); + () -> Mono.from(provider.serviceCall().requestOne(GREETING_ERROR_REQ)).block(timeout)); } @Test public void test_local_async_greeting_return_GreetingResponse() { // When - Publisher resultFuture = provider.call().requestOne(GREETING_REQUEST_REQ); + Publisher resultFuture = + provider.serviceCall().requestOne(GREETING_REQUEST_REQ); // Then ServiceMessage result = Mono.from(resultFuture).block(Duration.ofSeconds(TIMEOUT)); @@ -128,7 +129,7 @@ public void test_local_async_greeting_return_GreetingResponse() { @Test public void test_local_greeting_request_timeout_expires() { - ServiceCall service = provider.call(); + ServiceCall service = provider.serviceCall(); // call the service. Publisher future = service.requestOne(GREETING_REQUEST_TIMEOUT_REQ); @@ -141,7 +142,7 @@ public void test_local_greeting_request_timeout_expires() { @Test public void test_local_async_greeting_return_Message() { - ServiceMessage result = provider.call().requestOne(GREETING_REQUEST_REQ).block(timeout); + ServiceMessage result = provider.serviceCall().requestOne(GREETING_REQUEST_REQ).block(timeout); // print the greeting. GreetingResponse responseData = result.data(); @@ -153,9 +154,9 @@ public void test_local_async_greeting_return_Message() { @Test public void test_remote_mono_empty_request_response_greeting_messsage() { StepVerifier.create( - provider - .call() - .requestOne(GREETING_EMPTY_REQUEST_RESPONSE, EmptyGreetingResponse.class)) + provider + .serviceCall() + .requestOne(GREETING_EMPTY_REQUEST_RESPONSE, EmptyGreetingResponse.class)) .expectNextMatches(resp -> resp.data() instanceof EmptyGreetingResponse) .expectComplete() .verify(timeout); @@ -164,7 +165,7 @@ public void test_remote_mono_empty_request_response_greeting_messsage() { @Test public void test_async_greeting_return_string_service_not_found_error_case() { - ServiceCall service = provider.call(); + ServiceCall service = provider.serviceCall(); try { // call the service. diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index 09bb1456b..e2ef767bc 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -72,14 +72,14 @@ private static Microservices serviceProvider(Object service) { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(gateway.discovery("gateway").address()))) .transport(RSocketServiceTransport::new) - .services(service) + .serviceFactory(ScalecubeServiceFactory.fromInstances(service)) .startAwait(); } @Test public void test_remote_async_greeting_no_params() { - ServiceCall serviceCall = gateway.call(); + ServiceCall serviceCall = gateway.serviceCall(); // call the service. Publisher future = @@ -93,13 +93,17 @@ public void test_remote_async_greeting_no_params() { @Test public void test_remote_void_greeting() { // When - StepVerifier.create(gateway.call().oneWay(GREETING_VOID_REQ)).expectComplete().verify(TIMEOUT); + StepVerifier.create(gateway.serviceCall().oneWay(GREETING_VOID_REQ)) + .expectComplete() + .verify(TIMEOUT); } @Test public void test_remote_mono_empty_request_response_greeting_messsage() { StepVerifier.create( - gateway.call().requestOne(GREETING_EMPTY_REQUEST_RESPONSE, EmptyGreetingResponse.class)) + gateway + .serviceCall() + .requestOne(GREETING_EMPTY_REQUEST_RESPONSE, EmptyGreetingResponse.class)) .expectNextMatches(resp -> resp.data() instanceof EmptyGreetingResponse) .expectComplete() .verify(TIMEOUT); @@ -109,7 +113,7 @@ public void test_remote_mono_empty_request_response_greeting_messsage() { public void test_remote_failing_void_greeting() { // When - StepVerifier.create(gateway.call().requestOne(GREETING_FAILING_VOID_REQ, Void.class)) + StepVerifier.create(gateway.serviceCall().requestOne(GREETING_FAILING_VOID_REQ, Void.class)) .expectErrorMessage(GREETING_FAILING_VOID_REQ.data().toString()) .verify(TIMEOUT); } @@ -117,7 +121,7 @@ public void test_remote_failing_void_greeting() { @Test public void test_remote_throwing_void_greeting() { // When - StepVerifier.create(gateway.call().oneWay(GREETING_THROWING_VOID_REQ)) + StepVerifier.create(gateway.serviceCall().oneWay(GREETING_THROWING_VOID_REQ)) .expectErrorMessage(GREETING_THROWING_VOID_REQ.data().toString()) .verify(TIMEOUT); } @@ -129,7 +133,8 @@ public void test_remote_fail_greeting() { assertThrows( ServiceException.class, () -> - Mono.from(gateway.call().requestOne(GREETING_FAIL_REQ, GreetingResponse.class)) + Mono.from( + gateway.serviceCall().requestOne(GREETING_FAIL_REQ, GreetingResponse.class)) .block(TIMEOUT)); assertEquals("GreetingRequest{name='joe'}", exception.getMessage()); } @@ -142,7 +147,10 @@ public void test_remote_exception_void() { assertThrows( ServiceException.class, () -> - Mono.from(gateway.call().requestOne(GREETING_ERROR_REQ, GreetingResponse.class)) + Mono.from( + gateway + .serviceCall() + .requestOne(GREETING_ERROR_REQ, GreetingResponse.class)) .block(TIMEOUT)); assertEquals("GreetingRequest{name='joe'}", exception.getMessage()); } @@ -150,7 +158,8 @@ public void test_remote_exception_void() { @Test public void test_remote_async_greeting_return_string() { - Publisher resultFuture = gateway.call().requestOne(GREETING_REQ, String.class); + Publisher resultFuture = + gateway.serviceCall().requestOne(GREETING_REQ, String.class); // Then ServiceMessage result = Mono.from(resultFuture).block(TIMEOUT); @@ -164,7 +173,7 @@ public void test_remote_async_greeting_return_GreetingResponse() { // When Publisher result = - gateway.call().requestOne(GREETING_REQUEST_REQ, GreetingResponse.class); + gateway.serviceCall().requestOne(GREETING_REQUEST_REQ, GreetingResponse.class); // Then GreetingResponse greeting = Mono.from(result).block(TIMEOUT).data(); @@ -174,7 +183,7 @@ public void test_remote_async_greeting_return_GreetingResponse() { @Test public void test_remote_greeting_request_timeout_expires() { - ServiceCall service = gateway.call(); + ServiceCall service = gateway.serviceCall(); // call the service. Publisher future = service.requestOne(GREETING_REQUEST_TIMEOUT_REQ); @@ -186,7 +195,7 @@ public void test_remote_greeting_request_timeout_expires() { // Since here and below tests were not reviewed [sergeyr] @Test public void test_remote_async_greeting_return_Message() { - ServiceCall service = gateway.call(); + ServiceCall service = gateway.serviceCall(); // call the service. Publisher future = service.requestOne(GREETING_REQUEST_REQ); @@ -206,7 +215,7 @@ public void test_remote_async_greeting_return_Message() { public void test_remote_dispatcher_remote_greeting_request_completes_before_timeout() { Publisher result = - gateway.call().requestOne(GREETING_REQUEST_REQ, GreetingResponse.class); + gateway.serviceCall().requestOne(GREETING_REQUEST_REQ, GreetingResponse.class); GreetingResponse greetings = Mono.from(result).block(TIMEOUT).data(); System.out.println("greeting_request_completes_before_timeout : " + greetings.getResult()); @@ -218,7 +227,7 @@ public void test_service_address_lookup_occur_only_after_subscription() { Flux quotes = gateway - .call() + .serviceCall() .requestMany( ServiceMessage.builder() .qualifier(QuoteService.NAME, "onlyOneAndThenNever") @@ -240,7 +249,7 @@ public void test_service_address_lookup_occur_only_after_subscription() { @Disabled("https://github.com/scalecube/scalecube-services/issues/742") public void test_many_stream_block_first() { - ServiceCall call = gateway.call(); + ServiceCall call = gateway.serviceCall(); ServiceMessage request = TestRequests.GREETING_MANY_STREAM_30; diff --git a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java index 4470e08e3..088603128 100644 --- a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java @@ -9,8 +9,10 @@ import io.scalecube.services.sut.GreetingResponse; import io.scalecube.services.sut.GreetingService; import io.scalecube.services.sut.GreetingServiceImpl; + import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,7 +29,10 @@ public class ServiceLocalTest extends BaseTest { @BeforeEach public void setUp() { - microservices = Microservices.builder().services(new GreetingServiceImpl()).startAwait(); + microservices = + Microservices.builder() + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) + .startAwait(); } @AfterEach @@ -39,7 +44,7 @@ public void cleanUp() { @Test public void test_local_greeting_request_completes_before_timeout() { - GreetingService service = microservices.call().api(GreetingService.class); + GreetingService service = microservices.serviceCall().api(GreetingService.class); // call the service. GreetingResponse result = @@ -209,7 +214,7 @@ void test_local_greeting_message() { .verify(timeout); // using serviceCall directly - ServiceCall serviceCall = microservices.call(); + ServiceCall serviceCall = microservices.serviceCall(); StepVerifier.create( serviceCall.requestOne( @@ -305,6 +310,6 @@ public void test_local_bidi_greeting_expect_GreetingResponse() { } private GreetingService createProxy(Microservices gateway) { - return gateway.call().api(GreetingService.class); // create proxy for GreetingService API + return gateway.serviceCall().api(GreetingService.class); // create proxy for GreetingService API } } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java index 8a5c044d8..e36ad9d99 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java @@ -58,14 +58,14 @@ public void test_added_removed_registration_events(MetadataCodec metadataCodec) Microservices.builder() .discovery("ms1", defServiceDiscovery(seedAddress, metadataCodec)) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); Microservices ms2 = Microservices.builder() .discovery("ms2", defServiceDiscovery(seedAddress, metadataCodec)) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); StepVerifier.create(events) @@ -92,7 +92,7 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) { Microservices.builder() .discovery("seed", defServiceDiscovery(metadataCodec)) .transport(RSocketServiceTransport::new) - .services(new AnnotationServiceImpl()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new AnnotationServiceImpl())) .startAwait(); cluster.add(seed); @@ -107,7 +107,8 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) { Microservices.builder() .discovery("ms1", defServiceDiscovery(seedAddress, metadataCodec)) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory( + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); cluster.add(ms1); }) @@ -118,21 +119,25 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) { Microservices.builder() .discovery("ms2", defServiceDiscovery(seedAddress, metadataCodec)) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory( + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); cluster.add(ms2); }) .assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type())) .then(() -> cluster.remove(2).shutdown().block(TIMEOUT)) .assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type())) + .thenAwait(TIMEOUT) .assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type())) .then(() -> cluster.remove(1).shutdown().block(TIMEOUT)) .assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type())) + .thenAwait(TIMEOUT) .assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type())) .thenCancel() .verify(TIMEOUT); - StepVerifier.create(seed.call().api(AnnotationService.class).serviceDiscoveryEventTypes()) + StepVerifier.create( + seed.serviceCall().api(AnnotationService.class).serviceDiscoveryEventTypes()) .assertNext(type -> assertEquals(ENDPOINT_ADDED, type)) .assertNext(type -> assertEquals(ENDPOINT_ADDED, type)) .assertNext(type -> assertEquals(ENDPOINT_LEAVING, type)) @@ -157,7 +162,7 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) Microservices.builder() .discovery("seed", defServiceDiscovery(metadataCodec)) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); cluster.add(seed); @@ -172,7 +177,9 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) Microservices.builder() .discovery("ms1", defServiceDiscovery(seedAddress, metadataCodec)) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl(), new AnnotationServiceImpl()) + .serviceFactory( + ScalecubeServiceFactory.fromInstances( + new GreetingServiceImpl(), new AnnotationServiceImpl())) .startAwait(); cluster.add(ms1); }) @@ -183,7 +190,8 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) Microservices.builder() .discovery("ms2", defServiceDiscovery(seedAddress, metadataCodec)) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory( + ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); cluster.add(ms2); }) @@ -191,7 +199,8 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) .thenCancel() .verify(TIMEOUT); - StepVerifier.create(seed.call().api(AnnotationService.class).serviceDiscoveryEventTypes()) + StepVerifier.create( + seed.serviceCall().api(AnnotationService.class).serviceDiscoveryEventTypes()) .assertNext(type -> assertEquals(ENDPOINT_ADDED, type)) .assertNext(type -> assertEquals(ENDPOINT_ADDED, type)) .thenCancel() diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index 1a51715f1..c3b4420c2 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -76,7 +76,7 @@ private static Microservices serviceProvider() { return Microservices.builder() .discovery("serviceProvider", ServiceRemoteTest::serviceDiscovery) .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); } @@ -84,7 +84,7 @@ private static Microservices serviceProvider() { public void test_remote_greeting_request_completes_before_timeout() { Duration duration = Duration.ofSeconds(1); - GreetingService service = gateway.call().api(GreetingService.class); + GreetingService service = gateway.serviceCall().api(GreetingService.class); // call the service. Mono result = @@ -95,7 +95,7 @@ public void test_remote_greeting_request_completes_before_timeout() { @Test public void test_remote_void_greeting() throws Exception { - GreetingService service = gateway.call().api(GreetingService.class); + GreetingService service = gateway.serviceCall().api(GreetingService.class); // call the service. service.greetingVoid(new GreetingRequest("joe")).block(Duration.ofSeconds(3)); @@ -108,7 +108,7 @@ public void test_remote_void_greeting() throws Exception { @Test public void test_remote_failing_void_greeting() { - GreetingService service = gateway.call().api(GreetingService.class); + GreetingService service = gateway.serviceCall().api(GreetingService.class); GreetingRequest request = new GreetingRequest("joe"); // call the service. @@ -119,7 +119,7 @@ public void test_remote_failing_void_greeting() { @Test public void test_remote_throwing_void_greeting() { - GreetingService service = gateway.call().api(GreetingService.class); + GreetingService service = gateway.serviceCall().api(GreetingService.class); GreetingRequest request = new GreetingRequest("joe"); // call the service. @@ -228,7 +228,7 @@ void test_remote_greeting_message() { .verify(TIMEOUT); // using serviceCall directly - ServiceCall serviceCall = gateway.call(); + ServiceCall serviceCall = gateway.serviceCall(); StepVerifier.create( serviceCall.requestOne( @@ -272,11 +272,13 @@ public void test_remote_serviceA_calls_serviceB_using_setter() { Microservices.builder() .discovery("provider", ServiceRemoteTest::serviceDiscovery) .transport(RSocketServiceTransport::new) - .services(new CoarseGrainedServiceImpl()) // add service a and b + .serviceFactory( + ScalecubeServiceFactory.fromInstances( + new CoarseGrainedServiceImpl())) // add service a and b .startAwait(); // Get a proxy to the service api. - CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class); + CoarseGrainedService service = gateway.serviceCall().api(CoarseGrainedService.class); Publisher future = service.callGreeting("joe"); @@ -295,11 +297,11 @@ public void test_remote_serviceA_calls_serviceB() { Microservices.builder() .discovery("provider", ServiceRemoteTest::serviceDiscovery) .transport(RSocketServiceTransport::new) - .services(another) + .serviceFactory(ScalecubeServiceFactory.fromInstances(another)) .startAwait(); // Get a proxy to the service api. - CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class); + CoarseGrainedService service = gateway.serviceCall().api(CoarseGrainedService.class); Publisher future = service.callGreeting("joe"); assertEquals(" hello to: joe", Mono.from(future).block(Duration.ofSeconds(1))); provider.shutdown().then(Mono.delay(TIMEOUT2)).block(); @@ -315,11 +317,11 @@ public void test_remote_serviceA_calls_serviceB_with_timeout() { Microservices.builder() .discovery("ms", ServiceRemoteTest::serviceDiscovery) .transport(RSocketServiceTransport::new) - .services(another) // add service a and b + .serviceFactory(ScalecubeServiceFactory.fromInstances(another)) // add service a and b .startAwait(); // Get a proxy to the service api. - CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class); + CoarseGrainedService service = gateway.serviceCall().api(CoarseGrainedService.class); InternalServiceException exception = assertThrows( InternalServiceException.class, @@ -340,11 +342,11 @@ public void test_remote_serviceA_calls_serviceB_with_dispatcher() { Microservices.builder() .discovery("provider", ServiceRemoteTest::serviceDiscovery) .transport(RSocketServiceTransport::new) - .services(another) // add service a and b + .serviceFactory(ScalecubeServiceFactory.fromInstances(another)) // add service a and b .startAwait(); // Get a proxy to the service api. - CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class); + CoarseGrainedService service = gateway.serviceCall().api(CoarseGrainedService.class); String response = service.callGreetingWithDispatcher("joe").block(Duration.ofSeconds(5)); assertEquals(response, " hello to: joe"); @@ -425,7 +427,7 @@ public void test_services_contribute_to_cluster_metadata() { .discovery("ms", ScalecubeServiceDiscovery::new) .transport(RSocketServiceTransport::new) .tags(tags) - .services(new GreetingServiceImpl()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl())) .startAwait(); assertTrue(ms.serviceEndpoint().tags().containsKey("HOSTNAME")); @@ -433,7 +435,7 @@ public void test_services_contribute_to_cluster_metadata() { @Test public void test_remote_mono_empty_greeting() { - GreetingService service = gateway.call().api(GreetingService.class); + GreetingService service = gateway.serviceCall().api(GreetingService.class); // call the service. StepVerifier.create(service.greetingMonoEmpty(new GreetingRequest("empty"))) @@ -443,7 +445,7 @@ public void test_remote_mono_empty_greeting() { @Test public void test_remote_mono_empty_request_response_greeting() { - GreetingService service = gateway.call().api(GreetingService.class); + GreetingService service = gateway.serviceCall().api(GreetingService.class); // call the service. StepVerifier.create(service.emptyGreeting(new EmptyGreetingRequest())) @@ -454,7 +456,7 @@ public void test_remote_mono_empty_request_response_greeting() { @Test public void test_remote_flux_empty_greeting() { - GreetingService service = gateway.call().api(GreetingService.class); + GreetingService service = gateway.serviceCall().api(GreetingService.class); // call the service. StepVerifier.create(service.greetingFluxEmpty(new GreetingRequest("empty"))) @@ -464,7 +466,7 @@ public void test_remote_flux_empty_greeting() { @Disabled("https://github.com/scalecube/scalecube-services/issues/742") public void test_many_stream_block_first() { - GreetingService service = gateway.call().api(GreetingService.class); + GreetingService service = gateway.serviceCall().api(GreetingService.class); for (int i = 0; i < 100; i++) { //noinspection ConstantConditions @@ -474,7 +476,7 @@ public void test_many_stream_block_first() { } private GreetingService createProxy() { - return gateway.call().api(GreetingService.class); // create proxy for GreetingService API + return gateway.serviceCall().api(GreetingService.class); // create proxy for GreetingService API } private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint) { diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index f694374be..e937d26fa 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -45,7 +45,7 @@ public static void setup() { .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .defaultDataDecoder(ServiceMessageCodec::decodeData) - .services(new SimpleQuoteService()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new SimpleQuoteService())) .startAwait(); } @@ -54,7 +54,7 @@ public void test_quotes() throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); Disposable sub = - node.call() + node.serviceCall() .api(QuoteService.class) .quotes() .subscribe( @@ -70,7 +70,7 @@ public void test_quotes() throws InterruptedException { @Test public void test_local_quotes_service() { - QuoteService service = node.call().api(QuoteService.class); + QuoteService service = node.serviceCall().api(QuoteService.class); int expected = 3; List list = service.quotes().take(Duration.ofMillis(3500)).collectList().block(); @@ -84,7 +84,7 @@ public void test_remote_quotes_service() throws InterruptedException { CountDownLatch latch1 = new CountDownLatch(3); CountDownLatch latch2 = new CountDownLatch(3); - QuoteService service = gateway.call().api(QuoteService.class); + QuoteService service = gateway.serviceCall().api(QuoteService.class); service.snapshot(3).subscribe(onNext -> latch1.countDown()); service.snapshot(3).subscribe(onNext -> latch2.countDown()); @@ -100,7 +100,7 @@ public void test_remote_quotes_service() throws InterruptedException { public void test_quotes_batch() throws InterruptedException { int streamBound = 1000; - QuoteService service = gateway.call().api(QuoteService.class); + QuoteService service = gateway.serviceCall().api(QuoteService.class); CountDownLatch latch1 = new CountDownLatch(streamBound); final Disposable sub1 = service.snapshot(streamBound).subscribe(onNext -> latch1.countDown()); @@ -115,7 +115,7 @@ public void test_quotes_batch() throws InterruptedException { public void test_call_quotes_snapshot() { int batchSize = 1000; - ServiceCall serviceCall = gateway.call(); + ServiceCall serviceCall = gateway.serviceCall(); ServiceMessage message = ServiceMessage.builder().qualifier(QuoteService.NAME, "snapshot").data(batchSize).build(); @@ -128,14 +128,14 @@ public void test_call_quotes_snapshot() { @Test public void test_just_once() { - QuoteService service = gateway.call().api(QuoteService.class); + QuoteService service = gateway.serviceCall().api(QuoteService.class); assertEquals("1", service.justOne().block(Duration.ofSeconds(2))); } @Test public void test_just_one_message() { - ServiceCall service = gateway.call(); + ServiceCall service = gateway.serviceCall(); ServiceMessage justOne = ServiceMessage.builder().qualifier(QuoteService.NAME, "justOne").build(); @@ -149,7 +149,7 @@ public void test_just_one_message() { @Test public void test_scheduled_messages() { - ServiceCall serviceCall = gateway.call(); + ServiceCall serviceCall = gateway.serviceCall(); ServiceMessage scheduled = ServiceMessage.builder().qualifier(QuoteService.NAME, "scheduled").data(1000).build(); @@ -164,7 +164,7 @@ public void test_scheduled_messages() { @Test public void test_unknown_method() { - ServiceCall service = gateway.call(); + ServiceCall service = gateway.serviceCall(); ServiceMessage scheduled = ServiceMessage.builder().qualifier(QuoteService.NAME, "unknonwn").build(); @@ -180,7 +180,7 @@ public void test_unknown_method() { public void test_snapshot_completes() { int batchSize = 1000; - ServiceCall serviceCall = gateway.call(); + ServiceCall serviceCall = gateway.serviceCall(); ServiceMessage message = ServiceMessage.builder().qualifier(QuoteService.NAME, "snapshot").data(batchSize).build(); diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index 6a138d1ee..5fd5d4a42 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -11,8 +11,8 @@ import io.scalecube.net.Address; import io.scalecube.services.BaseTest; -import io.scalecube.services.Microservices; import io.scalecube.services.Reflect; +import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceInfo; import io.scalecube.services.ServiceReference; @@ -136,7 +136,7 @@ public void test_router_factory() { @Test public void test_round_robin() { - ServiceCall service = gateway.call(); + ServiceCall service = gateway.serviceCall(); // call the service. GreetingResponse result1 = @@ -158,7 +158,7 @@ public void test_remote_service_tags() throws Exception { CanaryService service = gateway - .call() + .serviceCall() .router(Routers.getRouter(WeightedRandomRouter.class)) .api(CanaryService.class); @@ -185,7 +185,7 @@ public void test_remote_service_tags() throws Exception { public void tesTagsFromAnnotation() { ServiceCall serviceCall = provider3 - .call() + .serviceCall() .router( (req, mes) -> { ServiceReference tagServiceRef = req.listServiceReferences().get(0); @@ -207,7 +207,7 @@ public void test_tag_selection_logic() { ServiceCall service = gateway - .call() + .serviceCall() .router( (reg, msg) -> reg.listServiceReferences().stream() @@ -230,7 +230,7 @@ public void test_tag_request_selection_logic() { ServiceCall service = gateway - .call() + .serviceCall() .router( (reg, msg) -> reg.listServiceReferences().stream() @@ -256,7 +256,7 @@ public void test_tag_request_selection_logic() { public void test_service_tags() throws Exception { TimeUnit.SECONDS.sleep(3); - ServiceCall service = gateway.call().router(WeightedRandomRouter.class); + ServiceCall service = gateway.serviceCall().router(WeightedRandomRouter.class); ServiceMessage req = ServiceMessage.builder() diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java index 848e3929a..7a8f71c11 100644 --- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java +++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java @@ -57,7 +57,7 @@ public static void main(String[] args) { .startAwait(); CanaryService service = - gateway.call().router(WeightedRandomRouter.class).api(CanaryService.class); + gateway.serviceCall().router(WeightedRandomRouter.class).api(CanaryService.class); for (int i = 0; i < 10; i++) { Mono.from(service.greeting(new GreetingRequest("joe"))) diff --git a/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java b/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java index 7de7c8747..a8b84cfe3 100644 --- a/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java +++ b/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java @@ -1,6 +1,6 @@ package io.scalecube.services.sut; -import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; import io.scalecube.services.annotations.AfterConstruct; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import reactor.core.publisher.Flux; @@ -11,7 +11,7 @@ public class AnnotationServiceImpl implements AnnotationService { private ReplayProcessor serviceDiscoveryEvents; @AfterConstruct - void init(Microservices microservices) { + void init(MicroservicesContext microservices) { this.serviceDiscoveryEvents = ReplayProcessor.create(); microservices.listenDiscovery().subscribe(serviceDiscoveryEvents); } diff --git a/services/src/test/java/io/scalecube/services/sut/CoarseGrainedServiceImpl.java b/services/src/test/java/io/scalecube/services/sut/CoarseGrainedServiceImpl.java index be297b306..61bcf05d7 100644 --- a/services/src/test/java/io/scalecube/services/sut/CoarseGrainedServiceImpl.java +++ b/services/src/test/java/io/scalecube/services/sut/CoarseGrainedServiceImpl.java @@ -1,6 +1,6 @@ package io.scalecube.services.sut; -import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; import io.scalecube.services.annotations.Inject; import io.scalecube.services.api.ServiceMessage; import java.time.Duration; @@ -12,7 +12,7 @@ public class CoarseGrainedServiceImpl implements CoarseGrainedService { @Inject private GreetingService greetingService; - @Inject private Microservices microservices; + @Inject private MicroservicesContext microservices; @Override public Mono callGreeting(String name) { @@ -32,7 +32,7 @@ public Mono callGreetingTimeout(String request) { @Override public Mono callGreetingWithDispatcher(String request) { return microservices - .call() + .serviceCall() .requestOne( ServiceMessage.builder() .qualifier(GreetingService.SERVICE_NAME, "greeting") diff --git a/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java b/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java index 53425c72d..c990a30f1 100644 --- a/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java +++ b/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java @@ -1,6 +1,7 @@ package io.scalecube.services.sut; import io.scalecube.services.Microservices; +import io.scalecube.services.MicroservicesContext; import io.scalecube.services.annotations.Inject; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.exceptions.ForbiddenException; @@ -12,7 +13,8 @@ public final class GreetingServiceImpl implements GreetingService { - @Inject Microservices ms; + @Inject + MicroservicesContext ms; private int instanceId; diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java index 5bd9872ed..e00238f4d 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java @@ -12,6 +12,8 @@ import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; + +import io.scalecube.services.ScalecubeServiceFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,11 +44,12 @@ public void setUp() { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) - .services(new Facade()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new Facade())) .startAwait(); final Address facadeAddress = facade.discovery("facade").address(); + PingService pingService = () -> Mono.just(Thread.currentThread().getName()); this.ping = Microservices.builder() .discovery( @@ -55,9 +58,10 @@ public void setUp() { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(facadeAddress))) .transport(RSocketServiceTransport::new) - .services((PingService) () -> Mono.just(Thread.currentThread().getName())) + .serviceFactory(ScalecubeServiceFactory.fromInstances(pingService)) .startAwait(); + PongService pongService = () -> Mono.just(Thread.currentThread().getName()); this.pong = Microservices.builder() .discovery( @@ -66,13 +70,13 @@ public void setUp() { new ScalecubeServiceDiscovery(endpoint) .membership(cfg -> cfg.seedMembers(facadeAddress))) .transport(RSocketServiceTransport::new) - .services((PongService) () -> Mono.just(Thread.currentThread().getName())) + .serviceFactory(ScalecubeServiceFactory.fromInstances(pongService)) .startAwait(); } @Test public void testColocatedEventLoopGroup() { - ServiceCall call = gateway.call(); + ServiceCall call = gateway.serviceCall(); FacadeService facade = call.api(FacadeService.class); diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index e1a31afc2..1f67ebec1 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -11,6 +11,7 @@ import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import io.scalecube.services.exceptions.ConnectionClosedException; +import io.scalecube.services.ScalecubeServiceFactory; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; import java.time.Duration; @@ -54,7 +55,7 @@ public void setUp() { new ScalecubeServiceDiscovery(serviceEndpoint) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) - .services(new SimpleQuoteService()) + .serviceFactory(ScalecubeServiceFactory.fromInstances(new SimpleQuoteService())) .startAwait(); } @@ -78,7 +79,7 @@ public void test_remote_node_died_mono_never() throws Exception { AtomicReference sub1 = new AtomicReference<>(null); AtomicReference exceptionHolder = new AtomicReference<>(null); - ServiceCall serviceCall = gateway.call(); + ServiceCall serviceCall = gateway.serviceCall(); sub1.set(serviceCall.requestOne(JUST_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway @@ -106,7 +107,7 @@ public void test_remote_node_died_many_never() throws Exception { AtomicReference sub1 = new AtomicReference<>(null); AtomicReference exceptionHolder = new AtomicReference<>(null); - ServiceCall serviceCall = gateway.call(); + ServiceCall serviceCall = gateway.serviceCall(); sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway @@ -134,7 +135,7 @@ public void test_remote_node_died_many_then_never() throws Exception { AtomicReference sub1 = new AtomicReference<>(null); AtomicReference exceptionHolder = new AtomicReference<>(null); - ServiceCall serviceCall = gateway.call(); + ServiceCall serviceCall = gateway.serviceCall(); sub1.set( serviceCall .requestMany(ONLY_ONE_AND_THEN_NEVER)