Skip to content

Commit

Permalink
Sink Queue updates. Publishers.Factory docs.
Browse files Browse the repository at this point in the history
Ensured after either of a Factory's dispatcher completions are called, further enqueue attempts are no-ops
Fixed `Publishers.Factory` example
Fixed EntwineTest examples
  • Loading branch information
tcldr committed Jun 25, 2019
1 parent 5c26eac commit b461919
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 17 deletions.
10 changes: 5 additions & 5 deletions Assets/EntwineTest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ import EntwineTest

// we'll set the schedulers clock a little forward – at 200

let scheduler1 = TestScheduler(initialClock: 200)
let scheduler = TestScheduler(initialClock: 200)

let relativeTimePublisher: TestablePublisher<String, Never> = scheduler.createRelativeTestablePublisher([
(020, .input("Mi")),
Expand Down Expand Up @@ -120,7 +120,7 @@ import Combine
import EntwineTest

let scheduler = TestScheduler()
let passthroughSubject = PassthroughSubject<String, Void>()
let passthroughSubject = PassthroughSubject<String, Never>()

scheduler.schedule(after: 100) { passthroughSubject.send("yippee") }
scheduler.schedule(after: 200) { passthroughSubject.send("ki") }
Expand All @@ -132,14 +132,14 @@ passthroughSubject.subscribe(subscriber)

scheduler.resume()

let expected = TestSequence<String, Self> = [
let expected: TestSequence<String, Never> = [
(000, .subscription),
(100, .input("yippee")),
(200, .input("ki")),
(300, .input("yay")),
]

print("sequences match: \(expected = subscriber.sequence)")
print("sequences match: \(expected == subscriber.sequence)")

// outputs:
// sequences match: true
Expand Down Expand Up @@ -239,4 +239,4 @@ Copyright 2019 © Tristan Celder

_EntwineTest_ is made available under the [MIT License](http://github.com/tcldr/Entwine/blob/master/LICENSE)

---
---
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@ _Inline publisher creation for PhotoKit authorization status:_

import Entwine

let photoKitAuthorizationStatus = Publishers.Factory { dispatcher in
let status = PHPhotoLibrary.authorizationStatus()
let photoKitAuthorizationStatus = Publishers.Factory<PHAuthorizationStatus, Never> { dispatcher in
let status = PHPhotoLibrary.authorizationStatus()
dispatcher.forward(status)
switch status {
case .notDetermined:
PHPhotoLibrary.requestAuthorization { newStatus in
dispatcher.forward(newStatus)
dispatcher.forwardCompletion(.finished)
dispatcher.forward(completion: .finished)
}
case .restricted, .denied, .authorized:
dispatcher.forward(.authorized)
dispatcher.forwardCompletion(.finished)
default:
dispatcher.forward(completion: .finished)
}
return AnyCancellable {}
}
```
## Test publisher behavior
Expand Down
3 changes: 3 additions & 0 deletions Sources/Common/Utilities/SinkQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SinkQueue<Sink: Subscriber> {
private var demandQueued: Subscribers.Demand { .max(buffer.count) }

private var completion: Subscribers.Completion<Sink.Failure>?
private var isActive: Bool { sink != nil && completion == nil }

init(sink: Sink) {
self.sink = sink
Expand All @@ -52,11 +53,13 @@ class SinkQueue<Sink: Subscriber> {
}

func enqueue(_ input: Sink.Input) -> Subscribers.Demand {
guard isActive else { return .none }
buffer.enqueue(input)
return processDemand()
}

func enqueue(completion: Subscribers.Completion<Sink.Failure>) -> Subscribers.Demand {
guard isActive else { return .none }
self.completion = completion
return processDemand()
}
Expand Down
24 changes: 18 additions & 6 deletions Sources/Entwine/Publishers/Factory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@ extension Publishers {
///
/// import Entwine
///
/// let photoKitAuthorizationStatus = Publishers.Factory { dispatcher in
/// let photoKitAuthorizationStatus = Publishers.Factory<PHAuthorizationStatus, Never> { dispatcher in
/// let status = PHPhotoLibrary.authorizationStatus()
/// dispatcher.forward(status)
/// switch status {
/// case .notDetermined:
/// PHPhotoLibrary.requestAuthorization { newStatus in
/// dispatcher.forward(newStatus)
/// dispatcher.forwardCompletion(.finished)
/// dispatcher.forward(completion: .finished)
/// }
/// case .restricted, .denied, .authorized:
/// dispatcher.forward(.authorized)
/// dispatcher.forwardCompletion(.finished)
/// default:
/// dispatcher.forward(completion: .finished)
/// }
/// return AnyCancellable {}
/// }
/// ```
///
Expand Down Expand Up @@ -110,19 +111,30 @@ fileprivate class FactorySubscription<Sink: Subscriber>: Subscription {
public class Dispatcher<Input, Failure: Error> {

/// Queues an element to be delivered to the subscriber
///
/// If the subscriber has cancelled the subscription, or either the `forward(completion:)`
/// or the `forwardImmediately(completion:)`method of the dispatcher has already
/// been called, this will be a no-op.
///
/// - Parameter input: a value to be delivered to a downstream subscriber
public func forward(_ input: Input) {
fatalError("Abstract class. Override in subclass.")
}

/// Completes the sequence once any queued elements are delivered to the subscriber
/// - Parameter completion: a completion value to be delivered to the subscriber once
///
/// If the subscriber has cancelled the subscription, or either the `forward(completion:)`
/// or the `forwardImmediately(completion:)`method of the dispatcher has already
/// been called, this will be a no-op.
///
/// the remaining items in the queue have been delivered
public func forward(completion: Subscribers.Completion<Failure>) {
fatalError("Abstract class. Override in subclass.")
}

/// Completes the sequence immediately regardless of any elements that are waiting to be delivered
/// Completes the sequence immediately regardless of any elements that are waiting to be delivered,
/// subsequent calls to the dispatcher will be a no-op
/// - Parameter completion: a completion value to be delivered immediately to the subscriber
public func forwardImmediately(completion: Subscribers.Completion<Failure>) {
fatalError("Abstract class. Override in subclass.")
Expand Down

0 comments on commit b461919

Please sign in to comment.