-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Having queue in nodejs' memory makes cqrs-saga not amenable for clustering #47
Comments
Any progress here? We met the same issue and can't find quick solution. |
does it mean that revisionGuard cannot handle multiple instances of the saga at all? Or did we just misunderstood revisionGuard purpose – if so, could you please briefly explain it, please? |
Have missed this issue completely... sorry. cqrs-saga IS able to exist as multiple instances... should not be any problem... (exactly there is the revisionGuard for this...) About the memory leak... is this a huge issue? normally, when working with microservices, these are not that long-living anyway... but contributions to help are always welcome |
Hi @adrai,
what if the same event (with the same revision value) will be received by several instances of the same saga at the same time (from bus)? Seems it will be handled as 2 different events in parallel. In my opinion event should be handled only once by instance which "first managed to grab" event but current function proceed (revInStore) {
if (!revInStore && !self.currentHandlingRevisions[concatenatedId] && (self.options.startRevisionNumber === undefined || self.options.startRevisionNumber === null)) {
self.currentHandlingRevisions[concatenatedId] = revInEvt;
debug('first revision to store [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);
callback(null, function (clb) {
self.finishGuard(evt, revInStore, clb);
});
return;
}
if (revInStore && revInEvt < revInStore) {
debug('event already handled [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(new AlreadyHandledError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already handled!'), function (clb) {
clb(null);
});
return;
}
if (revInStore && revInEvt > revInStore) {
debug('queue event [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
self.queueEvent(evt, callback);
return;
}
if (!revInStore && self.options.startRevisionNumber >= 0 && revInEvt > self.options.startRevisionNumber) {
debug('queue event (startRevisionNumber is set) [concatenatedId]=' + concatenatedId + ', [startRevisionNumber]=' + self.options.startRevisionNumber + ', [revInEvt]=' + revInEvt);
self.queueEvent(evt, callback);
return;
}
if (!revInStore && self.currentHandlingRevisions[concatenatedId] >= 0 && revInEvt > self.currentHandlingRevisions[concatenatedId]) {
debug('queue event [concatenatedId]=' + concatenatedId + ', [currentlyHandling]=' + self.currentHandlingRevisions[concatenatedId] + ', [revInEvt]=' + revInEvt);
self.queueEvent(evt, callback);
return;
}
if (self.currentHandlingRevisions[concatenatedId] >= revInEvt) {
debug('event already handling [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(new AlreadyHandlingError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already handling!'), function (clb) {
clb(null);
});
return;
}
self.currentHandlingRevisions[concatenatedId] = revInEvt;
debug('event is in correct order => go for it! [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(null, function (clb) {
self.finishGuard(evt, revInStore, clb);
});
} here we check |
in all my cqrs systems so far the messages were sent “normally” just once, additionally the message distribution was always sharded by context+aggregate+aggregateId so an event-stream was always sent to the same process... Is your “double handling” case something that happens often in your system? But in general yes, feel free to contribute. Btw: in theory https://github.com/adrai/node-cqrs-eventdenormalizer has the same problem |
yes, now I see that we just need to solve this on distribution level (bus), as you said channel should be sharded by context+aggregate+aggregateId.
always ready to do that. Especially when we decided to use your libraries in our new big project... |
btw, how you created so complex and "academical" event sourcing/cqrs system? I reviewed a lot of similar systems and only your looks like full implementation of theory from the book of Chris Richardson (https://www.amazon.com/Microservices-Patterns-examples-Chris-Richardson/dp/1617294543). Who was inspired you? Your results is respectful. |
Thank you very much for these nice words... |
The fact that queue of events maintained in memory, made me suspect that this is not going to work in clustered environment. Ref.: https://github.com/adrai/node-cqrs-saga/blob/9770440df0e50b5a3897607a07e0252315b25edf/lib/orderQueue.js#L11
Some tests confirmed it. My test was simple:
I run two instances of my "cqrs-saga + cqrs-domain" node. One is subscribed to redis-channel named "SHARD_01", and the other one is subscribed to redis-channel "SHARD_02" (for the events and publishing commands to).
I have two events: orderUpdated and orderCreated (this simulates an action from a REST interface, and is supposed to move a saga forward). The event orderCreated should be processed before orderUpdated.
Thanks to revisionGuard, even if the events arrive out of order at the process manager, the event-processing order stated above can be respected. Revision-guard keeps orderUpdated in the queue (in case it arrives first). Once orderCreated arrives (later), it will process orderCreated, and followed by orderUpdated (pulled out of the queue). For that mechanism to work, I set the revision number of orderCreated to 1, and orderUpdated to 2.
It works just fine, as long as the events arrive on the same node running cqrs-saga.
The problem is: the queue is maintained in memory. So, when I have two instances of cqrs-saga (and purposedly, for this test, the events are sent to different instance), things break.
Only event orderCreated is processed (at node 01). Node 02 (where orderUpdated is queued) is not aware of the fact that orderCreated has arrived (and has been processed). So it stucks there, and so does the instance of the saga.
I think the quick fix would be: move the queue to Redis (have it as a list).
But a more general (more proper) fix would be to modify the way revisionGuard keeps / checks information in revisionGuard store, and act accordingly (e.g.: once node 01 has finished processing orderCreated, it will set something in revisionGuard store to the revision number of orderCreated. ... Node 02, in a loop checking the revisionGuard, will eventually detect that, and starts pulling orderUpdated out of queue, and commence its handling).
The text was updated successfully, but these errors were encountered: