Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update latency & tunnel failure monitors implementation #613

Merged
merged 13 commits into from
Dec 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import Network
import Common
import Combine

final public class NetworkProtectionLatencyMonitor {
public actor NetworkProtectionLatencyMonitor {
quanganhdo marked this conversation as resolved.
Show resolved Hide resolved
public enum ConnectionQuality: String {
case terrible
case poor
Expand Down Expand Up @@ -59,192 +59,108 @@ final public class NetworkProtectionLatencyMonitor {

private static let unknownLatency: TimeInterval = -1

public var publisher: AnyPublisher<Result, Never> {
subject.eraseToAnyPublisher()
}
private let subject = PassthroughSubject<Result, Never>()

private let latencySubject = PassthroughSubject<TimeInterval, Never>()
private var latencyCancellable: AnyCancellable?

private actor TimerRunCoordinator {
private(set) var isRunning = false

func start() {
isRunning = true
}
private var latencyCancellable: AnyCancellable?

func stop() {
isRunning = false
private var task: Task<Never, Error>? {
willSet {
task?.cancel()
}
}

private var timer: DispatchSourceTimer?
private let timerRunCoordinator = TimerRunCoordinator()
private let timerQueue: DispatchQueue

private let lock = NSLock()

private var _lastLatencyReported: Date = .distantPast
private(set) var lastLatencyReported: Date {
get {
lock.lock(); defer { lock.unlock() }
return _lastLatencyReported
}
set {
lock.lock()
self._lastLatencyReported = newValue
lock.unlock()
}
var isStarted: Bool {
task?.isCancelled == false
}

private let serverIP: () -> IPv4Address?
private var lastLatencyReported: Date = .distantPast

private let log: OSLog

private var _ignoreThreshold = false
private(set) var ignoreThreshold: Bool {
get {
lock.lock(); defer { lock.unlock() }
return _ignoreThreshold
}
set {
lock.lock()
self._ignoreThreshold = newValue
lock.unlock()
}
}
private(set) var serverIP: IPv4Address?
quanganhdo marked this conversation as resolved.
Show resolved Hide resolved

// MARK: - Init & deinit

init(serverIP: @escaping () -> IPv4Address?, timerQueue: DispatchQueue, log: OSLog) {
self.serverIP = serverIP
self.timerQueue = timerQueue
self.log = log

init() {
os_log("[+] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
}

deinit {
os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
task?.cancel()

cancelTimerImmediately()
os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
}

// MARK: - Start/Stop monitoring

public func start() async throws {
guard await !timerRunCoordinator.isRunning else {
os_log("Will not start the latency monitor as it's already running", log: log)
return
}
public func start(serverIP: IPv4Address, callback: @escaping (Result) -> Void) {
os_log("⚫️ Starting latency monitor", log: .networkProtectionLatencyMonitorLog)

os_log("⚫️ Starting latency monitor", log: log)
self.serverIP = serverIP

latencyCancellable = latencySubject.eraseToAnyPublisher()
.scan(ExponentialGeometricAverage()) { [weak self] measurements, latency in
.receive(on: DispatchQueue.main)
.scan(ExponentialGeometricAverage()) { measurements, latency in
if latency >= 0 {
measurements.addMeasurement(latency)
os_log("⚫️ Latency: %{public}f milliseconds", log: .networkProtectionPixel, type: .debug, latency)
os_log("⚫️ Latency: %{public}f milliseconds", log: .networkProtectionLatencyMonitorLog, type: .debug, latency)
} else {
self?.subject.send(.error)
callback(.error)
}

os_log("⚫️ Average: %{public}f milliseconds", log: .networkProtectionPixel, type: .debug, measurements.average)
os_log("⚫️ Average: %{public}f milliseconds", log: .networkProtectionLatencyMonitorLog, type: .debug, measurements.average)

return measurements
}
.map { ConnectionQuality(average: $0.average) }
.sink { [weak self] quality in
let now = Date()
if let self,
(now.timeIntervalSince1970 - self.lastLatencyReported.timeIntervalSince1970 >= Self.reportThreshold) || ignoreThreshold {
self.subject.send(.quality(quality))
self.lastLatencyReported = now
.sink { quality in
Task { [weak self] in
let now = Date()
if let self,
await now.timeIntervalSince1970 - self.lastLatencyReported.timeIntervalSince1970 >= Self.reportThreshold {
callback(.quality(quality))
await self.updateLastLatencyReported(date: now)
}
}
}

do {
try await scheduleTimer()
} catch {
os_log("⚫️ Stopping latency monitor prematurely", log: log)
throw error
task = Task.periodic(interval: Self.measurementInterval) { [weak self] in
await self?.measureLatency()
}
}

public func stop() async {
os_log("⚫️ Stopping latency monitor", log: log)
await stopScheduledTimer()
}

// MARK: - Timer scheduling

private func scheduleTimer() async throws {
await stopScheduledTimer()
public func stop() {
os_log("⚫️ Stopping latency monitor", log: .networkProtectionLatencyMonitorLog)

await timerRunCoordinator.start()

let timer = DispatchSource.makeTimerSource(queue: timerQueue)
self.timer = timer

timer.schedule(deadline: .now() + Self.measurementInterval, repeating: Self.measurementInterval)
timer.setEventHandler { [weak self] in
guard let self else { return }

Task {
await self.measureLatency()
}
}

timer.setCancelHandler { [weak self] in
self?.timer = nil
}

timer.resume()
latencyCancellable = nil
task = nil
}

private func stopScheduledTimer() async {
await timerRunCoordinator.stop()

cancelTimerImmediately()
}

private func cancelTimerImmediately() {
guard let timer else { return }

if !timer.isCancelled {
timer.cancel()
}

self.timer = nil
private func updateLastLatencyReported(date: Date) {
lastLatencyReported = date
}

// MARK: - Latency monitor

@MainActor
public func measureLatency() async {
guard let serverIP = serverIP() else {
guard let serverIP else {
latencySubject.send(Self.unknownLatency)
quanganhdo marked this conversation as resolved.
Show resolved Hide resolved
return
}

os_log("⚫️ Pinging %{public}s", log: .networkProtectionPixel, type: .debug, serverIP.debugDescription)
os_log("⚫️ Pinging %{public}s", log: .networkProtectionLatencyMonitorLog, type: .debug, serverIP.debugDescription)

let result = await Pinger(ip: serverIP, timeout: Self.pingTimeout, log: .networkProtectionPixel).ping()
let result = await Pinger(ip: serverIP, timeout: Self.pingTimeout, log: .networkProtectionLatencyMonitorLog).ping()

switch result {
case .success(let pingResult):
latencySubject.send(pingResult.time * 1000)
case .failure(let error):
os_log("⚫️ Ping error: %{public}s", log: .networkProtectionPixel, type: .debug, error.localizedDescription)
os_log("⚫️ Ping error: %{public}s", log: .networkProtectionLatencyMonitorLog, type: .debug, error.localizedDescription)
latencySubject.send(Self.unknownLatency)
}
}

public func simulateLatency(_ timeInterval: TimeInterval) {
ignoreThreshold = true
latencySubject.send(timeInterval)
ignoreThreshold = false
}
}

Expand Down
Loading
Loading