Skip to content

Commit

Permalink
Update latency & tunnel failure monitors implementation (#613)
Browse files Browse the repository at this point in the history
* Rewrite tunnel failure monitor using Task instead of Timer
* Rewrite latency monitor using Task instead of Timer
* Remove ignoreThreshold
* Update logging categories
  • Loading branch information
quanganhdo authored Dec 22, 2023
1 parent 2dc2db9 commit 18043cb
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import Network
import Common
import Combine

final public class NetworkProtectionLatencyMonitor {
public actor NetworkProtectionLatencyMonitor {
public enum ConnectionQuality: String {
case terrible
case poor
Expand Down Expand Up @@ -55,196 +55,103 @@ final public class NetworkProtectionLatencyMonitor {

private static let reportThreshold: TimeInterval = .minutes(10)
private static let measurementInterval: TimeInterval = .seconds(5)
private static let pingTimeout: TimeInterval = 0.3
private static let pingTimeout: TimeInterval = .seconds(1)

private static let unknownLatency: TimeInterval = -1

public var publisher: AnyPublisher<Result, Never> {
subject.eraseToAnyPublisher()
}
private let subject = PassthroughSubject<Result, Never>()

private let latencySubject = PassthroughSubject<TimeInterval, Never>()
private var latencyCancellable: AnyCancellable?

private actor TimerRunCoordinator {
private(set) var isRunning = false

func start() {
isRunning = true
}
private var latencyCancellable: AnyCancellable?

func stop() {
isRunning = false
private var task: Task<Never, Error>? {
willSet {
task?.cancel()
}
}

private var timer: DispatchSourceTimer?
private let timerRunCoordinator = TimerRunCoordinator()
private let timerQueue: DispatchQueue

private let lock = NSLock()

private var _lastLatencyReported: Date = .distantPast
private(set) var lastLatencyReported: Date {
get {
lock.lock(); defer { lock.unlock() }
return _lastLatencyReported
}
set {
lock.lock()
self._lastLatencyReported = newValue
lock.unlock()
}
var isStarted: Bool {
task?.isCancelled == false
}

private let serverIP: () -> IPv4Address?

private let log: OSLog

private var _ignoreThreshold = false
private(set) var ignoreThreshold: Bool {
get {
lock.lock(); defer { lock.unlock() }
return _ignoreThreshold
}
set {
lock.lock()
self._ignoreThreshold = newValue
lock.unlock()
}
}
private var lastLatencyReported: Date = .distantPast

// MARK: - Init & deinit

init(serverIP: @escaping () -> IPv4Address?, timerQueue: DispatchQueue, log: OSLog) {
self.serverIP = serverIP
self.timerQueue = timerQueue
self.log = log

init() {
os_log("[+] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
}

deinit {
os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
task?.cancel()

cancelTimerImmediately()
os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
}

// MARK: - Start/Stop monitoring

public func start() async throws {
guard await !timerRunCoordinator.isRunning else {
os_log("Will not start the latency monitor as it's already running", log: log)
return
}

os_log("⚫️ Starting latency monitor", log: log)
public func start(serverIP: IPv4Address, callback: @escaping (Result) -> Void) {
os_log("⚫️ Starting latency monitor", log: .networkProtectionLatencyMonitorLog)

latencyCancellable = latencySubject.eraseToAnyPublisher()
.scan(ExponentialGeometricAverage()) { [weak self] measurements, latency in
.receive(on: DispatchQueue.main)
.scan(ExponentialGeometricAverage()) { measurements, latency in
if latency >= 0 {
measurements.addMeasurement(latency)
os_log("⚫️ Latency: %{public}f milliseconds", log: .networkProtectionPixel, type: .debug, latency)
os_log("⚫️ Latency: %{public}f milliseconds", log: .networkProtectionLatencyMonitorLog, type: .debug, latency)
} else {
self?.subject.send(.error)
callback(.error)
}

os_log("⚫️ Average: %{public}f milliseconds", log: .networkProtectionPixel, type: .debug, measurements.average)
os_log("⚫️ Average: %{public}f milliseconds", log: .networkProtectionLatencyMonitorLog, type: .debug, measurements.average)

return measurements
}
.map { ConnectionQuality(average: $0.average) }
.sink { [weak self] quality in
let now = Date()
if let self,
(now.timeIntervalSince1970 - self.lastLatencyReported.timeIntervalSince1970 >= Self.reportThreshold) || ignoreThreshold {
self.subject.send(.quality(quality))
self.lastLatencyReported = now
.sink { quality in
Task { [weak self] in
let now = Date()
if let self,
await now.timeIntervalSince1970 - self.lastLatencyReported.timeIntervalSince1970 >= Self.reportThreshold {
callback(.quality(quality))
await self.updateLastLatencyReported(date: now)
}
}
}

do {
try await scheduleTimer()
} catch {
os_log("⚫️ Stopping latency monitor prematurely", log: log)
throw error
task = Task.periodic(interval: Self.measurementInterval) { [weak self] in
await self?.measureLatency(to: serverIP)
}
}

public func stop() async {
os_log("⚫️ Stopping latency monitor", log: log)
await stopScheduledTimer()
}

// MARK: - Timer scheduling

private func scheduleTimer() async throws {
await stopScheduledTimer()

await timerRunCoordinator.start()

let timer = DispatchSource.makeTimerSource(queue: timerQueue)
self.timer = timer

timer.schedule(deadline: .now() + Self.measurementInterval, repeating: Self.measurementInterval)
timer.setEventHandler { [weak self] in
guard let self else { return }

Task {
await self.measureLatency()
}
}

timer.setCancelHandler { [weak self] in
self?.timer = nil
}
public func stop() {
os_log("⚫️ Stopping latency monitor", log: .networkProtectionLatencyMonitorLog)

timer.resume()
latencyCancellable = nil
task = nil
}

private func stopScheduledTimer() async {
await timerRunCoordinator.stop()

cancelTimerImmediately()
}

private func cancelTimerImmediately() {
guard let timer else { return }

if !timer.isCancelled {
timer.cancel()
}

self.timer = nil
private func updateLastLatencyReported(date: Date) {
lastLatencyReported = date
}

// MARK: - Latency monitor

@MainActor
public func measureLatency() async {
guard let serverIP = serverIP() else {
latencySubject.send(Self.unknownLatency)
return
}

os_log("⚫️ Pinging %{public}s", log: .networkProtectionPixel, type: .debug, serverIP.debugDescription)
private func measureLatency(to ip: IPv4Address) async {
os_log("⚫️ Pinging %{public}s", log: .networkProtectionLatencyMonitorLog, type: .debug, ip.debugDescription)

let result = await Pinger(ip: serverIP, timeout: Self.pingTimeout, log: .networkProtectionPixel).ping()
let result = await Pinger(ip: ip, timeout: Self.pingTimeout, log: .networkProtectionLatencyMonitorLog).ping()

switch result {
case .success(let pingResult):
latencySubject.send(pingResult.time * 1000)
case .failure(let error):
os_log("⚫️ Ping error: %{public}s", log: .networkProtectionPixel, type: .debug, error.localizedDescription)
os_log("⚫️ Ping error: %{public}s", log: .networkProtectionLatencyMonitorLog, type: .debug, error.localizedDescription)
latencySubject.send(Self.unknownLatency)
}
}

public func simulateLatency(_ timeInterval: TimeInterval) {
ignoreThreshold = true
latencySubject.send(timeInterval)
ignoreThreshold = false
}
}

Expand Down
Loading

0 comments on commit 18043cb

Please sign in to comment.