Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

withTransaction does not keep the same transaction associated with the current Vert.x local context #1121

Closed
guy1699 opened this issue Jan 12, 2022 · 31 comments
Milestone

Comments

@guy1699
Copy link

guy1699 commented Jan 12, 2022

I'm counting on the documentation explaining one can work with the same reactive-stream and keep calling withTransaction to keep adding DB operations to the same transaction, but it does not work.
https://hibernate.org/reactive/documentation/1.0/reference/html_single/#:~:text=When%20you%20create,see%20this%20error%3A

I'm running on Spring boot using Uni all the way.
I'm testing it by inserting to the DB and then throwing an exception, expecting the insert will be rolled back.

I'm calling the endpoint below to test it and the exception being thrown while the new entity is stored in the DB:

public Uni<Entity> save(Entity entity) {
        return reactiveSessionFactory.withTransaction((session, transaction) ->  session.persist(entity).replaceWith(entity));
}

@PutMapping("/create-entity")
 public Uni<Entity> createTenant(@PathVariable String appId) {

      return save(new entity())
              .map(entity -> entity.getName().equals("name") ? throwE() : entity);
}

private Entity throwE(){
     throw new RuntimeException();
}

reactiveSessionFactory created by:

EntityManagerFactory emf = Persistence.createEntityManagerFactory(PERSISTENCE_UNIT_NAME, hibernateReactiveProperties());
      Mutiny.SessionFactory reactiveSessionFactory = emf.unwrap(Mutiny.SessionFactory.class);
@gavinking
Copy link
Member

Tx propagation works if you stay on the same Vert.x local context. I'm not sure how you're implementing that in Spring boot, and I imagine you don't get it for free.

@guy1699
Copy link
Author

guy1699 commented Jan 12, 2022

@gavinking so that means that your comment on same Transaction per Vert.x context (the link I referred to above) is only relevant to Quarkus ?

Can you elaborate more please,
I'm using Mutiny and Spring-boot, does it means that Spring should somehow attach Vert.x context for each reactive stream? in which point that can happen, where Quarkus is doing that?

@gavinking
Copy link
Member

Well not just in Quarkus, in anywhere you have Vert.x driving the calls into your code.

@guy1699
Copy link
Author

guy1699 commented Jan 13, 2022

I still don't get it, how can I roll back a transaction when an error is thrown?

It would not make sense to wrap 100 lines of code with the withTransaction method in order to rollback

 reactiveSessionFactory.withTransaction((session, transaction) -> {
   try{
      doManyDBCalls(session);
   } catch (e){
     transaction.markForRollback();
  }
}

@gavinking
Copy link
Member

@guy1699 it's really hard to discuss this stuff in the abstract, without knowing more about how your system works. It's made even harder by the fact that we've just woken up to the fact that for the last 4 months there's been a bug that rendered tx/session propagation pretty much completely broken (and so what you're experiencing could even just stem from that, I'm not sure).

Basically, tx/session propagation works, by default, by storing the thing in the current Vert.x local context, meaning Vert.x is really the one responsible for propagating these things along the reactive stream.

But this mechanism is completely pluggable: it's implemented by the (currently broken) VertxContext service, but you can in-principle give us your own org.hibernate.reactive.context.Context implementation which uses some other (Spring, for example) way of associating contextual objects with reactive streams.

Or, on the other hand, you always have the option of falling back to openSession() and doing things your own way.

HTH at least a little.

@guy1699
Copy link
Author

guy1699 commented Jan 14, 2022

Thanks for the detailed answer @gavinking (not typical for you :))

So maybe the VertxContext is what I need, not sure if it always comes with Mutiny, regardless of not using Vert.x for managing deployments and verticals/instances.
Vertex only used through smallrye-mutiny-vertx .

Can you share an example how to use it?

@DavideD
Copy link
Member

DavideD commented Jan 14, 2022

I've just released HR 1.1.2.Final. @guy1699 Could you check if this version solves your issue?

@guy1699
Copy link
Author

guy1699 commented Jan 14, 2022

Thanks, I'll check.
Is there a way to print the current reactive-stream (pipeline), some unique identifier? (using Mutiny of course)
What PR did fix the issue?

@DavideD
Copy link
Member

DavideD commented Jan 14, 2022

Can you share an example how to use it?

Maybe the way we use it in the project can help?

