From 40d08ddc88c284bb6f93b4a9ebf9b4adc7e6a5e0 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Thu, 30 Nov 2023 15:21:08 +0100 Subject: [PATCH] [#1436] Test behaviour on cancel signal --- hibernate-reactive-core/build.gradle | 3 + .../hibernate/reactive/BaseReactiveTest.java | 5 + .../hibernate/reactive/CancelSignalTest.java | 177 ++++++++++++++++++ 3 files changed, 185 insertions(+) create mode 100644 hibernate-reactive-core/src/test/java/org/hibernate/reactive/CancelSignalTest.java diff --git a/hibernate-reactive-core/build.gradle b/hibernate-reactive-core/build.gradle index 97088a502..574086aab 100644 --- a/hibernate-reactive-core/build.gradle +++ b/hibernate-reactive-core/build.gradle @@ -34,6 +34,9 @@ dependencies { testImplementation "io.vertx:vertx-mssql-client:${vertxSqlClientVersion}" testImplementation "io.vertx:vertx-oracle-client:${vertxSqlClientVersion}" + // Metrics + testImplementation "io.vertx:vertx-micrometer-metrics:${vertxSqlClientVersion}" + // Optional dependency of vertx-pg-client, essential when connecting via SASL SCRAM testImplementation 'com.ongres.scram:client:2.1' diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BaseReactiveTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BaseReactiveTest.java index 07eb16d65..c3178a2c3 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BaseReactiveTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BaseReactiveTest.java @@ -36,6 +36,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.smallrye.mutiny.Uni; import io.vertx.core.Promise; import io.vertx.core.VertxOptions; @@ -43,6 +45,7 @@ import io.vertx.junit5.Timeout; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; +import io.vertx.micrometer.MicrometerMetricsOptions; import jakarta.persistence.criteria.CriteriaQuery; import static java.util.concurrent.TimeUnit.MINUTES; @@ -73,7 +76,9 @@ public abstract class BaseReactiveTest { static RunTestOnContext testOnContext = new RunTestOnContext( vertxOptions() ); private static VertxOptions vertxOptions() { + Metrics.addRegistry( new SimpleMeterRegistry() ); return new VertxOptions() + .setMetricsOptions( new MicrometerMetricsOptions().setEnabled( true ) ) .setBlockedThreadCheckInterval( 10 ) .setBlockedThreadCheckIntervalUnit( MINUTES ); } diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/CancelSignalTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/CancelSignalTest.java new file mode 100644 index 000000000..d1ae51355 --- /dev/null +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/CancelSignalTest.java @@ -0,0 +1,177 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive; + +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; + +import org.jboss.logging.Logger; + +import io.micrometer.core.instrument.Metrics; +import io.smallrye.mutiny.subscription.Cancellable; +import io.vertx.junit5.VertxTestContext; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import static java.util.Arrays.stream; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.stream.Stream.concat; +import static org.assertj.core.api.Assertions.assertThat; + +public class CancelSignalTest extends BaseReactiveTest { + private static final Logger LOG = Logger.getLogger( CancelSignalTest.class ); + + @Override + protected Collection> annotatedEntities() { + return List.of( GuineaPig.class ); + } + + @Test + public void cleanupConnectionWhenCancelSignal(VertxTestContext context) { + // larger than 'sql pool size' to check entering the 'pool waiting queue' + int executeSize = 10; + CountDownLatch firstSessionWaiter = new CountDownLatch( 1 ); + Queue cancellableQueue = new ConcurrentLinkedQueue<>(); + + ExecutorService withSessionExecutor = Executors.newFixedThreadPool( executeSize ); + // Create some jobs that are going to be cancelled asynchronously + CompletableFuture[] withSessionFutures = IntStream + .range( 0, executeSize ) + .mapToObj( i -> runAsync( + () -> { + CountDownLatch countDownLatch = new CountDownLatch( 1 ); + Cancellable cancellable = getMutinySessionFactory() + .withSession( s -> { + LOG.debug( "start withSession: " + i ); + sleep( 100 ); + firstSessionWaiter.countDown(); + return s.find( GuineaPig.class, 1 ); + } ) + .onTermination().invoke( () -> { + countDownLatch.countDown(); + LOG.debug( "future " + i + " terminated" ); + } ) + .subscribe().with( item -> LOG.debug( "end withSession: " + i ) ); + cancellableQueue.add( cancellable ); + await( countDownLatch ); + }, + withSessionExecutor + ) ) + .toArray( CompletableFuture[]::new ); + + // Create jobs that are going to cancel the previous ones + ExecutorService cancelExecutor = Executors.newFixedThreadPool( executeSize ); + CompletableFuture[] cancelFutures = IntStream + .range( 0, executeSize ) + .mapToObj( i -> runAsync( + () -> { + await( firstSessionWaiter ); + cancellableQueue.poll().cancel(); + sleep( 500 ); + }, + cancelExecutor + ) ) + .toArray( CompletableFuture[]::new ); + + CompletableFuture allFutures = allOf( concat( stream( withSessionFutures ), stream( cancelFutures ) ) + .toArray( CompletableFuture[]::new ) + ); + + // Test that there shouldn't be any pending process + test( context, allFutures.thenAccept( x -> assertThat( sqlPendingMetric() ).isEqualTo( 0.0 ) ) ); + } + + private static double sqlPendingMetric() { + return Metrics.globalRegistry.find( "vertx.sql.processing.pending" ) + .gauge() + .value(); + } + + private static void await(CountDownLatch latch) { + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException( e ); + } + } + + private static void sleep(int millis) { + try { + // Add sleep to create a test that delays processing + Thread.sleep( millis ); + } + catch (InterruptedException e) { + throw new RuntimeException( e ); + } + } + + @Entity(name = "GuineaPig") + @Table(name = "Pig") + public static class GuineaPig { + @Id + private Integer id; + private String name; + + public GuineaPig() { + } + + public GuineaPig(Integer id, String name) { + this.id = id; + this.name = name; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return id + ": " + name; + } + + @Override + public boolean equals(Object o) { + if ( this == o ) { + return true; + } + if ( o == null || getClass() != o.getClass() ) { + return false; + } + GuineaPig guineaPig = (GuineaPig) o; + return Objects.equals( name, guineaPig.name ); + } + + @Override + public int hashCode() { + return Objects.hash( name ); + } + } +}