Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update latency & tunnel failure monitors implementation #613

Merged
merged 13 commits into from
Dec 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import Network
import Common
import Combine

final public class NetworkProtectionLatencyMonitor {
public actor NetworkProtectionLatencyMonitor {
quanganhdo marked this conversation as resolved.
Show resolved Hide resolved
public enum ConnectionQuality: String {
case terrible
case poor
Expand Down Expand Up @@ -59,95 +59,60 @@ final public class NetworkProtectionLatencyMonitor {

private static let unknownLatency: TimeInterval = -1

public var publisher: AnyPublisher<Result, Never> {
subject.eraseToAnyPublisher()
}
private let subject = PassthroughSubject<Result, Never>()

@MainActor
private let latencySubject = PassthroughSubject<TimeInterval, Never>()
private var latencyCancellable: AnyCancellable?

private actor TimerRunCoordinator {
private(set) var isRunning = false

func start() {
isRunning = true
}
@MainActor
private var latencyCancellable: AnyCancellable?

func stop() {
isRunning = false
@MainActor
private var task: Task<Never, Error>? {
willSet {
task?.cancel()
}
}

private var timer: DispatchSourceTimer?
private let timerRunCoordinator = TimerRunCoordinator()
private let timerQueue: DispatchQueue

private let lock = NSLock()

private var _lastLatencyReported: Date = .distantPast
private(set) var lastLatencyReported: Date {
get {
lock.lock(); defer { lock.unlock() }
return _lastLatencyReported
}
set {
lock.lock()
self._lastLatencyReported = newValue
lock.unlock()
}
@MainActor
var isStarted: Bool {
task?.isCancelled == false
}

private let serverIP: () -> IPv4Address?
@MainActor
private var lastLatencyReported: Date = .distantPast

private let log: OSLog
@MainActor
private var ignoreThreshold = false

private var _ignoreThreshold = false
private(set) var ignoreThreshold: Bool {
get {
lock.lock(); defer { lock.unlock() }
return _ignoreThreshold
}
set {
lock.lock()
self._ignoreThreshold = newValue
lock.unlock()
}
}
@MainActor
private(set) var serverIP: IPv4Address?
quanganhdo marked this conversation as resolved.
Show resolved Hide resolved

// MARK: - Init & deinit

init(serverIP: @escaping () -> IPv4Address?, timerQueue: DispatchQueue, log: OSLog) {
self.serverIP = serverIP
self.timerQueue = timerQueue
self.log = log

init() {
os_log("[+] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
}

deinit {
os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
task?.cancel()

cancelTimerImmediately()
os_log("[-] %{public}@", log: .networkProtectionMemoryLog, type: .debug, String(describing: self))
}

// MARK: - Start/Stop monitoring

public func start() async throws {
guard await !timerRunCoordinator.isRunning else {
os_log("Will not start the latency monitor as it's already running", log: log)
return
}
@MainActor
public func start(serverIP: IPv4Address, callback: @escaping (Result) -> Void) {
os_log("⚫️ Starting latency monitor", log: .networkProtectionPixel)
quanganhdo marked this conversation as resolved.
Show resolved Hide resolved

os_log("⚫️ Starting latency monitor", log: log)
self.serverIP = serverIP

latencyCancellable = latencySubject.eraseToAnyPublisher()
.scan(ExponentialGeometricAverage()) { [weak self] measurements, latency in
.scan(ExponentialGeometricAverage()) { measurements, latency in
if latency >= 0 {
measurements.addMeasurement(latency)
os_log("⚫️ Latency: %{public}f milliseconds", log: .networkProtectionPixel, type: .debug, latency)
} else {
self?.subject.send(.error)
callback(.error)
}

os_log("⚫️ Average: %{public}f milliseconds", log: .networkProtectionPixel, type: .debug, measurements.average)
Expand All @@ -159,71 +124,29 @@ final public class NetworkProtectionLatencyMonitor {
let now = Date()
if let self,
(now.timeIntervalSince1970 - self.lastLatencyReported.timeIntervalSince1970 >= Self.reportThreshold) || ignoreThreshold {
self.subject.send(.quality(quality))
callback(.quality(quality))
self.lastLatencyReported = now
}
}

do {
try await scheduleTimer()
} catch {
os_log("⚫️ Stopping latency monitor prematurely", log: log)
throw error
}
}

public func stop() async {
os_log("⚫️ Stopping latency monitor", log: log)
await stopScheduledTimer()
}

// MARK: - Timer scheduling

private func scheduleTimer() async throws {
await stopScheduledTimer()

await timerRunCoordinator.start()

let timer = DispatchSource.makeTimerSource(queue: timerQueue)
self.timer = timer

timer.schedule(deadline: .now() + Self.measurementInterval, repeating: Self.measurementInterval)
timer.setEventHandler { [weak self] in
guard let self else { return }

Task {
await self.measureLatency()
}
}

timer.setCancelHandler { [weak self] in
self?.timer = nil
task = Task.periodic(interval: Self.measurementInterval) { [weak self] in
await self?.measureLatency()
}

timer.resume()
}

private func stopScheduledTimer() async {
await timerRunCoordinator.stop()

cancelTimerImmediately()
}

private func cancelTimerImmediately() {
guard let timer else { return }

if !timer.isCancelled {
timer.cancel()
}
@MainActor
public func stop() {
os_log("⚫️ Stopping latency monitor", log: .networkProtectionPixel)

self.timer = nil
latencyCancellable = nil
task = nil
}

// MARK: - Latency monitor

@MainActor
public func measureLatency() async {
guard let serverIP = serverIP() else {
guard let serverIP else {
latencySubject.send(Self.unknownLatency)
quanganhdo marked this conversation as resolved.
Show resolved Hide resolved
return
}
Expand All @@ -241,6 +164,7 @@ final public class NetworkProtectionLatencyMonitor {
}
}

@MainActor
public func simulateLatency(_ timeInterval: TimeInterval) {
ignoreThreshold = true
latencySubject.send(timeInterval)
Expand Down
Loading
Loading