Skip to content
This repository has been archived by the owner on May 13, 2024. It is now read-only.

Fix non-thread-safe mutex access #141

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions Source/WebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ private class Deflater {
/// WebSocket objects are bidirectional network streams that communicate over HTTP. RFC 6455.
private class InnerWebSocket: Hashable {
var id : Int
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
let request : URLRequest!
let subProtocols : [String]!
var frames : [Frame] = []
Expand Down Expand Up @@ -607,7 +607,8 @@ private class InnerWebSocket: Hashable {
}

init(request: URLRequest, subProtocols : [String] = [], stub : Bool = false){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
self.id = manager.nextId()
self.request = request
self.subProtocols = subProtocols
Expand All @@ -633,13 +634,14 @@ private class InnerWebSocket: Hashable {
if inputBytes != nil {
free(inputBytes)
}
pthread_mutex_init(&mutex, nil)
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
@inline(__always) fileprivate func lock(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
}
@inline(__always) fileprivate func unlock(){
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}

fileprivate var dirty : Bool {
Expand Down Expand Up @@ -1570,38 +1572,43 @@ private enum TCPConnSecurity {
private class Manager {
var queue = DispatchQueue(label: "SwiftWebSocketInstance", attributes: [])
var once = Int()
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
var cond = pthread_cond_t()
var websockets = Set<InnerWebSocket>()
var _nextId = 0
init(){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
IanHoar marked this conversation as resolved.
Show resolved Hide resolved
pthread_cond_init(&cond, nil)
DispatchQueue(label: "SwiftWebSocket", attributes: []).async {
var wss : [InnerWebSocket] = []
while true {
var wait = true
wss.removeAll()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
for ws in self.websockets {
wss.append(ws)
}
for ws in wss {
self.checkForConnectionTimeout(ws)
if ws.dirty {
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
ws.step()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
wait = false
}
}
if wait {
_ = self.wait(250)
}
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
}
}
}
deinit{
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
func checkForConnectionTimeout(_ ws : InnerWebSocket) {
if ws.rd != nil && ws.wr != nil && (ws.rd.streamStatus == .opening || ws.wr.streamStatus == .opening) {
let age = CFAbsoluteTimeGetCurrent() - ws.createdAt
Expand All @@ -1620,28 +1627,28 @@ private class Manager {
ts.tv_nsec = v1 + v2;
ts.tv_sec += ts.tv_nsec / (1000 * 1000 * 1000);
ts.tv_nsec %= (1000 * 1000 * 1000);
return pthread_cond_timedwait(&self.cond, &self.mutex, &ts)
return pthread_cond_timedwait(&self.cond, self.mutex, &ts)
}
func signal(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func add(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.insert(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func remove(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.remove(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func nextId() -> Int {
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
pthread_mutex_lock(mutex)
defer { pthread_mutex_unlock(mutex) }
_nextId += 1
return _nextId
}
Expand Down