Skip to content

Commit

Permalink
PingPong handler (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
Henryforce authored Jan 13, 2022
1 parent 4eeca92 commit eeb365b
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 23 deletions.
16 changes: 16 additions & 0 deletions Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"object": {
"pins": [
{
"package": "AsyncTimeSequences",
"repositoryURL": "https://github.com/Henryforce/AsyncTimeSequences",
"state": {
"branch": null,
"revision": "1dbdc4b6c888bd26ec15d5c279d7d0284fae422a",
"version": "0.0.7"
}
}
]
},
"version": 1
}
12 changes: 9 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@ let package = Package(
],
dependencies: [
// Dependencies declare other packages that this package depends on.
// .package(url: /* package url */, from: "1.0.0"),
.package(url: "https://github.com/Henryforce/AsyncTimeSequences", from: "0.0.7")
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
// Targets can depend on other targets in this package, and on products in packages this package depends on.
.target(
name: "AsyncWebSocketClient",
dependencies: []),
dependencies: [
"AsyncTimeSequences",
]),
.testTarget(
name: "AsyncWebSocketClientTests",
dependencies: ["AsyncWebSocketClient"]),
dependencies: [
"AsyncWebSocketClient",
"AsyncTimeSequences",
.product(name: "AsyncTimeSequencesSupport", package: "AsyncTimeSequences"),
]),
]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// AsyncWebSocketClient+URLSessionWebSocketDelegate.swift
// AsyncWebSocketClient
//
// Created by Henry Javier Serrano Echeverria on 13/1/22.
//

import Foundation

extension AsyncWebSocketClient: URLSessionWebSocketDelegate {
nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
Task {
await socketWasOpened()
}
}

nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
Task {
await updateStream(with: .socketClosed(nil))
}
}

nonisolated public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
Task {
await socketFailedToOpen()
}
}
}
78 changes: 60 additions & 18 deletions Sources/AsyncWebSocketClient/AsyncWebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@
//

import Foundation

// TODO: handle ping pong
import AsyncTimeSequences

