Skip to content

Commit

Permalink
Add new annotation - Subscriber (#853)
Browse files Browse the repository at this point in the history
* Added annotation Subscriber
* Enhanced .newServiceEndpoint()
  • Loading branch information
artem-v authored Oct 2, 2024
1 parent 17f5213 commit a93fe0f
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.scalecube.services.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Subscriber {

/**
* Event type to subscribe on.
*
* @return event type
*/
Class<?> value() default Object.class;
}
47 changes: 31 additions & 16 deletions services/src/main/java/io/scalecube/services/Injector.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import io.scalecube.services.annotations.AfterConstruct;
import io.scalecube.services.annotations.BeforeDestroy;
import io.scalecube.services.annotations.Inject;
import io.scalecube.services.annotations.Subscriber;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.routing.Router;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;

/** Service Injector scan and injects beans to a given Microservices instance. */
Expand All @@ -32,9 +35,22 @@ public static Microservices inject(Microservices microservices, Collection<Objec
Arrays.stream(service.getClass().getDeclaredFields())
.forEach(field -> injectField(microservices, field, service)));
services.forEach(service -> processAfterConstruct(microservices, service));
services.forEach(service -> processServiceDiscoverySubscriber(microservices, service));
return microservices;
}

private static void processServiceDiscoverySubscriber(
Microservices microservices, Object service) {
if (service instanceof CoreSubscriber) {
final Subscriber subscriberAnnotation = service.getClass().getAnnotation(Subscriber.class);
if (subscriberAnnotation != null
&& ServiceDiscoveryEvent.class.isAssignableFrom(subscriberAnnotation.value())) {
//noinspection unchecked,rawtypes
microservices.listenDiscovery().subscribe((CoreSubscriber) service);
}
}
}

