diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 270dc3cb3cb3c..d4ec6368c7a22 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -4565,13 +4565,6 @@ org.apache.kafka kafka-clients ${kafka.version} - - - - org.lz4 - lz4-java - - at.yawk.lz4 diff --git a/docs/src/main/asciidoc/hibernate-reactive.adoc b/docs/src/main/asciidoc/hibernate-reactive.adoc index 0fecffe0b8928..a0ed910ebd487 100644 --- a/docs/src/main/asciidoc/hibernate-reactive.adoc +++ b/docs/src/main/asciidoc/hibernate-reactive.adoc @@ -492,7 +492,7 @@ Mixing both transaction management styles in the same reactive pipeline is not s In the future, we'll deprecate the previous annotations provided by Panache and and support only `@Transactional`. You can inject either `Mutiny.Session` or `Mutiny.StatelessSession`. -Be careful of injecting both session types in the same bean. Attempting to use both session types within the same transaction will throw an `IllegalStateException`. +Mixing both session types in the same transaction should work, but should be reserved for exotic use cases implemented by advanced users, as the (stateful) session will not be aware of changes operated through the stateless session, which could thus conflict or be silently erased by (stateful) session writes. [[transactional-different-pipelines]] === Using Declarative Transaction Management in different reactive pipelines diff --git a/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/hibernate/orm/stateless/MixStatelessStatefulSessionTest.java b/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/hibernate/orm/stateless/MixStatelessStatefulSessionTest.java new file mode 100644 index 0000000000000..58a643c063a03 --- /dev/null +++ b/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/hibernate/orm/stateless/MixStatelessStatefulSessionTest.java @@ -0,0 +1,136 @@ +package io.quarkus.hibernate.orm.stateless; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; + +import jakarta.inject.Inject; + +import org.hibernate.Session; +import org.hibernate.StatelessSession; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.hibernate.orm.MyEntity; +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.test.QuarkusExtensionTest; + +/** + * Verifies that Hibernate ORM allows mixing regular Session and StatelessSession + * within the same transaction. + */ +public class MixStatelessStatefulSessionTest { + + @RegisterExtension + static QuarkusExtensionTest runner = new QuarkusExtensionTest() + .withApplicationRoot((jar) -> jar + .addClass(MyEntity.class)) + .withConfigurationResource("application.properties"); + + @Inject + Session session; + + @Inject + StatelessSession statelessSession; + + @Test + public void testRegularSessionThenStatelessSessionInSameTransaction() { + // Use regular Session, then StatelessSession in same transaction + QuarkusTransaction.requiringNew().run(() -> { + MyEntity entity = new MyEntity("testRegular"); + session.persist(entity); + session.flush(); + // Now use StatelessSession in the same transaction - should work without error + List list = statelessSession + .createSelectionQuery("SELECT e.name from MyEntity e WHERE e.name = 'testRegular'", String.class) + .getResultList(); + assertThat(list).containsOnly("testRegular"); + }); + + // Verify it was persisted + QuarkusTransaction.requiringNew().run(() -> { + long count = session + .createSelectionQuery("SELECT count(e) from MyEntity e WHERE e.name = 'testRegular'", Long.class) + .getSingleResult(); + assertThat(count).isEqualTo(1); + }); + } + + @Test + public void testStatelessSessionThenRegularSessionInSameTransaction() { + // Use StatelessSession, then regular Session in same transaction + QuarkusTransaction.requiringNew().run(() -> { + MyEntity entity = new MyEntity("testStateless"); + statelessSession.insert(entity); + // Now use regular Session in the same transaction - should work without error + List list = session + .createSelectionQuery("SELECT e.name from MyEntity e WHERE e.name = 'testStateless'", String.class) + .getResultList(); + assertThat(list).containsOnly("testStateless"); + }); + + // Verify it was persisted + QuarkusTransaction.requiringNew().run(() -> { + long count = session + .createSelectionQuery("SELECT count(e) from MyEntity e WHERE e.name = 'testStateless'", Long.class) + .getSingleResult(); + assertThat(count).isEqualTo(1); + }); + } + + @Test + public void testBothSessionsCommitTogether() { + // Make changes via both sessions, verify both commit together + QuarkusTransaction.requiringNew().run(() -> { + MyEntity entity1 = new MyEntity("commitViaRegular"); + session.persist(entity1); + session.flush(); + + MyEntity entity2 = new MyEntity("commitViaStateless"); + statelessSession.insert(entity2); + }); + + // Verify both were persisted + QuarkusTransaction.requiringNew().run(() -> { + long count1 = session + .createSelectionQuery("SELECT count(e) from MyEntity e WHERE e.name = 'commitViaRegular'", Long.class) + .getSingleResult(); + long count2 = session + .createSelectionQuery("SELECT count(e) from MyEntity e WHERE e.name = 'commitViaStateless'", Long.class) + .getSingleResult(); + assertThat(count1).isEqualTo(1); + assertThat(count2).isEqualTo(1); + }); + } + + @Test + public void testBothSessionsRollbackTogether() { + // Make changes via both sessions, then rollback - verify both rolled back + assertThatThrownBy(() -> { + QuarkusTransaction.requiringNew().run(() -> { + MyEntity entity1 = new MyEntity("rollbackViaRegular"); + session.persist(entity1); + session.flush(); + + MyEntity entity2 = new MyEntity("rollbackViaStateless"); + statelessSession.insert(entity2); + + throw new RuntimeException("Force rollback"); + }); + }).isInstanceOf(RuntimeException.class) + .hasMessage("Force rollback"); + + // Verify neither was persisted + QuarkusTransaction.requiringNew().run(() -> { + long count1 = session + .createSelectionQuery("SELECT count(e) from MyEntity e WHERE e.name = 'rollbackViaRegular'", Long.class) + .getSingleResult(); + long count2 = session + .createSelectionQuery("SELECT count(e) from MyEntity e WHERE e.name = 'rollbackViaStateless'", Long.class) + .getSingleResult(); + assertThat(count1).isEqualTo(0); + assertThat(count2).isEqualTo(0); + }); + } +} diff --git a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/HibernateReactiveStatelessTransactionsTest.java b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/HibernateReactiveStatelessTransactionsTest.java index 54a61c0a0e690..371020333e8e0 100644 --- a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/HibernateReactiveStatelessTransactionsTest.java +++ b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/HibernateReactiveStatelessTransactionsTest.java @@ -9,7 +9,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequired; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequired; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.test.vertx.RunOnVertxContext; import io.quarkus.test.vertx.UniAsserter; diff --git a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/HibernateReactiveTransactionsTest.java b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/HibernateReactiveTransactionsTest.java index 3550b7fbf1956..60a9b1a854fc2 100644 --- a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/HibernateReactiveTransactionsTest.java +++ b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/HibernateReactiveTransactionsTest.java @@ -9,7 +9,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequired; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequired; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.test.vertx.RunOnVertxContext; import io.quarkus.test.vertx.UniAsserter; diff --git a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/InterleaveTest.java b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/InterleaveTest.java index b41304a2467e7..4b22e0a4ba603 100644 --- a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/InterleaveTest.java +++ b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/InterleaveTest.java @@ -15,7 +15,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.hibernate.reactive.runtime.OpenedSessionsState; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequired; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequired; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.test.vertx.RunOnVertxContext; import io.quarkus.test.vertx.UniAsserter; diff --git a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/SupportOnlyRequiredTransactionTypeTest.java b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/SupportOnlyRequiredTransactionTypeTest.java index 7f3458ea74fd9..9c2af4d8bc204 100644 --- a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/SupportOnlyRequiredTransactionTypeTest.java +++ b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/SupportOnlyRequiredTransactionTypeTest.java @@ -7,11 +7,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import io.quarkus.reactive.transaction.TransactionalInterceptorMandatory; -import io.quarkus.reactive.transaction.TransactionalInterceptorNever; -import io.quarkus.reactive.transaction.TransactionalInterceptorNotSupported; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequiresNew; -import io.quarkus.reactive.transaction.TransactionalInterceptorSupports; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorMandatory; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorNever; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorNotSupported; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequiresNew; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorSupports; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.test.vertx.RunOnVertxContext; import io.smallrye.mutiny.Uni; diff --git a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/mixing/MixStatelessStatefulSessionTest.java b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/mixing/MixStatelessStatefulSessionTest.java index 4bc37b9f41784..b4e1b70144b1e 100644 --- a/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/mixing/MixStatelessStatefulSessionTest.java +++ b/extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/mixing/MixStatelessStatefulSessionTest.java @@ -14,13 +14,18 @@ import io.quarkus.test.vertx.RunOnVertxContext; import io.quarkus.test.vertx.UniAsserter; import io.smallrye.mutiny.Uni; +import io.vertx.mutiny.sqlclient.Pool; +/** + * Verifies that Hibernate Reactive allows mixing regular Session and StatelessSession + * within the same transaction. + */ public class MixStatelessStatefulSessionTest { @RegisterExtension static final QuarkusUnitTest config = new QuarkusUnitTest() .withApplicationRoot(jar -> jar.addClasses(Hero.class)) - .withConfigurationResource("application.properties"); + .withConfigurationResource("application-reactive-transaction.properties"); @Inject Mutiny.Session session; @@ -28,42 +33,99 @@ public class MixStatelessStatefulSessionTest { @Inject Mutiny.StatelessSession statelessSession; + @Inject + Pool pool; + + @Test + @RunOnVertxContext + public void testRegularSessionThenStatelessSessionInTransactional(UniAsserter asserter) { + // Use regular Session, then StatelessSession in same transaction - should work without error + asserter.execute(() -> assertThat(pool.size()).isEqualTo(1)); + asserter.assertThat( + () -> transactionalMethodUsingRegularSessionThenStatelessSession(), + count -> assertThat(count).isNotNull()); + // Verify pool size is still 1 (connection was shared and released) + asserter.execute(() -> assertThat(pool.size()).isEqualTo(1)); + } + @Test @RunOnVertxContext public void testStatelessSessionThenRegularSessionInTransactional(UniAsserter asserter) { - asserter.assertFailedWith( + // Use StatelessSession, then regular Session in same transaction - should work without error + asserter.execute(() -> assertThat(pool.size()).isEqualTo(1)); + asserter.assertThat( () -> transactionalMethodUsingStatelessSessionThenRegularSession(), - e -> assertThat(e) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("stateless session for the same Persistence Unit is already opened") - .hasMessageContaining( - "Mixing different kinds of sessions in the same transaction is not supported yet")); + count -> assertThat(count).isNotNull()); + // Verify pool size is still 1 (connection was shared and released) + asserter.execute(() -> assertThat(pool.size()).isEqualTo(1)); } @Test @RunOnVertxContext - public void testRegularSessionThenStatelessSessionInTransactional(UniAsserter asserter) { + public void testBothSessionsCommitTogether(UniAsserter asserter) { + // Make changes via both sessions, verify both commit together + asserter.assertThat( + () -> transactionalMethodCreatingViaBothSessions("CommitHero1", "CommitHero2"), + v -> assertThat(v).isNull()); + + // Verify both were persisted + asserter.assertThat( + () -> transactionalMethodCountingHeroes("CommitHero%"), + count -> assertThat(count).isEqualTo(2L)); + } + + @Test + @RunOnVertxContext + public void testBothSessionsRollbackTogether(UniAsserter asserter) { + // Make changes via both sessions, then throw exception - verify both rolled back asserter.assertFailedWith( - () -> transactionalMethodUsingRegularSessionThenStatelessSession(), + () -> transactionalMethodCreatingViaBothSessionsThenFailing("RollbackHero1", "RollbackHero2"), e -> assertThat(e) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("session for the same Persistence Unit is already opened") - .hasMessageContaining( - "Mixing different kinds of sessions in the same transaction is not supported yet")); + .isInstanceOf(RuntimeException.class) + .hasMessage("Force rollback")); + + // Verify neither was persisted + asserter.assertThat( + () -> transactionalMethodCountingHeroes("RollbackHero%"), + count -> assertThat(count).isEqualTo(0L)); } @Transactional - public Uni transactionalMethodUsingRegularSessionThenStatelessSession() { - return session.createQuery("select count(1) from Hero h", Long.class).getSingleResult() - .chain(count -> statelessSession.createQuery("select count(1) from Hero h", Long.class) + public Uni transactionalMethodUsingRegularSessionThenStatelessSession() { + return session.createSelectionQuery("select count(h) from Hero h", Long.class).getSingleResult() + .chain(count -> statelessSession.createSelectionQuery("select count(h) from Hero h", Long.class) .getSingleResult()); } @Transactional - public Uni transactionalMethodUsingStatelessSessionThenRegularSession() { - return statelessSession.createQuery("select count(1) from Hero h", Long.class).getSingleResult() - .chain(count -> session.createQuery("select count(1) from Hero h", Long.class) + public Uni transactionalMethodUsingStatelessSessionThenRegularSession() { + return statelessSession.createSelectionQuery("select count(h) from Hero h", Long.class).getSingleResult() + .chain(count -> session.createSelectionQuery("select count(h) from Hero h", Long.class) .getSingleResult()); } + @Transactional + public Uni transactionalMethodCreatingViaBothSessions(String name1, String name2) { + Hero hero1 = new Hero(name1); + return session.persist(hero1) + .call(() -> session.flush()) + .chain(() -> { + Hero hero2 = new Hero(name2); + return statelessSession.insert(hero2); + }); + } + + @Transactional + public Uni transactionalMethodCreatingViaBothSessionsThenFailing(String name1, String name2) { + return transactionalMethodCreatingViaBothSessions(name1, name2) + .chain(() -> Uni.createFrom().failure(new RuntimeException("Force rollback"))); + } + + @Transactional + public Uni transactionalMethodCountingHeroes(String namePattern) { + return session.createSelectionQuery("select count(h) from Hero h where h.name like :name", Long.class) + .setParameter("name", namePattern) + .getSingleResult(); + } + } diff --git a/extensions/hibernate-reactive/deployment/src/test/resources/application-reactive-transaction.properties b/extensions/hibernate-reactive/deployment/src/test/resources/application-reactive-transaction.properties index 04cc0f41acdc1..9a04628eba993 100644 --- a/extensions/hibernate-reactive/deployment/src/test/resources/application-reactive-transaction.properties +++ b/extensions/hibernate-reactive/deployment/src/test/resources/application-reactive-transaction.properties @@ -1,5 +1,8 @@ quarkus.datasource.db-kind=postgresql quarkus.datasource.reactive=true +# Using max-size=1 ensures that: +# 1. Connection management is tested properly (connection must be released after transaction) +# 2. Session types sharing the same connection is verified (tests would hang if separate connections were used) quarkus.datasource.reactive.max-size=1 quarkus.hibernate-orm.log.sql=true diff --git a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/HibernateReactiveRecorder.java b/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/HibernateReactiveRecorder.java index 6c08459887981..ab2608949fb2c 100644 --- a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/HibernateReactiveRecorder.java +++ b/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/HibernateReactiveRecorder.java @@ -1,7 +1,7 @@ package io.quarkus.hibernate.reactive.runtime; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.PERSISTENCE_UNIT_NAME_KEY; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.PERSISTENCE_UNIT_NAME_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; import java.util.List; import java.util.Locale; @@ -134,17 +134,6 @@ public static Mutiny.Session getSession(String persistenceUnitName) { } else if (context.getLocal(TRANSACTIONAL_METHOD_KEY) == null) { throw new IllegalStateException(noSessionFoundErrorMessage()); } else { - - Optional> openedStatelessSession = OPENED_SESSIONS_STATE_STATELESS - .getOpenedSession( - context, - persistenceUnitName); - - if (openedStatelessSession.isPresent()) { - throw new IllegalStateException("A stateless session for the same Persistence Unit is already opened." - + "\n\t- Mixing different kinds of sessions in the same transaction is not supported yet."); - } - // Store the persistence unit name so that we can close only this session at the end of the interceptor context.putLocal(PERSISTENCE_UNIT_NAME_KEY, persistenceUnitName); @@ -179,17 +168,6 @@ public static Mutiny.StatelessSession getStatelessSession(String persistenceUnit } else if (context.getLocal(TRANSACTIONAL_METHOD_KEY) == null) { throw new IllegalStateException(noSessionFoundErrorMessage()); } else { - - Optional> openedRegularSession = OPENED_SESSIONS_STATE - .getOpenedSession( - context, - persistenceUnitName); - - if (openedRegularSession.isPresent()) { - throw new IllegalStateException("A (non-stateless) session for the same Persistence Unit is already opened." - + "\n\t- Mixing different kinds of sessions in the same transaction is not supported yet."); - } - // Store the persistence unit name so that we can close only this session at the end of the interceptor context.putLocal(PERSISTENCE_UNIT_NAME_KEY, persistenceUnitName); diff --git a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/QuarkusReactiveConnectionPoolInitiator.java b/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/QuarkusReactiveConnectionPoolInitiator.java index 7badd1199c1f6..1c93b2b259766 100644 --- a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/QuarkusReactiveConnectionPoolInitiator.java +++ b/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/QuarkusReactiveConnectionPoolInitiator.java @@ -31,7 +31,7 @@ public ReactiveConnectionPool initiateService(Map configurationValues, ServiceRe // nothing to do, but given the separate hierarchies have to handle this here. return null; } - return new QuarkusSqlClientPool(new TransactionalContextPool(pool)); + return new QuarkusSqlClientPool(new io.quarkus.reactive.transaction.runtime.pool.TransactionalContextPool(pool)); } } diff --git a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/transaction/HibernateActionsStrategy.java b/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/transaction/HibernateActionsStrategy.java index 02cfd09d49a71..f1482881f5bd9 100644 --- a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/transaction/HibernateActionsStrategy.java +++ b/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/transaction/HibernateActionsStrategy.java @@ -1,14 +1,14 @@ package io.quarkus.hibernate.reactive.runtime.transaction; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.PERSISTENCE_UNIT_NAME_KEY; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.PERSISTENCE_UNIT_NAME_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; import java.util.Optional; import jakarta.enterprise.context.ApplicationScoped; import io.quarkus.hibernate.reactive.runtime.HibernateReactiveRecorder; -import io.quarkus.reactive.transaction.ReactiveResource; +import io.quarkus.reactive.transaction.runtime.ReactiveResource; import io.smallrye.mutiny.Uni; import io.vertx.core.Context; diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java index 47b2308207e29..b2ec1ef076806 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java @@ -58,7 +58,7 @@ public abstract class TransactionalInterceptorBase implements Serializable { private static MethodHandle reactiveInterceptorShouldRun() { try { - Class vertxContext = Class.forName("io.quarkus.reactive.transaction.TransactionalInterceptorBase", true, + Class vertxContext = Class.forName("io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase", true, Thread.currentThread().getContextClassLoader()); return MethodHandles.publicLookup().findStatic(vertxContext, "reactiveInterceptorShouldRun", MethodType.methodType(boolean.class)); diff --git a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/ReactiveTransactionalInterceptor.java b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/ReactiveTransactionalInterceptor.java index 69fcbb9e72888..70765983ffa63 100644 --- a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/ReactiveTransactionalInterceptor.java +++ b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/ReactiveTransactionalInterceptor.java @@ -1,8 +1,8 @@ package io.quarkus.hibernate.reactive.panache.common.runtime; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.REACTIVE_TRANSACTIONAL_METHOD_KEY; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.proceedUni; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.REACTIVE_TRANSACTIONAL_METHOD_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.proceedUni; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; diff --git a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/SessionOperations.java b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/SessionOperations.java index 77a0e57f65f30..87dc580d6d962 100644 --- a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/SessionOperations.java +++ b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/SessionOperations.java @@ -1,8 +1,8 @@ package io.quarkus.hibernate.reactive.panache.common.runtime; import static io.quarkus.hibernate.orm.runtime.PersistenceUnitUtil.DEFAULT_PERSISTENCE_UNIT_NAME; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.SESSION_ON_DEMAND_KEY; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.SESSION_ON_DEMAND_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; import java.util.ArrayList; import java.util.HashSet; diff --git a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/WithSessionOnDemandInterceptor.java b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/WithSessionOnDemandInterceptor.java index 1a4aff1197d1b..31838a61d94f7 100644 --- a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/WithSessionOnDemandInterceptor.java +++ b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/WithSessionOnDemandInterceptor.java @@ -1,7 +1,7 @@ package io.quarkus.hibernate.reactive.panache.common.runtime; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.proceedUni; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.reactiveInterceptorShouldRun; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.proceedUni; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.reactiveInterceptorShouldRun; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; diff --git a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/WithTransactionInterceptor.java b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/WithTransactionInterceptor.java index 926c89c6a7cd4..164a8d995baa0 100644 --- a/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/WithTransactionInterceptor.java +++ b/extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/WithTransactionInterceptor.java @@ -1,7 +1,7 @@ package io.quarkus.hibernate.reactive.panache.common.runtime; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.WITH_TRANSACTION_METHOD_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.WITH_TRANSACTION_METHOD_KEY; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; diff --git a/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixReactiveTransactionalTest.java b/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixReactiveTransactionalTest.java index a12dd5cb1f324..49bcf24f93389 100644 --- a/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixReactiveTransactionalTest.java +++ b/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixReactiveTransactionalTest.java @@ -9,7 +9,7 @@ import io.quarkus.hibernate.reactive.panache.common.runtime.ReactiveTransactional; import io.quarkus.hibernate.reactive.runtime.transaction.HibernateActionsStrategy; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequired; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequired; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.test.vertx.RunOnVertxContext; import io.quarkus.test.vertx.UniAsserter; diff --git a/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixWithSessionOnDemandTest.java b/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixWithSessionOnDemandTest.java index ccf1efda88e3d..747353d7d81c9 100644 --- a/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixWithSessionOnDemandTest.java +++ b/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixWithSessionOnDemandTest.java @@ -9,7 +9,7 @@ import io.quarkus.hibernate.reactive.panache.common.WithSessionOnDemand; import io.quarkus.hibernate.reactive.runtime.transaction.HibernateActionsStrategy; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequired; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequired; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.test.vertx.RunOnVertxContext; import io.quarkus.test.vertx.UniAsserter; diff --git a/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixWithTransactionTest.java b/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixWithTransactionTest.java index 0ac177d40d297..b6546d796d6cf 100644 --- a/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixWithTransactionTest.java +++ b/extensions/panache/hibernate-reactive-panache/deployment/src/test/java/io/quarkus/hibernate/reactive/panache/test/transaction/MixWithTransactionTest.java @@ -9,7 +9,7 @@ import io.quarkus.hibernate.reactive.panache.common.WithTransaction; import io.quarkus.hibernate.reactive.runtime.transaction.HibernateActionsStrategy; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequired; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequired; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.test.vertx.RunOnVertxContext; import io.quarkus.test.vertx.UniAsserter; diff --git a/extensions/reactive-transactions/deployment/src/main/java/io/quarkus/hibernate/reactive/transactions/deployment/QuarkusReactiveTransactionsProcessor.java b/extensions/reactive-transactions/deployment/src/main/java/io/quarkus/hibernate/reactive/transactions/deployment/QuarkusReactiveTransactionsProcessor.java index 94c06f02ec41e..e1826d02b087e 100644 --- a/extensions/reactive-transactions/deployment/src/main/java/io/quarkus/hibernate/reactive/transactions/deployment/QuarkusReactiveTransactionsProcessor.java +++ b/extensions/reactive-transactions/deployment/src/main/java/io/quarkus/hibernate/reactive/transactions/deployment/QuarkusReactiveTransactionsProcessor.java @@ -2,12 +2,12 @@ import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.reactive.transaction.TransactionalInterceptorMandatory; -import io.quarkus.reactive.transaction.TransactionalInterceptorNever; -import io.quarkus.reactive.transaction.TransactionalInterceptorNotSupported; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequired; -import io.quarkus.reactive.transaction.TransactionalInterceptorRequiresNew; -import io.quarkus.reactive.transaction.TransactionalInterceptorSupports; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorMandatory; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorNever; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorNotSupported; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequired; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorRequiresNew; +import io.quarkus.reactive.transaction.runtime.TransactionalInterceptorSupports; public class QuarkusReactiveTransactionsProcessor { diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/ReactiveResource.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/ReactiveResource.java similarity index 79% rename from extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/ReactiveResource.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/ReactiveResource.java index 0021f4cc22b83..84b16752fb4b5 100644 --- a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/ReactiveResource.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/ReactiveResource.java @@ -1,4 +1,4 @@ -package io.quarkus.reactive.transaction; +package io.quarkus.reactive.transaction.runtime; import io.smallrye.mutiny.Uni; import io.vertx.core.Context; diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorBase.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorBase.java similarity index 69% rename from extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorBase.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorBase.java index 1fe83154e646d..21caaad8f4877 100644 --- a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorBase.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorBase.java @@ -1,4 +1,4 @@ -package io.quarkus.reactive.transaction; +package io.quarkus.reactive.transaction.runtime; import java.lang.annotation.Annotation; import java.lang.reflect.Method; @@ -11,6 +11,7 @@ import org.jboss.logging.Logger; import io.quarkus.arc.runtime.InterceptorBindings; +import io.quarkus.reactive.transaction.runtime.pool.TransactionalContextPool; import io.quarkus.transaction.annotations.Rollback; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.smallrye.mutiny.Uni; @@ -28,10 +29,6 @@ */ public abstract class TransactionalInterceptorBase { - // Used in this class and in TransactionalContextPool to store and get - // the connection with the lazily created Transaction inside the Vert.x Context - public static final String CURRENT_CONNECTION_KEY = "reactive.transaction.currentConnection"; - // This key is used to indicate the method was annotated with @Transactional // And will open a session and a transaction lazily when the first operation requires a reactive session // Check HibernateReactiveRecorder.sessionSupplier to see where the session is injected @@ -114,12 +111,19 @@ private Uni invokeBeforeCommitAndCommit(Context context) { .onItem().invoke(() -> LOG.tracef("Flushed the session before commit/rollback")) .onItemOrFailure().call((result, exception) -> { if (exception != null) { - SqlConnection connection = connectionFromContext(); - return actualRollback(connection.transaction(), exception).invoke(() -> { - // onItemOrFailure() will still propagate the chain even with an execption - // we need to rethrow it to make sure the reactive chain fails - throw new RuntimeException("Transaction rolled back due to: ", exception); - }); + Uni connectionUni = connectionFromContext(); + if (connectionUni == null) { + LOG.tracef("Transaction doesn't exist, cannot rollback, propagating original exception"); + return Uni.createFrom().failure( + new RuntimeException("Transaction rolled back due to: ", exception)); + } + return connectionUni + .onItem() + .transformToUni(connection -> actualRollback(connection.transaction(), exception).invoke(() -> { + // onItemOrFailure() will still propagate the chain even with an execption + // we need to rethrow it to make sure the reactive chain fails + throw new RuntimeException("Transaction rolled back due to: ", exception); + })); } else { return commit(); } @@ -127,24 +131,28 @@ private Uni invokeBeforeCommitAndCommit(Context context) { } private Uni closeConnection() { - SqlConnection connection = connectionFromContext(); - if (connection == null) { + Future closeFuture = TransactionalContextPool.closeAndClearCurrentConnection(); + if (closeFuture == null) { // io/quarkus/hibernate/reactive/transaction/DisableJTATransactionTest.java:38 LOG.tracef("Connection doesn't exist, nothing to do here"); return Uni.createFrom().nullItem(); } - LOG.tracef("Closing the connection %s", connection); - return toUni(connection.close()); + return toUni(closeFuture) + .invoke(connection -> LOG.tracef("Closing the connection %s", connection)); } - SqlConnection connectionFromContext() { - return Vertx.currentContext().getLocal(CURRENT_CONNECTION_KEY); + Uni connectionFromContext() { + Future future = TransactionalContextPool.getCurrentConnectionFromVertxContext(); + if (future == null) { + return null; + } + return toUni(future); } // Based on org/hibernate/reactive/pool/impl/SqlClientConnection.java:305 Uni commit() { - SqlConnection connection = connectionFromContext(); - if (connection == null || connection.transaction() == null) { + Uni connectionUni = connectionFromContext(); + if (connectionUni == null) { // This might happen if the method is annotated with @Transactional but doesn't flush // i.e. a single persist without an explicit .flush() // We then avoid committing the transaction here, and we rely on Hibernate Reactive @@ -153,28 +161,40 @@ Uni commit() { return Uni.createFrom().nullItem(); } - return toUni(connection.transaction().commit()) - .onFailure().invoke(() -> LOG.tracef("Failed to commit transaction: %s", connection)) - .invoke(() -> LOG.tracef("Transaction committed: %s", connection)); + return connectionUni + .onItem().transformToUni(connection -> { + if (connection.transaction() == null) { + LOG.tracef("Transaction doesn't exist, so won't commit here"); + return Uni.createFrom().nullItem(); + } + return toUni(connection.transaction().commit()) + .onFailure().invoke(() -> LOG.tracef("Failed to commit transaction: %s", connection)) + .invoke(() -> LOG.tracef("Transaction committed: %s", connection)); + }); } - private static Uni toUni(Future future) { + private static Uni toUni(Future future) { return Uni.createFrom() .emitter(emitter -> future.onComplete(emitter::complete, emitter::fail)); } Uni rollbackOnCancel() { - SqlConnection connection = connectionFromContext(); - Transaction transaction = connection.transaction(); - return toUni(transaction.rollback()) - .onFailure().invoke(() -> LOG.tracef("Failed to rollback transaction on cancellation: %s", connection)) - .invoke(() -> LOG.tracef("Transaction rolled back due to cancellation: %s", transaction)); + Uni connectionUni = connectionFromContext(); + if (connectionUni == null) { + LOG.tracef("Transaction doesn't exist, so won't roll back"); + return Uni.createFrom().nullItem(); + } + return connectionUni.onItem().transformToUni(connection -> { + Transaction transaction = connection.transaction(); + return toUni(transaction.rollback()) + .onFailure() + .invoke(() -> LOG.tracef("Failed to rollback transaction on cancellation: %s", connection)) + .invoke(() -> LOG.tracef("Transaction rolled back due to cancellation: %s", transaction)); + }); } // Based on org/hibernate/reactive/pool/impl/SqlClientConnection.java:314 Uni rollbackOrCommitBasedOnException(Context context, Transactional annotation, Throwable exception) { - SqlConnection connection = connectionFromContext(); - for (Class dontRollbackOnClass : annotation.dontRollbackOn()) { if (dontRollbackOnClass.isAssignableFrom(exception.getClass())) { LOG.trace("Avoid rollback due to `dontRollbackOn` on `@Transactional` annotation, committing instead"); @@ -182,32 +202,41 @@ Uni rollbackOrCommitBasedOnException(Context context, Transactional annota } } - for (Class rollbackOnClass : annotation.rollbackOn()) { - if (rollbackOnClass.isAssignableFrom(exception.getClass())) { - LOG.tracef( - "Rollback the transaction due to exception class %s included in `rollbackOn` field on `@Transactional` annotation", - exception.getClass()); - return actualRollback(connection.transaction(), exception); - } + Uni connectionUni = connectionFromContext(); + if (connectionUni == null) { + LOG.tracef("Transaction doesn't exist, so won't commit or roll back"); + return Uni.createFrom().nullItem(); } + return connectionUni + .onItem().transformToUni(connection -> { + for (Class rollbackOnClass : annotation.rollbackOn()) { + if (rollbackOnClass.isAssignableFrom(exception.getClass())) { + LOG.tracef( + "Rollback the transaction due to exception class %s included in `rollbackOn` field on `@Transactional` annotation", + exception.getClass()); + return actualRollback(connection.transaction(), exception); + } + } - Rollback rollbackAnnotation = exception.getClass().getAnnotation(Rollback.class); - if (rollbackAnnotation != null) { - if (rollbackAnnotation.value()) { - LOG.tracef("Rollback the transaction as the exception class %s is annotated with `@Rollback` annotation", - exception.getClass()); - return actualRollback(connection.transaction(), exception); - } else { - LOG.tracef( - "Do not rollback the transaction as the exception class %s is annotated with `@Rollback(false)` annotation", - exception.getClass()); - return invokeBeforeCommitAndCommit(context); - } - } + Rollback rollbackAnnotation = exception.getClass().getAnnotation(Rollback.class); + if (rollbackAnnotation != null) { + if (rollbackAnnotation.value()) { + LOG.tracef( + "Rollback the transaction as the exception class %s is annotated with `@Rollback` annotation", + exception.getClass()); + return actualRollback(connection.transaction(), exception); + } else { + LOG.tracef( + "Do not rollback the transaction as the exception class %s is annotated with `@Rollback(false)` annotation", + exception.getClass()); + return invokeBeforeCommitAndCommit(context); + } + } - // Default behavior: rollback for RuntimeException and Error (unchecked exceptions) - // Note: Mutiny wraps checked exceptions in CompletionException, so they appear as RuntimeException here - return actualRollback(connection.transaction(), exception); + // Default behavior: rollback for RuntimeException and Error (unchecked exceptions) + // Note: Mutiny wraps checked exceptions in CompletionException, so they appear as RuntimeException here + return actualRollback(connection.transaction(), exception); + }); } private Uni actualRollback(Transaction transaction, Throwable exception) { diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorMandatory.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorMandatory.java similarity index 91% rename from extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorMandatory.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorMandatory.java index 0d57fbefc6181..09c94c072dbcd 100644 --- a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorMandatory.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorMandatory.java @@ -1,4 +1,4 @@ -package io.quarkus.reactive.transaction; +package io.quarkus.reactive.transaction.runtime; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorNever.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorNever.java similarity index 91% rename from extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorNever.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorNever.java index 47b323d3aef0d..861a9bfb5c091 100644 --- a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorNever.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorNever.java @@ -1,4 +1,4 @@ -package io.quarkus.reactive.transaction; +package io.quarkus.reactive.transaction.runtime; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorNotSupported.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorNotSupported.java similarity index 91% rename from extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorNotSupported.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorNotSupported.java index 1d00946c99477..eb01c2c3e1842 100644 --- a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorNotSupported.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorNotSupported.java @@ -1,4 +1,4 @@ -package io.quarkus.reactive.transaction; +package io.quarkus.reactive.transaction.runtime; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorRequired.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorRequired.java similarity index 95% rename from extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorRequired.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorRequired.java index 7ada81e473958..5b3d89fcb828a 100644 --- a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorRequired.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorRequired.java @@ -1,4 +1,4 @@ -package io.quarkus.reactive.transaction; +package io.quarkus.reactive.transaction.runtime; import jakarta.annotation.Priority; import jakarta.enterprise.inject.Instance; diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorRequiresNew.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorRequiresNew.java similarity index 91% rename from extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorRequiresNew.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorRequiresNew.java index 1147f9da096d2..cdc5a989da307 100644 --- a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorRequiresNew.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorRequiresNew.java @@ -1,4 +1,4 @@ -package io.quarkus.reactive.transaction; +package io.quarkus.reactive.transaction.runtime; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorSupports.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorSupports.java similarity index 91% rename from extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorSupports.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorSupports.java index 481b836aacbf6..c82b9f24527eb 100644 --- a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/TransactionalInterceptorSupports.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorSupports.java @@ -1,4 +1,4 @@ -package io.quarkus.reactive.transaction; +package io.quarkus.reactive.transaction.runtime; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; diff --git a/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/ConnectionHolder.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/ConnectionHolder.java new file mode 100644 index 0000000000000..e1fb0079fc37c --- /dev/null +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/ConnectionHolder.java @@ -0,0 +1,61 @@ +package io.quarkus.reactive.transaction.runtime.pool; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +import org.jboss.logging.Logger; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.sqlclient.Pool; + +/** + * Holder for the connection future, ensuring only one connection is created per transaction + * even when multiple sessions request it concurrently. + */ +class ConnectionHolder { + + private static final Logger LOG = Logger.getLogger(ConnectionHolder.class); + + private static final VarHandle OPENED_HANDLE; + + static { + try { + MethodHandles.Lookup lookup = MethodHandles.lookup(); + OPENED_HANDLE = lookup.findVarHandle(ConnectionHolder.class, "opened", boolean.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final Pool delegate; + final Promise connectionPromise = Promise.promise(); + private volatile boolean opened = false; + + ConnectionHolder(Pool delegate) { + this.delegate = delegate; + } + + /** + * Gets the connection, creating it if this is the first call (thread-safe). + * Concurrent calls will wait on the same future. + */ + Future getConnection() { + if (opened) { + return connectionPromise.future(); + } + // Use compareAndSet to ensure only one thread initiates the connection + if (OPENED_HANDLE.compareAndSet(this, false, true)) { + delegate.getConnection() + .compose(connection -> { + LOG.tracef("New connection, about to start transaction: %s", connection); + return connection.begin().map(t -> { + LOG.tracef("Transaction started: %s", connection); + return new TransactionalContextConnection(connection); + }); + }) + .onComplete(connectionPromise); + } + return connectionPromise.future(); + } +} diff --git a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/TransactionalContextConnection.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/TransactionalContextConnection.java similarity index 91% rename from extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/TransactionalContextConnection.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/TransactionalContextConnection.java index 9e864ec28e944..ed2f48f1c799a 100644 --- a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/TransactionalContextConnection.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/TransactionalContextConnection.java @@ -1,4 +1,4 @@ -package io.quarkus.hibernate.reactive.runtime.customized; +package io.quarkus.reactive.transaction.runtime.pool; import org.jboss.logging.Logger; @@ -86,7 +86,8 @@ public boolean isSSL() { @Override public void close(Handler> handler) { - connection.close(handler); + LOG.tracef("Calling no-op close on TransactionalContextConnection it will close after the closing of session"); + handler.handle(Future.succeededFuture()); } @Override @@ -114,4 +115,8 @@ public Future close() { LOG.tracef("Calling no-op close on TransactionalContextConnection it will close after the closing of session"); return Future.succeededFuture(); } + + SqlConnection getDelegate() { + return connection; + } } diff --git a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/TransactionalContextPool.java b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/TransactionalContextPool.java similarity index 55% rename from extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/TransactionalContextPool.java rename to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/TransactionalContextPool.java index f1055aafd25ce..f743cf1fad396 100644 --- a/extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/TransactionalContextPool.java +++ b/extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/TransactionalContextPool.java @@ -1,7 +1,6 @@ -package io.quarkus.hibernate.reactive.runtime.customized; +package io.quarkus.reactive.transaction.runtime.pool; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.CURRENT_CONNECTION_KEY; -import static io.quarkus.reactive.transaction.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; +import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY; import java.util.function.Function; @@ -28,6 +27,9 @@ public class TransactionalContextPool implements Pool { private static final Logger LOG = Logger.getLogger(TransactionalContextPool.class); + // Key to store the connection holder for reuse by multiple sessions + private static final String CURRENT_CONNECTION_KEY = "reactive.transaction.currentConnection"; + private final Pool delegate; public TransactionalContextPool(Pool delegate) { @@ -39,19 +41,10 @@ public void getConnection(Handler> handler) { if (!shouldOpenTransaction()) { delegate.getConnection(handler); } else { - delegate.getConnection(result -> { - if (result.failed()) { - handler.handle(result); - return; - } - var connection = result.result(); - connection.begin() - .map(transaction -> { - storeConnectionInVertxContext(connection); - return (SqlConnection) new TransactionalContextConnection(connection); - }) - .andThen(handler); - }); + getOrCreateConnectionHolder() + .getConnection() + .map(conn -> (SqlConnection) conn) + .onComplete(handler); } } @@ -60,20 +53,60 @@ public Future getConnection() { if (!shouldOpenTransaction()) { return delegate.getConnection(); } else { - return delegate.getConnection() - .compose(connection -> { - LOG.tracef("New connection, about to start transaction: %s", connection); - return connection.begin().map(t -> { - LOG.tracef("Transaction started: %s", connection); - storeConnectionInVertxContext(connection); - return new TransactionalContextConnection(connection); - }); - }); + return getOrCreateConnectionHolder() + .getConnection() + .map(conn -> (SqlConnection) conn); } } - private static void storeConnectionInVertxContext(SqlConnection connection) { - Vertx.currentContext().putLocal(CURRENT_CONNECTION_KEY, connection); + /** + * Gets or creates the ConnectionHolder for the current context. + * The holder ensures only one connection is created even with concurrent access. + */ + private ConnectionHolder getOrCreateConnectionHolder() { + Context context = Vertx.currentContext(); + ConnectionHolder holder = context.getLocal(CURRENT_CONNECTION_KEY); + if (holder == null) { + holder = new ConnectionHolder(delegate); + context.putLocal(CURRENT_CONNECTION_KEY, holder); + } + return holder; + } + + public static Future getCurrentConnectionFromVertxContext() { + Context context = Vertx.currentContext(); + if (context == null) { + return null; + } + ConnectionHolder holder = context.getLocal(CURRENT_CONNECTION_KEY); + if (holder == null) { + return null; + } + return holder.connectionPromise.future(); + } + + /** + * Closes the current connection and clears it from the Vertx context. + * This should be called by TransactionalInterceptorBase at the end of the transaction. + * + * @return a Future that completes when the connection is closed, or null if no connection exists + */ + public static Future closeAndClearCurrentConnection() { + Context context = Vertx.currentContext(); + if (context == null) { + return null; + } + ConnectionHolder holder = context.getLocal(CURRENT_CONNECTION_KEY); + if (holder == null) { + return null; + } + // Wait for the connection to be available, then close it + return holder.connectionPromise.future() + .compose(wrappedConnection -> { + SqlConnection delegateConnection = wrappedConnection.getDelegate(); + return delegateConnection.close(); + }) + .andThen(ar -> context.removeLocal(CURRENT_CONNECTION_KEY)); } private boolean shouldOpenTransaction() { diff --git a/test-framework/junit/src/main/java/io/quarkus/test/junit/launcher/CustomLauncherInterceptor.java b/test-framework/junit/src/main/java/io/quarkus/test/junit/launcher/CustomLauncherInterceptor.java index 319c979717449..8b03396bb9eb4 100644 --- a/test-framework/junit/src/main/java/io/quarkus/test/junit/launcher/CustomLauncherInterceptor.java +++ b/test-framework/junit/src/main/java/io/quarkus/test/junit/launcher/CustomLauncherInterceptor.java @@ -124,7 +124,7 @@ public void launcherDiscoveryFinished(LauncherDiscoveryRequest request) { // On the other hand, if the QuarkusTestExtension is registered by a service loader mechanism, it gets loaded after the discovery phase finishes, // so needs the TCCL to still be the facade classloader. // This compromise does mean you can't use the service loader mechanism to avoid having to use `@QuarkusTest` and also use Quarkus config in your own test extensions, but that combination is very unlikely. - if (!facadeLoader.isServiceLoaderMechanism()) { + if (facadeLoader != null && !facadeLoader.isServiceLoaderMechanism()) { // Do not close the facade loader at this stage, because discovery finished may be called several times within a single run // Ideally we would reset to what the TCCL was when we started discovery, but we can't, // because the intercept method will have set something before the discovery start is triggered. @@ -138,7 +138,7 @@ public void launcherDiscoveryFinished(LauncherDiscoveryRequest request) { if (orderer.isEmpty() || !(orderer.get() .equals(DESIRED_CLASS_ORDERER.getName()) || orderer.get().equals(CONFIG_SETTING_DESIRED_CLASS_ORDERER.getName()))) { - if (facadeLoader.hasMultipleClassLoaders()) { + if (facadeLoader != null && facadeLoader.hasMultipleClassLoaders()) { String message = getFailureMessageForJUnitMisconfiguration(orderer); throw new IllegalStateException(message); } @@ -157,7 +157,7 @@ private static String getFailureMessageForJUnitMisconfiguration(Optional String message; if (orderer.isPresent()) { message = String.format( - "%sTo set a test order while preserving the Quarkus required sorting, please set use the Quarkus configuration to set junit.quarkus.orderer.secondary-orderer=%s, and remove the junit-platform.properties (if any) from the classpath.", + "%sTo set a test order while preserving the Quarkus required sorting, please use the Quarkus configuration to set junit.quarkus.orderer.secondary-orderer=%s, and remove the junit-platform.properties (if any) from the classpath.", generalExplanation, orderer.get()); } else { message = String.format(