public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol {

private var webSocketTask: URLSessionWebSocketTaskWrapper?
private var urlSession: URLSession!
private var connectContinuation: CheckedContinuation<Void, Error>?
private var streamContinuation: AsyncStream<AsyncWebSocketEvent>.Continuation?
private let streamGenerator = StreamGenerator(value: Int.zero)
private let scheduler: AsyncScheduler

enum Constants {
static let debounceTime: TimeInterval = 20.0
}

// TODO: add a dequeue to keep track of events that could be missed by a subscriber if the stream is not requested

public init(url: URL) {
public init(url: URL, scheduler: AsyncScheduler = MainAsyncScheduler.default) {
self.scheduler = scheduler
super.init()
self.urlSession = URLSession(
configuration: .default,
Expand All @@ -28,9 +34,10 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol {
webSocketTask = urlSession.webSocketTask(with: url)
}

init(webSocketTask: URLSessionWebSocketTaskWrapper) {
super.init()
init(webSocketTask: URLSessionWebSocketTaskWrapper, scheduler: AsyncScheduler) {
self.webSocketTask = webSocketTask
self.scheduler = scheduler
super.init()
}

public func connect() async throws {
Expand All @@ -41,9 +48,17 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol {
}
}

/// Disconnects but keeps the stream open
public func disconnect() async throws {
guard let webSocketTask = webSocketTask else { throw AsyncWebSocketError.invalidSocket }
webSocketTask.wrappedCancel(with: .goingAway, reason: nil)
self.webSocketTask = nil
}

/// Disconnects and closes the stream
public func close() async throws {
try await disconnect()
finishStream()
}

public func send(_ data: AsyncWebSocketData) async throws {
Expand All @@ -60,6 +75,7 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol {
})
}

/// Starts to listen to events. Only one active stream is allowed at all times.
public func listenStream() async -> AsyncStream<AsyncWebSocketEvent> {
return AsyncStream { continuation in
if let savedContinuation = streamContinuation { // If there is an open stream, close it
Expand All @@ -86,6 +102,7 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol {
fatalError()
}

resetPingPong()
updateStream(with: socketData)
listen()
}
Expand All @@ -96,6 +113,7 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol {
connectContinuation.resume()
self.connectContinuation = nil
}
startPingPongHandler()
updateStream(with: .socketOpened)
listen()
}
Expand All @@ -114,6 +132,24 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol {
}
}

/// A debouncing behavior is implemented to send a ping-pong every time after 20 seconds
/// have elapsed since the last time an event happened or this function was initially called.
func startPingPongHandler() {
Task { [weak self] in
guard let stream = await self?.streamGenerator.subscribe() else { return }

let debouncedStream = stream.debounce(for: Constants.debounceTime, scheduler: scheduler)

// Start debouncing now
await streamGenerator.updateValue(.zero)

for await _ in debouncedStream {
guard let self = self else { break }
await self.performPingPong()
}
}
}

func updateStream(with event: AsyncWebSocketEvent) {
streamContinuation?.yield(event)
}
Expand All @@ -126,24 +162,30 @@ public actor AsyncWebSocketClient: NSObject, AsyncWebSocketClientProtocol {
streamContinuation?.yield(.socketClosed(error))
}

}

extension AsyncWebSocketClient: URLSessionWebSocketDelegate {
nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
Task {
await socketWasOpened()
}
private func performPingPong() {
webSocketTask?.wrappedSendPing(pongReceiveHandler: { error in
guard let error = error else { return }
Task { [weak self] in
await self?.terminate(with: error)
}
})
}

nonisolated public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
private func resetPingPong() {
Task {
await updateStream(with: .socketClosed(nil))
await streamGenerator.updateValue(.zero)
}
}

nonisolated public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
Task {
await socketFailedToOpen()
}
private func terminate(with error: Error) {
webSocketTask?.wrappedCancel(with: .goingAway, reason: nil)
webSocketTask = nil
updateStream(with: .socketClosed(error))
}

private func finishStream() {
streamContinuation?.finish()
streamContinuation = nil
}

}
65 changes: 65 additions & 0 deletions Sources/AsyncWebSocketClient/StreamGenerator/StreamGenerator.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// StreamGenerator.swift
//
//
// Created by Henry Javier Serrano Echeverria on 13/1/22.
//

import Foundation

// OPTIONAL TODO: move to a different package

actor StreamGenerator<T> {
var subscribers = [UUID: AsyncStream<T>.Continuation]()

var value: T { _value }
var _value: T {
didSet {
subscribers.values.forEach { $0.yield(value) }
}
}

init(value: T) {
self._value = value
}

func updateValue(_ value: T) {
self._value = value
}

func subscribe() -> AsyncStream<T> {
return AsyncStream { continuation in
let uuid = UUID()
subscribers[uuid] = continuation

continuation.onTermination = { @Sendable _ in
Task { [weak self] in
await self?.removeSubscriber(with: uuid)
}
}
}
}

private func removeSubscriber(with uuid: UUID) {
subscribers.removeValue(forKey: uuid)
}

deinit {
for key in subscribers.keys {
guard let subscriber = subscribers[key] else { continue }
subscriber.finish()
removeSubscriber(with: key)
}
}
}

//public protocol WriteStreamGenerator: Actor {
// associatedtype Value
// func updateValue(_ value: Value)
//}
//
//public protocol ReadStreamGenerator: Actor {
// associatedtype Value
// var value: Value { get }
// func subscribe() -> AsyncStream<Value>
//}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ protocol URLSessionWebSocketTaskWrapper {
func wrappedCancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?)
func wrappedSend(_ message: URLSessionWebSocketTask.Message, completionHandler: @escaping (Error?) -> Void)
func wrappedReceive(completionHandler: @escaping (Result<URLSessionWebSocketTask.Message, Error>) -> Void)
func wrappedSendPing(pongReceiveHandler: @escaping (Error?) -> Void)
}

extension URLSessionWebSocketTask: URLSessionWebSocketTaskWrapper {
Expand All @@ -30,4 +31,11 @@ extension URLSessionWebSocketTask: URLSessionWebSocketTaskWrapper {
func wrappedReceive(completionHandler: @escaping (Result<Message, Error>) -> Void) {
receive(completionHandler: completionHandler)
}

func wrappedSendPing(pongReceiveHandler: @escaping (Error?) -> Void) {

sendPing { error in
pongReceiveHandler(error)
}
}
}
Loading

0 comments on commit eeb365b

Please sign in to comment.