From fae84da20728529b878fce8906cca6794812159e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sr=C4=91an=20Ras=CC=8Cic=CC=81?= Date: Mon, 12 Mar 2018 15:08:03 +0100 Subject: [PATCH] Add `replayValues` and `shareReplayValues` operators to `LoadingSignal`. --- Sources/Connectable.swift | 18 +++++++++++++ Sources/Subjects.swift | 56 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/Sources/Connectable.swift b/Sources/Connectable.swift index 0204912..75f456f 100644 --- a/Sources/Connectable.swift +++ b/Sources/Connectable.swift @@ -111,3 +111,21 @@ extension SignalProtocol { return replay(limit: limit).refCount() } } + +extension SignalProtocol where Element: LoadingStateProtocol { + + /// Ensure that all observers see the same sequence of elements. Connectable. + public func replayValues(limit: Int = Int.max) -> ConnectableSignal, Error>> { + if limit == 0 { + return ConnectableSignal(source: map { $0.asLoadingState }, subject: PublishSubject()) + } else { + return ConnectableSignal(source: map { $0.asLoadingState }, subject: ReplayLoadingValueSubject(bufferSize: limit)) + } + } + + /// Ensure that all observers see the same sequence of elements. + /// Shorthand for `replay(limit).refCount()`. + public func shareReplayValues(limit: Int = Int.max) -> Signal, Error> { + return replayValues(limit: limit).refCount() + } +} diff --git a/Sources/Subjects.swift b/Sources/Subjects.swift index 9637b24..57dd701 100644 --- a/Sources/Subjects.swift +++ b/Sources/Subjects.swift @@ -150,6 +150,62 @@ public final class ReplayOneSubject: Subject: Subject, Error> { + + private enum State { + case notStarted + case loading + case loadedOrFailedAtLeastOnce + } + + private var state: State = .notStarted + private var buffer: ArraySlice> = [] + private var terminalEvent: Event, Error>? = nil + + public let bufferSize: Int + + public init(bufferSize: Int = Int.max) { + self.bufferSize = bufferSize + } + + public override func send(_ event: Event, Error>) { + switch event { + case .next(let loadingState): + switch loadingState { + case .loading: + if state == .notStarted { + state = .loading + } + case .loaded: + state = .loadedOrFailedAtLeastOnce + buffer.append(loadingState) + buffer = buffer.suffix(bufferSize) + case .failed: + state = .loadedOrFailedAtLeastOnce + buffer = [loadingState] + } + case .failed, .completed: + terminalEvent = event + } + super.send(event) + } + + public override func willAdd(observer: @escaping Observer, Error>) { + switch state { + case .notStarted: + break + case .loading: + observer(.next(.loading)) + case .loadedOrFailedAtLeastOnce: + buffer.forEach { observer(.next($0)) } + } + if let event = terminalEvent { + observer(event) + } + } +} + @available(*, deprecated, message: "All subjects now inherit 'Subject' that can be used in place of 'AnySubject'.") public final class AnySubject: SubjectProtocol { private let baseObserve: (@escaping Observer) -> Disposable