Skip to content

Commit

Permalink
Added annotation Subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Oct 2, 2024
1 parent e4f5bd7 commit ed607a5
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.scalecube.services.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Subscriber {

/**
* Event type to subscribe on.
*
* @return event type
*/
Class<?> value() default Object.class;
}
16 changes: 16 additions & 0 deletions services/src/main/java/io/scalecube/services/Injector.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import io.scalecube.services.annotations.AfterConstruct;
import io.scalecube.services.annotations.BeforeDestroy;
import io.scalecube.services.annotations.Inject;
import io.scalecube.services.annotations.Subscriber;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.routing.Router;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;

/** Service Injector scan and injects beans to a given Microservices instance. */
Expand All @@ -32,9 +35,22 @@ public static Microservices inject(Microservices microservices, Collection<Objec
Arrays.stream(service.getClass().getDeclaredFields())
.forEach(field -> injectField(microservices, field, service)));
services.forEach(service -> processAfterConstruct(microservices, service));
services.forEach(service -> processServiceDiscoverySubscriber(microservices, service));
return microservices;
}

private static void processServiceDiscoverySubscriber(
Microservices microservices, Object service) {
if (service instanceof CoreSubscriber) {
final Subscriber subscriberAnnotation = service.getClass().getAnnotation(Subscriber.class);
if (subscriberAnnotation != null
&& ServiceDiscoveryEvent.class.isAssignableFrom(subscriberAnnotation.value())) {
//noinspection unchecked,rawtypes
microservices.listenDiscovery().subscribe((CoreSubscriber) service);
}
}
}

private static void injectField(Microservices microservices, Field field, Object service) {
if (field.isAnnotationPresent(Inject.class) && field.getType().equals(Microservices.class)) {
setField(field, service, microservices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,18 @@ private static class ServiceDiscoveryBootstrap implements AutoCloseable {
private Scheduler scheduler;
private Microservices microservices;

private ServiceDiscoveryBootstrap() {}

private ServiceDiscoveryBootstrap operator(UnaryOperator<ServiceDiscoveryOptions> op) {
operator = op;
return this;
}

private ServiceDiscoveryBootstrap conclude(
Microservices microservices, ServiceDiscoveryOptions options) {
this.microservices = microservices;
this.scheduler = Schedulers.newSingle("discovery", true);

if (operator == null) {
return this;
}
Expand All @@ -542,8 +547,6 @@ private ServiceDiscoveryBootstrap conclude(
}

serviceDiscovery = discoveryFactory.createServiceDiscovery(serviceEndpoint);
this.microservices = microservices;
this.scheduler = Schedulers.newSingle("discovery", true);

return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.scalecube.services;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;

import io.scalecube.services.annotations.Subscriber;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

public class ServiceDiscoverySubscriberTest extends BaseTest {

@Test
void testRegisterNonDiscoveryCoreSubscriber() {
final NonDiscoverySubscriber1 discoverySubscriber1 = spy(new NonDiscoverySubscriber1());
final NonDiscoverySubscriber2 discoverySubscriber2 = spy(new NonDiscoverySubscriber2());

Microservices.builder().services(discoverySubscriber1, discoverySubscriber2).startAwait();

verifyNoInteractions(discoverySubscriber1, discoverySubscriber2);
}

@Test
void testRegisterNotMatchingTypeDiscoveryCoreSubscriber() {
final NotMatchingTypeDiscoverySubscriber discoverySubscriber =
spy(new NotMatchingTypeDiscoverySubscriber());

Microservices.builder().services(discoverySubscriber).startAwait();

verifyNoInteractions(discoverySubscriber);
}

@Test
void testRegisterDiscoveryCoreSubscriber() {
final AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
final NormalDiscoverySubscriber normalDiscoverySubscriber =
new NormalDiscoverySubscriber(subscriptionReference);

Microservices.builder().services(normalDiscoverySubscriber).startAwait();

assertNotNull(subscriptionReference.get(), "subscription");
}

private static class SomeType {}

private static class NonDiscoverySubscriber1 extends BaseSubscriber {}

private static class NonDiscoverySubscriber2 extends BaseSubscriber<SomeType> {}

@Subscriber
private static class NotMatchingTypeDiscoverySubscriber
extends BaseSubscriber<ServiceDiscoveryEvent> {}

@Subscriber(ServiceDiscoveryEvent.class)
private static class NormalDiscoverySubscriber extends BaseSubscriber<ServiceDiscoveryEvent> {

private final AtomicReference<Subscription> subscriptionReference;

private NormalDiscoverySubscriber(AtomicReference<Subscription> subscriptionReference) {
this.subscriptionReference = subscriptionReference;
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
subscriptionReference.set(subscription);
}
}
}

0 comments on commit ed607a5

Please sign in to comment.