Skip to content

Commit

Permalink
Added ExecuteOn parsing to Reflect. Tests passing.
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Oct 13, 2024
1 parent 37e9119 commit 134e36c
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 36 deletions.
80 changes: 79 additions & 1 deletion services-api/src/main/java/io/scalecube/services/Reflect.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static io.scalecube.services.CommunicationMode.REQUEST_RESPONSE;
import static io.scalecube.services.CommunicationMode.REQUEST_STREAM;

import io.scalecube.services.annotations.ExecuteOn;
import io.scalecube.services.annotations.RequestType;
import io.scalecube.services.annotations.ResponseType;
import io.scalecube.services.annotations.Service;
Expand All @@ -21,13 +22,16 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class Reflect {

Expand Down Expand Up @@ -172,7 +176,8 @@ public static Map<Method, MethodInfo> methodsInfo(Class<?> serviceInterface) {
method.getParameterCount(),
requestType(method),
isRequestTypeServiceMessage(method),
isSecured(method)))));
isSecured(method),
null))));
}

/**
Expand Down Expand Up @@ -379,4 +384,77 @@ public static boolean isSecured(Method method) {
return method.isAnnotationPresent(Secured.class)
|| method.getDeclaringClass().isAnnotationPresent(Secured.class);
}

public static Scheduler executeOnScheduler(Method method, Map<String, Scheduler> schedulers) {
final Class<?> declaringClass = method.getDeclaringClass();

if (method.isAnnotationPresent(ExecuteOn.class)) {
final var executeOn = method.getAnnotation(ExecuteOn.class);
final var name = executeOn.value();
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException(
"Wrong @ExecuteOn definition on "
+ declaringClass.getName()
+ "."
+ method.getName()
+ " -- value is missing");
}
final var scheduler = schedulers.get(name);
if (scheduler == null) {
throw new IllegalArgumentException(
"Wrong @ExecuteOn definition on "
+ declaringClass.getName()
+ "."
+ method.getName()
+ " -- scheduler with name="
+ name
+ " cannot be found");
}
return scheduler;
}

// If @ExecuteOn annotation is not present on method, then find it on @Service interface

var clazz = declaringClass;

// Get all interfaces, including those inherited from superclasses
Set<Class<?>> allInterfaces = new HashSet<>();
while (clazz != null) {
Collections.addAll(allInterfaces, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}

final var optional =
allInterfaces.stream()
.map(aClass -> aClass.getAnnotation(ExecuteOn.class))
.filter(Objects::nonNull)
.findFirst();

if (optional.isPresent()) {
final var executeOn = optional.get();
final var name = executeOn.value();
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException(
"Wrong @ExecuteOn definition on "
+ declaringClass.getName()
+ "."
+ method.getName()
+ " -- value is missing");
}
final var scheduler = schedulers.get(name);
if (scheduler == null) {
throw new IllegalArgumentException(
"Wrong @ExecuteOn definition on "
+ declaringClass.getName()
+ "."
+ method.getName()
+ " -- scheduler with name="
+ name
+ " cannot be found");
}
return scheduler;
}

return Schedulers.immediate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.scalecube.services.api.Qualifier;
import java.lang.reflect.Type;
import java.util.StringJoiner;
import reactor.core.scheduler.Scheduler;

public final class MethodInfo {

Expand All @@ -17,6 +18,7 @@ public final class MethodInfo {
private final Class<?> requestType;
private final boolean isRequestTypeServiceMessage;
private final boolean isSecured;
private final Scheduler scheduler;

/**
* Create a new service info.
Expand All @@ -30,6 +32,7 @@ public final class MethodInfo {
* @param requestType the type of the request
* @param isRequestTypeServiceMessage is request service message
* @param isSecured is method protected by authentication
* @param scheduler scheduler
*/
public MethodInfo(
String serviceName,
Expand All @@ -40,7 +43,8 @@ public MethodInfo(
int parameterCount,
Class<?> requestType,
boolean isRequestTypeServiceMessage,
boolean isSecured) {
boolean isSecured,
Scheduler scheduler) {
this.parameterizedReturnType = parameterizedReturnType;
this.isReturnTypeServiceMessage = isReturnTypeServiceMessage;
this.communicationMode = communicationMode;
Expand All @@ -51,6 +55,7 @@ public MethodInfo(
this.requestType = requestType;
this.isRequestTypeServiceMessage = isRequestTypeServiceMessage;
this.isSecured = isSecured;
this.scheduler = scheduler;
}

public String serviceName() {
Expand Down Expand Up @@ -101,19 +106,24 @@ public boolean isSecured() {
return isSecured;
}

public Scheduler scheduler() {
return scheduler;
}

@Override
public String toString() {
return new StringJoiner(", ", MethodInfo.class.getSimpleName() + "[", "]")
.add("serviceName=" + serviceName)
.add("methodName=" + methodName)
.add("qualifier=" + qualifier)
.add("serviceName='" + serviceName + "'")
.add("methodName='" + methodName + "'")
.add("qualifier='" + qualifier + "'")
.add("parameterizedReturnType=" + parameterizedReturnType)
.add("isReturnTypeServiceMessage=" + isReturnTypeServiceMessage)
.add("communicationMode=" + communicationMode)
.add("parameterCount=" + parameterCount)
.add("requestType=" + requestType)
.add("isRequestTypeServiceMessage=" + isRequestTypeServiceMessage)
.add("isSecured=" + isSecured)
.add("scheduler=" + scheduler)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
.flatMap(authData -> deferWithContextOne(message, authData))
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
.onErrorResume(
throwable -> Mono.just(errorMapper.toMessage(message.qualifier(), throwable)));
throwable -> Mono.just(errorMapper.toMessage(message.qualifier(), throwable)))
.subscribeOn(methodInfo.scheduler());
}

/**
Expand All @@ -84,7 +85,8 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
.flatMapMany(authData -> deferWithContextMany(message, authData))
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
.onErrorResume(
throwable -> Flux.just(errorMapper.toMessage(message.qualifier(), throwable)));
throwable -> Flux.just(errorMapper.toMessage(message.qualifier(), throwable)))
.subscribeOn(methodInfo.scheduler());
}

/**
Expand All @@ -104,7 +106,8 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
toResponse(response, first.get().qualifier(), first.get().dataFormat()))
.onErrorResume(
throwable ->
Flux.just(errorMapper.toMessage(first.get().qualifier(), throwable))));
Flux.just(errorMapper.toMessage(first.get().qualifier(), throwable)))
.subscribeOn(methodInfo.scheduler()));
}

private Mono<?> deferWithContextOne(ServiceMessage message, Object authData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.mockito.Mockito;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

class ServiceMethodInvokerTest {
Expand Down Expand Up @@ -62,7 +63,8 @@ void testInvokeOneWhenReturnNull() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -100,7 +102,8 @@ void testInvokeManyWhenReturnNull() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -138,7 +141,8 @@ void testInvokeBidirectionalWhenReturnNull() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -177,7 +181,8 @@ void testInvokeOneWhenThrowException() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -219,7 +224,8 @@ void testInvokeManyWhenThrowException() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -260,7 +266,8 @@ void testInvokeBidirectionalWhenThrowException() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -305,7 +312,8 @@ void testAuthMethodWhenNoContextAndNoAuthenticator() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -347,7 +355,8 @@ void testAuthMethodWhenThereIsContextAndNoAuthenticator() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -387,7 +396,8 @@ void testAuthMethodWhenNoContextButThereIsAuthenticator() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

//noinspection unchecked,rawtypes
Authenticator<Map> mockedAuthenticator = Mockito.mock(Authenticator.class);
Expand Down
17 changes: 2 additions & 15 deletions services/src/main/java/io/scalecube/services/Microservices.java
Original file line number Diff line number Diff line change
Expand Up @@ -610,17 +610,6 @@ public Context tags(Map<String, String> tags) {
return this;
}

/**
* Setter for {@link ServiceRegistry}.
*
* @param serviceRegistry serviceRegistry
* @return this
*/
public Context serviceRegistry(ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
return this;
}

/**
* Setter for {@link ServiceDiscoveryFactory}.
*
Expand Down Expand Up @@ -734,16 +723,14 @@ private Context conclude() {
.orElse((message, dataType) -> message);
}

if (serviceRegistry == null) {
serviceRegistry = new ServiceRegistryImpl();
}

if (tags == null) {
tags = new HashMap<>();
}

schedulerSuppliers.forEach((s, supplier) -> schedulers.put(s, supplier.get()));

serviceRegistry = new ServiceRegistryImpl(schedulers);

return this;
}

Expand Down
Loading

0 comments on commit 134e36c

Please sign in to comment.