private static void injectField(Microservices microservices, Field field, Object service) {
if (field.isAnnotationPresent(Inject.class) && field.getType().equals(Microservices.class)) {
setField(field, service, microservices);
Expand All @@ -45,8 +61,7 @@ private static void injectField(Microservices microservices, Field field, Object
if (!routerClass.isInterface()) {
call.router(routerClass);
}
final Object targetProxy = call.api(field.getType());
setField(field, service, targetProxy);
setField(field, service, call.api(field.getType()));
}
}

Expand Down Expand Up @@ -74,21 +89,21 @@ private static <A extends Annotation> void processMethodWithAnnotation(
.filter(method -> method.isAnnotationPresent(annotation))
.forEach(
targetMethod -> {
targetMethod.setAccessible(true);
Object[] parameters =
Arrays.stream(targetMethod.getParameters())
.map(
mapper -> {
if (mapper.getType().equals(Microservices.class)) {
return microservices;
} else if (Reflect.isService(mapper.getType())) {
return microservices.call().api(mapper.getType());
} else {
return null;
}
})
.toArray();
try {
targetMethod.setAccessible(true);
Object[] parameters =
Arrays.stream(targetMethod.getParameters())
.map(
mapper -> {
if (mapper.getType().equals(Microservices.class)) {
return microservices;
} else if (Reflect.isService(mapper.getType())) {
return microservices.call().api(mapper.getType());
} else {
return null;
}
})
.toArray();
targetMethod.invoke(targetInstance, parameters);
} catch (Exception ex) {
throw Exceptions.propagate(ex);
Expand Down
24 changes: 12 additions & 12 deletions services/src/main/java/io/scalecube/services/Microservices.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private Mono<Microservices> start() {
.tags(tags);

// invoke service providers and register services
List<Object> serviceInstances =
final List<Object> serviceInstances =
serviceProviders.stream()
.flatMap(serviceProvider -> serviceProvider.provide(serviceCall).stream())
.peek(this::registerService)
Expand All @@ -214,17 +214,14 @@ this, new ServiceDiscoveryOptions().serviceEndpoint(serviceEndpoint))
}

private ServiceEndpoint newServiceEndpoint(ServiceEndpoint serviceEndpoint) {
ServiceEndpoint.Builder builder = ServiceEndpoint.from(serviceEndpoint);
final ServiceEndpoint.Builder builder = ServiceEndpoint.from(serviceEndpoint);

int port = Optional.ofNullable(externalPort).orElse(serviceEndpoint.address().port());
final String finalHost =
Optional.ofNullable(externalHost).orElse(serviceEndpoint.address().host());
final int finalPort =
Optional.ofNullable(externalPort).orElse(serviceEndpoint.address().port());

// calculate local service endpoint address
Address newAddress =
Optional.ofNullable(externalHost)
.map(host -> Address.create(host, port))
.orElseGet(() -> Address.create(serviceEndpoint.address().host(), port));

return builder.address(newAddress).build();
return builder.address(Address.create(finalHost, finalPort)).build();
}

private Mono<GatewayBootstrap> startGateway(GatewayOptions options) {
Expand Down Expand Up @@ -525,13 +522,18 @@ private static class ServiceDiscoveryBootstrap implements AutoCloseable {
private Scheduler scheduler;
private Microservices microservices;

private ServiceDiscoveryBootstrap() {}

private ServiceDiscoveryBootstrap operator(UnaryOperator<ServiceDiscoveryOptions> op) {
operator = op;
return this;
}

private ServiceDiscoveryBootstrap conclude(
Microservices microservices, ServiceDiscoveryOptions options) {
this.microservices = microservices;
this.scheduler = Schedulers.newSingle("discovery", true);

if (operator == null) {
return this;
}
Expand All @@ -545,8 +547,6 @@ private ServiceDiscoveryBootstrap conclude(
}

serviceDiscovery = discoveryFactory.createServiceDiscovery(serviceEndpoint);
this.microservices = microservices;
this.scheduler = Schedulers.newSingle("discovery", true);

return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.scalecube.services;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;

import io.scalecube.services.annotations.Subscriber;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

public class ServiceDiscoverySubscriberTest extends BaseTest {

@Test
void testRegisterNonDiscoveryCoreSubscriber() {
final NonDiscoverySubscriber1 discoverySubscriber1 = spy(new NonDiscoverySubscriber1());
final NonDiscoverySubscriber2 discoverySubscriber2 = spy(new NonDiscoverySubscriber2());

Microservices.builder().services(discoverySubscriber1, discoverySubscriber2).startAwait();

verifyNoInteractions(discoverySubscriber1, discoverySubscriber2);
}

@Test
void testRegisterNotMatchingTypeDiscoveryCoreSubscriber() {
final NotMatchingTypeDiscoverySubscriber discoverySubscriber =
spy(new NotMatchingTypeDiscoverySubscriber());

Microservices.builder().services(discoverySubscriber).startAwait();

verifyNoInteractions(discoverySubscriber);
}

@Test
void testRegisterDiscoveryCoreSubscriber() {
final AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
final NormalDiscoverySubscriber normalDiscoverySubscriber =
new NormalDiscoverySubscriber(subscriptionReference);

Microservices.builder().services(normalDiscoverySubscriber).startAwait();

assertNotNull(subscriptionReference.get(), "subscription");
}

private static class SomeType {}

private static class NonDiscoverySubscriber1 extends BaseSubscriber {}

private static class NonDiscoverySubscriber2 extends BaseSubscriber<SomeType> {}

@Subscriber
private static class NotMatchingTypeDiscoverySubscriber
extends BaseSubscriber<ServiceDiscoveryEvent> {}

@Subscriber(ServiceDiscoveryEvent.class)
private static class NormalDiscoverySubscriber extends BaseSubscriber<ServiceDiscoveryEvent> {

private final AtomicReference<Subscription> subscriptionReference;

private NormalDiscoverySubscriber(AtomicReference<Subscription> subscriptionReference) {
this.subscriptionReference = subscriptionReference;
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
subscriptionReference.set(subscription);
}
}
}

0 comments on commit a93fe0f

Please sign in to comment.