Skip to content

Commit

Permalink
Execute async controller methods without Reactor (#10212)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstepanov authored Dec 6, 2023
1 parent 0b293c5 commit 4d93f38
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
* The completable future execution flow.
Expand All @@ -38,8 +38,8 @@ public interface CompletableFutureExecutionFlow<T> extends ExecutionFlow<T> {
* @return a new flow
*/
@NonNull
static <K> ExecutionFlow<K> just(@NonNull CompletableFuture<K> value) {
return (ExecutionFlow<K>) new CompletableFutureExecutionFlowImpl((CompletableFuture<Object>) value);
static <K> ExecutionFlow<K> just(@NonNull CompletionStage<K> value) {
return (ExecutionFlow<K>) new CompletableFutureExecutionFlowImpl((CompletionStage<Object>) value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
@Internal
final class CompletableFutureExecutionFlowImpl implements CompletableFutureExecutionFlow<Object> {

private CompletableFuture<Object> stage;
private CompletionStage<Object> stage;

CompletableFutureExecutionFlowImpl(CompletableFuture<Object> stage) {
CompletableFutureExecutionFlowImpl(CompletionStage<Object> stage) {
this.stage = stage;
}

Expand Down Expand Up @@ -106,9 +106,10 @@ public void onComplete(BiConsumer<? super Object, Throwable> fn) {
@Nullable
@Override
public ImperativeExecutionFlow<Object> tryComplete() {
if (stage.isDone()) {
CompletableFuture<Object> completableFuture = stage.toCompletableFuture();
if (completableFuture.isDone()) {
try {
return new ImperativeExecutionFlowImpl(stage.getNow(null), null);
return new ImperativeExecutionFlowImpl(completableFuture.getNow(null), null);
} catch (Throwable throwable) {
if (throwable instanceof CompletionException completionException) {
throwable = completionException.getCause();
Expand All @@ -122,7 +123,7 @@ public ImperativeExecutionFlow<Object> tryComplete() {

@Override
public CompletableFuture<Object> toCompletableFuture() {
return stage;
return stage.toCompletableFuture();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Error
import io.micronaut.http.annotation.Filter
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.ResponseFilter
import io.micronaut.http.annotation.ServerFilter
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.filter.HttpServerFilter
import io.micronaut.http.filter.ServerFilterChain
Expand All @@ -21,6 +23,8 @@ import org.reactivestreams.Publisher
import spock.lang.Specification
import spock.lang.Unroll

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicBoolean

class InvocationStackSpec extends Specification {
Expand All @@ -38,15 +42,56 @@ class InvocationStackSpec extends Specification {
embeddedServer.close()

where:
method << ["blocking", "nonblocking", "executeOn",
method << ["async", "asyncIO", "blocking", "nonblocking", "executeOn",
"withOneReactiveFilter", "withOneReactiveFilterExecuteOn",
"withTwoReactiveFilters", "withTwoReactiveFiltersExecuteOn",
"exception", "throwsExecuteOnEx"]
}

static void checkInvocationStack(boolean allowExecutor = false, boolean allowReactor = false) {
for (StackTraceElement s in new RuntimeException().getStackTrace()) {
if (!isKnownStack(s.className, allowExecutor, allowExecutor)) {
throw new RuntimeException("Unknown stack member: " + s.className);
}
}
}

static boolean isKnownStack(String className, boolean allowExecutor, boolean allowReactor) {
if (allowExecutor) {
if (className.startsWith("java.util.concurrent")) {
return true
}
}
if (className.startsWith("io.netty")) {
return true
}
if (className.startsWith("io.micronaut")) {
return true
}
if (className.startsWith("jdk.internal") || className.startsWith("java.lang")) {
return true // Java
}
if (className.startsWith("org.codehaus.groovy") || className.startsWith("org.apache.groovy") || className.startsWith("groovy.lang")) {
return true // Groovy
}
if (allowReactor) {
if (className == "reactor.core.publisher.Mono" || className == "reactor.core.publisher.MonoFromPublisher") {
return false
}
}
return false
}

@Requires(property = "spec", value = "InvocationStackSpec")
@Client("/stack-check")
static interface StackCheckClient {

@Get("/async")
String async()

@Get("/async-io")
String asyncIO()

@Get("/blocking")
String blocking()

Expand Down Expand Up @@ -87,16 +132,29 @@ class InvocationStackSpec extends Specification {
@Inject
MyTwoFilter2 twoFilters2

@Get("/async")
CompletionStage<String> async() {
checkInvocationStack()
return CompletableFuture.completedFuture("OK")
}

@ExecuteOn(TaskExecutors.IO)
@Get("/async-io")
CompletionStage<String> asyncIO() {
checkInvocationStack(true)
return CompletableFuture.completedFuture("OK")
}

@Get("/blocking")
String blocking() {
checkInvocationStack(false)
checkInvocationStack()
return "OK"
}

@Get("/nonblocking")
@NonBlocking
String nonblocking() {
checkInvocationStack(false)
checkInvocationStack()
return "OK"
}

Expand All @@ -105,7 +163,7 @@ class InvocationStackSpec extends Specification {
if (!oneFilter.getExecuted().get()) {
throw new IllegalStateException()
}
checkInvocationStack(false)
checkInvocationStack()
return "OK"
}

Expand All @@ -124,7 +182,7 @@ class InvocationStackSpec extends Specification {
if (!twoFilters1.getExecuted().get() || !twoFilters2.getExecuted().get()) {
throw new IllegalStateException()
}
checkInvocationStack(false)
checkInvocationStack()
return "OK"
}

Expand All @@ -133,7 +191,7 @@ class InvocationStackSpec extends Specification {
if (!twoFilters1.getExecuted().get() || !twoFilters2.getExecuted().get()) {
throw new IllegalStateException()
}
checkInvocationStack(true)
checkInvocationStack()
return "OK"
}

Expand All @@ -146,19 +204,19 @@ class InvocationStackSpec extends Specification {

@Get("/exception")
String throwsEx() {
checkInvocationStack(false)
checkInvocationStack()
throw new MyException()
}

@Error(MyException)
HttpResponse<?> onException(MyException e) {
checkInvocationStack(false)
checkInvocationStack()
return HttpResponse.ok("OK")
}

@Get("/exception-execute-on")
String throwsExecuteOnEx() {
checkInvocationStack(false)
checkInvocationStack()
throw new MyException2()
}

Expand All @@ -169,36 +227,6 @@ class InvocationStackSpec extends Specification {
return HttpResponse.ok("OK")
}

void checkInvocationStack(boolean allowExecutor) {
for (StackTraceElement s in new RuntimeException().getStackTrace()) {
if (!isKnownStack(s.className, allowExecutor)) {
throw new RuntimeException("Unknown stack member: " + s.className);
}
}
}

boolean isKnownStack(String className, boolean allowExecutor) {
if (className.startsWith("java.util.concurrent")) {
return true
}
if (className.startsWith("io.netty")) {
return true
}
if (className.startsWith("io.micronaut")) {
return true
}
if (className.startsWith("jdk.internal") || className.startsWith("java.lang")) {
return true // Java
}
if (className.startsWith("org.codehaus.groovy") || className.startsWith("org.apache.groovy")) {
return true // Spock
}
if (className == "reactor.core.publisher.Mono" || className == "reactor.core.publisher.MonoFromPublisher") {
return true // added for kotlin context filters
}
return false
}

}

static class MyException extends RuntimeException {
Expand All @@ -217,37 +245,68 @@ class InvocationStackSpec extends Specification {

}

@Requires(property = "spec", value = "InvocationStackSpec")
@ServerFilter("/stack-check/async")
static class AsyncFilter {

@ResponseFilter
HttpResponse<?> filterResponse(HttpResponse<?> httpResponse) {
checkInvocationStack()
return httpResponse
}
}

@Requires(property = "spec", value = "InvocationStackSpec")
@ServerFilter("/stack-check/async-io")
static class AsyncIoFilter {

@ResponseFilter
HttpResponse<?> filterResponse(HttpResponse<?> httpResponse) {
if (!Thread.currentThread().name.startsWith("io-executor")) {
throw new IllegalAccessException()
}
checkInvocationStack(true)
return httpResponse
}
}

@Requires(property = "spec", value = "InvocationStackSpec")
@Filter("/stack-check/with-one-reactive-filter*")
static class MyOneFilter implements HttpServerFilter {

final AtomicBoolean executed = new AtomicBoolean(false)

@Override
Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
checkInvocationStack(false, true)
executed.set(true)
return chain.proceed(request)
}
}

@Requires(property = "spec", value = "InvocationStackSpec")
@Filter("/stack-check/with-two-reactive-filters*")
static class MyTwoFilter1 implements HttpServerFilter {

final AtomicBoolean executed = new AtomicBoolean(false)

@Override
Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
checkInvocationStack(false, true)
executed.set(true)
return chain.proceed(request)
}
}

@Requires(property = "spec", value = "InvocationStackSpec")
@Filter("/stack-check/with-two-reactive-filters*")
static class MyTwoFilter2 implements HttpServerFilter {

final AtomicBoolean executed = new AtomicBoolean(false)

@Override
Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
checkInvocationStack(false, true)
executed.set(true)
return chain.proceed(request)
}
Expand Down
Loading

0 comments on commit 4d93f38

Please sign in to comment.