diff --git a/src/Rxnet/Operator/OnBackPressureBuffer.php b/src/Rxnet/Operator/OnBackPressureBuffer.php index 2a4f290..d3e5d16 100644 --- a/src/Rxnet/Operator/OnBackPressureBuffer.php +++ b/src/Rxnet/Operator/OnBackPressureBuffer.php @@ -80,8 +80,7 @@ function ($next) { // Add to queue $this->queue->push($next); }, - [$this->subject, 'onError'], - [$this->subject, 'onCompleted'] + [$this->subject, 'onError'] ), $scheduler ); @@ -100,9 +99,10 @@ public function request() // Queue is finished we can return to live stream if ($this->queue->isEmpty()) { $this->pending = false; + $this->subject->onCompleted(); return; } // Take element in order they have been inserted $this->subject->onNext($this->queue->shift()); } -} \ No newline at end of file +}