diff --git a/atomicfu/src/nativeInterop/cinterop/interop.def b/atomicfu/src/nativeInterop/cinterop/interop.def index ea5f59db..ec8b0521 100644 --- a/atomicfu/src/nativeInterop/cinterop/interop.def +++ b/atomicfu/src/nativeInterop/cinterop/interop.def @@ -11,6 +11,8 @@ typedef struct lock_support { typedef struct mutex_node { lock_support_t* mutex; + int nestedLocks; + int waiters; struct mutex_node* next; } mutex_node_t; @@ -24,6 +26,8 @@ lock_support_t* lock_support_init() { mutex_node_t* mutex_node_init(mutex_node_t* mutexNode) { mutexNode->mutex = lock_support_init(); + mutexNode->nestedLocks = 0; + mutexNode->waiters = 0; mutexNode->next = NULL; return mutexNode; } diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt index 4ff88611..a27640c0 100644 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt @@ -2,80 +2,140 @@ package kotlinx.atomicfu.locks import platform.posix.* import interop.* +import kotlinx.atomicfu.atomic import kotlinx.cinterop.* -import kotlin.native.internal.NativePtr -import kotlinx.atomicfu.locks.SynchronizedObject.Status.* +import kotlinx.cinterop.NativePtr +import kotlin.concurrent.AtomicLong import kotlin.concurrent.AtomicNativePtr -import kotlin.concurrent.AtomicReference -import kotlin.native.concurrent.* +import kotlin.experimental.ExperimentalNativeApi + + +private val threadCounter = atomic(0) + +// TODO assert no overflow? +@kotlin.native.concurrent.ThreadLocal +private var threadId: UInt = threadCounter.addAndGet(1).toUInt() public actual open class SynchronizedObject { - protected val lock = AtomicReference(LockState(UNLOCKED, 0, 0)) + private enum class LockStatus { UNLOCKED, THIN, FAT } + + @OptIn(ExperimentalNativeApi::class) + private value class LockWord private constructor(private val encoded: ULong) { + companion object { + private const val FAT_BIT_MASK = 1uL + + fun unlocked() = Thin(0u, 0u).lockWord + + fun fromLong(l: Long) = LockWord(l.toULong()) + } + + inline val fat: Boolean get() = encoded.and(FAT_BIT_MASK) != 0uL + inline val thin: Boolean get() = !fat + + inline val status: LockStatus get() = when { + thin -> if (asThin().isUnlocked()) LockStatus.UNLOCKED else LockStatus.THIN + else -> LockStatus.FAT + } + + inline fun asThin() = Thin(this) + inline fun asFat() = Fat(this) + + inline fun toLong() = encoded.toLong() + + value class Thin internal constructor(val lockWord: LockWord) { + init { assert(lockWord.thin) } + + companion object { + // TODO outline some consts + + inline operator fun invoke(nested: UInt, ownerTid: UInt): Thin { + if (nested > UInt.MAX_VALUE.shr(1)) throw IllegalArgumentException() // TODO + val nestedPart = nested.shl(1).toULong() + val tidPart = ownerTid.toULong().shl(UInt.SIZE_BITS) + val result = Thin(LockWord(nestedPart.or(tidPart))) + assert(result.nested == nested) + assert(result.ownerTid == ownerTid) + return result + } + } + + inline val nested: UInt get() = lockWord.encoded.and(UInt.MAX_VALUE.toULong()).shr(1).toUInt() + inline val ownerTid: UInt get() = lockWord.encoded.and(UInt.MAX_VALUE.toULong().inv()).shr(UInt.SIZE_BITS).toUInt() + + internal inline fun isUnlocked() = ownerTid == 0u + } + + value class Fat internal constructor(val lockWord: LockWord) { + init { assert(lockWord.fat) } + + companion object { + inline operator fun invoke(mutex: CPointer, contended: Boolean = false): Fat { + val mutexPtrValue = mutex.toLong().toULong() + assert(mutexPtrValue.and(FAT_BIT_MASK) == 0uL) + return Fat(LockWord(mutexPtrValue.or(FAT_BIT_MASK))) + } + } + + inline val mutex: CPointer get() = + lockWord.encoded.and(FAT_BIT_MASK.inv()).toLong().toCPointer()!! + } + + } + + // TODO introduce AtomicLockWord + private val lockWord = AtomicLong(LockWord.unlocked().toLong()) + + private inline fun loadLockState() = LockWord.fromLong(lockWord.value) + + private fun compareSetAndFreeLock(expected: LockWord, desired: LockWord): Boolean { + return lockWord.compareAndSet(expected.toLong(), desired.toLong()) + } public fun lock() { - val currentThreadId = pthread_self()!! + val currentThreadId = threadId while (true) { - val state = lock.value + val state = loadLockState() when (state.status) { - UNLOCKED -> { - val thinLock = LockState(THIN, 1, 0, currentThreadId) - if (lock.compareAndSet(state, thinLock)) + LockStatus.UNLOCKED -> { + val thinLock = LockWord.Thin(1u, currentThreadId) + if (compareSetAndFreeLock(state, thinLock.lockWord)) return } - THIN -> { - if (currentThreadId == state.ownerThreadId) { + LockStatus.THIN -> { + val thinState = state.asThin() + if (currentThreadId == thinState.ownerTid) { // reentrant lock - val thinNested = LockState(THIN, state.nestedLocks + 1, state.waiters, currentThreadId) - if (lock.compareAndSet(state, thinNested)) + val thinNested = LockWord.Thin(thinState.nested + 1u, currentThreadId) + if (compareSetAndFreeLock(state, thinNested.lockWord)) return } else { - // another thread is trying to take this lock -> allocate native mutex - val mutex = mutexPool.allocate() - mutex.lock() - val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, mutex) - if (lock.compareAndSet(state, fatLock)) { - //block the current thread waiting for the owner thread to release the permit - mutex.lock() - tryLockAfterResume(currentThreadId) - return - } else { - // return permit taken for the owner thread and release mutex back to the pool - mutex.unlock() - mutexPool.release(mutex) - } + // another thread holds the lock -> allocate native mutex + // TODO allocate native mutex + // or just spin + pthread_yield_np() } } - FAT -> { - if (currentThreadId == state.ownerThreadId) { - // reentrant lock - val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex) - if (lock.compareAndSet(state, nestedFatLock)) return - } else if (state.ownerThreadId != null) { - val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex) - if (lock.compareAndSet(state, fatLock)) { - fatLock.mutex!!.lock() - tryLockAfterResume(currentThreadId) - return - } - } + LockStatus.FAT -> { + abort() } } } } public fun tryLock(): Boolean { - val currentThreadId = pthread_self()!! + val currentThreadId = threadId while (true) { - val state = lock.value - if (state.status == UNLOCKED) { - val thinLock = LockState(THIN, 1, 0, currentThreadId) - if (lock.compareAndSet(state, thinLock)) + val state = loadLockState() + if (state.status == LockStatus.UNLOCKED) { + val thinLock = LockWord.Thin(1u, currentThreadId) + if (compareSetAndFreeLock(state, thinLock.lockWord)) return true } else { - if (currentThreadId == state.ownerThreadId) { - val nestedLock = LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex) - if (lock.compareAndSet(state, nestedLock)) + // FIXME what if fat? + if (currentThreadId == state.asThin().ownerTid) { + val nestedLock = LockWord.Thin(state.asThin().nested + 1u, currentThreadId) + if (compareSetAndFreeLock(state, nestedLock.lockWord)) return true } else { return false @@ -85,71 +145,32 @@ public actual open class SynchronizedObject { } public fun unlock() { - val currentThreadId = pthread_self()!! + val currentThreadId = threadId while (true) { - val state = lock.value - require(currentThreadId == state.ownerThreadId) { "Thin lock may be only released by the owner thread, expected: ${state.ownerThreadId}, real: $currentThreadId" } + val state = loadLockState() when (state.status) { - THIN -> { + LockStatus.THIN -> { + val thinState = state.asThin() + require(currentThreadId == thinState.ownerTid) { "Thin lock may be only released by the owner thread, expected: ${thinState.ownerTid}, real: $currentThreadId" } // nested unlock - if (state.nestedLocks == 1) { - val unlocked = LockState(UNLOCKED, 0, 0) - if (lock.compareAndSet(state, unlocked)) + if (thinState.nested == 1u) { + val unlocked = LockWord.unlocked() + if (compareSetAndFreeLock(state, unlocked)) return } else { - val releasedNestedLock = - LockState(THIN, state.nestedLocks - 1, state.waiters, state.ownerThreadId) - if (lock.compareAndSet(state, releasedNestedLock)) + val releasedNestedLock = LockWord.Thin(thinState.nested - 1u, thinState.ownerTid) + if (compareSetAndFreeLock(state, releasedNestedLock.lockWord)) return } } - FAT -> { - if (state.nestedLocks == 1) { - // last nested unlock -> release completely, resume some waiter - val releasedLock = LockState(FAT, 0, state.waiters - 1, null, state.mutex) - if (lock.compareAndSet(state, releasedLock)) { - releasedLock.mutex!!.unlock() - return - } - } else { - // lock is still owned by the current thread - val releasedLock = - LockState(FAT, state.nestedLocks - 1, state.waiters, state.ownerThreadId, state.mutex) - if (lock.compareAndSet(state, releasedLock)) - return - } + LockStatus.FAT -> { + require(false) } else -> error("It is not possible to unlock the mutex that is not obtained") } } } - private fun tryLockAfterResume(threadId: pthread_t) { - while (true) { - val state = lock.value - val newState = if (state.waiters == 0) // deflate - LockState(THIN, 1, 0, threadId) - else - LockState(FAT, 1, state.waiters, threadId, state.mutex) - if (lock.compareAndSet(state, newState)) { - if (state.waiters == 0) { - state.mutex!!.unlock() - mutexPool.release(state.mutex) - } - return - } - } - } - - protected class LockState( - val status: Status, - val nestedLocks: Int, - val waiters: Int, - val ownerThreadId: pthread_t? = null, - val mutex: CPointer? = null - ) - - protected enum class Status { UNLOCKED, THIN, FAT } private fun CPointer.lock() = lock(this.pointed.mutex)