Skip to content

Commit

Permalink
fix(api): storing cancelablles with actor methods in AppSyncRTC (#3824)
Browse files Browse the repository at this point in the history
* fix(api): storing cancelablles with actor methods in AppSyncRTC

* add unit test cases

* remove internal modifier
  • Loading branch information
5d authored Aug 23, 2024
1 parent 673a075 commit 8189460
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,15 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
// Placing the actual subscription work in a deferred task and
// promptly returning the filtered publisher for downstream consumption of all error messages.
defer {
Task { [weak self] in
let task = Task { [weak self] in
guard let self = self else { return }
if !(await self.isConnected) {
try await connect()
try await waitForState(.connected)
}
await self.bindCancellableToConnection(try await self.startSubscription(id))
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
await self.storeInConnectionCancellables(try await self.startSubscription(id))
}
self.storeInConnectionCancellables(task.toAnyCancellable)
}

return filterAppSyncSubscriptionEvent(with: id)
Expand Down Expand Up @@ -236,24 +237,29 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
}

private func subscribeToWebSocketEvent() async {
await self.webSocketClient.publisher.sink { [weak self] _ in
let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in
self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
} receiveValue: { webSocketEvent in
Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}
await self?.storeInCancellables(task.toAnyCancellable)
}
}
.store(in: &cancellables)
self.storeInCancellables(cancellable)
}

private func resumeExistingSubscriptions() {
log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions")
for (id, _) in self.subscriptions {
Task {
Task { [weak self] in
do {
try await self.startSubscription(id).store(in: &cancellablesBindToConnection)
if let cancellable = try await self?.startSubscription(id) {
await self?.storeInConnectionCancellables(cancellable)
}
} catch {
log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
}
}
}
Expand Down Expand Up @@ -286,7 +292,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
subject.filter {
switch $0 {
case .success(let response): return response.id == id || response.type == .connectionError
case .failure(let error): return true
case .failure: return true
}
}
.map { result -> AppSyncSubscriptionEvent? in
Expand Down Expand Up @@ -350,10 +356,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
}

private func bindCancellableToConnection(_ cancellable: AnyCancellable) {
cancellable.store(in: &cancellablesBindToConnection)
}

}

// MARK: - On WebSocket Events
Expand All @@ -366,8 +368,11 @@ extension AppSyncRealTimeClient {
if self.state.value == .connectionDropped {
log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop")
Task { [weak self] in
try? await self?.connect()
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
let task = Task { [weak self] in
try? await self?.connect()
}
await self?.storeInConnectionCancellables(task.toAnyCancellable)
}
}

case let .disconnected(closeCode, reason): //
Expand Down Expand Up @@ -425,24 +430,37 @@ extension AppSyncRealTimeClient {
}
}

private func monitorHeartBeats(_ connectionAck: JSONValue?) {
func monitorHeartBeats(_ connectionAck: JSONValue?) {
let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0
log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms")
heartBeats.eraseToAnyPublisher()
let cancellable = heartBeats.eraseToAnyPublisher()
.debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global())
.first()
.sink(receiveValue: {
self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
.sink(receiveValue: { [weak self] in
Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
Task { [weak self] in
await self?.reconnect()
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.reconnect()
}
await self?.storeInCancellables(task.toAnyCancellable)
}
})
.store(in: &cancellablesBindToConnection)
self.storeInConnectionCancellables(cancellable)
// start counting down
heartBeats.send(())
}
}

extension AppSyncRealTimeClient {
private func storeInCancellables(_ cancellable: AnyCancellable) {
self.cancellables.insert(cancellable)
}

private func storeInConnectionCancellables(_ cancellable: AnyCancellable) {
self.cancellablesBindToConnection.insert(cancellable)
}
}

extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never {
func toAppSyncSubscriptionEventStream() -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,32 @@ class AppSyncRealTimeClientTests: XCTestCase {
await fulfillment(of: [startTriggered, errorReceived], timeout: 2)

}

func testReconnect_whenHeartBeatSignalIsNotReceived() async throws {
var cancellables = Set<AnyCancellable>()
let timeout = 1.0
let mockWebSocketClient = MockWebSocketClient()
let mockAppSyncRequestInterceptor = MockAppSyncRequestInterceptor()
let appSyncClient = AppSyncRealTimeClient(
endpoint: URL(string: "https://example.com")!,
requestInterceptor: mockAppSyncRequestInterceptor,
webSocketClient: mockWebSocketClient
)

// start monitoring
await appSyncClient.monitorHeartBeats(.object([
"connectionTimeoutMs": 100
]))

let reconnect = expectation(description: "webSocket triggers event to connection")
await mockWebSocketClient.actionSubject.sink { action in
switch action {
case .connect:
reconnect.fulfill()
default: break
}
}.store(in: &cancellables)
await fulfillment(of: [reconnect], timeout: 2)
}

}

0 comments on commit 8189460

Please sign in to comment.