diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java index 659b7bdc3..aecc99e37 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java @@ -5,9 +5,13 @@ */ package org.hibernate.reactive.mutiny.impl; -import io.smallrye.mutiny.Uni; -import jakarta.persistence.criteria.CriteriaBuilder; -import jakarta.persistence.metamodel.Metamodel; +import java.lang.invoke.MethodHandles; +import java.util.Objects; +import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; + import org.hibernate.Cache; import org.hibernate.internal.SessionCreationOptions; import org.hibernate.internal.SessionFactoryImpl; @@ -25,12 +29,9 @@ import org.hibernate.service.ServiceRegistry; import org.hibernate.stat.Statistics; -import java.lang.invoke.MethodHandles; -import java.util.Objects; -import java.util.concurrent.CompletionStage; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Supplier; +import io.smallrye.mutiny.Uni; +import jakarta.persistence.criteria.CriteriaBuilder; +import jakarta.persistence.metamodel.Metamodel; import static org.hibernate.reactive.common.InternalStateAssertions.assertUseOnEventLoop; @@ -106,7 +107,12 @@ public Uni openSession(String tenantId) { */ private Uni create(ReactiveConnection connection, Supplier supplier) { return Uni.createFrom().item( supplier ) - .onFailure().call( () -> Uni.createFrom().completionStage( connection.close() ) ); + .onCancellation().call( () -> close( connection ) ) + .onFailure().call( () -> close( connection ) ); + } + + private static Uni close(ReactiveConnection connection) { + return Uni.createFrom().completionStage( connection.close() ); } @Override @@ -209,8 +215,8 @@ private Uni withSession( return sessionUni.chain( session -> Uni.createFrom().voidItem() .invoke( () -> context.put( contextKey, session ) ) .chain( () -> work.apply( session ) ) - .eventually( () -> context.remove( contextKey ) ) - .eventually(session::close) + .onTermination().invoke( () -> context.remove( contextKey ) ) + .onTermination().call( session::close ) ); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java index 0ba5b8e8d..4ffc8b3d0 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java @@ -5,13 +5,16 @@ */ package org.hibernate.reactive.pool.impl; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.hibernate.engine.jdbc.spi.SqlExceptionHelper; import org.hibernate.engine.jdbc.spi.SqlStatementLogger; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.pool.ReactiveConnectionPool; +import io.vertx.core.Future; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.SqlConnection; @@ -41,13 +44,13 @@ public abstract class SqlClientPool implements ReactiveConnectionPool { /** * @return a Hibernate {@link SqlStatementLogger} for logging SQL - * statements as they are executed + * statements as they are executed */ protected abstract SqlStatementLogger getSqlStatementLogger(); /** * @return a Hibernate {@link SqlExceptionHelper} for converting - * exceptions + * exceptions */ protected abstract SqlExceptionHelper getSqlExceptionHelper(); @@ -58,9 +61,7 @@ public abstract class SqlClientPool implements ReactiveConnectionPool { * subclasses which support multitenancy. * * @param tenantId the id of the tenant - * * @throws UnsupportedOperationException if multitenancy is not supported - * * @see ReactiveConnectionPool#getConnection(String) */ protected Pool getTenantPool(String tenantId) { @@ -88,13 +89,33 @@ public CompletionStage getConnection(String tenantId, SqlExc } private CompletionStage getConnectionFromPool(Pool pool) { - return pool.getConnection() - .toCompletionStage().thenApply( this::newConnection ); + return completionStage( pool.getConnection().map( this::newConnection ), ReactiveConnection::close ); } private CompletionStage getConnectionFromPool(Pool pool, SqlExceptionHelper sqlExceptionHelper) { - return pool.getConnection() - .toCompletionStage().thenApply( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ); + return completionStage( + pool.getConnection().map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ), + ReactiveConnection::close + ); + } + + /** + * @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation. + */ + private CompletionStage completionStage(Future future, Consumer onCancellation) { + CompletableFuture completableFuture = new CompletableFuture<>(); + future.onComplete( ar -> { + if ( ar.succeeded() ) { + if ( completableFuture.isCancelled() ) { + onCancellation.accept( ar.result() ); + } + completableFuture.complete( ar.result() ); + } + else { + completableFuture.completeExceptionally( ar.cause() ); + } + } ); + return completableFuture; } private SqlClientConnection newConnection(SqlConnection connection) {