Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed Apr 18, 2024
1 parent f7488e3 commit fffaf08
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//


import Combine

class CancellableAsyncStream<T>: AsyncSequence {

typealias AsyncIterator = AsyncStream<T>.AsyncIterator

typealias Element = T

private let asyncStream: AsyncStream<T>

private let cancellable: AnyCancellable?

deinit {
cancel()
}

init(asyncStream: AsyncStream<T>, canellable: AnyCancellable?) {
self.asyncStream = asyncStream
self.cancellable = canellable
}

func makeAsyncIterator() -> AsyncStream<T>.AsyncIterator {
asyncStream.makeAsyncIterator()
}

func cancel() {
cancellable?.cancel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import Combine

/// Captures a weak reference to the value
class WeakWrapper<T> where T: AnyObject {
Expand Down Expand Up @@ -38,8 +38,14 @@ actor StateMachine<
private let environment: EnvironmentType
private let resolver: AnyResolver<StateType>

private(set) var currentState: StateType
private var subscribers: [WeakWrapper<StateAsyncSequence<StateType>>]
public var currentState: StateType {
currentStateSubject.value
}
private(set) var currentStateSubject: CurrentValueSubject<StateType, Never>

deinit {
currentStateSubject.send(completion: .finished)
}

init<ResolverType>(
resolver: ResolverType,
Expand All @@ -48,21 +54,24 @@ actor StateMachine<
) where ResolverType: StateMachineResolver, ResolverType.StateType == StateType {
self.resolver = resolver.eraseToAnyResolver()
self.environment = environment
self.currentState = initialState ?? resolver.defaultState

self.subscribers = []
self.currentStateSubject = CurrentValueSubject(initialState ?? resolver.defaultState)
}

/// Start listening to state change updates. The current state and all subsequent state changes will be sent to the sequence.
///
/// - Returns: An async sequence that get states asynchronously
func listen() -> StateChangeSequence {
let sequence = StateAsyncSequence<StateType>()
let currentState = self.currentState
let wrappedSequence = WeakWrapper(sequence)
subscribers.append(wrappedSequence)
sequence.send(currentState)
return sequence
func listen() -> CancellableAsyncStream<StateType> {
var cancellable: AnyCancellable?
return CancellableAsyncStream(
asyncStream: AsyncStream { continuation in
cancellable = currentStateSubject.sink { _ in
continuation.finish()
} receiveValue: {
continuation.yield($0)
}
},
canellable: cancellable
)
}
}

Expand All @@ -83,38 +92,16 @@ extension StateMachine: EventDispatcher {

private func process(event: StateMachineEvent) {
let resolution = resolver.resolve(
oldState: currentState,
oldState: currentStateSubject.value,
byApplying: event
)

if currentState != resolution.newState {
currentState = resolution.newState
subscribers.removeAll { item in
!notify(subscriberElement: item, about: resolution.newState)
}
if currentStateSubject.value != resolution.newState {
currentStateSubject.send(resolution.newState)
}
execute(resolution.actions)
}

/// - Parameters:
/// - subscriberElement: A weak wrapped async sequence
/// - newState: The new state to be sent
/// - Returns: true if the subscriber was notified, false if the wrapper reference was nil or a cancellation was pending
private func notify(
subscriberElement: WeakWrapper<StateChangeSequence>,
about newState: StateType
) -> Bool {

// If weak reference has become nil, do not process, and return false so caller can remove
// the subscription from the subscribers list
guard let sequence = subscriberElement.value else {
return false
}

sequence.send(newState)
return true
}

private func execute(_ actions: [Action]) {
guard !actions.isEmpty else {
return
Expand Down

0 comments on commit fffaf08

Please sign in to comment.