diff --git a/atomicfu/src/nativeInterop/cinterop/interop.def b/atomicfu/src/nativeInterop/cinterop/interop.def index ea5f59db..7e98de55 100644 --- a/atomicfu/src/nativeInterop/cinterop/interop.def +++ b/atomicfu/src/nativeInterop/cinterop/interop.def @@ -9,11 +9,6 @@ typedef struct lock_support { pthread_cond_t cond; } lock_support_t; -typedef struct mutex_node { - lock_support_t* mutex; - struct mutex_node* next; -} mutex_node_t; - lock_support_t* lock_support_init() { lock_support_t * ls = (lock_support_t *) malloc(sizeof(lock_support_t)); ls->locked = 0; @@ -21,13 +16,6 @@ lock_support_t* lock_support_init() { pthread_cond_init(&ls->cond, NULL); return ls; } - -mutex_node_t* mutex_node_init(mutex_node_t* mutexNode) { - mutexNode->mutex = lock_support_init(); - mutexNode->next = NULL; - return mutexNode; -} - void lock(lock_support_t* ls) { pthread_mutex_lock(&ls->mutex); while (ls->locked == 1) { // wait till locked are available diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/PosixLockSupport.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/PosixLockSupport.kt new file mode 100644 index 00000000..3843948d --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/PosixLockSupport.kt @@ -0,0 +1,51 @@ +package kotlinx.atomicfu.locks + +import interop.* +import kotlinx.cinterop.* +import platform.posix.* +import kotlin.concurrent.* + +public class NativeMutexNode { + private val mutex: CPointer = lock_support_init()!! + + internal var next: NativeMutexNode? = null + + fun lock() { + interop.lock(mutex) + } + + fun unlock() { + interop.unlock(mutex) + } +} + +/** + * This is a trivial counter-part of NativeMutexNode that does not rely on interop.def + * The problem is, commonizer cannot commonize pthreads, thus this declaration should be duplicated + * over multiple Native source-sets to work properly + */ +//public class NativeMutexNode { +// +// @Volatile +// private var isLocked = false +// private val pMutex = nativeHeap.alloc().apply { pthread_mutex_init(ptr, null) } +// private val pCond = nativeHeap.alloc().apply { pthread_cond_init(ptr, null) } +// +// internal var next: NativeMutexNode? = null +// +// fun lock() { +// pthread_mutex_lock(pMutex.ptr) +// while (isLocked) { // wait till locked are available +// pthread_cond_wait(pCond.ptr, pMutex.ptr) +// } +// isLocked = true +// pthread_mutex_unlock(pMutex.ptr) +// } +// +// fun unlock() { +// pthread_mutex_lock(pMutex.ptr) +// isLocked = false +// pthread_cond_broadcast(pCond.ptr) +// pthread_mutex_unlock(pMutex.ptr) +// } +//} \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt index 4ff88611..fdb04d81 100644 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt @@ -1,13 +1,8 @@ package kotlinx.atomicfu.locks import platform.posix.* -import interop.* -import kotlinx.cinterop.* -import kotlin.native.internal.NativePtr import kotlinx.atomicfu.locks.SynchronizedObject.Status.* -import kotlin.concurrent.AtomicNativePtr import kotlin.concurrent.AtomicReference -import kotlin.native.concurrent.* public actual open class SynchronizedObject { @@ -23,6 +18,7 @@ public actual open class SynchronizedObject { if (lock.compareAndSet(state, thinLock)) return } + THIN -> { if (currentThreadId == state.ownerThreadId) { // reentrant lock @@ -46,13 +42,16 @@ public actual open class SynchronizedObject { } } } + FAT -> { if (currentThreadId == state.ownerThreadId) { // reentrant lock - val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex) + val nestedFatLock = + LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex) if (lock.compareAndSet(state, nestedFatLock)) return } else if (state.ownerThreadId != null) { - val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex) + val fatLock = + LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex) if (lock.compareAndSet(state, fatLock)) { fatLock.mutex!!.lock() tryLockAfterResume(currentThreadId) @@ -74,7 +73,8 @@ public actual open class SynchronizedObject { return true } else { if (currentThreadId == state.ownerThreadId) { - val nestedLock = LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex) + val nestedLock = + LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex) if (lock.compareAndSet(state, nestedLock)) return true } else { @@ -103,6 +103,7 @@ public actual open class SynchronizedObject { return } } + FAT -> { if (state.nestedLocks == 1) { // last nested unlock -> release completely, resume some waiter @@ -119,6 +120,7 @@ public actual open class SynchronizedObject { return } } + else -> error("It is not possible to unlock the mutex that is not obtained") } } @@ -146,14 +148,10 @@ public actual open class SynchronizedObject { val nestedLocks: Int, val waiters: Int, val ownerThreadId: pthread_t? = null, - val mutex: CPointer? = null + val mutex: NativeMutexNode? = null ) protected enum class Status { UNLOCKED, THIN, FAT } - - private fun CPointer.lock() = lock(this.pointed.mutex) - - private fun CPointer.unlock() = unlock(this.pointed.mutex) } public actual fun reentrantLock() = ReentrantLock() @@ -183,37 +181,40 @@ private const val INITIAL_POOL_CAPACITY = 64 private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) } class MutexPool(capacity: Int) { - private val top = AtomicNativePtr(NativePtr.NULL) + private val top = AtomicReference(null) - private val mutexes = nativeHeap.allocArray(capacity) { mutex_node_init(ptr) } + private val mutexes = Array(capacity) { NativeMutexNode() } init { - for (i in 0 until capacity) { - release(interpretCPointer(mutexes.rawValue.plus(i * sizeOf()))!!) + // Immediately form a stack + for (mutex in mutexes) { + release(mutex) } } - private fun allocMutexNode() = nativeHeap.alloc { mutex_node_init(ptr) }.ptr + private fun allocMutexNode() = NativeMutexNode() - fun allocate(): CPointer = pop() ?: allocMutexNode() + fun allocate(): NativeMutexNode = pop() ?: allocMutexNode() - fun release(mutexNode: CPointer) { + fun release(mutexNode: NativeMutexNode) { while (true) { - val oldTop = interpretCPointer(top.value) - mutexNode.pointed.next = oldTop - if (top.compareAndSet(oldTop.rawValue, mutexNode.rawValue)) + val oldTop = top.value + mutexNode.next = oldTop + if (top.compareAndSet(oldTop, mutexNode)) { return + } } } - private fun pop(): CPointer? { + private fun pop(): NativeMutexNode? { while (true) { - val oldTop = interpretCPointer(top.value) - if (oldTop.rawValue === NativePtr.NULL) + val oldTop = top.value + if (oldTop == null) return null - val newHead = oldTop!!.pointed.next - if (top.compareAndSet(oldTop.rawValue, newHead.rawValue)) + val newHead = oldTop.next + if (top.compareAndSet(oldTop, newHead)) { return oldTop + } } } }