diff --git a/Amplify/Core/Support/TaskQueue.swift b/Amplify/Core/Support/TaskQueue.swift index f281c57e1c..a09bfcc4d8 100644 --- a/Amplify/Core/Support/TaskQueue.swift +++ b/Amplify/Core/Support/TaskQueue.swift @@ -8,10 +8,25 @@ import Foundation /// A helper for executing asynchronous work serially. -public actor TaskQueue { - private var previousTask: Task? +public class TaskQueue { + typealias Block = @Sendable () async -> Void + private var streamContinuation: AsyncStream.Continuation! - public init() {} + public init() { + let stream = AsyncStream.init { continuation in + streamContinuation = continuation + } + + Task { + for await block in stream { + _ = await block() + } + } + } + + deinit { + streamContinuation.finish() + } /// Serializes asynchronous requests made from an async context /// @@ -25,17 +40,31 @@ public actor TaskQueue { /// TaskQueue serializes this work so that `doAsync1` is performed before `doAsync2`, /// which is performed before `doAsync3`. public func sync(block: @Sendable @escaping () async throws -> Success) async throws -> Success { - let currentTask: Task = Task { [previousTask] in - _ = await previousTask?.result - return try await block() + try await withCheckedThrowingContinuation { continuation in + streamContinuation.yield { + do { + let value = try await block() + continuation.resume(returning: value) + } catch { + continuation.resume(throwing: error) + } + } } - previousTask = currentTask - return try await currentTask.value } - public nonisolated func async(block: @Sendable @escaping () async throws -> Success) rethrows { - Task { - try await sync(block: block) + public func async(block: @Sendable @escaping () async throws -> Success) { + streamContinuation.yield { + do { + _ = try await block() + } catch { + Self.log.warn("Failed to handle async task in TaskQueue<\(Success.self)> with error: \(error)") + } } } } + +extension TaskQueue { + public static var log: Logger { + Amplify.Logging.logger(forNamespace: String(describing: self)) + } +} diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift index 3f3889566a..13cba6b888 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift @@ -46,10 +46,7 @@ public class AWSGraphQLSubscriptionTaskRunner: InternalTaskRunner, public func cancel() { self.send(GraphQLSubscriptionEvent.connection(.disconnected)) - Task { [weak self] in - guard let self else { - return - } + Task { guard let appSyncClient = self.appSyncClient else { return } diff --git a/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift b/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift index a3c92f8dcc..48c19c9182 100644 --- a/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift +++ b/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift @@ -66,4 +66,20 @@ final class AmplifyTaskQueueTests: XCTestCase { await fulfillment(of: [expectation1, expectation2, expectation3], enforceOrder: true) } + func testAsync() async throws { + let taskCount = 1_000 + let expectations: [XCTestExpectation] = (0..() + + for i in 0..