diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift index 4e374e9fdf..7f3d75abbc 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift @@ -165,14 +165,15 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { // Placing the actual subscription work in a deferred task and // promptly returning the filtered publisher for downstream consumption of all error messages. defer { - Task { [weak self] in + let task = Task { [weak self] in guard let self = self else { return } if !(await self.isConnected) { try await connect() try await waitForState(.connected) } - await self.bindCancellableToConnection(try await self.startSubscription(id)) - }.toAnyCancellable.store(in: &cancellablesBindToConnection) + await self.storeInConnectionCancellables(try await self.startSubscription(id)) + } + self.storeInConnectionCancellables(task.toAnyCancellable) } return filterAppSyncSubscriptionEvent(with: id) @@ -236,24 +237,29 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { } private func subscribeToWebSocketEvent() async { - await self.webSocketClient.publisher.sink { [weak self] _ in + let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated") } receiveValue: { webSocketEvent in Task { [weak self] in - await self?.onWebSocketEvent(webSocketEvent) - }.toAnyCancellable.store(in: &self.cancellables) + let task = Task { [weak self] in + await self?.onWebSocketEvent(webSocketEvent) + } + await self?.storeInCancellables(task.toAnyCancellable) + } } - .store(in: &cancellables) + self.storeInCancellables(cancellable) } private func resumeExistingSubscriptions() { log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions") for (id, _) in self.subscriptions { - Task { + Task { [weak self] in do { - try await self.startSubscription(id).store(in: &cancellablesBindToConnection) + if let cancellable = try await self?.startSubscription(id) { + await self?.storeInConnectionCancellables(cancellable) + } } catch { - log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))") + Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))") } } } @@ -286,7 +292,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { subject.filter { switch $0 { case .success(let response): return response.id == id || response.type == .connectionError - case .failure(let error): return true + case .failure: return true } } .map { result -> AppSyncSubscriptionEvent? in @@ -350,10 +356,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:)) } - private func bindCancellableToConnection(_ cancellable: AnyCancellable) { - cancellable.store(in: &cancellablesBindToConnection) - } - } // MARK: - On WebSocket Events @@ -366,8 +368,11 @@ extension AppSyncRealTimeClient { if self.state.value == .connectionDropped { log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop") Task { [weak self] in - try? await self?.connect() - }.toAnyCancellable.store(in: &cancellablesBindToConnection) + let task = Task { [weak self] in + try? await self?.connect() + } + await self?.storeInConnectionCancellables(task.toAnyCancellable) + } } case let .disconnected(closeCode, reason): // @@ -425,24 +430,37 @@ extension AppSyncRealTimeClient { } } - private func monitorHeartBeats(_ connectionAck: JSONValue?) { + func monitorHeartBeats(_ connectionAck: JSONValue?) { let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0 log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms") - heartBeats.eraseToAnyPublisher() + let cancellable = heartBeats.eraseToAnyPublisher() .debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global()) .first() - .sink(receiveValue: { - self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting") + .sink(receiveValue: { [weak self] in + Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting") Task { [weak self] in - await self?.reconnect() - }.toAnyCancellable.store(in: &self.cancellables) + let task = Task { [weak self] in + await self?.reconnect() + } + await self?.storeInCancellables(task.toAnyCancellable) + } }) - .store(in: &cancellablesBindToConnection) + self.storeInConnectionCancellables(cancellable) // start counting down heartBeats.send(()) } } +extension AppSyncRealTimeClient { + private func storeInCancellables(_ cancellable: AnyCancellable) { + self.cancellables.insert(cancellable) + } + + private func storeInConnectionCancellables(_ cancellable: AnyCancellable) { + self.cancellablesBindToConnection.insert(cancellable) + } +} + extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never { func toAppSyncSubscriptionEventStream() -> AnyPublisher { self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in diff --git a/AmplifyPlugins/API/Tests/AWSAPIPluginTests/AppSyncRealTimeClient/AppSyncRealTimeClientTests.swift b/AmplifyPlugins/API/Tests/AWSAPIPluginTests/AppSyncRealTimeClient/AppSyncRealTimeClientTests.swift index 83c2c58216..99b36943f1 100644 --- a/AmplifyPlugins/API/Tests/AWSAPIPluginTests/AppSyncRealTimeClient/AppSyncRealTimeClientTests.swift +++ b/AmplifyPlugins/API/Tests/AWSAPIPluginTests/AppSyncRealTimeClient/AppSyncRealTimeClientTests.swift @@ -551,4 +551,32 @@ class AppSyncRealTimeClientTests: XCTestCase { await fulfillment(of: [startTriggered, errorReceived], timeout: 2) } + + func testReconnect_whenHeartBeatSignalIsNotReceived() async throws { + var cancellables = Set() + let timeout = 1.0 + let mockWebSocketClient = MockWebSocketClient() + let mockAppSyncRequestInterceptor = MockAppSyncRequestInterceptor() + let appSyncClient = AppSyncRealTimeClient( + endpoint: URL(string: "https://example.com")!, + requestInterceptor: mockAppSyncRequestInterceptor, + webSocketClient: mockWebSocketClient + ) + + // start monitoring + await appSyncClient.monitorHeartBeats(.object([ + "connectionTimeoutMs": 100 + ])) + + let reconnect = expectation(description: "webSocket triggers event to connection") + await mockWebSocketClient.actionSubject.sink { action in + switch action { + case .connect: + reconnect.fulfill() + default: break + } + }.store(in: &cancellables) + await fulfillment(of: [reconnect], timeout: 2) + } + }