diff --git a/services-api/src/main/java/io/scalecube/services/annotations/Subscriber.java b/services-api/src/main/java/io/scalecube/services/annotations/Subscriber.java new file mode 100644 index 000000000..b72985e1a --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/annotations/Subscriber.java @@ -0,0 +1,18 @@ +package io.scalecube.services.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface Subscriber { + + /** + * Event type to subscribe on. + * + * @return event type + */ + Class value() default Object.class; +} diff --git a/services/src/main/java/io/scalecube/services/Injector.java b/services/src/main/java/io/scalecube/services/Injector.java index c7433dbfe..0dd264383 100644 --- a/services/src/main/java/io/scalecube/services/Injector.java +++ b/services/src/main/java/io/scalecube/services/Injector.java @@ -3,12 +3,15 @@ import io.scalecube.services.annotations.AfterConstruct; import io.scalecube.services.annotations.BeforeDestroy; import io.scalecube.services.annotations.Inject; +import io.scalecube.services.annotations.Subscriber; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import io.scalecube.services.routing.Router; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collection; +import reactor.core.CoreSubscriber; import reactor.core.Exceptions; /** Service Injector scan and injects beans to a given Microservices instance. */ @@ -32,9 +35,22 @@ public static Microservices inject(Microservices microservices, Collection injectField(microservices, field, service))); services.forEach(service -> processAfterConstruct(microservices, service)); + services.forEach(service -> processServiceDiscoverySubscriber(microservices, service)); return microservices; } + private static void processServiceDiscoverySubscriber( + Microservices microservices, Object service) { + if (service instanceof CoreSubscriber) { + final Subscriber subscriberAnnotation = service.getClass().getAnnotation(Subscriber.class); + if (subscriberAnnotation != null + && ServiceDiscoveryEvent.class.isAssignableFrom(subscriberAnnotation.value())) { + //noinspection unchecked,rawtypes + microservices.listenDiscovery().subscribe((CoreSubscriber) service); + } + } + } + private static void injectField(Microservices microservices, Field field, Object service) { if (field.isAnnotationPresent(Inject.class) && field.getType().equals(Microservices.class)) { setField(field, service, microservices); @@ -45,8 +61,7 @@ private static void injectField(Microservices microservices, Field field, Object if (!routerClass.isInterface()) { call.router(routerClass); } - final Object targetProxy = call.api(field.getType()); - setField(field, service, targetProxy); + setField(field, service, call.api(field.getType())); } } @@ -74,21 +89,21 @@ private static void processMethodWithAnnotation( .filter(method -> method.isAnnotationPresent(annotation)) .forEach( targetMethod -> { + targetMethod.setAccessible(true); + Object[] parameters = + Arrays.stream(targetMethod.getParameters()) + .map( + mapper -> { + if (mapper.getType().equals(Microservices.class)) { + return microservices; + } else if (Reflect.isService(mapper.getType())) { + return microservices.call().api(mapper.getType()); + } else { + return null; + } + }) + .toArray(); try { - targetMethod.setAccessible(true); - Object[] parameters = - Arrays.stream(targetMethod.getParameters()) - .map( - mapper -> { - if (mapper.getType().equals(Microservices.class)) { - return microservices; - } else if (Reflect.isService(mapper.getType())) { - return microservices.call().api(mapper.getType()); - } else { - return null; - } - }) - .toArray(); targetMethod.invoke(targetInstance, parameters); } catch (Exception ex) { throw Exceptions.propagate(ex); diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 4ed4a959d..edf1ab6cf 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -188,7 +188,7 @@ private Mono start() { .tags(tags); // invoke service providers and register services - List serviceInstances = + final List serviceInstances = serviceProviders.stream() .flatMap(serviceProvider -> serviceProvider.provide(serviceCall).stream()) .peek(this::registerService) @@ -214,17 +214,14 @@ this, new ServiceDiscoveryOptions().serviceEndpoint(serviceEndpoint)) } private ServiceEndpoint newServiceEndpoint(ServiceEndpoint serviceEndpoint) { - ServiceEndpoint.Builder builder = ServiceEndpoint.from(serviceEndpoint); + final ServiceEndpoint.Builder builder = ServiceEndpoint.from(serviceEndpoint); - int port = Optional.ofNullable(externalPort).orElse(serviceEndpoint.address().port()); + final String finalHost = + Optional.ofNullable(externalHost).orElse(serviceEndpoint.address().host()); + final int finalPort = + Optional.ofNullable(externalPort).orElse(serviceEndpoint.address().port()); - // calculate local service endpoint address - Address newAddress = - Optional.ofNullable(externalHost) - .map(host -> Address.create(host, port)) - .orElseGet(() -> Address.create(serviceEndpoint.address().host(), port)); - - return builder.address(newAddress).build(); + return builder.address(Address.create(finalHost, finalPort)).build(); } private Mono startGateway(GatewayOptions options) { @@ -525,6 +522,8 @@ private static class ServiceDiscoveryBootstrap implements AutoCloseable { private Scheduler scheduler; private Microservices microservices; + private ServiceDiscoveryBootstrap() {} + private ServiceDiscoveryBootstrap operator(UnaryOperator op) { operator = op; return this; @@ -532,6 +531,9 @@ private ServiceDiscoveryBootstrap operator(UnaryOperator subscriptionReference = new AtomicReference<>(); + final NormalDiscoverySubscriber normalDiscoverySubscriber = + new NormalDiscoverySubscriber(subscriptionReference); + + Microservices.builder().services(normalDiscoverySubscriber).startAwait(); + + assertNotNull(subscriptionReference.get(), "subscription"); + } + + private static class SomeType {} + + private static class NonDiscoverySubscriber1 extends BaseSubscriber {} + + private static class NonDiscoverySubscriber2 extends BaseSubscriber {} + + @Subscriber + private static class NotMatchingTypeDiscoverySubscriber + extends BaseSubscriber {} + + @Subscriber(ServiceDiscoveryEvent.class) + private static class NormalDiscoverySubscriber extends BaseSubscriber { + + private final AtomicReference subscriptionReference; + + private NormalDiscoverySubscriber(AtomicReference subscriptionReference) { + this.subscriptionReference = subscriptionReference; + } + + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscriptionReference.set(subscription); + } + } +}