Skip to content

Commit

Permalink
refactor: use makeStream for creating AsyncStream with continuation
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed May 17, 2024
1 parent 5798204 commit d5eec5f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 21 deletions.
8 changes: 3 additions & 5 deletions Amplify/Core/Support/AmplifyAsyncSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@ public typealias WeakAmplifyAsyncSequenceRef<Element> = WeakRef<AmplifyAsyncSequ

public class AmplifyAsyncSequence<Element: Sendable>: AsyncSequence, Cancellable {
public typealias Iterator = AsyncStream<Element>.Iterator
private var asyncStream: AsyncStream<Element>! = nil
private var continuation: AsyncStream<Element>.Continuation! = nil
private var asyncStream: AsyncStream<Element>
private var continuation: AsyncStream<Element>.Continuation
private var parent: Cancellable?

public private(set) var isCancelled: Bool = false

public init(parent: Cancellable? = nil,
bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) {
self.parent = parent
asyncStream = AsyncStream<Element>(Element.self, bufferingPolicy: bufferingPolicy) { continuation in
self.continuation = continuation
}
(asyncStream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy)
}

public func makeAsyncIterator() -> Iterator {
Expand Down
8 changes: 3 additions & 5 deletions Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@ public typealias WeakAmplifyAsyncThrowingSequenceRef<Element> = WeakRef<AmplifyA

public class AmplifyAsyncThrowingSequence<Element: Sendable>: AsyncSequence, Cancellable {
public typealias Iterator = AsyncThrowingStream<Element, Error>.Iterator
private var asyncStream: AsyncThrowingStream<Element, Error>! = nil
private var continuation: AsyncThrowingStream<Element, Error>.Continuation! = nil
private var asyncStream: AsyncThrowingStream<Element, Error>
private var continuation: AsyncThrowingStream<Element, Error>.Continuation
private var parent: Cancellable?

public private(set) var isCancelled: Bool = false

public init(parent: Cancellable? = nil,
bufferingPolicy: AsyncThrowingStream<Element, Error>.Continuation.BufferingPolicy = .unbounded) {
self.parent = parent
asyncStream = AsyncThrowingStream(Element.self, bufferingPolicy: bufferingPolicy, { continuation in
self.continuation = continuation
})
(asyncStream, continuation) = AsyncThrowingStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy)
}

public func makeAsyncIterator() -> Iterator {
Expand Down
5 changes: 2 additions & 3 deletions Amplify/Core/Support/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ public class TaskQueue<Success> {
private var streamContinuation: AsyncStream<Block>.Continuation!

public init() {
let stream = AsyncStream<Block>.init { continuation in
streamContinuation = continuation
}
let (stream, continuation) = AsyncStream.makeStream(of: Block.self)
self.streamContinuation = continuation

Task {
for await block in stream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ class CancellableAsyncStream<Element>: AsyncSequence {
}

convenience init(with publisher: AnyPublisher<Element, Never>) {
var cancellable: AnyCancellable?
self.init(asyncStream: AsyncStream { continuation in
cancellable = publisher.sink { _ in
continuation.finish()
} receiveValue: {
continuation.yield($0)
}
}, cancellable: cancellable)
let (asyncStream, contiuation) = AsyncStream.makeStream(of: Element.self)
let cancellable = publisher.sink { _ in
contiuation.finish()
} receiveValue: {
contiuation.yield($0)
}

self.init(asyncStream: asyncStream, cancellable: cancellable)
}

func makeAsyncIterator() -> AsyncStream<Element>.AsyncIterator {
Expand Down

0 comments on commit d5eec5f

Please sign in to comment.