Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miscellaneous fixes #91

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 11 additions & 31 deletions Example/Sources/ViewController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -117,48 +117,28 @@ extension ViewController: NDT7TestInteraction {
}
}

if origin == .client,
if origin == .client && kind == .download,
enableAppData,
let elapsedTime = measurement.appInfo?.elapsedTime,
let numBytes = measurement.appInfo?.numBytes,
elapsedTime >= 1000000 {
let seconds = elapsedTime / 1000000
let mbit = numBytes / 125000
let rounded = Double(Float64(mbit)/Float64(seconds)).rounded(toPlaces: 1)
switch kind {
case .download:
downloadSpeed = rounded
DispatchQueue.main.async { [weak self] in
self?.downloadSpeedLabel.text = "\(rounded) Mbit/s"
}
case .upload:
uploadSpeed = rounded
DispatchQueue.main.async { [weak self] in
self?.uploadSpeedLabel.text = "\(rounded) Mbit/s"
}
downloadSpeed = rounded
DispatchQueue.main.async { [weak self] in
self?.downloadSpeedLabel.text = "\(rounded) Mbit/s"
}
} else if origin == .server,
} else if origin == .server && kind == .upload,
let elapsedTime = measurement.tcpInfo?.elapsedTime,
elapsedTime >= 1000000 {
let seconds = elapsedTime / 1000000
switch kind {
case .download:
if let numBytes = measurement.tcpInfo?.bytesSent {
let mbit = numBytes / 125000
let rounded = Double(Float64(mbit)/Float64(seconds)).rounded(toPlaces: 1)
downloadSpeed = rounded
DispatchQueue.main.async { [weak self] in
self?.downloadSpeedLabel.text = "\(rounded) Mbit/s"
}
}
case .upload:
if let numBytes = measurement.tcpInfo?.bytesReceived {
let mbit = numBytes / 125000
let rounded = Double(Float64(mbit)/Float64(seconds)).rounded(toPlaces: 1)
uploadSpeed = rounded
DispatchQueue.main.async { [weak self] in
self?.uploadSpeedLabel.text = "\(rounded) Mbit/s"
}
if let numBytes = measurement.tcpInfo?.bytesReceived {
let mbit = numBytes / 125000
let rounded = Double(Float64(mbit)/Float64(seconds)).rounded(toPlaces: 1)
uploadSpeed = rounded
DispatchQueue.main.async { [weak self] in
self?.uploadSpeedLabel.text = "\(rounded) Mbit/s"
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions Sources/Constants.swift
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,19 @@ public struct NDT7WebSocketConstants {
/// updateInterval is the interval between client side upload measurements.
public static let updateInterval: TimeInterval = 0.25

/// bulkMessageSize is the size of uploaded messages
public static let bulkMessageSize = 1 << 13
/// initialMessageSize is the starting size of uploaded messages
public static let initialMessageSize = 1 << 13

/// maxConcurrentMessages is the max concurrent messages for upload
public static let maxConcurrentMessages: UInt = 100
/// maxMessageSize is the maximum accepted message size
public static let maxMessageSize = 1 << 20

/// uploadRequestDelay is delay for upload messages
public static let uploadRequestDelay: Double = Double(maxConcurrentMessages) * Double(bulkMessageSize) / (4 * 125000) / 100
// scalingFraction sets the threshold for scaling binary messages. When
// the current binary message size is <= than 1/scalingFactor of the
// amount of bytes sent so far, we scale the message. This is documented
// in the appendix of the ndt7 specification.
public static let scalingFraction = 16

/// uploadRequestDelay is delay for upload messages when output is buffered
public static let initialUploadRequestDelay: Double = Double(initialMessageSize) / (4 * 125000)
}
}
2 changes: 1 addition & 1 deletion Sources/DataExtension.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ extension Data {
/// Provides a random 8192 bytes data object.
/// - returns: 8192 bytes data object based in random UInt8 objects.
static func randomDataNetworkElement() -> Data {
let dataArray: [UInt8] = (0..<(NDT7WebSocketConstants.Request.bulkMessageSize)).map { _ in
let dataArray: [UInt8] = (0..<(NDT7WebSocketConstants.Request.initialMessageSize)).map { _ in
UInt8.random(in: 1...255)
}
return dataArray.withUnsafeBufferPointer { Data(buffer: $0) }
Expand Down
48 changes: 32 additions & 16 deletions Sources/NDT7Test.swift
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ extension NDT7Test {
var t1 = Date()
var tlast = tlast ?? Date()
var count = count
var message = message
var yield = false
let duration: TimeInterval = 10.0
guard t1.timeIntervalSince1970 - t0.timeIntervalSince1970 < duration && uploadTestRunning == true else {
uploadMessage(socket: socket, t0: t0, t1: t1, count: webSocketUpload?.outputBytesLengthAccumulated ?? 0)
Expand All @@ -297,27 +299,39 @@ extension NDT7Test {
return
}

let underbuffered = 7 * message.count
let underbuffered = 2 * message.count
var buffered: Int? = 0
if t1.timeIntervalSince1970 - tlast.timeIntervalSince1970 > 0.25,
let outputBytesAccumulated = webSocketUpload?.outputBytesLengthAccumulated {
tlast = t1
uploadMessage(socket: socket, t0: t0, t1: t1, count: outputBytesAccumulated)
}
while buffered != nil && buffered! < underbuffered && t1.timeIntervalSince1970 - t0.timeIntervalSince1970 < duration && uploadTestRunning == true,
while buffered != nil && buffered! < underbuffered && t1.timeIntervalSince1970 - t0.timeIntervalSince1970 < duration && uploadTestRunning == true && yield == false,
let outputBytesAccumulated = webSocketUpload?.outputBytesLengthAccumulated,
count < outputBytesAccumulated + underbuffered {
buffered = socket.send(message, maxBuffer: underbuffered)
if buffered != nil {
count += message.count * Int(NDT7WebSocketConstants.Request.maxConcurrentMessages)
count += message.count
if message.count < NDT7WebSocketConstants.Request.maxMessageSize,
message.count <= outputBytesAccumulated/NDT7WebSocketConstants.Request.scalingFraction {
message = message + message
}
}
t1 = Date()
if t1.timeIntervalSince1970 - tlast.timeIntervalSince1970 > 0.25 {
tlast = t1
uploadMessage(socket: socket, t0: t0, t1: t1, count: outputBytesAccumulated)
yield = true
}
}
queue.asyncAfter(deadline: .now() + NDT7WebSocketConstants.Request.uploadRequestDelay) { [weak self] in

let elapsedTime = (t1.timeIntervalSince1970) - (t0.timeIntervalSince1970)
let numBytes = webSocketUpload?.outputBytesLengthAccumulated ?? 0
var delay = NDT7WebSocketConstants.Request.initialUploadRequestDelay
if numBytes > 0 {
delay = Float64(buffered ?? underbuffered) / (Float64(numBytes) / Float64(elapsedTime))
}
queue.asyncAfter(deadline: .now() + delay) { [weak self] in
self?.uploader(socket: socket, message: message, t0: t0, tlast: tlast, count: count, queue: queue)
}
}
Expand All @@ -329,18 +343,20 @@ extension NDT7Test {
/// - parameter count: Number of transmitted bytes.
func uploadMessage(socket: WebSocketWrapper, t0: Date, t1: Date, count: Int) {
guard socket === webSocketUpload else { return }
let message = "{ }"
if var measurement = handleMessage(message) {
measurement.origin = .client
measurement.direction = .upload
measurement.appInfo = NDT7APPInfo(elapsedTime: Int64((t1.timeIntervalSince1970 * 1000000.0) - (t0.timeIntervalSince1970 * 1000000.0)), numBytes: Int64(count))
if let jsonData = try? JSONEncoder().encode(measurement) {
measurement.rawData = String(data: jsonData, encoding: .utf8)
}
logNDT7("Upload test from client: \(measurement.rawData ?? "")")
mainThread { [weak self] in
self?.delegate?.measurement(origin: .client, kind: .upload, measurement: measurement)
}
let appInfo = NDT7APPInfo(elapsedTime: Int64((t1.timeIntervalSince1970 * 1000000.0) - (t0.timeIntervalSince1970 * 1000000.0)), numBytes: Int64(count))
var measurement = NDT7Measurement(appInfo: appInfo,
bbrInfo: nil,
connectionInfo: nil,
origin: .client,
direction: .upload,
tcpInfo: nil,
rawData: nil)
if let jsonData = try? JSONEncoder().encode(measurement) {
measurement.rawData = String(data: jsonData, encoding: .utf8)
}
logNDT7("Upload test from client: \(measurement.rawData ?? "")")
mainThread { [weak self] in
self?.delegate?.measurement(origin: .client, kind: .upload, measurement: measurement)
}
}

Expand Down
56 changes: 28 additions & 28 deletions Sources/WebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ class Deflater {
/// WebSocket objects are bidirectional network streams that communicate over HTTP. RFC 6455.
class InnerWebSocket: Hashable {
var id : Int
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
let request : URLRequest!
let subProtocols : [String]!
var frames : [Frame] = []
Expand Down Expand Up @@ -595,7 +595,8 @@ class InnerWebSocket: Hashable {
}

init(request: URLRequest, subProtocols : [String] = [], stub : Bool = false){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
self.id = manager.nextId()
self.request = request
self.subProtocols = subProtocols
Expand All @@ -621,13 +622,14 @@ class InnerWebSocket: Hashable {
if inputBytes != nil {
free(inputBytes)
}
pthread_mutex_init(&mutex, nil)
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
@inline(__always) fileprivate func lock(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
}
@inline(__always) fileprivate func unlock(){
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}

fileprivate var dirty : Bool {
Expand Down Expand Up @@ -677,7 +679,6 @@ class InnerWebSocket: Hashable {
var finalError : Error?
var exit = false
var more = true
var timesToSend: UInt = 0
func step(){
if exit {
return
Expand Down Expand Up @@ -1487,11 +1488,6 @@ class InnerWebSocket: Hashable {
}
try write(head, length: hlen)
try write(payloadBytes, length: payloadBytes.count)
while timesToSend > 0 {
try write(head, length: hlen)
try write(payloadBytes, length: payloadBytes.count)
timesToSend -= 1
}
}
func close(_ code : Int = 1000, reason : String = "Normal Closure") {
let f = Frame()
Expand Down Expand Up @@ -1571,38 +1567,43 @@ private class Manager {
var queue = DispatchQueue(label: "SwiftWebSocketInstance", attributes: [])
let dispatchQueue = DispatchQueue.init(label: "SwiftWebSocket", qos: .userInteractive, attributes: [])
var once = Int()
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
var cond = pthread_cond_t()
var websockets = Set<InnerWebSocket>()
var _nextId = 0
init(){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
pthread_cond_init(&cond, nil)
dispatchQueue.async {
var wss : [InnerWebSocket] = []
while true {
var wait = true
wss.removeAll()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
for ws in self.websockets {
wss.append(ws)
}
for ws in wss {
self.checkForConnectionTimeout(ws)
if ws.dirty {
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
ws.step()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
wait = false
}
}
if wait {
_ = self.wait(250)
}
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
}
}
}
deinit{
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
func checkForConnectionTimeout(_ ws : InnerWebSocket) {
if ws.rd != nil && ws.wr != nil && (ws.rd.streamStatus == .opening || ws.wr.streamStatus == .opening) {
let age = CFAbsoluteTimeGetCurrent() - ws.createdAt
Expand All @@ -1621,28 +1622,28 @@ private class Manager {
ts.tv_nsec = v1 + v2;
ts.tv_sec += ts.tv_nsec / (1000 * 1000 * 1000);
ts.tv_nsec %= (1000 * 1000 * 1000);
return pthread_cond_timedwait(&self.cond, &self.mutex, &ts)
return pthread_cond_timedwait(&self.cond, self.mutex, &ts)
}
func signal(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func add(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.insert(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func remove(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.remove(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func nextId() -> Int {
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
pthread_mutex_lock(mutex)
defer { pthread_mutex_unlock(mutex) }
_nextId += 1
return _nextId
}
Expand Down Expand Up @@ -1784,11 +1785,10 @@ class WebSocket: NSObject {

:param: message The message to be sent to the server.
*/
func send(_ message : Any, _ times: UInt = 0){
func send(_ message : Any){
if !opened{
return
}
ws.timesToSend = times
ws.send(message)
}
/**
Expand Down
2 changes: 1 addition & 1 deletion Sources/WebSocketWrapper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ extension WebSocketWrapper {
func send(_ message: Any, maxBuffer: Int) -> Int? {
guard let buffer = webSocket?.ws.outputBytesLength, buffer < maxBuffer else { return nil }
if open {
webSocket?.send(message, NDT7WebSocketConstants.Request.maxConcurrentMessages)
webSocket?.send(message)
return buffer
} else {
logNDT7("WebSocket \(url.absoluteString) did not send message. WebSocket not connected")
Expand Down
4 changes: 3 additions & 1 deletion Tests/ConstantsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class ConstantsTests: XCTestCase {
XCTAssertEqual(NDT7WebSocketConstants.Request.uploadTimeout, 15)
XCTAssertEqual(NDT7WebSocketConstants.Request.ioTimeout, 7)
XCTAssertEqual(NDT7WebSocketConstants.Request.updateInterval, 0.25)
XCTAssertEqual(NDT7WebSocketConstants.Request.bulkMessageSize, 1 << 13)
XCTAssertEqual(NDT7WebSocketConstants.Request.initialMessageSize, 1 << 13)
XCTAssertEqual(NDT7WebSocketConstants.Request.maxMessageSize, 1 << 20)
XCTAssertEqual(NDT7WebSocketConstants.Request.scalingFraction, 16)
}
}