Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy up our synchronization code #413

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions atomicfu/src/nativeInterop/cinterop/interop.def
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,13 @@ typedef struct lock_support {
pthread_cond_t cond;
} lock_support_t;

typedef struct mutex_node {
lock_support_t* mutex;
struct mutex_node* next;
} mutex_node_t;

lock_support_t* lock_support_init() {
lock_support_t * ls = (lock_support_t *) malloc(sizeof(lock_support_t));
ls->locked = 0;
pthread_mutex_init(&ls->mutex, NULL);
pthread_cond_init(&ls->cond, NULL);
return ls;
}

mutex_node_t* mutex_node_init(mutex_node_t* mutexNode) {
mutexNode->mutex = lock_support_init();
mutexNode->next = NULL;
return mutexNode;
}

void lock(lock_support_t* ls) {
pthread_mutex_lock(&ls->mutex);
while (ls->locked == 1) { // wait till locked are available
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package kotlinx.atomicfu.locks

import interop.*
import kotlinx.cinterop.*
import platform.posix.*
import kotlin.concurrent.*

public class NativeMutexNode {
private val mutex: CPointer<lock_support_t> = lock_support_init()!!

internal var next: NativeMutexNode? = null

fun lock() {
interop.lock(mutex)
}

fun unlock() {
interop.unlock(mutex)
}
}

/**
* This is a trivial counter-part of NativeMutexNode that does not rely on interop.def
* The problem is, commonizer cannot commonize pthreads, thus this declaration should be duplicated
* over multiple Native source-sets to work properly
*/
//public class NativeMutexNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you expect commonizer being able to unlock this implementation one day? Do we really need to keep it here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, or that we'll decide to give up on the commonizer and just copy-paste it over a few native sourcesets

//
// @Volatile
// private var isLocked = false
// private val pMutex = nativeHeap.alloc<pthread_mutex_t>().apply { pthread_mutex_init(ptr, null) }
// private val pCond = nativeHeap.alloc<pthread_cond_t>().apply { pthread_cond_init(ptr, null) }
//
// internal var next: NativeMutexNode? = null
//
// fun lock() {
// pthread_mutex_lock(pMutex.ptr)
// while (isLocked) { // wait till locked are available
// pthread_cond_wait(pCond.ptr, pMutex.ptr)
// }
// isLocked = true
// pthread_mutex_unlock(pMutex.ptr)
// }
//
// fun unlock() {
// pthread_mutex_lock(pMutex.ptr)
// isLocked = false
// pthread_cond_broadcast(pCond.ptr)
// pthread_mutex_unlock(pMutex.ptr)
// }
//}
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package kotlinx.atomicfu.locks

import platform.posix.*
import interop.*
import kotlinx.cinterop.*
import kotlin.native.internal.NativePtr
import kotlinx.atomicfu.locks.SynchronizedObject.Status.*
import kotlin.concurrent.AtomicNativePtr
import kotlin.concurrent.AtomicReference
import kotlin.native.concurrent.*

public actual open class SynchronizedObject {

Expand All @@ -23,6 +18,7 @@ public actual open class SynchronizedObject {
if (lock.compareAndSet(state, thinLock))
return
}

THIN -> {
if (currentThreadId == state.ownerThreadId) {
// reentrant lock
Expand All @@ -46,13 +42,16 @@ public actual open class SynchronizedObject {
}
}
}

FAT -> {
if (currentThreadId == state.ownerThreadId) {
// reentrant lock
val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
val nestedFatLock =
LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
if (lock.compareAndSet(state, nestedFatLock)) return
} else if (state.ownerThreadId != null) {
val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
val fatLock =
LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
if (lock.compareAndSet(state, fatLock)) {
fatLock.mutex!!.lock()
tryLockAfterResume(currentThreadId)
Expand All @@ -74,7 +73,8 @@ public actual open class SynchronizedObject {
return true
} else {
if (currentThreadId == state.ownerThreadId) {
val nestedLock = LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
val nestedLock =
LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
if (lock.compareAndSet(state, nestedLock))
return true
} else {
Expand Down Expand Up @@ -103,6 +103,7 @@ public actual open class SynchronizedObject {
return
}
}

FAT -> {
if (state.nestedLocks == 1) {
// last nested unlock -> release completely, resume some waiter
Expand All @@ -119,6 +120,7 @@ public actual open class SynchronizedObject {
return
}
}

else -> error("It is not possible to unlock the mutex that is not obtained")
}
}
Expand Down Expand Up @@ -146,14 +148,10 @@ public actual open class SynchronizedObject {
val nestedLocks: Int,
val waiters: Int,
val ownerThreadId: pthread_t? = null,
val mutex: CPointer<mutex_node_t>? = null
val mutex: NativeMutexNode? = null
)

protected enum class Status { UNLOCKED, THIN, FAT }

private fun CPointer<mutex_node_t>.lock() = lock(this.pointed.mutex)

private fun CPointer<mutex_node_t>.unlock() = unlock(this.pointed.mutex)
}

public actual fun reentrantLock() = ReentrantLock()
Expand Down Expand Up @@ -183,37 +181,40 @@ private const val INITIAL_POOL_CAPACITY = 64
private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) }

class MutexPool(capacity: Int) {
private val top = AtomicNativePtr(NativePtr.NULL)
private val top = AtomicReference<NativeMutexNode?>(null)

private val mutexes = nativeHeap.allocArray<mutex_node_t>(capacity) { mutex_node_init(ptr) }
private val mutexes = Array<NativeMutexNode>(capacity) { NativeMutexNode() }

init {
for (i in 0 until capacity) {
release(interpretCPointer<mutex_node_t>(mutexes.rawValue.plus(i * sizeOf<mutex_node_t>()))!!)
// Immediately form a stack
for (mutex in mutexes) {
release(mutex)
}
}

private fun allocMutexNode() = nativeHeap.alloc<mutex_node_t> { mutex_node_init(ptr) }.ptr
private fun allocMutexNode() = NativeMutexNode()

fun allocate(): CPointer<mutex_node_t> = pop() ?: allocMutexNode()
fun allocate(): NativeMutexNode = pop() ?: allocMutexNode()

fun release(mutexNode: CPointer<mutex_node_t>) {
fun release(mutexNode: NativeMutexNode) {
while (true) {
val oldTop = interpretCPointer<mutex_node_t>(top.value)
mutexNode.pointed.next = oldTop
if (top.compareAndSet(oldTop.rawValue, mutexNode.rawValue))
val oldTop = top.value
mutexNode.next = oldTop
if (top.compareAndSet(oldTop, mutexNode)) {
return
}
}
}

private fun pop(): CPointer<mutex_node_t>? {
private fun pop(): NativeMutexNode? {
while (true) {
val oldTop = interpretCPointer<mutex_node_t>(top.value)
if (oldTop.rawValue === NativePtr.NULL)
val oldTop = top.value
if (oldTop == null)
return null
val newHead = oldTop!!.pointed.next
if (top.compareAndSet(oldTop.rawValue, newHead.rawValue))
val newHead = oldTop.next
if (top.compareAndSet(oldTop, newHead)) {
return oldTop
}
}
}
}