Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsumerObservable doesn't propagate the cancellation #186

Open
cakper opened this issue Apr 13, 2020 · 0 comments
Open

ConsumerObservable doesn't propagate the cancellation #186

cakper opened this issue Apr 13, 2020 · 0 comments

Comments

@cakper
Copy link
Contributor

cakper commented Apr 13, 2020

During the upgrade to 1.0.0-RC5 we've noticed that the consumer keeps pooling messaged after it was terminated. A minimal reproduction case was added to this PR:
https://github.com/monix/monix-kafka/compare/master...cakper:cancelable-repro?expand=1

I've noticed that if the doAfterSubscribe is set before bufferTimedWithPressure then the consumer is not terminated properly, but if I change it to be after then it is.

Upon investigation @Avasil has found in the Observer implementation:

https://github.com/monix/monix-kafka/blob/v1.0.0-RC5/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala#L77

Observer.feed always returns Continue if iterator is empty so downstream doesn't get the chance to propagate cancelation and it keeps polling until there is anything available in the topic

I'm not sure about the fix though, on the user side you could probably use takeUntil or takeUntilEval + Deferred which will cancel the subscription (along with the Task) or just cancel the observableTask itself
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant