From 18043cbc24e5bb79aa1f44e01a367e0bccd2fa6e Mon Sep 17 00:00:00 2001 From: Anh Do <18567+quanganhdo@users.noreply.github.com> Date: Fri, 22 Dec 2023 18:10:19 -0500 Subject: [PATCH] Update latency & tunnel failure monitors implementation (#613) * Rewrite tunnel failure monitor using Task instead of Timer * Rewrite latency monitor using Task instead of Timer * Remove ignoreThreshold * Update logging categories --- .../NetworkProtectionLatencyMonitor.swift | 171 ++++-------------- ...etworkProtectionTunnelFailureMonitor.swift | 137 +++----------- .../NetworkProtection/Logging/Logging.swift | 14 ++ .../PacketTunnelProvider.swift | 82 ++++----- ...NetworkProtectionLatencyMonitorTests.swift | 60 ++---- 5 files changed, 136 insertions(+), 328 deletions(-) diff --git a/Sources/NetworkProtection/Diagnostics/NetworkProtectionLatencyMonitor.swift b/Sources/NetworkProtection/Diagnostics/NetworkProtectionLatencyMonitor.swift index 3271c447f..287fabf43 100644 --- a/Sources/NetworkProtection/Diagnostics/NetworkProtectionLatencyMonitor.swift +++ b/Sources/NetworkProtection/Diagnostics/NetworkProtectionLatencyMonitor.swift @@ -21,7 +21,7 @@ import Network import Common import Combine -final public class NetworkProtectionLatencyMonitor { +public actor NetworkProtectionLatencyMonitor { public enum ConnectionQuality: String { case terrible case poor @@ -55,196 +55,103 @@ final public class NetworkProtectionLatencyMonitor { private static let reportThreshold: TimeInterval = .minutes(10) private static let measurementInterval: TimeInterval = .seconds(5) - private static let pingTimeout: TimeInterval = 0.3 + private static let pingTimeout: TimeInterval = .seconds(1) private static let unknownLatency: TimeInterval = -1 - public var publisher: AnyPublisher { - subject.eraseToAnyPublisher() - } - private let subject = PassthroughSubject() - private let latencySubject = PassthroughSubject() - private var latencyCancellable: AnyCancellable? - private actor TimerRunCoordinator { - private(set) var isRunning = false - - func start() { - isRunning = true - } + private var latencyCancellable: AnyCancellable? - func stop() { - isRunning = false + private var task: Task? { + willSet { + task?.cancel() } } - private var timer: DispatchSourceTimer? - private let timerRunCoordinator = TimerRunCoordinator() - private let timerQueue: DispatchQueue - - private let lock = NSLock() - - private var _lastLatencyReported: Date = .distantPast - private(set) var lastLatencyReported: Date { - get { - lock.lock(); defer { lock.unlock() } - return _lastLatencyReported - } - set { - lock.lock() - self._lastLatencyReported = newValue - lock.unlock() - } + var isStarted: Bool { + task?.isCancelled == false } - private let serverIP: () -> IPv4Address? - - private let log: OSLog - - private var _ignoreThreshold = false - private(set) var ignoreThreshold: Bool { - get { - lock.lock(); defer { lock.unlock() } - return _ignoreThreshold - } - set { - lock.lock() - self._ignoreThreshold = newValue - lock.unlock() - } - } + private var lastLatencyReported: Date = .distantPast // MARK: - Init & deinit - init(serverIP: @escaping () -> IPv4Address?, timerQueue: DispatchQueue, log: OSLog) { - self.serverIP = serverIP - self.timerQueue = timerQueue - self.log = log - + init() { os_log("[+] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self)) } deinit { - os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self)) + task?.cancel() - cancelTimerImmediately() + os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self)) } // MARK: - Start/Stop monitoring - public func start() async throws { - guard await !timerRunCoordinator.isRunning else { - os_log("Will not start the latency monitor as it's already running", log: log) - return - } - - os_log("⚫️ Starting latency monitor", log: log) + public func start(serverIP: IPv4Address, callback: @escaping (Result) -> Void) { + os_log("⚫️ Starting latency monitor", log: .networkProtectionLatencyMonitorLog) latencyCancellable = latencySubject.eraseToAnyPublisher() - .scan(ExponentialGeometricAverage()) { [weak self] measurements, latency in + .receive(on: DispatchQueue.main) + .scan(ExponentialGeometricAverage()) { measurements, latency in if latency >= 0 { measurements.addMeasurement(latency) - os_log("⚫️ Latency: %{public}f milliseconds", log: .networkProtectionPixel, type: .debug, latency) + os_log("⚫️ Latency: %{public}f milliseconds", log: .networkProtectionLatencyMonitorLog, type: .debug, latency) } else { - self?.subject.send(.error) + callback(.error) } - os_log("⚫️ Average: %{public}f milliseconds", log: .networkProtectionPixel, type: .debug, measurements.average) + os_log("⚫️ Average: %{public}f milliseconds", log: .networkProtectionLatencyMonitorLog, type: .debug, measurements.average) return measurements } .map { ConnectionQuality(average: $0.average) } - .sink { [weak self] quality in - let now = Date() - if let self, - (now.timeIntervalSince1970 - self.lastLatencyReported.timeIntervalSince1970 >= Self.reportThreshold) || ignoreThreshold { - self.subject.send(.quality(quality)) - self.lastLatencyReported = now + .sink { quality in + Task { [weak self] in + let now = Date() + if let self, + await now.timeIntervalSince1970 - self.lastLatencyReported.timeIntervalSince1970 >= Self.reportThreshold { + callback(.quality(quality)) + await self.updateLastLatencyReported(date: now) + } } } - do { - try await scheduleTimer() - } catch { - os_log("⚫️ Stopping latency monitor prematurely", log: log) - throw error + task = Task.periodic(interval: Self.measurementInterval) { [weak self] in + await self?.measureLatency(to: serverIP) } } - public func stop() async { - os_log("⚫️ Stopping latency monitor", log: log) - await stopScheduledTimer() - } - - // MARK: - Timer scheduling - - private func scheduleTimer() async throws { - await stopScheduledTimer() - - await timerRunCoordinator.start() - - let timer = DispatchSource.makeTimerSource(queue: timerQueue) - self.timer = timer - - timer.schedule(deadline: .now() + Self.measurementInterval, repeating: Self.measurementInterval) - timer.setEventHandler { [weak self] in - guard let self else { return } - - Task { - await self.measureLatency() - } - } - - timer.setCancelHandler { [weak self] in - self?.timer = nil - } + public func stop() { + os_log("⚫️ Stopping latency monitor", log: .networkProtectionLatencyMonitorLog) - timer.resume() + latencyCancellable = nil + task = nil } - private func stopScheduledTimer() async { - await timerRunCoordinator.stop() - - cancelTimerImmediately() - } - - private func cancelTimerImmediately() { - guard let timer else { return } - - if !timer.isCancelled { - timer.cancel() - } - - self.timer = nil + private func updateLastLatencyReported(date: Date) { + lastLatencyReported = date } // MARK: - Latency monitor - @MainActor - public func measureLatency() async { - guard let serverIP = serverIP() else { - latencySubject.send(Self.unknownLatency) - return - } - - os_log("⚫️ Pinging %{public}s", log: .networkProtectionPixel, type: .debug, serverIP.debugDescription) + private func measureLatency(to ip: IPv4Address) async { + os_log("⚫️ Pinging %{public}s", log: .networkProtectionLatencyMonitorLog, type: .debug, ip.debugDescription) - let result = await Pinger(ip: serverIP, timeout: Self.pingTimeout, log: .networkProtectionPixel).ping() + let result = await Pinger(ip: ip, timeout: Self.pingTimeout, log: .networkProtectionLatencyMonitorLog).ping() switch result { case .success(let pingResult): latencySubject.send(pingResult.time * 1000) case .failure(let error): - os_log("⚫️ Ping error: %{public}s", log: .networkProtectionPixel, type: .debug, error.localizedDescription) + os_log("⚫️ Ping error: %{public}s", log: .networkProtectionLatencyMonitorLog, type: .debug, error.localizedDescription) latencySubject.send(Self.unknownLatency) } } public func simulateLatency(_ timeInterval: TimeInterval) { - ignoreThreshold = true latencySubject.send(timeInterval) - ignoreThreshold = false } } diff --git a/Sources/NetworkProtection/Diagnostics/NetworkProtectionTunnelFailureMonitor.swift b/Sources/NetworkProtection/Diagnostics/NetworkProtectionTunnelFailureMonitor.swift index c801cf2e2..06a219c39 100644 --- a/Sources/NetworkProtection/Diagnostics/NetworkProtectionTunnelFailureMonitor.swift +++ b/Sources/NetworkProtection/Diagnostics/NetworkProtectionTunnelFailureMonitor.swift @@ -22,7 +22,7 @@ import NetworkExtension import Common import Combine -final public class NetworkProtectionTunnelFailureMonitor { +public actor NetworkProtectionTunnelFailureMonitor { public enum Result { case failureDetected case failureRecovered @@ -39,156 +39,77 @@ final public class NetworkProtectionTunnelFailureMonitor { private static let monitoringInterval: TimeInterval = .seconds(10) - public var publisher: AnyPublisher { - failureSubject.eraseToAnyPublisher() - } - private let failureSubject = PassthroughSubject() - - private actor TimerRunCoordinator { - private(set) var isRunning = false - - func start() { - isRunning = true - } - - func stop() { - isRunning = false + private var task: Task? { + willSet { + task?.cancel() } } - private var timer: DispatchSourceTimer? - private let timerRunCoordinator = TimerRunCoordinator() - private let timerQueue: DispatchQueue + var isStarted: Bool { + task?.isCancelled == false + } - private let tunnelProvider: PacketTunnelProvider + private weak var tunnelProvider: PacketTunnelProvider? private let networkMonitor = NWPathMonitor() - private let log: OSLog - - private let lock = NSLock() - - private var _failureReported = false - private(set) var failureReported: Bool { - get { - lock.lock(); defer { lock.unlock() } - return _failureReported - } - set { - lock.lock() - self._failureReported = newValue - lock.unlock() - } - } + private var failureReported = false // MARK: - Init & deinit - init(tunnelProvider: PacketTunnelProvider, timerQueue: DispatchQueue, log: OSLog) { + init(tunnelProvider: PacketTunnelProvider) { self.tunnelProvider = tunnelProvider - self.timerQueue = timerQueue - self.log = log + self.networkMonitor.start(queue: .global()) os_log("[+] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self)) } deinit { - os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self)) + task?.cancel() + networkMonitor.cancel() - cancelTimerImmediately() + os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self)) } // MARK: - Start/Stop monitoring - func start() async throws { - guard await !timerRunCoordinator.isRunning else { - os_log("Will not start the tunnel failure monitor as it's already running", log: log) - return - } - - os_log("⚫️ Starting tunnel failure monitor", log: log) - - do { - networkMonitor.start(queue: .global()) - - failureReported = false - try await scheduleTimer() - } catch { - os_log("⚫️ Stopping tunnel failure monitor prematurely", log: log) - throw error - } - } - - func stop() async { - os_log("⚫️ Stopping tunnel failure monitor", log: log) - await stopScheduledTimer() - - networkMonitor.cancel() - } - - // MARK: - Timer scheduling - - private func scheduleTimer() async throws { - await stopScheduledTimer() - - await timerRunCoordinator.start() - - let timer = DispatchSource.makeTimerSource(queue: timerQueue) - self.timer = timer + func start(callback: @escaping (Result) -> Void) { + os_log("⚫️ Starting tunnel failure monitor", log: .networkProtectionTunnelFailureMonitorLog) - timer.schedule(deadline: .now() + Self.monitoringInterval, repeating: Self.monitoringInterval) - timer.setEventHandler { [weak self] in - guard let self else { return } + failureReported = false - Task { - try? await self.monitorHandshakes() - } - } - - timer.setCancelHandler { [weak self] in - self?.timer = nil + task = Task.periodic(interval: Self.monitoringInterval) { [weak self] in + await self?.monitorHandshakes(callback: callback) } - - timer.resume() } - private func stopScheduledTimer() async { - await timerRunCoordinator.stop() - - cancelTimerImmediately() - } - - private func cancelTimerImmediately() { - guard let timer else { return } - - if !timer.isCancelled { - timer.cancel() - } + func stop() { + os_log("⚫️ Stopping tunnel failure monitor", log: .networkProtectionTunnelFailureMonitorLog) - self.timer = nil + task = nil } // MARK: - Handshake monitor - @MainActor - func monitorHandshakes() async throws { - let mostRecentHandshake = await tunnelProvider.mostRecentHandshake() ?? 0 + private func monitorHandshakes(callback: @escaping (Result) -> Void) async { + let mostRecentHandshake = await tunnelProvider?.mostRecentHandshake() ?? 0 let difference = Date().timeIntervalSince1970 - mostRecentHandshake - os_log("⚫️ Last handshake: %{public}f seconds ago", log: .networkProtectionPixel, type: .debug, difference) + os_log("⚫️ Last handshake: %{public}f seconds ago", log: .networkProtectionTunnelFailureMonitorLog, type: .debug, difference) if difference > Result.failureDetected.threshold, isConnected { if failureReported { - os_log("⚫️ Tunnel failure already reported", log: .networkProtectionPixel, type: .debug) + os_log("⚫️ Tunnel failure already reported", log: .networkProtectionTunnelFailureMonitorLog, type: .debug) } else { - failureSubject.send(.failureDetected) + callback(.failureDetected) failureReported = true } } else if difference <= Result.failureRecovered.threshold, failureReported { - failureSubject.send(.failureRecovered) + callback(.failureRecovered) failureReported = false } } - var isConnected: Bool { + private var isConnected: Bool { let path = networkMonitor.currentPath let connectionType = NetworkConnectionType(nwPath: path) diff --git a/Sources/NetworkProtection/Logging/Logging.swift b/Sources/NetworkProtection/Logging/Logging.swift index 6f604e606..33aad17b9 100644 --- a/Sources/NetworkProtection/Logging/Logging.swift +++ b/Sources/NetworkProtection/Logging/Logging.swift @@ -29,6 +29,14 @@ extension OSLog { Logging.networkProtectionBandwidthAnalysisLoggingEnabled ? Logging.networkProtectionBandwidthAnalysis : .disabled } + public static var networkProtectionLatencyMonitorLog: OSLog { + Logging.networkProtectionLatencyMonitorLoggingEnabled ? Logging.networkProtectionLatencyMonitor : .disabled + } + + public static var networkProtectionTunnelFailureMonitorLog: OSLog { + Logging.networkProtectionTunnelFailureMonitorLoggingEnabled ? Logging.networkProtectionTunnelFailureMonitor : .disabled + } + public static var networkProtectionConnectionTesterLog: OSLog { Logging.networkProtectionConnectionTesterLoggingEnabled ? Logging.networkProtectionConnectionTesterLog : .disabled } @@ -73,6 +81,12 @@ struct Logging { fileprivate static let networkProtectionBandwidthAnalysisLoggingEnabled = true fileprivate static let networkProtectionBandwidthAnalysis: OSLog = OSLog(subsystem: subsystem, category: "Network Protection: Bandwidth Analysis") + fileprivate static let networkProtectionLatencyMonitorLoggingEnabled = true + fileprivate static let networkProtectionLatencyMonitor: OSLog = OSLog(subsystem: subsystem, category: "Network Protection: Latency Monitor") + + fileprivate static let networkProtectionTunnelFailureMonitorLoggingEnabled = true + fileprivate static let networkProtectionTunnelFailureMonitor: OSLog = OSLog(subsystem: subsystem, category: "Network Protection: Tunnel Failure Monitor") + fileprivate static let networkProtectionConnectionTesterLoggingEnabled = true fileprivate static let networkProtectionConnectionTesterLog: OSLog = OSLog(subsystem: subsystem, category: "Network Protection: Connection Tester") diff --git a/Sources/NetworkProtection/PacketTunnelProvider.swift b/Sources/NetworkProtection/PacketTunnelProvider.swift index 03ac8b0d5..9572b51bc 100644 --- a/Sources/NetworkProtection/PacketTunnelProvider.swift +++ b/Sources/NetworkProtection/PacketTunnelProvider.swift @@ -255,13 +255,8 @@ open class PacketTunnelProvider: NEPacketTunnelProvider { } }() - public lazy var tunnelFailureMonitor = NetworkProtectionTunnelFailureMonitor(tunnelProvider: self, - timerQueue: timerQueue, - log: .networkProtectionPixel) - - public lazy var latencyMonitor = NetworkProtectionLatencyMonitor(serverIP: { [weak self] in self?.lastSelectedServerInfo?.ipv4 }, - timerQueue: timerQueue, - log: .networkProtectionPixel) + public lazy var tunnelFailureMonitor = NetworkProtectionTunnelFailureMonitor(tunnelProvider: self) + public lazy var latencyMonitor = NetworkProtectionLatencyMonitor() private var lastTestFailed = false private let bandwidthAnalyzer = NetworkProtectionConnectionBandwidthAnalyzer() @@ -301,8 +296,6 @@ open class PacketTunnelProvider: NEPacketTunnelProvider { observeSettingChanges() observeConnectionStatusChanges() - observeTunnelFailures() - observeConnectionQuality() } deinit { @@ -449,31 +442,6 @@ open class PacketTunnelProvider: NEPacketTunnelProvider { .store(in: &cancellables) } - private func observeTunnelFailures() { - tunnelFailureMonitor.publisher - .sink { [weak self] result in - self?.providerEvents.fire(.reportTunnelFailure(result: result)) - } - .store(in: &cancellables) - } - - private func observeConnectionQuality() { - latencyMonitor.publisher - .flatMap { [weak self] result in - switch result { - case .error: - self?.providerEvents.fire(.reportLatency(result: .error)) - return Empty().eraseToAnyPublisher() - case .quality(let quality): - return Just(quality).eraseToAnyPublisher() - } - } - .sink { [weak self] quality in - self?.providerEvents.fire(.reportLatency(result: .quality(quality))) - } - .store(in: &cancellables) - } - // MARK: - Tunnel Start open override func startTunnel(options: [String: NSObject]?, completionHandler: @escaping (Error?) -> Void) { @@ -1033,19 +1001,8 @@ open class PacketTunnelProvider: NEPacketTunnelProvider { os_log("🔵 Tunnel interface is %{public}@", log: .networkProtection, type: .info, adapter.interfaceName ?? "unknown") - do { - try await tunnelFailureMonitor.start() - } catch { - os_log("⚫️ Tunnel failure monitor error: %{public}@", log: .networkProtectionPixel, type: .error, String(reflecting: error)) - throw error - } - - do { - try await latencyMonitor.start() - } catch { - os_log("⚫️ Latency monitor error: %{public}@", log: .networkProtectionPixel, type: .error, String(reflecting: error)) - throw error - } + await startTunnelFailureMonitor() + await startLatencyMonitor() do { // These cases only make sense in the context of a connection that had trouble @@ -1066,6 +1023,37 @@ open class PacketTunnelProvider: NEPacketTunnelProvider { await self.latencyMonitor.stop() } + // MARK: - Monitors + + private func startTunnelFailureMonitor() async { + if await tunnelFailureMonitor.isStarted { + await tunnelFailureMonitor.stop() + } + + await tunnelFailureMonitor.start { [weak self] result in + self?.providerEvents.fire(.reportTunnelFailure(result: result)) + } + } + + private func startLatencyMonitor() async { + guard let ip = lastSelectedServerInfo?.ipv4 else { + await latencyMonitor.stop() + return + } + if await latencyMonitor.isStarted { + await latencyMonitor.stop() + } + + await latencyMonitor.start(serverIP: ip) { [weak self] result in + switch result { + case .error: + self?.providerEvents.fire(.reportLatency(result: .error)) + case .quality(let quality): + self?.providerEvents.fire(.reportLatency(result: .quality(quality))) + } + } + } + // MARK: - Connection Tester private enum ConnectionTesterError: Error { diff --git a/Tests/NetworkProtectionTests/NetworkProtectionLatencyMonitorTests.swift b/Tests/NetworkProtectionTests/NetworkProtectionLatencyMonitorTests.swift index 89f9b305e..12e9e0008 100644 --- a/Tests/NetworkProtectionTests/NetworkProtectionLatencyMonitorTests.swift +++ b/Tests/NetworkProtectionTests/NetworkProtectionLatencyMonitorTests.swift @@ -27,42 +27,24 @@ final class NetworkProtectionLatencyMonitorTests: XCTestCase { override func setUp() async throws { try await super.setUp() - monitor = NetworkProtectionLatencyMonitor(serverIP: { nil }, timerQueue: DispatchQueue.main, log: .networkProtectionPixel) - - try await monitor?.start() + monitor = NetworkProtectionLatencyMonitor() } override func tearDown() async throws { await monitor?.stop() } - func testInvalidIP() async { - let expectation = XCTestExpectation(description: "Invalid IP reported") - cancellable = monitor?.publisher - .sink { result in - switch result { - case .error: - expectation.fulfill() - case .quality: - break - } - } - await monitor?.measureLatency() - await fulfillment(of: [expectation], timeout: 1) - } - func testPingFailure() async { let expectation = XCTestExpectation(description: "Ping failure reported") - cancellable = monitor?.publisher - .sink { result in - switch result { - case .error: - expectation.fulfill() - case .quality: - break - } + await monitor?.start(serverIP: .init("127.0.0.1")!) { result in + switch result { + case .error: + expectation.fulfill() + case .quality: + break } - monitor?.simulateLatency(-1) + } + await monitor?.simulateLatency(-1) await fulfillment(of: [expectation], timeout: 1) } @@ -80,23 +62,19 @@ final class NetworkProtectionLatencyMonitorTests: XCTestCase { } private func testConnectionLatency(_ timeInterval: TimeInterval, expecting expectedQuality: NetworkProtectionLatencyMonitor.ConnectionQuality) async { - let monitor = NetworkProtectionLatencyMonitor(serverIP: { nil }, timerQueue: DispatchQueue.main, log: .networkProtectionPixel) + let monitor = NetworkProtectionLatencyMonitor() var reportedQuality = NetworkProtectionLatencyMonitor.ConnectionQuality.unknown - cancellable = monitor.publisher - .sink { result in - switch result { - case .quality(let quality): - reportedQuality = quality - case .error: - XCTFail("Unexpected result") - } + await monitor.start(serverIP: .init("127.0.0.1")!) { result in + switch result { + case .quality(let quality): + reportedQuality = quality + XCTAssertEqual(expectedQuality, reportedQuality) + case .error: + XCTFail("Unexpected result") } - - try? await monitor.start() - monitor.simulateLatency(timeInterval) + } + await monitor.simulateLatency(timeInterval) await monitor.stop() - - XCTAssertEqual(expectedQuality, reportedQuality) } }