-
Notifications
You must be signed in to change notification settings - Fork 284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Load-balanced message co-ordination with distributed lock #1015
Comments
No, there is no such feature and no plan to add one at this time. With current implementation, even if available, it may not even be enough. Let me explain: the server normally delivers a given message to a single member of the queue group. However, if the message is not acknowledged in the AckWait interval, it is going to be redelivered, possibly to another queue member. But the user callback may not see that message at the same time (lock would not help) but still be processed twice. For instance, say there are 2 members, server sends messages like this: member 1: m1, m3 Suppose that member 1 processes m1 but take a long time (more than AckWait). The server is going to redeliver this, say to member 2. But member 2 is itself processing m2 and is very slow. The message m1 may again be redelivered and is now in 2 members client library internal queue waiting to be dispatched. |
Thanks for your response. For discussion point, We usually design AckTime is always greater than processing time plus maximum lock acquiring time. In that case this won't be an issue. As an alternate way, we have an option to ack before getting lock, if the ack time too long. We can keep control of this, with try lock with timeout parameters also.
In my case, the actually lock processing logic is in the range of (50 ms to 100 ms). So I think it won't be an issue, having AckTime 30s. |
So again, if you make sure that there is no redelivery possible, then I am not sure what you are asking since by nature the server sends the message to ONLY one of the queue member (of the same group, that is queue name). |
Let me put the assumptions:
Before processing any dependent message, it has to creates a lock with unique name( correlation-key as lock name), If other consumer try to create with same lock name, then it has to wait. If no one has created with same lock name, then it can process imediately. So here wait is based on lock name. not on subscriber level. Note: Generic Subscriber for multiple orders. In this case assume, if message 1 lands on first replica and message 2 on 2nd replica and message 3 on 3rd replica with very small time interval. There is a possibility to process the message concurrently,(Ie before completing message 2, Message 3 will be processed) which is an issue. We need to process the message one by one. so in this case we can use orderId as lock name. so with respect to time we can sequence the message in order, with the help of lock, without blocking other orders flow. By this, |
Sorry for the delay, and again, to make sure that I am not wasting your time, please note that there is no plan to implement distributed lock.
But from previous description, you say that the lock would behave this way:
So in the example you describe, suppose that process that received message 2 is actually executing its code before process that received message 1, then when it ask for the lock for this order ID, nobody has already claim it and so it would accept processing of message 2. Then, say that process that received message 1 request the lock (and is blocked until process that has message 2 completes), then gets the lock, it will then process message 1, then finally message 3 is processed. So the order would be 2, 1 and 3. So you are not guaranteeing the order for this OrderID. |
Thanks for your response, I am not expecting this to be in current or near releases. I am trying to fit NATS for most of the uses cases we are trying in Software Defined Networks, instead combining with other solutions inside our architecture. I think, I made a confusion. Here dependent can be easily identified from lock name, If we could create in NATS context. Lock context can be at any level, It can be at NATS connection or at subscriber level. Only Lock capability is expected ( similar to Hazelcast Distributed lock). We can group the subscribers by including the subscriber name in lock name. I believe manualAck and their wait timings shall be handled by the subscribers configurations effectively based on the consumer nature. |
I have a ONAP use-case to block/wait for the dependent messages processing between load balance queue group consumers.
I am trying to build correlation messaging system, with complex business logic in correlation key identification, With NATS current implementation same correlated messages, may land in different load balance consumers(replicas) and processed concurrently, without understanding their message dependencies, In that case I need to process one by one and co-ordinate between consumers.
It is similar to distributed locking use case.
Right now, I am trying to solve with adding Atomix lib on consumer implementation, The problem with Atomix is, it requires complex static cluster configuration, programing language dependent and some cases both share some common functionalities.
I feel NATS can handle this better, with simple approach ( lock based on subjects )
https://atomix.io/docs/latest/user-manual/primitives/DistributedLock/
Do we have an api to lock/unlock on particular subject with lock name, so that we can block other consumers based on the lock name.
I see, this requirement will be applied in many use cases in Software Defined Network device managements & device control and it will avoid lot of complexity on subject naming, correlation subject based consumers and improved performances.( Ie not locking all messages at subject consumer level, only certain messages level, if necessary)
ONAP Reference code : https://gerrit.onap.org/r/gitweb?p=ccsdk/cds.git;a=blob;f=ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt;h=a6963d83f18628fb8c3dc7b9af371b0def6ceef8;hb=refs/heads/master#l46
Sample implementation, with message lock.
`
MessageHandler { message ->
`
The text was updated successfully, but these errors were encountered: