Skip to content

Commit

Permalink
Removed ReplaySubject Subscriber accounting
Browse files Browse the repository at this point in the history
The behaviour of Apple's stock Subjects suggests that Subscriber accounting (to prevent duplicate subscriptions of the same Subscriber) is the responsibility of the Subscriber itself – not the Subjects they're subscribed to.

Thanks for the heads-up @jasdev!
  • Loading branch information
tcldr committed May 8, 2020
1 parent 9b3514f commit fc3bc4d
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 25 deletions.
27 changes: 3 additions & 24 deletions Sources/Entwine/Operators/ReplaySubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public final class ReplaySubject<Output, Failure: Error> {
typealias Sink = AnySubscriber<Output, Failure>

private var subscriptions = [ReplaySubjectSubscription<Sink>]()
private var subscriberIdentifiers = Set<CombineIdentifier>()

private var replayValues: ReplaySubjectValueBuffer<Output>
private var completion: Subscribers.Completion<Failure>?

Expand All @@ -55,32 +53,13 @@ public final class ReplaySubject<Output, Failure: Error> {
extension ReplaySubject: Publisher {

public func receive<S : Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {

guard !subscriberIdentifiers.contains(subscriber.combineIdentifier) else {
subscriber.receive(subscription: Subscriptions.empty)
return
}

let subscriberIdentifier = subscriber.combineIdentifier

let subscription = ReplaySubjectSubscription(sink: AnySubscriber(subscriber))

// we use seperate collections for identifiers and subscriptions
// to improve performance of identifier lookups and to keep the
// order in which subscribers are signalled to be in the order that
// they intially subscribed.

subscriberIdentifiers.insert(subscriberIdentifier)
subscriptions.append(subscription)

subscription.cleanupHandler = { [weak self] in

guard let self = self else { return }

if let index = self.subscriptions.firstIndex(where: { subscriberIdentifier == $0.subscriberIdentifier }) {
self.subscriberIdentifiers.remove(subscriberIdentifier)
self.subscriptions.remove(at: index)
}
let firstIndex = self?.subscriptions.firstIndex { subscriberIdentifier == $0.subscriberIdentifier }
guard let index = firstIndex else { return }
self?.subscriptions.remove(at: index)
}
subscriber.receive(subscription: subscription)
subscription.replayInputs(replayValues.buffer, completion: completion)
Expand Down
1 change: 0 additions & 1 deletion Tests/EntwineTests/ReplaySubjectTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ final class ReplaySubjectTests: XCTestCase {
scheduler.schedule(after: 600) { subject.send(0) }
scheduler.schedule(after: 700) { subject.send(completion: .finished) }


scheduler.resume()

let expected1: TestSequence<Int, Never> = [
Expand Down

0 comments on commit fc3bc4d

Please sign in to comment.