-
I'm not sure the way that I'm creating a durable push consumer is correct, I'm trying to configure a competing consumer pattern on a stream. This is how I create the stream: _jetStreamManagement.AddStream(StreamConfiguration.Builder()
.WithName("NewOrders")
.WithSubjects("Orders.Created.*")
.WithRetentionPolicy(RetentionPolicy.WorkQueue)
.Build()); This is how I create the consumer: _jetStreamManagement.AddOrUpdateConsumer("NewOrders", ConsumerConfiguration.Builder()
.WithDurable("NewOrderProcessor")
.WithDeliverSubject("NewOrdersToProcess")
.WithDeliverGroup("NewOrderProcessors")
.WithAckPolicy(AckPolicy.Explicit)
.Build()); I create a subscriber like this: _subscription = _jetStream.PushSubscribeAsync("Orders.Created.*", HandleOrderCreated, false, PushSubscribeOptions.Builder()
.WithStream("NewOrders")
.WithDurable("NewOrderProcessor")
.WithDeliverSubject("NewOrdersToProcess")
.WithDeliverGroup("NewOrderProcessors")
.Build()); The problem with this is when I connect the above subscriber, the subscriber doesn't receive any messages. If I add another subscription with It appears that the |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
Your current subscribe is making a conflicting consumer of the same name that does not match the consumer you have already created. _subscription = _jetStream.PushSubscribeAsync(null, HandleOrderCreated, false,
PushSubscribeOptions.BindTo("NewOrders", "NewOrderProcessor")); |
Beta Was this translation helpful? Give feedback.
-
Sorry I missed putting the queue name in the subscribe call. Here is a full example.
|
Beta Was this translation helpful? Give feedback.
Sorry I missed putting the queue name in the subscribe call. Here is a full example.