From eeb365b1579c447998d0ad3d090aa47c2217c544 Mon Sep 17 00:00:00 2001 From: Henryforce Date: Thu, 13 Jan 2022 14:18:21 +0900 Subject: [PATCH] PingPong handler (#5) --- Package.resolved | 16 ++++ Package.swift | 12 ++- ...etClient+URLSessionWebSocketDelegate.swift | 28 +++++++ .../AsyncWebSocketClient.swift | 78 ++++++++++++++----- .../StreamGenerator/StreamGenerator.swift | 65 ++++++++++++++++ .../URLSessionWebSocketTaskWrapper.swift | 8 ++ .../AsyncWebSocketClientTests.swift | 66 +++++++++++++++- .../MockURLSessionWebSocketTaskWrapper.swift | 10 +++ 8 files changed, 260 insertions(+), 23 deletions(-) create mode 100644 Package.resolved create mode 100644 Sources/AsyncWebSocketClient/AsyncWebSocketClient+URLSessionWebSocketDelegate.swift create mode 100644 Sources/AsyncWebSocketClient/StreamGenerator/StreamGenerator.swift diff --git a/Package.resolved b/Package.resolved new file mode 100644 index 0000000..0aee2af --- /dev/null +++ b/Package.resolved @@ -0,0 +1,16 @@ +{ + "object": { + "pins": [ + { + "package": "AsyncTimeSequences", + "repositoryURL": "https://github.com/Henryforce/AsyncTimeSequences", + "state": { + "branch": null, + "revision": "1dbdc4b6c888bd26ec15d5c279d7d0284fae422a", + "version": "0.0.7" + } + } + ] + }, + "version": 1 +} diff --git a/Package.swift b/Package.swift index 96846cf..ed2d7fd 100644 --- a/Package.swift +++ b/Package.swift @@ -14,16 +14,22 @@ let package = Package( ], dependencies: [ // Dependencies declare other packages that this package depends on. - // .package(url: /* package url */, from: "1.0.0"), + .package(url: "https://github.com/Henryforce/AsyncTimeSequences", from: "0.0.7") ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. // Targets can depend on other targets in this package, and on products in packages this package depends on. .target( name: "AsyncWebSocketClient", - dependencies: []), + dependencies: [ + "AsyncTimeSequences", + ]), .testTarget( name: "AsyncWebSocketClientTests", - dependencies: ["AsyncWebSocketClient"]), + dependencies: [ + "AsyncWebSocketClient", + "AsyncTimeSequences", + .product(name: "AsyncTimeSequencesSupport", package: "AsyncTimeSequences"), + ]), ] ) diff --git a/Sources/AsyncWebSocketClient/AsyncWebSocketClient+URLSessionWebSocketDelegate.swift b/Sources/AsyncWebSocketClient/AsyncWebSocketClient+URLSessionWebSocketDelegate.swift new file mode 100644 index 0000000..5ea2bf7 --- /dev/null +++ b/Sources/AsyncWebSocketClient/AsyncWebSocketClient+URLSessionWebSocketDelegate.swift @@ -0,0 +1,28 @@ +// +// AsyncWebSocketClient+URLSessionWebSocketDelegate.swift +// AsyncWebSocketClient +// +// Created by Henry Javier Serrano Echeverria on 13/1/22. +// + +import Foundation + +extension AsyncWebSocketClient: URLSessionWebSocketDelegate { + nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + Task { + await socketWasOpened() + } + } + + nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + Task { + await updateStream(with: .socketClosed(nil)) + } + } + + nonisolated public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { + Task { + await socketFailedToOpen() + } + } +} diff --git a/Sources/AsyncWebSocketClient/AsyncWebSocketClient.swift b/Sources/AsyncWebSocketClient/AsyncWebSocketClient.swift index d074508..441ac20 100644 --- a/Sources/AsyncWebSocketClient/AsyncWebSocketClient.swift +++ b/Sources/AsyncWebSocketClient/AsyncWebSocketClient.swift @@ -6,8 +6,7 @@ // import Foundation - -// TODO: handle ping pong +import AsyncTimeSequences public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { @@ -15,10 +14,17 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { private var urlSession: URLSession! private var connectContinuation: CheckedContinuation? private var streamContinuation: AsyncStream.Continuation? + private let streamGenerator = StreamGenerator(value: Int.zero) + private let scheduler: AsyncScheduler + + enum Constants { + static let debounceTime: TimeInterval = 20.0 + } // TODO: add a dequeue to keep track of events that could be missed by a subscriber if the stream is not requested - public init(url: URL) { + public init(url: URL, scheduler: AsyncScheduler = MainAsyncScheduler.default) { + self.scheduler = scheduler super.init() self.urlSession = URLSession( configuration: .default, @@ -28,9 +34,10 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { webSocketTask = urlSession.webSocketTask(with: url) } - init(webSocketTask: URLSessionWebSocketTaskWrapper) { - super.init() + init(webSocketTask: URLSessionWebSocketTaskWrapper, scheduler: AsyncScheduler) { self.webSocketTask = webSocketTask + self.scheduler = scheduler + super.init() } public func connect() async throws { @@ -41,9 +48,17 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { } } + /// Disconnects but keeps the stream open public func disconnect() async throws { guard let webSocketTask = webSocketTask else { throw AsyncWebSocketError.invalidSocket } webSocketTask.wrappedCancel(with: .goingAway, reason: nil) + self.webSocketTask = nil + } + + /// Disconnects and closes the stream + public func close() async throws { + try await disconnect() + finishStream() } public func send(_ data: AsyncWebSocketData) async throws { @@ -60,6 +75,7 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { }) } + /// Starts to listen to events. Only one active stream is allowed at all times. public func listenStream() async -> AsyncStream { return AsyncStream { continuation in if let savedContinuation = streamContinuation { // If there is an open stream, close it @@ -86,6 +102,7 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { fatalError() } + resetPingPong() updateStream(with: socketData) listen() } @@ -96,6 +113,7 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { connectContinuation.resume() self.connectContinuation = nil } + startPingPongHandler() updateStream(with: .socketOpened) listen() } @@ -114,6 +132,24 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { } } + /// A debouncing behavior is implemented to send a ping-pong every time after 20 seconds + /// have elapsed since the last time an event happened or this function was initially called. + func startPingPongHandler() { + Task { [weak self] in + guard let stream = await self?.streamGenerator.subscribe() else { return } + + let debouncedStream = stream.debounce(for: Constants.debounceTime, scheduler: scheduler) + + // Start debouncing now + await streamGenerator.updateValue(.zero) + + for await _ in debouncedStream { + guard let self = self else { break } + await self.performPingPong() + } + } + } + func updateStream(with event: AsyncWebSocketEvent) { streamContinuation?.yield(event) } @@ -126,24 +162,30 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol { streamContinuation?.yield(.socketClosed(error)) } -} - -extension AsyncWebSocketClient: URLSessionWebSocketDelegate { - nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { - Task { - await socketWasOpened() - } + private func performPingPong() { + webSocketTask?.wrappedSendPing(pongReceiveHandler: { error in + guard let error = error else { return } + Task { [weak self] in + await self?.terminate(with: error) + } + }) } - nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + private func resetPingPong() { Task { - await updateStream(with: .socketClosed(nil)) + await streamGenerator.updateValue(.zero) } } - nonisolated public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { - Task { - await socketFailedToOpen() - } + private func terminate(with error: Error) { + webSocketTask?.wrappedCancel(with: .goingAway, reason: nil) + webSocketTask = nil + updateStream(with: .socketClosed(error)) } + + private func finishStream() { + streamContinuation?.finish() + streamContinuation = nil + } + } diff --git a/Sources/AsyncWebSocketClient/StreamGenerator/StreamGenerator.swift b/Sources/AsyncWebSocketClient/StreamGenerator/StreamGenerator.swift new file mode 100644 index 0000000..505832a --- /dev/null +++ b/Sources/AsyncWebSocketClient/StreamGenerator/StreamGenerator.swift @@ -0,0 +1,65 @@ +// +// StreamGenerator.swift +// +// +// Created by Henry Javier Serrano Echeverria on 13/1/22. +// + +import Foundation + +// OPTIONAL TODO: move to a different package + +actor StreamGenerator { + var subscribers = [UUID: AsyncStream.Continuation]() + + var value: T { _value } + var _value: T { + didSet { + subscribers.values.forEach { $0.yield(value) } + } + } + + init(value: T) { + self._value = value + } + + func updateValue(_ value: T) { + self._value = value + } + + func subscribe() -> AsyncStream { + return AsyncStream { continuation in + let uuid = UUID() + subscribers[uuid] = continuation + + continuation.onTermination = { @Sendable _ in + Task { [weak self] in + await self?.removeSubscriber(with: uuid) + } + } + } + } + + private func removeSubscriber(with uuid: UUID) { + subscribers.removeValue(forKey: uuid) + } + + deinit { + for key in subscribers.keys { + guard let subscriber = subscribers[key] else { continue } + subscriber.finish() + removeSubscriber(with: key) + } + } +} + +//public protocol WriteStreamGenerator: Actor { +// associatedtype Value +// func updateValue(_ value: Value) +//} +// +//public protocol ReadStreamGenerator: Actor { +// associatedtype Value +// var value: Value { get } +// func subscribe() -> AsyncStream +//} diff --git a/Sources/AsyncWebSocketClient/URLSessionWebSocketTaskWrapper.swift b/Sources/AsyncWebSocketClient/URLSessionWebSocketTaskWrapper.swift index 470227d..fd22464 100644 --- a/Sources/AsyncWebSocketClient/URLSessionWebSocketTaskWrapper.swift +++ b/Sources/AsyncWebSocketClient/URLSessionWebSocketTaskWrapper.swift @@ -12,6 +12,7 @@ protocol URLSessionWebSocketTaskWrapper { func wrappedCancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) func wrappedSend(_ message: URLSessionWebSocketTask.Message, completionHandler: @escaping (Error?) -> Void) func wrappedReceive(completionHandler: @escaping (Result) -> Void) + func wrappedSendPing(pongReceiveHandler: @escaping (Error?) -> Void) } extension URLSessionWebSocketTask: URLSessionWebSocketTaskWrapper { @@ -30,4 +31,11 @@ extension URLSessionWebSocketTask: URLSessionWebSocketTaskWrapper { func wrappedReceive(completionHandler: @escaping (Result) -> Void) { receive(completionHandler: completionHandler) } + + func wrappedSendPing(pongReceiveHandler: @escaping (Error?) -> Void) { + + sendPing { error in + pongReceiveHandler(error) + } + } } diff --git a/Tests/AsyncWebSocketClientTests/AsyncWebSocketClientTests.swift b/Tests/AsyncWebSocketClientTests/AsyncWebSocketClientTests.swift index 1beaf12..c610895 100644 --- a/Tests/AsyncWebSocketClientTests/AsyncWebSocketClientTests.swift +++ b/Tests/AsyncWebSocketClientTests/AsyncWebSocketClientTests.swift @@ -7,16 +7,20 @@ import XCTest @testable import AsyncWebSocketClient +import AsyncTimeSequences +import AsyncTimeSequencesSupport final class AsyncWebSocketClientTests: XCTestCase { var mockSocketTask: MockURLSessionWebSocketTaskWrapper! var client: AsyncWebSocketClient! + var scheduler: TestAsyncScheduler! override func setUp() { super.setUp() + scheduler = TestAsyncScheduler() mockSocketTask = MockURLSessionWebSocketTaskWrapper() - client = AsyncWebSocketClient(webSocketTask: mockSocketTask) + client = AsyncWebSocketClient(webSocketTask: mockSocketTask, scheduler: scheduler) } override func tearDown() { @@ -24,6 +28,7 @@ final class AsyncWebSocketClientTests: XCTestCase { mockSocketTask.cleanup() mockSocketTask = nil client = nil + scheduler = nil } func testConnectionOpened() async throws { @@ -145,6 +150,61 @@ final class AsyncWebSocketClientTests: XCTestCase { XCTAssertEqual(stringValue, resultStringValue) } + func testPingPongWhenErrorIsReturnedFromTask() async { + // Given + let debounceTime = AsyncWebSocketClient.Constants.debounceTime + mockSocketTask.sendPingErrors = [AsyncWebSocketError.unknownError(nil)] + + // When + var iterator = await client.listenStream().makeAsyncIterator() + + await client.startPingPongHandler() + + await scheduler.waitForScheduledJobs(count: 1) // Wait for the debounce to start capturing the first event + + await scheduler.advance(by: debounceTime) + + let event = await iterator.next() + + // Then + guard case .socketClosed(let error) = event, + case .unknownError = error as? AsyncWebSocketError else { + XCTFail("Invalid event received") + return + } + XCTAssertEqual(mockSocketTask.sendPingWasCalledCount, 1) + XCTAssertEqual(mockSocketTask.cancelWasCalledStack.count, 1) + } + + func testPingPongIsNotTriggeredIfDataIsReceivedBetweenInterval() async { + // Given + let sendString = "Hello" + let halfDebounceTime = AsyncWebSocketClient.Constants.debounceTime / 2 + mockSocketTask.sendPingErrors = [AsyncWebSocketError.unknownError(nil)] + + // When + var iterator = await client.listenStream().makeAsyncIterator() + + await client.startPingPongHandler() + + await scheduler.waitForScheduledJobs(count: 1) // Wait for the debounce to start capturing the first event + + await scheduler.advance(by: halfDebounceTime) + + await client.processReceivedResult(.success(.string(sendString))) // Mock a received string + + let event = await iterator.next() + + // Then + guard case .dataReceived(let data) = event, + case .string(let stringValue) = data else { + XCTFail("Invalid event received") + return + } + XCTAssertEqual(stringValue, sendString) + XCTAssertEqual(mockSocketTask.sendPingWasCalledCount, 0) + } + // TODO: remove temp tests // func testExample() async throws { // let client = AsyncWebSocketClient(url: URL(string: "ws://localhost:8765/")!) @@ -153,10 +213,12 @@ final class AsyncWebSocketClientTests: XCTestCase { // do { // try await client.connect() // -// for index in 0..<10 { +// for index in 0..<5 { // try await client.send(.string("Hello \(index)")) // try await Task.sleep(nanoseconds: 1000000000) // } +// +// try await client.close() // } catch (let error) { // print("\(error)") // } diff --git a/Tests/AsyncWebSocketClientTests/Mocks/MockURLSessionWebSocketTaskWrapper.swift b/Tests/AsyncWebSocketClientTests/Mocks/MockURLSessionWebSocketTaskWrapper.swift index 73d2d35..aa1771c 100644 --- a/Tests/AsyncWebSocketClientTests/Mocks/MockURLSessionWebSocketTaskWrapper.swift +++ b/Tests/AsyncWebSocketClientTests/Mocks/MockURLSessionWebSocketTaskWrapper.swift @@ -57,6 +57,16 @@ final class MockURLSessionWebSocketTaskWrapper: URLSessionWebSocketTaskWrapper { completionHandler(firstValue) } + var sendPingWasCalledCount = 0 + var sendPingErrors = [Error]() + func wrappedSendPing(pongReceiveHandler: @escaping (Error?) -> Void) { + sendPingWasCalledCount += 1 + + guard !sendPingErrors.isEmpty else { return } + let firstError = sendPingErrors.removeFirst() + pongReceiveHandler(firstError) + } + func cleanup() { guard let receiveHandler = receiveHandler else { return } receiveHandler(.failure(AsyncWebSocketError.unknownError(nil)))