And in case you need it, this is how you can access the Hibernate Reactive Vert.x context:

import org.hibernate.reactive.common.spi.Implementor

Mutiny.SessionFactory factory = ... // Same for Stage.SessionFactory
Context vertxContext = ((Implementor) factory).getServiceRegistry().getService( VertxContext.class );

@DavideD
Copy link
Member

DavideD commented Jan 14, 2022

This is the PR that fixes the error @gavinking was talking about: #1118

@guy1699
Copy link
Author

guy1699 commented Jan 14, 2022

Great @DavideD , I feel it's getting there.
So if you can help with testing,

  1. Where can I see a test of this promise to keep the same session/tx along the pipeline, I wonder if I can uniTest it without actual DB?
  2. Is there a way to print the current reactive-stream (pipeline), some unique identifier? (using Mutiny of course)

@guy1699
Copy link
Author

guy1699 commented Jan 14, 2022

Yes, it is working now, I tested it with the code below.
The LOGGER print the same session and also attempt to defy a DB constrain like two entities with the same unique key will throw an error and will roll back the first insert.

  return tenantService.getReactiveSessionFactory().withTransaction((session, transaction) -> {

            Entity entity1 = createNewEntity();
         
            LOGGER.info("------------------------111--------------------------" + session);
            LOGGER.info("------------------------111--------------------------" + entity1);

            return session.persist(entity1)
                    .flatMap(unused -> tenantService.getReactiveSessionFactory().withTransaction((session1, transaction1) -> {
                        Entity entity2 = createNewEntity();
                        LOGGER.info("------------------------2222--------------------------" + session);
                        LOGGER.info("------------------------2222--------------------------" + entity2);
                        return session.persist(entity2);
                    }));


        });

Still appreciate ideas on how to uni test without a real DB.
Can Postgresql containers help?

@DavideD
Copy link
Member

DavideD commented Jan 14, 2022

Awesome!

Where can I see a test of this promise to keep the same session/tx along the pipeline, I wonder if I can uniTest it without actual DB?

We test everything using real database for several reasons:

  • Docker + testcontainers makes it really easy to setup up and start a database when you need one and some databases are really quick (PostgreSQL for example)
  • Even without docker, having a script that starts a db is not too hard
  • I'm personally not a big fun of mocking classes and don't trust tests working using mocks (generally speaking)
  • We don't have a reactive Vert.x SQL client to connect to a in-memory database at the moment (but we have an issue open about it)

In our project you can see several examples on how we unit tests.
We have a super class that we use for all our test cases:

This class is responsible for preparing a SessionFactory that the tests can use and makes sure that a container is started before running the testsuite (if docker is selected as an option).
This is an example of how we use testcontainers with Postgres:

public static final PostgreSQLContainer<?> postgresql = new PostgreSQLContainer<>( IMAGE_NAME )

Is there a way to print the current reactive-stream (pipeline), some unique identifier? (using Mutiny of course)

No idea.

@DavideD
Copy link
Member

DavideD commented Jan 14, 2022

There is also now an integration test added to the project to check for #1073: #1123

There are some caveats explained in the PR. In the future I want to improve the test but for now it should be good enough.

I think that if you want to test this type of errors, you need to create a situation where you run different operations in parallel and then check that everything has been executed in the right order.

In this paragraph of the documentation we touch briefly about the session and the contexts with an example of the kind of coding that doesn't work when using an hibernate session: I don't think there is an easy: http://hibernate.org/reactive/documentation/1.1/reference/html_single/#_sessions_and_vert_x_contexts

By the way, our project is not so big, so if you want a working example, you should be able to clone the repository and run the tests from command line and the IDE and see working example of the tests in action. The README should contain enough infor to start (hopefully).

@DavideD
Copy link
Member

DavideD commented Jan 14, 2022

Closing this issue because it's been solved by the latest release

@DavideD DavideD closed this as completed Jan 14, 2022
@DavideD DavideD added this to the 1.1.2.Final milestone Jan 14, 2022
@acasanova99
Copy link

