Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Having queue in nodejs' memory makes cqrs-saga not amenable for clustering #47

Open
rakamoviz opened this issue Mar 28, 2019 · 8 comments

Comments

@rakamoviz
Copy link

rakamoviz commented Mar 28, 2019

The fact that queue of events maintained in memory, made me suspect that this is not going to work in clustered environment. Ref.: https://github.com/adrai/node-cqrs-saga/blob/9770440df0e50b5a3897607a07e0252315b25edf/lib/orderQueue.js#L11

Some tests confirmed it. My test was simple:

I run two instances of my "cqrs-saga + cqrs-domain" node. One is subscribed to redis-channel named "SHARD_01", and the other one is subscribed to redis-channel "SHARD_02" (for the events and publishing commands to).

I have two events: orderUpdated and orderCreated (this simulates an action from a REST interface, and is supposed to move a saga forward). The event orderCreated should be processed before orderUpdated.

Thanks to revisionGuard, even if the events arrive out of order at the process manager, the event-processing order stated above can be respected. Revision-guard keeps orderUpdated in the queue (in case it arrives first). Once orderCreated arrives (later), it will process orderCreated, and followed by orderUpdated (pulled out of the queue). For that mechanism to work, I set the revision number of orderCreated to 1, and orderUpdated to 2.

It works just fine, as long as the events arrive on the same node running cqrs-saga.

The problem is: the queue is maintained in memory. So, when I have two instances of cqrs-saga (and purposedly, for this test, the events are sent to different instance), things break.

Only event orderCreated is processed (at node 01). Node 02 (where orderUpdated is queued) is not aware of the fact that orderCreated has arrived (and has been processed). So it stucks there, and so does the instance of the saga.

I think the quick fix would be: move the queue to Redis (have it as a list).

But a more general (more proper) fix would be to modify the way revisionGuard keeps / checks information in revisionGuard store, and act accordingly (e.g.: once node 01 has finished processing orderCreated, it will set something in revisionGuard store to the revision number of orderCreated. ... Node 02, in a loop checking the revisionGuard, will eventually detect that, and starts pulling orderUpdated out of queue, and commence its handling).

@imissyouso
Copy link

imissyouso commented Jan 23, 2020

Any progress here? We met the same issue and can't find quick solution.
Seems currentHandlingRevisions must be shared across different processes of the same saga.

@dmitry-textmagic
Copy link

does it mean that revisionGuard cannot handle multiple instances of the saga at all? Or did we just misunderstood revisionGuard purpose – if so, could you please briefly explain it, please?

@adrai
Copy link
Contributor

adrai commented Jan 23, 2020

Have missed this issue completely... sorry.

cqrs-saga IS able to exist as multiple instances... should not be any problem... (exactly there is the revisionGuard for this...)

About the memory leak... is this a huge issue? normally, when working with microservices, these are not that long-living anyway... but contributions to help are always welcome

@imissyouso
Copy link

imissyouso commented Jan 23, 2020

Hi @adrai,

cqrs-saga IS able to exist as multiple instances... should not be any problem... (exactly there is the revisionGuard for this...)

what if the same event (with the same revision value) will be received by several instances of the same saga at the same time (from bus)? Seems it will be handled as 2 different events in parallel. In my opinion event should be handled only once by instance which "first managed to grab" event but current revisionGuard implementation doesn't help us to reach such result.
We put concatenatedId key with revInEvt + 1 in store only when first saga step was done (in done callback actually) so before this, another instance of saga can start processing of the same event in parallel too. I found mechanism which should avoid us from such cases but seems it works only for single saga process. Checks which determines should be event handled or not listed below (revisionGuard.js):

    function proceed (revInStore) {
      if (!revInStore && !self.currentHandlingRevisions[concatenatedId] && (self.options.startRevisionNumber === undefined || self.options.startRevisionNumber === null)) {
        self.currentHandlingRevisions[concatenatedId] = revInEvt;
        debug('first revision to store [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);
        callback(null, function (clb) {
          self.finishGuard(evt, revInStore, clb);
        });
        return;
      }

      if (revInStore && revInEvt < revInStore) {
        debug('event already handled [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
        callback(new AlreadyHandledError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already handled!'), function (clb) {
          clb(null);
        });
        return;
      }

      if (revInStore && revInEvt > revInStore) {
        debug('queue event [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
        self.queueEvent(evt, callback);
        return;
      }

      if (!revInStore && self.options.startRevisionNumber >= 0 && revInEvt > self.options.startRevisionNumber) {
        debug('queue event (startRevisionNumber is set) [concatenatedId]=' + concatenatedId + ', [startRevisionNumber]=' + self.options.startRevisionNumber + ', [revInEvt]=' + revInEvt);
        self.queueEvent(evt, callback);
        return;
      }

      if (!revInStore && self.currentHandlingRevisions[concatenatedId] >= 0 && revInEvt > self.currentHandlingRevisions[concatenatedId]) {
        debug('queue event [concatenatedId]=' + concatenatedId + ', [currentlyHandling]=' + self.currentHandlingRevisions[concatenatedId] + ', [revInEvt]=' + revInEvt);
        self.queueEvent(evt, callback);
        return;
      }

      if (self.currentHandlingRevisions[concatenatedId] >= revInEvt) {
        debug('event already handling [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
        callback(new AlreadyHandlingError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already handling!'), function (clb) {
          clb(null);
        });
        return;
      }

      self.currentHandlingRevisions[concatenatedId] = revInEvt;

      debug('event is in correct order => go for it! [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
      callback(null, function (clb) {
        self.finishGuard(evt, revInStore, clb);
      });
    }

here we check currentHandlingRevisions and if it is already contain concatenatedId of the event we drop it. But this object stored in memory and is visible only inside of current saga instance, therefore if currentHandlingRevisions will be shared across all saga instances (for example by storing in Redis) it will solve the issue. What do you think?

@adrai
Copy link
Contributor

adrai commented Jan 23, 2020

in all my cqrs systems so far the messages were sent “normally” just once, additionally the message distribution was always sharded by context+aggregate+aggregateId so an event-stream was always sent to the same process...

Is your “double handling” case something that happens often in your system?

But in general yes, feel free to contribute.

Btw: in theory https://github.com/adrai/node-cqrs-eventdenormalizer has the same problem

@imissyouso
Copy link

imissyouso commented Jan 23, 2020

yes, now I see that we just need to solve this on distribution level (bus), as you said channel should be sharded by context+aggregate+aggregateId.

But in general yes, feel free to contribute.

always ready to do that. Especially when we decided to use your libraries in our new big project...

@imissyouso
Copy link

imissyouso commented Jan 23, 2020

btw, how you created so complex and "academical" event sourcing/cqrs system? I reviewed a lot of similar systems and only your looks like full implementation of theory from the book of Chris Richardson (https://www.amazon.com/Microservices-Patterns-examples-Chris-Richardson/dp/1617294543). Who was inspired you? Your results is respectful.

@adrai
Copy link
Contributor

adrai commented Jan 23, 2020

Thank you very much for these nice words...
the awesome guy who pushed me in this world was @jamuhl 😊
and then I read the classics... https://github.com/adrai/cqrs-sample#why-should-i-care

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants