Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new annotation - Subscriber #853

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.scalecube.services.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Subscriber {

/**
* Event type to subscribe on.
*
* @return event type
*/
Class<?> value() default Object.class;
}
47 changes: 31 additions & 16 deletions services/src/main/java/io/scalecube/services/Injector.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import io.scalecube.services.annotations.AfterConstruct;
import io.scalecube.services.annotations.BeforeDestroy;
import io.scalecube.services.annotations.Inject;
import io.scalecube.services.annotations.Subscriber;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.routing.Router;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;

/** Service Injector scan and injects beans to a given Microservices instance. */
Expand All @@ -32,9 +35,22 @@ public static Microservices inject(Microservices microservices, Collection<Objec
Arrays.stream(service.getClass().getDeclaredFields())
.forEach(field -> injectField(microservices, field, service)));
services.forEach(service -> processAfterConstruct(microservices, service));
services.forEach(service -> processServiceDiscoverySubscriber(microservices, service));
return microservices;
}

private static void processServiceDiscoverySubscriber(
Microservices microservices, Object service) {
if (service instanceof CoreSubscriber) {
final Subscriber subscriberAnnotation = service.getClass().getAnnotation(Subscriber.class);
if (subscriberAnnotation != null
&& ServiceDiscoveryEvent.class.isAssignableFrom(subscriberAnnotation.value())) {
//noinspection unchecked,rawtypes
microservices.listenDiscovery().subscribe((CoreSubscriber) service);
}
}
}

private static void injectField(Microservices microservices, Field field, Object service) {
if (field.isAnnotationPresent(Inject.class) && field.getType().equals(Microservices.class)) {
setField(field, service, microservices);
Expand All @@ -45,8 +61,7 @@ private static void injectField(Microservices microservices, Field field, Object
if (!routerClass.isInterface()) {
call.router(routerClass);
}
final Object targetProxy = call.api(field.getType());
setField(field, service, targetProxy);
setField(field, service, call.api(field.getType()));
}
}

Expand Down Expand Up @@ -74,21 +89,21 @@ private static <A extends Annotation> void processMethodWithAnnotation(
.filter(method -> method.isAnnotationPresent(annotation))
.forEach(
targetMethod -> {
targetMethod.setAccessible(true);
Object[] parameters =
Arrays.stream(targetMethod.getParameters())
.map(
mapper -> {
if (mapper.getType().equals(Microservices.class)) {
return microservices;
} else if (Reflect.isService(mapper.getType())) {
return microservices.call().api(mapper.getType());
} else {
return null;
}
})
.toArray();
try {
targetMethod.setAccessible(true);
Object[] parameters =
Arrays.stream(targetMethod.getParameters())
.map(
mapper -> {
if (mapper.getType().equals(Microservices.class)) {
return microservices;
} else if (Reflect.isService(mapper.getType())) {
return microservices.call().api(mapper.getType());
} else {
return null;
}
})
.toArray();
targetMethod.invoke(targetInstance, parameters);
} catch (Exception ex) {
throw Exceptions.propagate(ex);
Expand Down
24 changes: 12 additions & 12 deletions services/src/main/java/io/scalecube/services/Microservices.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private Mono<Microservices> start() {
.tags(tags);

// invoke service providers and register services
List<Object> serviceInstances =
final List<Object> serviceInstances =
serviceProviders.stream()
.flatMap(serviceProvider -> serviceProvider.provide(serviceCall).stream())
.peek(this::registerService)
Expand All @@ -214,17 +214,14 @@ this, new ServiceDiscoveryOptions().serviceEndpoint(serviceEndpoint))
}

private ServiceEndpoint newServiceEndpoint(ServiceEndpoint serviceEndpoint) {
ServiceEndpoint.Builder builder = ServiceEndpoint.from(serviceEndpoint);
final ServiceEndpoint.Builder builder = ServiceEndpoint.from(serviceEndpoint);

int port = Optional.ofNullable(externalPort).orElse(serviceEndpoint.address().port());
final String finalHost =
Optional.ofNullable(externalHost).orElse(serviceEndpoint.address().host());
final int finalPort =
Optional.ofNullable(externalPort).orElse(serviceEndpoint.address().port());

// calculate local service endpoint address
Address newAddress =
Optional.ofNullable(externalHost)
.map(host -> Address.create(host, port))
.orElseGet(() -> Address.create(serviceEndpoint.address().host(), port));

return builder.address(newAddress).build();
return builder.address(Address.create(finalHost, finalPort)).build();
}

private Mono<GatewayBootstrap> startGateway(GatewayOptions options) {
Expand Down Expand Up @@ -525,13 +522,18 @@ private static class ServiceDiscoveryBootstrap implements AutoCloseable {
private Scheduler scheduler;
private Microservices microservices;

private ServiceDiscoveryBootstrap() {}

private ServiceDiscoveryBootstrap operator(UnaryOperator<ServiceDiscoveryOptions> op) {
operator = op;
return this;
}

private ServiceDiscoveryBootstrap conclude(
Microservices microservices, ServiceDiscoveryOptions options) {
this.microservices = microservices;
this.scheduler = Schedulers.newSingle("discovery", true);

if (operator == null) {
return this;
}
Expand All @@ -545,8 +547,6 @@ private ServiceDiscoveryBootstrap conclude(
}

serviceDiscovery = discoveryFactory.createServiceDiscovery(serviceEndpoint);
this.microservices = microservices;
this.scheduler = Schedulers.newSingle("discovery", true);

return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.scalecube.services;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;

import io.scalecube.services.annotations.Subscriber;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

public class ServiceDiscoverySubscriberTest extends BaseTest {

@Test
void testRegisterNonDiscoveryCoreSubscriber() {
final NonDiscoverySubscriber1 discoverySubscriber1 = spy(new NonDiscoverySubscriber1());
final NonDiscoverySubscriber2 discoverySubscriber2 = spy(new NonDiscoverySubscriber2());

Microservices.builder().services(discoverySubscriber1, discoverySubscriber2).startAwait();

verifyNoInteractions(discoverySubscriber1, discoverySubscriber2);
}

@Test
void testRegisterNotMatchingTypeDiscoveryCoreSubscriber() {
final NotMatchingTypeDiscoverySubscriber discoverySubscriber =
spy(new NotMatchingTypeDiscoverySubscriber());

Microservices.builder().services(discoverySubscriber).startAwait();

verifyNoInteractions(discoverySubscriber);
}

@Test
void testRegisterDiscoveryCoreSubscriber() {
final AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
final NormalDiscoverySubscriber normalDiscoverySubscriber =
new NormalDiscoverySubscriber(subscriptionReference);

Microservices.builder().services(normalDiscoverySubscriber).startAwait();

assertNotNull(subscriptionReference.get(), "subscription");
}

private static class SomeType {}

private static class NonDiscoverySubscriber1 extends BaseSubscriber {}

private static class NonDiscoverySubscriber2 extends BaseSubscriber<SomeType> {}

@Subscriber
private static class NotMatchingTypeDiscoverySubscriber
extends BaseSubscriber<ServiceDiscoveryEvent> {}

@Subscriber(ServiceDiscoveryEvent.class)
private static class NormalDiscoverySubscriber extends BaseSubscriber<ServiceDiscoveryEvent> {

private final AtomicReference<Subscription> subscriptionReference;

private NormalDiscoverySubscriber(AtomicReference<Subscription> subscriptionReference) {
this.subscriptionReference = subscriptionReference;
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
subscriptionReference.set(subscription);
}
}
}
Loading