Skip to content

Commit

Permalink
Tidy up our synchronization code (#413)
Browse files Browse the repository at this point in the history
* Confine all interop signatures into NativeMutexNode to simplify further working and maintenance
* Get rid of mutex_node_t
  • Loading branch information
qwwdfsad authored Mar 25, 2024
1 parent 387c3db commit fe708c2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 40 deletions.
12 changes: 0 additions & 12 deletions atomicfu/src/nativeInterop/cinterop/interop.def
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,13 @@ typedef struct lock_support {
pthread_cond_t cond;
} lock_support_t;

typedef struct mutex_node {
lock_support_t* mutex;
struct mutex_node* next;
} mutex_node_t;

lock_support_t* lock_support_init() {
lock_support_t * ls = (lock_support_t *) malloc(sizeof(lock_support_t));
ls->locked = 0;
pthread_mutex_init(&ls->mutex, NULL);
pthread_cond_init(&ls->cond, NULL);
return ls;
}

mutex_node_t* mutex_node_init(mutex_node_t* mutexNode) {
mutexNode->mutex = lock_support_init();
mutexNode->next = NULL;
return mutexNode;
}

void lock(lock_support_t* ls) {
pthread_mutex_lock(&ls->mutex);
while (ls->locked == 1) { // wait till locked are available
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package kotlinx.atomicfu.locks

import interop.*
import kotlinx.cinterop.*
import platform.posix.*
import kotlin.concurrent.*

public class NativeMutexNode {
private val mutex: CPointer<lock_support_t> = lock_support_init()!!

internal var next: NativeMutexNode? = null

fun lock() {
interop.lock(mutex)
}

fun unlock() {
interop.unlock(mutex)
}
}

/**
* This is a trivial counter-part of NativeMutexNode that does not rely on interop.def
* The problem is, commonizer cannot commonize pthreads, thus this declaration should be duplicated
* over multiple Native source-sets to work properly
*/
//public class NativeMutexNode {
//
// @Volatile
// private var isLocked = false
// private val pMutex = nativeHeap.alloc<pthread_mutex_t>().apply { pthread_mutex_init(ptr, null) }
// private val pCond = nativeHeap.alloc<pthread_cond_t>().apply { pthread_cond_init(ptr, null) }
//
// internal var next: NativeMutexNode? = null
//
// fun lock() {
// pthread_mutex_lock(pMutex.ptr)
// while (isLocked) { // wait till locked are available
// pthread_cond_wait(pCond.ptr, pMutex.ptr)
// }
// isLocked = true
// pthread_mutex_unlock(pMutex.ptr)
// }
//
// fun unlock() {
// pthread_mutex_lock(pMutex.ptr)
// isLocked = false
// pthread_cond_broadcast(pCond.ptr)
// pthread_mutex_unlock(pMutex.ptr)
// }
//}
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package kotlinx.atomicfu.locks

import platform.posix.*
import interop.*
import kotlinx.cinterop.*
import kotlin.native.internal.NativePtr
import kotlinx.atomicfu.locks.SynchronizedObject.Status.*
import kotlin.concurrent.AtomicNativePtr
import kotlin.concurrent.AtomicReference
import kotlin.native.concurrent.*

public actual open class SynchronizedObject {

Expand All @@ -23,6 +18,7 @@ public actual open class SynchronizedObject {
if (lock.compareAndSet(state, thinLock))
return
}

THIN -> {
if (currentThreadId == state.ownerThreadId) {
// reentrant lock
Expand All @@ -46,13 +42,16 @@ public actual open class SynchronizedObject {
}
}
}

FAT -> {
if (currentThreadId == state.ownerThreadId) {
// reentrant lock
val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
val nestedFatLock =
LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
if (lock.compareAndSet(state, nestedFatLock)) return
} else if (state.ownerThreadId != null) {
val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
val fatLock =
LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
if (lock.compareAndSet(state, fatLock)) {
fatLock.mutex!!.lock()
tryLockAfterResume(currentThreadId)
Expand All @@ -74,7 +73,8 @@ public actual open class SynchronizedObject {
return true
} else {
if (currentThreadId == state.ownerThreadId) {
val nestedLock = LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
val nestedLock =
LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
if (lock.compareAndSet(state, nestedLock))
return true
} else {
Expand Down Expand Up @@ -103,6 +103,7 @@ public actual open class SynchronizedObject {
return
}
}

FAT -> {
if (state.nestedLocks == 1) {
// last nested unlock -> release completely, resume some waiter
Expand All @@ -119,6 +120,7 @@ public actual open class SynchronizedObject {
return
}
}

else -> error("It is not possible to unlock the mutex that is not obtained")
}
}
Expand Down Expand Up @@ -146,14 +148,10 @@ public actual open class SynchronizedObject {
val nestedLocks: Int,
val waiters: Int,
val ownerThreadId: pthread_t? = null,
val mutex: CPointer<mutex_node_t>? = null
val mutex: NativeMutexNode? = null
)

protected enum class Status { UNLOCKED, THIN, FAT }

private fun CPointer<mutex_node_t>.lock() = lock(this.pointed.mutex)

private fun CPointer<mutex_node_t>.unlock() = unlock(this.pointed.mutex)
}

public actual fun reentrantLock() = ReentrantLock()
Expand Down Expand Up @@ -183,37 +181,40 @@ private const val INITIAL_POOL_CAPACITY = 64
private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) }

class MutexPool(capacity: Int) {
private val top = AtomicNativePtr(NativePtr.NULL)
private val top = AtomicReference<NativeMutexNode?>(null)

private val mutexes = nativeHeap.allocArray<mutex_node_t>(capacity) { mutex_node_init(ptr) }
private val mutexes = Array<NativeMutexNode>(capacity) { NativeMutexNode() }

init {
for (i in 0 until capacity) {
release(interpretCPointer<mutex_node_t>(mutexes.rawValue.plus(i * sizeOf<mutex_node_t>()))!!)
// Immediately form a stack
for (mutex in mutexes) {
release(mutex)
}
}

private fun allocMutexNode() = nativeHeap.alloc<mutex_node_t> { mutex_node_init(ptr) }.ptr
private fun allocMutexNode() = NativeMutexNode()

fun allocate(): CPointer<mutex_node_t> = pop() ?: allocMutexNode()
fun allocate(): NativeMutexNode = pop() ?: allocMutexNode()

fun release(mutexNode: CPointer<mutex_node_t>) {
fun release(mutexNode: NativeMutexNode) {
while (true) {
val oldTop = interpretCPointer<mutex_node_t>(top.value)
mutexNode.pointed.next = oldTop
if (top.compareAndSet(oldTop.rawValue, mutexNode.rawValue))
val oldTop = top.value
mutexNode.next = oldTop
if (top.compareAndSet(oldTop, mutexNode)) {
return
}
}
}

private fun pop(): CPointer<mutex_node_t>? {
private fun pop(): NativeMutexNode? {
while (true) {
val oldTop = interpretCPointer<mutex_node_t>(top.value)
if (oldTop.rawValue === NativePtr.NULL)
val oldTop = top.value
if (oldTop == null)
return null
val newHead = oldTop!!.pointed.next
if (top.compareAndSet(oldTop.rawValue, newHead.rawValue))
val newHead = oldTop.next
if (top.compareAndSet(oldTop, newHead)) {
return oldTop
}
}
}
}

0 comments on commit fe708c2

Please sign in to comment.