Skip to content

Commit

Permalink
chore: kickoff release
Browse files Browse the repository at this point in the history
  • Loading branch information
5d authored Apr 17, 2024
2 parents dc32a9d + f7488e3 commit a5b6ce0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 15 deletions.
51 changes: 40 additions & 11 deletions Amplify/Core/Support/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,25 @@
import Foundation

/// A helper for executing asynchronous work serially.
public actor TaskQueue<Success> {
private var previousTask: Task<Success, Error>?
public class TaskQueue<Success> {
typealias Block = @Sendable () async -> Void
private var streamContinuation: AsyncStream<Block>.Continuation!

public init() {}
public init() {
let stream = AsyncStream<Block>.init { continuation in
streamContinuation = continuation
}

Task {
for await block in stream {
_ = await block()
}
}
}

deinit {
streamContinuation.finish()
}

/// Serializes asynchronous requests made from an async context
///
Expand All @@ -25,17 +40,31 @@ public actor TaskQueue<Success> {
/// TaskQueue serializes this work so that `doAsync1` is performed before `doAsync2`,
/// which is performed before `doAsync3`.
public func sync(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
let currentTask: Task<Success, Error> = Task { [previousTask] in
_ = await previousTask?.result
return try await block()
try await withCheckedThrowingContinuation { continuation in
streamContinuation.yield {
do {
let value = try await block()
continuation.resume(returning: value)
} catch {
continuation.resume(throwing: error)
}
}
}
previousTask = currentTask
return try await currentTask.value
}

public nonisolated func async(block: @Sendable @escaping () async throws -> Success) rethrows {
Task {
try await sync(block: block)
public func async(block: @Sendable @escaping () async throws -> Success) {
streamContinuation.yield {
do {
_ = try await block()
} catch {
Self.log.warn("Failed to handle async task in TaskQueue<\(Success.self)> with error: \(error)")
}
}
}
}

extension TaskQueue {
public static var log: Logger {
Amplify.Logging.logger(forNamespace: String(describing: self))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ public class AWSGraphQLSubscriptionTaskRunner<R: Decodable>: InternalTaskRunner,

public func cancel() {
self.send(GraphQLSubscriptionEvent<R>.connection(.disconnected))
Task { [weak self] in
guard let self else {
return
}
Task {
guard let appSyncClient = self.appSyncClient else {
return
}
Expand Down
16 changes: 16 additions & 0 deletions AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,20 @@ final class AmplifyTaskQueueTests: XCTestCase {
await fulfillment(of: [expectation1, expectation2, expectation3], enforceOrder: true)
}

func testAsync() async throws {
let taskCount = 1_000
let expectations: [XCTestExpectation] = (0..<taskCount).map {
expectation(description: "Expected execution of a task number \($0)")
}

let taskQueue = TaskQueue<Void>()

for i in 0..<taskCount {
taskQueue.async {
expectations[i].fulfill()
}
}

await fulfillment(of: expectations, enforceOrder: true)
}
}

0 comments on commit a5b6ce0

Please sign in to comment.