Yes, it is working now, I tested it with the code below. The LOGGER print the same session and also attempt to defy a DB constrain like two entities with the same unique key will throw an error and will roll back the first insert.

  return tenantService.getReactiveSessionFactory().withTransaction((session, transaction) -> {

            Entity entity1 = createNewEntity();
         
            LOGGER.info("------------------------111--------------------------" + session);
            LOGGER.info("------------------------111--------------------------" + entity1);

            return session.persist(entity1)
                    .flatMap(unused -> tenantService.getReactiveSessionFactory().withTransaction((session1, transaction1) -> {
                        Entity entity2 = createNewEntity();
                        LOGGER.info("------------------------2222--------------------------" + session);
                        LOGGER.info("------------------------2222--------------------------" + entity2);
                        return session.persist(entity2);
                    }));


        });

Still appreciate ideas on how to uni test without a real DB. Can Postgresql containers help?

Hi,

Could you please hand me how you get your "tenantService" instance?

@DavideD
Copy link
Member

DavideD commented Jun 23, 2022

I think it's a service he created in his project. What are you trying to do?

@acasanova99
Copy link

I am migrating an aplication from imperative to quarkus. The application is quite large, but the problem is localized when loading the demo data:

[STEP 1] First I need to persist some entities of type A (about 1k in the current tests).

[STEP 2] Then, the test pass these instances of A to other application (through REST), and gets some data.

[STEP 3] The application need to update the instances of A with some of the information retrieved in the STEP 2.

My problem is that, after creating the instances, the session is closed. So it is not possible to update the entities. I get HR000057. (If I run the updates in another worker Thread, I get the issue related with this post).

So my question is, how can I keep the session open? I am using my own repository with the Mutiny.SesionFactory.withTransaction() for creating and updating the instances.

I think I should read a bit about vertx context, but any help/documentation is apreciated.

@DavideD
Copy link
Member

DavideD commented Jun 24, 2022

I'm not sure how you can keep the session open (do you really want to do that?). But you can implement step 3 by merging the entities to the database. This will attach the entities to a new session and apply the changes.

@acasanova99
Copy link

How would you update the entities with Mutiny.SesionFactory.withTransaction()?

I will prefer to open and close the session on the first step and then open another one for the third step.

I mean, it is like using two transactions:

BEGIN TRANSACTION
   STEP 1
COMMIT

-- Apply some logic in between --

BEGIN TRANSACTION
   STEP 3
COMMIT

But, all the steps are done in a single function call.

@DavideD
Copy link
Member

DavideD commented Jun 24, 2022

How would you update the entities with Mutiny.SesionFactory.withTransaction()?
This should work:

List<MyEntity> entitiesWithChanges = ... // entities from Step 2
Mutiny.SessionFactory sf = ...
sf.withTransaction(session -> session.merge(entitiesWithChanges.toArray()));

But yes, it means opening two transactions.

Can you show me some of the code?

@DavideD
Copy link
Member

DavideD commented Jun 24, 2022

Maybe, I've misunderstood what you are trying to do. If you can show some code, I might have a better answer.
You should be able to pass the session around, now that I think about it.

@acasanova99
Copy link

Sure, here is the repository method:

@Inject
Mutiny.SessionFactory sf;
  
public Uni<Party> create(final Party party) {
    return sf.withTransaction((session, tx) -> session.persistAll(party,party.partyPersonalInfo))
        .replaceWith(this.findByKeys(party.partyKey));
}

public Uni<Party> update(final Party party) {
    return sf.withTransaction((session, tx) -> session.merge(party))
        .replaceWith(this.findByKeys(party.partyKey));
}

CreateDemoFile:

...

// Create the parties in the app BEGIN STEP 1
lstParty.forEach(element -> blParty.create(element).await().indefinitely());

// COMMIT STEP 1
        
// Create the parties in the KC as users from the given parties BEGIN STEP 2
Map<String, String> kcSummary = blKeycloak.createAllFromPartyDEMO(lstParty)
    .await()
    .indefinitely();
// END STEP 2 (Uses other application)

// Error summary
for (String usr : kcSummary.keySet()) {
  LOGGER.warn("--> PARTY: {}, ERROR : {} .(skipping)", usr, kcSummary.get(usr));
}

// BEGIN STEP 3 (this method call update for each party)
// Enrich all the parties with the username, isUser and isEnable field from KC 
lstParty = blKeycloak.listEnrichParties(TENANT).await().indefinitely();
// COMMIT STEP 3

...

I use await() since this is a file to generate some mock data just for testing purposes, in production, the application does not block the threads.

I need to create some users (parties), then register the user in Keycloak and finally update the users after keycloak retrieve me some information.

