From 34c9a01f14716359d39d6dfcb31ce48175d83fe0 Mon Sep 17 00:00:00 2001 From: Harshdeep Singh <6162866+harsh62@users.noreply.github.com> Date: Tue, 23 Apr 2024 14:13:56 -0400 Subject: [PATCH] fix(Auth): Fixing memory leaks happening because of the state machine --- .../CancellableAsyncStream.swift | 43 ++++++++++++++ .../StateAsyncSequence.swift | 36 ----------- .../StateMachine.swift | 59 +++++-------------- 3 files changed, 58 insertions(+), 80 deletions(-) create mode 100644 AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/CancellableAsyncStream.swift delete mode 100644 AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateAsyncSequence.swift diff --git a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/CancellableAsyncStream.swift b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/CancellableAsyncStream.swift new file mode 100644 index 0000000000..35ce4b65c1 --- /dev/null +++ b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/CancellableAsyncStream.swift @@ -0,0 +1,43 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Combine + +class CancellableAsyncStream: AsyncSequence { + + typealias AsyncIterator = AsyncStream.AsyncIterator + private let asyncStream: AsyncStream + private let cancellable: AnyCancellable? + + deinit { + cancel() + } + + init(asyncStream: AsyncStream, cancellable: AnyCancellable?) { + self.asyncStream = asyncStream + self.cancellable = cancellable + } + + convenience init(with publisher: AnyPublisher) { + var cancellable: AnyCancellable? + self.init(asyncStream: AsyncStream { continuation in + cancellable = publisher.sink { _ in + continuation.finish() + } receiveValue: { + continuation.yield($0) + } + }, cancellable: cancellable) + } + + func makeAsyncIterator() -> AsyncStream.AsyncIterator { + asyncStream.makeAsyncIterator() + } + + func cancel() { + cancellable?.cancel() + } +} diff --git a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateAsyncSequence.swift b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateAsyncSequence.swift deleted file mode 100644 index 04cf788b1f..0000000000 --- a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateAsyncSequence.swift +++ /dev/null @@ -1,36 +0,0 @@ -// -// Copyright Amazon.com Inc. or its affiliates. -// All Rights Reserved. -// -// SPDX-License-Identifier: Apache-2.0 -// - -import Foundation - -class StateAsyncSequence: AsyncSequence { - - typealias Iterator = AsyncStream.Iterator - private var continuation: AsyncStream.Continuation! = nil - - private var asyncStream: AsyncStream! = nil - - init(bufferingPolicy: AsyncStream.Continuation.BufferingPolicy = .unbounded) { - asyncStream = AsyncStream( - Element.self, - bufferingPolicy: bufferingPolicy) { continuation in - self.continuation = continuation - } - } - - func makeAsyncIterator() -> Iterator { - asyncStream.makeAsyncIterator() - } - - func send(_ element: Element) { - continuation.yield(element) - } - - func cancel() { - continuation.finish() - } -} diff --git a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateMachine.swift b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateMachine.swift index 7b69423411..33fbf46ce8 100644 --- a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateMachine.swift +++ b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateMachine.swift @@ -5,15 +5,7 @@ // SPDX-License-Identifier: Apache-2.0 // -import Foundation - -/// Captures a weak reference to the value -class WeakWrapper where T: AnyObject { - private(set) weak var value: T? - init(_ value: T) { - self.value = value - } -} +import Combine /// Models, evolves, and processes effects for a system. /// @@ -33,13 +25,20 @@ actor StateMachine< /// AsyncSequences are invoked a minimum of one time: Each sequence receives the current /// state as soon as `listen()` is invoked, and will receive each subsequent state change. - typealias StateChangeSequence = StateAsyncSequence + typealias StateChangeSequence = CancellableAsyncStream private let environment: EnvironmentType private let resolver: AnyResolver - private(set) var currentState: StateType - private var subscribers: [WeakWrapper>] + public var currentState: StateType { + currentStateSubject.value + } + + private(set) var currentStateSubject: CurrentValueSubject + + deinit { + currentStateSubject.send(completion: .finished) + } init( resolver: ResolverType, @@ -48,22 +47,16 @@ actor StateMachine< ) where ResolverType: StateMachineResolver, ResolverType.StateType == StateType { self.resolver = resolver.eraseToAnyResolver() self.environment = environment - self.currentState = initialState ?? resolver.defaultState - - self.subscribers = [] + self.currentStateSubject = CurrentValueSubject(initialState ?? resolver.defaultState) } /// Start listening to state change updates. The current state and all subsequent state changes will be sent to the sequence. /// /// - Returns: An async sequence that get states asynchronously func listen() -> StateChangeSequence { - let sequence = StateAsyncSequence() - let currentState = self.currentState - let wrappedSequence = WeakWrapper(sequence) - subscribers.append(wrappedSequence) - sequence.send(currentState) - return sequence + CancellableAsyncStream(with: currentStateSubject.eraseToAnyPublisher()) } + } extension StateMachine: EventDispatcher { @@ -88,33 +81,11 @@ extension StateMachine: EventDispatcher { ) if currentState != resolution.newState { - currentState = resolution.newState - subscribers.removeAll { item in - !notify(subscriberElement: item, about: resolution.newState) - } + currentStateSubject.send(resolution.newState) } execute(resolution.actions) } - /// - Parameters: - /// - subscriberElement: A weak wrapped async sequence - /// - newState: The new state to be sent - /// - Returns: true if the subscriber was notified, false if the wrapper reference was nil or a cancellation was pending - private func notify( - subscriberElement: WeakWrapper, - about newState: StateType - ) -> Bool { - - // If weak reference has become nil, do not process, and return false so caller can remove - // the subscription from the subscribers list - guard let sequence = subscriberElement.value else { - return false - } - - sequence.send(newState) - return true - } - private func execute(_ actions: [Action]) { guard !actions.isEmpty else { return