diff --git a/Example/Sources/ViewController.swift b/Example/Sources/ViewController.swift index 71676e1..7d3e642 100644 --- a/Example/Sources/ViewController.swift +++ b/Example/Sources/ViewController.swift @@ -117,7 +117,7 @@ extension ViewController: NDT7TestInteraction { } } - if origin == .client, + if origin == .client && kind == .download, enableAppData, let elapsedTime = measurement.appInfo?.elapsedTime, let numBytes = measurement.appInfo?.numBytes, @@ -125,40 +125,20 @@ extension ViewController: NDT7TestInteraction { let seconds = elapsedTime / 1000000 let mbit = numBytes / 125000 let rounded = Double(Float64(mbit)/Float64(seconds)).rounded(toPlaces: 1) - switch kind { - case .download: - downloadSpeed = rounded - DispatchQueue.main.async { [weak self] in - self?.downloadSpeedLabel.text = "\(rounded) Mbit/s" - } - case .upload: - uploadSpeed = rounded - DispatchQueue.main.async { [weak self] in - self?.uploadSpeedLabel.text = "\(rounded) Mbit/s" - } + downloadSpeed = rounded + DispatchQueue.main.async { [weak self] in + self?.downloadSpeedLabel.text = "\(rounded) Mbit/s" } - } else if origin == .server, + } else if origin == .server && kind == .upload, let elapsedTime = measurement.tcpInfo?.elapsedTime, elapsedTime >= 1000000 { let seconds = elapsedTime / 1000000 - switch kind { - case .download: - if let numBytes = measurement.tcpInfo?.bytesSent { - let mbit = numBytes / 125000 - let rounded = Double(Float64(mbit)/Float64(seconds)).rounded(toPlaces: 1) - downloadSpeed = rounded - DispatchQueue.main.async { [weak self] in - self?.downloadSpeedLabel.text = "\(rounded) Mbit/s" - } - } - case .upload: - if let numBytes = measurement.tcpInfo?.bytesReceived { - let mbit = numBytes / 125000 - let rounded = Double(Float64(mbit)/Float64(seconds)).rounded(toPlaces: 1) - uploadSpeed = rounded - DispatchQueue.main.async { [weak self] in - self?.uploadSpeedLabel.text = "\(rounded) Mbit/s" - } + if let numBytes = measurement.tcpInfo?.bytesReceived { + let mbit = numBytes / 125000 + let rounded = Double(Float64(mbit)/Float64(seconds)).rounded(toPlaces: 1) + uploadSpeed = rounded + DispatchQueue.main.async { [weak self] in + self?.uploadSpeedLabel.text = "\(rounded) Mbit/s" } } } diff --git a/Sources/Constants.swift b/Sources/Constants.swift index 9340448..6fed887 100644 --- a/Sources/Constants.swift +++ b/Sources/Constants.swift @@ -94,13 +94,19 @@ public struct NDT7WebSocketConstants { /// updateInterval is the interval between client side upload measurements. public static let updateInterval: TimeInterval = 0.25 - /// bulkMessageSize is the size of uploaded messages - public static let bulkMessageSize = 1 << 13 + /// initialMessageSize is the starting size of uploaded messages + public static let initialMessageSize = 1 << 13 - /// maxConcurrentMessages is the max concurrent messages for upload - public static let maxConcurrentMessages: UInt = 100 + /// maxMessageSize is the maximum accepted message size + public static let maxMessageSize = 1 << 20 - /// uploadRequestDelay is delay for upload messages - public static let uploadRequestDelay: Double = Double(maxConcurrentMessages) * Double(bulkMessageSize) / (4 * 125000) / 100 + // scalingFraction sets the threshold for scaling binary messages. When + // the current binary message size is <= than 1/scalingFactor of the + // amount of bytes sent so far, we scale the message. This is documented + // in the appendix of the ndt7 specification. + public static let scalingFraction = 16 + + /// uploadRequestDelay is delay for upload messages when output is buffered + public static let initialUploadRequestDelay: Double = Double(initialMessageSize) / (4 * 125000) } } diff --git a/Sources/DataExtension.swift b/Sources/DataExtension.swift index 7c1c899..04da194 100644 --- a/Sources/DataExtension.swift +++ b/Sources/DataExtension.swift @@ -14,7 +14,7 @@ extension Data { /// Provides a random 8192 bytes data object. /// - returns: 8192 bytes data object based in random UInt8 objects. static func randomDataNetworkElement() -> Data { - let dataArray: [UInt8] = (0..<(NDT7WebSocketConstants.Request.bulkMessageSize)).map { _ in + let dataArray: [UInt8] = (0..<(NDT7WebSocketConstants.Request.initialMessageSize)).map { _ in UInt8.random(in: 1...255) } return dataArray.withUnsafeBufferPointer { Data(buffer: $0) } diff --git a/Sources/NDT7Test.swift b/Sources/NDT7Test.swift index a069d96..0af4f62 100644 --- a/Sources/NDT7Test.swift +++ b/Sources/NDT7Test.swift @@ -289,6 +289,8 @@ extension NDT7Test { var t1 = Date() var tlast = tlast ?? Date() var count = count + var message = message + var yield = false let duration: TimeInterval = 10.0 guard t1.timeIntervalSince1970 - t0.timeIntervalSince1970 < duration && uploadTestRunning == true else { uploadMessage(socket: socket, t0: t0, t1: t1, count: webSocketUpload?.outputBytesLengthAccumulated ?? 0) @@ -297,27 +299,39 @@ extension NDT7Test { return } - let underbuffered = 7 * message.count + let underbuffered = 2 * message.count var buffered: Int? = 0 if t1.timeIntervalSince1970 - tlast.timeIntervalSince1970 > 0.25, let outputBytesAccumulated = webSocketUpload?.outputBytesLengthAccumulated { tlast = t1 uploadMessage(socket: socket, t0: t0, t1: t1, count: outputBytesAccumulated) } - while buffered != nil && buffered! < underbuffered && t1.timeIntervalSince1970 - t0.timeIntervalSince1970 < duration && uploadTestRunning == true, + while buffered != nil && buffered! < underbuffered && t1.timeIntervalSince1970 - t0.timeIntervalSince1970 < duration && uploadTestRunning == true && yield == false, let outputBytesAccumulated = webSocketUpload?.outputBytesLengthAccumulated, count < outputBytesAccumulated + underbuffered { buffered = socket.send(message, maxBuffer: underbuffered) if buffered != nil { - count += message.count * Int(NDT7WebSocketConstants.Request.maxConcurrentMessages) + count += message.count + if message.count < NDT7WebSocketConstants.Request.maxMessageSize, + message.count <= outputBytesAccumulated/NDT7WebSocketConstants.Request.scalingFraction { + message = message + message + } } t1 = Date() if t1.timeIntervalSince1970 - tlast.timeIntervalSince1970 > 0.25 { tlast = t1 uploadMessage(socket: socket, t0: t0, t1: t1, count: outputBytesAccumulated) + yield = true } } - queue.asyncAfter(deadline: .now() + NDT7WebSocketConstants.Request.uploadRequestDelay) { [weak self] in + + let elapsedTime = (t1.timeIntervalSince1970) - (t0.timeIntervalSince1970) + let numBytes = webSocketUpload?.outputBytesLengthAccumulated ?? 0 + var delay = NDT7WebSocketConstants.Request.initialUploadRequestDelay + if numBytes > 0 { + delay = Float64(buffered ?? underbuffered) / (Float64(numBytes) / Float64(elapsedTime)) + } + queue.asyncAfter(deadline: .now() + delay) { [weak self] in self?.uploader(socket: socket, message: message, t0: t0, tlast: tlast, count: count, queue: queue) } } @@ -329,18 +343,20 @@ extension NDT7Test { /// - parameter count: Number of transmitted bytes. func uploadMessage(socket: WebSocketWrapper, t0: Date, t1: Date, count: Int) { guard socket === webSocketUpload else { return } - let message = "{ }" - if var measurement = handleMessage(message) { - measurement.origin = .client - measurement.direction = .upload - measurement.appInfo = NDT7APPInfo(elapsedTime: Int64((t1.timeIntervalSince1970 * 1000000.0) - (t0.timeIntervalSince1970 * 1000000.0)), numBytes: Int64(count)) - if let jsonData = try? JSONEncoder().encode(measurement) { - measurement.rawData = String(data: jsonData, encoding: .utf8) - } - logNDT7("Upload test from client: \(measurement.rawData ?? "")") - mainThread { [weak self] in - self?.delegate?.measurement(origin: .client, kind: .upload, measurement: measurement) - } + let appInfo = NDT7APPInfo(elapsedTime: Int64((t1.timeIntervalSince1970 * 1000000.0) - (t0.timeIntervalSince1970 * 1000000.0)), numBytes: Int64(count)) + var measurement = NDT7Measurement(appInfo: appInfo, + bbrInfo: nil, + connectionInfo: nil, + origin: .client, + direction: .upload, + tcpInfo: nil, + rawData: nil) + if let jsonData = try? JSONEncoder().encode(measurement) { + measurement.rawData = String(data: jsonData, encoding: .utf8) + } + logNDT7("Upload test from client: \(measurement.rawData ?? "")") + mainThread { [weak self] in + self?.delegate?.measurement(origin: .client, kind: .upload, measurement: measurement) } } diff --git a/Sources/WebSocket.swift b/Sources/WebSocket.swift index fb14cc1..43101c5 100755 --- a/Sources/WebSocket.swift +++ b/Sources/WebSocket.swift @@ -498,7 +498,7 @@ class Deflater { /// WebSocket objects are bidirectional network streams that communicate over HTTP. RFC 6455. class InnerWebSocket: Hashable { var id : Int - var mutex = pthread_mutex_t() + var mutex = UnsafeMutablePointer.allocate(capacity: 1) let request : URLRequest! let subProtocols : [String]! var frames : [Frame] = [] @@ -595,7 +595,8 @@ class InnerWebSocket: Hashable { } init(request: URLRequest, subProtocols : [String] = [], stub : Bool = false){ - pthread_mutex_init(&mutex, nil) + mutex.initialize(to: pthread_mutex_t()) + pthread_mutex_init(mutex, nil) self.id = manager.nextId() self.request = request self.subProtocols = subProtocols @@ -621,13 +622,14 @@ class InnerWebSocket: Hashable { if inputBytes != nil { free(inputBytes) } - pthread_mutex_init(&mutex, nil) + pthread_mutex_init(mutex, nil) + mutex.deallocate() } @inline(__always) fileprivate func lock(){ - pthread_mutex_lock(&mutex) + pthread_mutex_lock(mutex) } @inline(__always) fileprivate func unlock(){ - pthread_mutex_unlock(&mutex) + pthread_mutex_unlock(mutex) } fileprivate var dirty : Bool { @@ -677,7 +679,6 @@ class InnerWebSocket: Hashable { var finalError : Error? var exit = false var more = true - var timesToSend: UInt = 0 func step(){ if exit { return @@ -1487,11 +1488,6 @@ class InnerWebSocket: Hashable { } try write(head, length: hlen) try write(payloadBytes, length: payloadBytes.count) - while timesToSend > 0 { - try write(head, length: hlen) - try write(payloadBytes, length: payloadBytes.count) - timesToSend -= 1 - } } func close(_ code : Int = 1000, reason : String = "Normal Closure") { let f = Frame() @@ -1571,38 +1567,43 @@ private class Manager { var queue = DispatchQueue(label: "SwiftWebSocketInstance", attributes: []) let dispatchQueue = DispatchQueue.init(label: "SwiftWebSocket", qos: .userInteractive, attributes: []) var once = Int() - var mutex = pthread_mutex_t() + var mutex = UnsafeMutablePointer.allocate(capacity: 1) var cond = pthread_cond_t() var websockets = Set() var _nextId = 0 init(){ - pthread_mutex_init(&mutex, nil) + mutex.initialize(to: pthread_mutex_t()) + pthread_mutex_init(mutex, nil) pthread_cond_init(&cond, nil) dispatchQueue.async { var wss : [InnerWebSocket] = [] while true { var wait = true wss.removeAll() - pthread_mutex_lock(&self.mutex) + pthread_mutex_lock(self.mutex) for ws in self.websockets { wss.append(ws) } for ws in wss { self.checkForConnectionTimeout(ws) if ws.dirty { - pthread_mutex_unlock(&self.mutex) + pthread_mutex_unlock(self.mutex) ws.step() - pthread_mutex_lock(&self.mutex) + pthread_mutex_lock(self.mutex) wait = false } } if wait { _ = self.wait(250) } - pthread_mutex_unlock(&self.mutex) + pthread_mutex_unlock(self.mutex) } } } + deinit{ + pthread_mutex_init(mutex, nil) + mutex.deallocate() + } func checkForConnectionTimeout(_ ws : InnerWebSocket) { if ws.rd != nil && ws.wr != nil && (ws.rd.streamStatus == .opening || ws.wr.streamStatus == .opening) { let age = CFAbsoluteTimeGetCurrent() - ws.createdAt @@ -1621,28 +1622,28 @@ private class Manager { ts.tv_nsec = v1 + v2; ts.tv_sec += ts.tv_nsec / (1000 * 1000 * 1000); ts.tv_nsec %= (1000 * 1000 * 1000); - return pthread_cond_timedwait(&self.cond, &self.mutex, &ts) + return pthread_cond_timedwait(&self.cond, self.mutex, &ts) } func signal(){ - pthread_mutex_lock(&mutex) + pthread_mutex_lock(mutex) pthread_cond_signal(&cond) - pthread_mutex_unlock(&mutex) + pthread_mutex_unlock(mutex) } func add(_ websocket: InnerWebSocket) { - pthread_mutex_lock(&mutex) + pthread_mutex_lock(mutex) websockets.insert(websocket) pthread_cond_signal(&cond) - pthread_mutex_unlock(&mutex) + pthread_mutex_unlock(mutex) } func remove(_ websocket: InnerWebSocket) { - pthread_mutex_lock(&mutex) + pthread_mutex_lock(mutex) websockets.remove(websocket) pthread_cond_signal(&cond) - pthread_mutex_unlock(&mutex) + pthread_mutex_unlock(mutex) } func nextId() -> Int { - pthread_mutex_lock(&mutex) - defer { pthread_mutex_unlock(&mutex) } + pthread_mutex_lock(mutex) + defer { pthread_mutex_unlock(mutex) } _nextId += 1 return _nextId } @@ -1784,11 +1785,10 @@ class WebSocket: NSObject { :param: message The message to be sent to the server. */ - func send(_ message : Any, _ times: UInt = 0){ + func send(_ message : Any){ if !opened{ return } - ws.timesToSend = times ws.send(message) } /** diff --git a/Sources/WebSocketWrapper.swift b/Sources/WebSocketWrapper.swift index 4f90ce5..e79d5ca 100644 --- a/Sources/WebSocketWrapper.swift +++ b/Sources/WebSocketWrapper.swift @@ -78,7 +78,7 @@ extension WebSocketWrapper { func send(_ message: Any, maxBuffer: Int) -> Int? { guard let buffer = webSocket?.ws.outputBytesLength, buffer < maxBuffer else { return nil } if open { - webSocket?.send(message, NDT7WebSocketConstants.Request.maxConcurrentMessages) + webSocket?.send(message) return buffer } else { logNDT7("WebSocket \(url.absoluteString) did not send message. WebSocket not connected") diff --git a/Tests/ConstantsTests.swift b/Tests/ConstantsTests.swift index 7570e90..850d939 100644 --- a/Tests/ConstantsTests.swift +++ b/Tests/ConstantsTests.swift @@ -32,6 +32,8 @@ class ConstantsTests: XCTestCase { XCTAssertEqual(NDT7WebSocketConstants.Request.uploadTimeout, 15) XCTAssertEqual(NDT7WebSocketConstants.Request.ioTimeout, 7) XCTAssertEqual(NDT7WebSocketConstants.Request.updateInterval, 0.25) - XCTAssertEqual(NDT7WebSocketConstants.Request.bulkMessageSize, 1 << 13) + XCTAssertEqual(NDT7WebSocketConstants.Request.initialMessageSize, 1 << 13) + XCTAssertEqual(NDT7WebSocketConstants.Request.maxMessageSize, 1 << 20) + XCTAssertEqual(NDT7WebSocketConstants.Request.scalingFraction, 16) } }