PD: I have a bl (basic logic) layer, that translate an entity into a DTO, but blParty.create() calls repoParty.create().

@DavideD
Copy link
Member

DavideD commented Jun 27, 2022

Something like this wouldn't work?
In a service bean

@Inject
Mutiny.SessionFactory factory:

...
// Open the session
factory.withSession( session ->  session
        // Step 1
        .withTransaction( tx -> lstParty.forEach(element -> blParty.create(element) ) )
        // Step 2
        .map( unused -> blKeycloak.createAllFromPartyDEMO(lstParty) )
        .invoke( kcSummary -> kscSummary.forEach( (key, value) -> LOGGER.warn("--> PARTY: {}, ERROR : {} .(skipping)", key, value) )
        .chain( kcSummary -> session.withTransaction( tx -> {
            // STEP 3: The session is still open
           ...
        })
} );

Hibernate Reactive should be able to find the open session or transaction and reuse it in the repo.
As an alternative, you could pass the open session to the repository:

factory.withSession( session ->  session
        // Step 1
        .withTransaction( tx -> lstParty.forEach(element -> blParty.create(session, element) ) )
...

@DavideD
Copy link
Member

DavideD commented Jun 27, 2022

By the way, I explain different ways to test with Hibernate Reactive and Panache using quarkus-test-vertx on this answer on StackOverflow: https://stackoverflow.com/questions/72648062/how-to-chain-2-uni-in-unit-test-using-panache-withtransaction-without-getti/72675643#72675643

We need to update the documentation on this subject.

@acasanova99
Copy link

Thanks for the help @DavideD, just one last question. How can I pass a session through several methods?

@DavideD
Copy link
Member

DavideD commented Jun 29, 2022

If we assume that the caller starts the transaction (like in my previous example), you can change blParty.create to:

public Uni<Party> create(Mutiny.Session session, final Party party) {
    return session.persistAll(party,party.partyPersonalInfo)
        .replaceWith(this.findByKeys(party.partyKey));
}

otherwise:

public Uni<Party> create(Mutiny.Session session, final Party party) {
    return session.withTransaction( tx -> session
        .persistAll(party,party.partyPersonalInfo).replaceWith(this.findByKeys(party.partyKey))
    );
}

It depends on how you want to design your app.
Does it answer your question?

@acasanova99
Copy link

Okay, yes

I was wondering if it possible to keep the session open without chaining the operations. But as this is only for loading data to a test DB, I will try the quarkus-test-vertx you suggested before

Thanks again

@DavideD
Copy link
Member

DavideD commented Jun 29, 2022

I was wondering if it possible to keep the session open without chaining the operations

How would you make sure that things happen in the right order (step 1 -> step 2 -> step 3). But, most important, you cannot use the same session on parallel chains of operations. It will lead to issues that are hard to debug.

@DavideD
Copy link
Member

DavideD commented Jun 29, 2022

And, just as a reminder, nested calls to withSession will return the same session (same for the transaction). So, it shouldn't be necessary to pass the session (or the transaction) around.

@acasanova99
Copy link

I was wondering if it possible to keep the session open without chaining the operations

How would you make sure that things happen in the right order (step 1 -> step 2 -> step 3). But, most important, you cannot use the same session on parallel chains of operations. It will lead to issues that are hard to debug.

I solve the original issue about the (step 1 -> step 2 -> step 3) by chaining the operations, after that I get some troubles with other parts of the demo loader. Sorry, I didn't explain it properly

This was one of the problems, I was running some of the methods on a worker thread, thus I get:

Error occurred while destroying instance of bean [io.quarkus.hibernate.reactive.runtime.ReactiveSessionProducer_ProducerMethod_createMutinySession_1321d110ee9e92bda147899150401e0a136779c7_Bean]: java.util.concurrent.CompletionException: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread: 'vert.x-eventloop-thread-3' current Thread: 'vert.x-eventloop-thread-1'

Now that I have remove the runSubscriptionOn, the demo is working again.

Yeah, I understand that concurrence can lead to race conditions (that will be translated into DB inconsistencies), but as this is a Demo. I just need to load some bulk (fake) data, so using the same session on parallel chains of operations may be useful to load non related entities at once.

Anyway, thanks to your help, I have a clearest idea about how HR handle the sessions. TY!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants