You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I've noticed that if the doAfterSubscribe is set before bufferTimedWithPressure then the consumer is not terminated properly, but if I change it to be after then it is.
Upon investigation @Avasil has found in the Observer implementation:
https://github.com/monix/monix-kafka/blob/v1.0.0-RC5/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala#L77
Observer.feed always returns Continue if iterator is empty so downstream doesn't get the chance to propagate cancelation and it keeps polling until there is anything available in the topic
I'm not sure about the fix though, on the user side you could probably use takeUntil or takeUntilEval + Deferred which will cancel the subscription (along with the Task) or just cancel the observableTask itself
The text was updated successfully, but these errors were encountered:
During the upgrade to
1.0.0-RC5
we've noticed that the consumer keeps pooling messaged after it was terminated. A minimal reproduction case was added to this PR:https://github.com/monix/monix-kafka/compare/master...cakper:cancelable-repro?expand=1
I've noticed that if the
doAfterSubscribe
is set beforebufferTimedWithPressure
then the consumer is not terminated properly, but if I change it to be after then it is.Upon investigation @Avasil has found in the Observer implementation:
The text was updated successfully, but these errors were encountered: