diff --git a/api/src/main/kotlin/com/oksusu/susu/api/common/lock/LockManager.kt b/api/src/main/kotlin/com/oksusu/susu/api/common/lock/LockManager.kt index 7cf92021..235619a6 100644 --- a/api/src/main/kotlin/com/oksusu/susu/api/common/lock/LockManager.kt +++ b/api/src/main/kotlin/com/oksusu/susu/api/common/lock/LockManager.kt @@ -1,5 +1,5 @@ package com.oksusu.susu.api.common.lock interface LockManager { - suspend fun lock(type: LockType, key: String, block: suspend () -> T): T + suspend fun lock(key: String, block: suspend () -> T): T } diff --git a/api/src/main/kotlin/com/oksusu/susu/api/common/lock/LockType.kt b/api/src/main/kotlin/com/oksusu/susu/api/common/lock/LockType.kt deleted file mode 100644 index ce328b9b..00000000 --- a/api/src/main/kotlin/com/oksusu/susu/api/common/lock/LockType.kt +++ /dev/null @@ -1,6 +0,0 @@ -package com.oksusu.susu.api.common.lock - -enum class LockType { - VOTE, - ; -} diff --git a/api/src/main/kotlin/com/oksusu/susu/api/common/lock/SuspendableLockManager.kt b/api/src/main/kotlin/com/oksusu/susu/api/common/lock/SuspendableLockManager.kt index 4f29d6ea..b549441a 100644 --- a/api/src/main/kotlin/com/oksusu/susu/api/common/lock/SuspendableLockManager.kt +++ b/api/src/main/kotlin/com/oksusu/susu/api/common/lock/SuspendableLockManager.kt @@ -55,7 +55,8 @@ private sealed class LockMsg { class CheckQueueEmpty(val channel: SendChannel) : LockMsg() } -@ObsoleteCoroutinesApi + +@OptIn(ObsoleteCoroutinesApi::class) private fun lockActor() = CoroutineScope(Dispatchers.IO).actor { // queue 맨 앞 == 락 설정 val lockQueue = LinkedList>() @@ -121,12 +122,12 @@ class SuspendableLockManager : LockManager { private val actorMap = ConcurrentHashMap>() - override suspend fun lock(type: LockType, key: String, block: suspend () -> T): T { + override suspend fun lock(key: String, block: suspend () -> T): T { // lock 관련 리턴 받을 채널 val channel = Channel() // key에 해당하는 actor - val actor = actorMap.compute("${type}_${key}") { _, value -> value ?: lockActor() } + val actor = actorMap.compute(key) { _, value -> value ?: lockActor() } ?: throw FailToExecuteException(ErrorCode.FAIL_TO_EXECUTE_LOCK) // 락 설정 @@ -148,7 +149,7 @@ class SuspendableLockManager : LockManager { releaseLock(actor, channel) // 큐가 빈 액터 삭제 - deleteEmptyQueueActor(type, channel, key) + deleteEmptyQueueActor(channel, key) // 채널 닫기 channel.close() @@ -178,7 +179,7 @@ class SuspendableLockManager : LockManager { channel.receive() } - private suspend fun deleteEmptyQueueActor(type: LockType, channel: Channel, key: String) { + private suspend fun deleteEmptyQueueActor(channel: Channel, key: String) { actorMap.computeIfPresent(key) { _, value -> val rtn = runBlocking(Dispatchers.Unconfined) { value.send(LockMsg.CheckQueueEmpty(channel)) diff --git a/api/src/main/kotlin/com/oksusu/susu/api/post/application/VoteFacade.kt b/api/src/main/kotlin/com/oksusu/susu/api/post/application/VoteFacade.kt index 8e7ebd50..62bd177a 100644 --- a/api/src/main/kotlin/com/oksusu/susu/api/post/application/VoteFacade.kt +++ b/api/src/main/kotlin/com/oksusu/susu/api/post/application/VoteFacade.kt @@ -3,7 +3,6 @@ package com.oksusu.susu.api.post.application import com.oksusu.susu.api.auth.model.AuthUser import com.oksusu.susu.api.common.dto.SusuPageRequest import com.oksusu.susu.api.common.lock.LockManager -import com.oksusu.susu.api.common.lock.LockType import com.oksusu.susu.api.count.application.CountService import com.oksusu.susu.api.event.model.DeleteVoteCountEvent import com.oksusu.susu.api.post.model.* @@ -162,7 +161,7 @@ class VoteFacade( } suspend fun vote(user: AuthUser, id: Long, request: CreateVoteHistoryRequest) { - lockManager.lock(LockType.VOTE, "$id") { + lockManager.lock("$id") { when (request.isCancel) { true -> cancelVote(user.uid, id, request.optionId) false -> castVote(user.uid, id, request.optionId) diff --git a/api/src/test/kotlin/com/oksusu/susu/api/ConcurrencyExtension.kt b/api/src/test/kotlin/com/oksusu/susu/api/ConcurrencyExtension.kt deleted file mode 100644 index 8961236b..00000000 --- a/api/src/test/kotlin/com/oksusu/susu/api/ConcurrencyExtension.kt +++ /dev/null @@ -1,29 +0,0 @@ -package com.oksusu.susu.api - -import io.github.oshai.kotlinlogging.KotlinLogging -import kotlinx.coroutines.* -import java.util.concurrent.atomic.AtomicLong - -private val logger = KotlinLogging.logger { } - -suspend fun executeConcurrency(successCount: AtomicLong, block: suspend () -> T?) { - coroutineScope { - val deferreds = mutableListOf>() - for (i in 1..5) { - val deferred = async(Dispatchers.IO) { - try { - block() - successCount.getAndIncrement() - } catch (e: Exception) { - logger.error { e } - } - - return@async - } - - deferreds.add(deferred) - } - - awaitAll(*deferreds.toTypedArray()) - } -} diff --git a/api/src/test/kotlin/com/oksusu/susu/api/common/lock/SuspendableLockManagerTest.kt b/api/src/test/kotlin/com/oksusu/susu/api/common/lock/SuspendableLockManagerTest.kt index ee421d2d..96869828 100644 --- a/api/src/test/kotlin/com/oksusu/susu/api/common/lock/SuspendableLockManagerTest.kt +++ b/api/src/test/kotlin/com/oksusu/susu/api/common/lock/SuspendableLockManagerTest.kt @@ -1,10 +1,14 @@ package com.oksusu.susu.api.common.lock -import com.oksusu.susu.api.executeConcurrency +import com.oksusu.susu.api.testExtension.CONCURRENT_COUNT +import com.oksusu.susu.api.testExtension.coExecuteConcurrency +import com.oksusu.susu.api.testExtension.executeConcurrency import io.github.oshai.kotlinlogging.KotlinLogging import io.kotest.core.spec.style.DescribeSpec import io.kotest.matchers.equals.shouldBeEqual import kotlinx.coroutines.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicLong class SuspendableLockManagerTest : DescribeSpec({ @@ -16,59 +20,65 @@ class SuspendableLockManagerTest : DescribeSpec({ val countService3 = CountService() beforeEach { - countService1.set(0) - countService2.set(0) - countService3.set(0) + countService1.apply { this.counter = 0 } + countService2.apply { this.counter = 0 } + countService3.apply { this.counter = 0 } } describe("suspendable lock manager") { - context("조회시") { - it("올바른 config 값이 조회되어야 한다.") { + context("락을 설정하면") { + it("여러 코루틴으로 동작했을 때, 카운트가 동작한 수만큼 증가해야한다.") { + val successCount = AtomicLong() + + coExecuteConcurrency(successCount) { + lockManager.lock("1") { + countService1.increase() + logger.info { "1 ${countService1.counter}" } + } + } + + countService1.counter shouldBeEqual CONCURRENT_COUNT + successCount.get() shouldBeEqual CONCURRENT_COUNT.toLong() + } + + it("여러 쓰레드를 생성해 동작했을 때, 카운트가 동작한 수만큼 증가해야한다.") { val successCount = AtomicLong() executeConcurrency(successCount) { - lockManager.lock(LockType.VOTE, "1") { - val counter = countService1.counter - Thread.sleep(100) - logger.info { "1 $counter" } - countService1.set(counter + 1) + lockManager.lock("1") { + countService1.increase() + logger.info { "1 ${countService1.counter}" } } } - countService1.counter shouldBeEqual 5 - successCount.get() shouldBeEqual 5 + countService1.counter shouldBeEqual CONCURRENT_COUNT + successCount.get() shouldBeEqual CONCURRENT_COUNT.toLong() } - it("올바른 config 값이 조회되어야 한다.") { + it("여러 코루틴으로 동작했을 때, 키 별로 락이 지정되고, 카운트가 올바르게 증가해야한다.") { val successCount = AtomicLong() coroutineScope { val deferreds = mutableListOf>() - for (i in 1..5) { + for (i in 1..CONCURRENT_COUNT) { val deferred1 = async(Dispatchers.IO) { - lockManager.lock(LockType.VOTE, "1") { - val counter = countService1.counter - Thread.sleep(100) - logger.info { "1 $counter" } - countService1.set(counter + 1) + lockManager.lock("1") { + countService1.increase() + logger.info { "1 ${countService1.counter}" } } successCount.getAndIncrement() } val deferred2 = async(Dispatchers.IO) { - lockManager.lock(LockType.VOTE, "2") { - val counter = countService2.counter - Thread.sleep(100) - logger.info { "2 $counter" } - countService2.set(counter + 1) + lockManager.lock("2") { + countService2.increase() + logger.info { "2 ${countService2.counter}" } } successCount.getAndIncrement() } val deferred3 = async(Dispatchers.IO) { - lockManager.lock(LockType.VOTE, "3") { - val counter = countService3.counter - Thread.sleep(100) - logger.info { "3 $counter" } - countService3.set(counter + 1) + lockManager.lock("3") { + countService3.increase() + logger.info { "3 ${countService3.counter}" } } successCount.getAndIncrement() } @@ -81,10 +91,69 @@ class SuspendableLockManagerTest : DescribeSpec({ awaitAll(*deferreds.toTypedArray()) } - countService1.counter shouldBeEqual 5 - countService2.counter shouldBeEqual 5 - countService3.counter shouldBeEqual 5 - successCount.get() shouldBeEqual 15 + countService1.counter shouldBeEqual CONCURRENT_COUNT + countService2.counter shouldBeEqual CONCURRENT_COUNT + countService3.counter shouldBeEqual CONCURRENT_COUNT + successCount.get() shouldBeEqual CONCURRENT_COUNT * 3L + } + + it("여러 쓰레드를 생성해 동작했을 때, 키 별로 락이 지정되고, 카운트가 올바르게 증가해야한다.") { + val successCount = AtomicLong() + val executorService = Executors.newFixedThreadPool(30) + val latch = CountDownLatch(30) + for (i in 1..CONCURRENT_COUNT) { + executorService.submit { + try { + runBlocking { + lockManager.lock("1") { + countService1.increase() + logger.info { "1 ${countService1.counter}" } + } + } + successCount.getAndIncrement() + } catch (e: Throwable) { + logger.info { e.toString() } + } finally { + latch.countDown() + } + } + executorService.submit { + try { + runBlocking { + lockManager.lock("2") { + countService2.increase() + logger.info { "2 ${countService2.counter}" } + } + } + successCount.getAndIncrement() + } catch (e: Throwable) { + logger.info { e.toString() } + } finally { + latch.countDown() + } + } + executorService.submit { + try { + runBlocking { + lockManager.lock("3") { + countService3.increase() + logger.info { "3 ${countService3.counter}" } + } + } + successCount.getAndIncrement() + } catch (e: Throwable) { + logger.info { e.toString() } + } finally { + latch.countDown() + } + } + } + latch.await() + + countService1.counter shouldBeEqual CONCURRENT_COUNT + countService2.counter shouldBeEqual CONCURRENT_COUNT + countService3.counter shouldBeEqual CONCURRENT_COUNT + successCount.get() shouldBeEqual CONCURRENT_COUNT * 3L } } } @@ -93,7 +162,9 @@ class SuspendableLockManagerTest : DescribeSpec({ private class CountService { var counter: Int = 0 - fun set(a: Int) { - counter = a + suspend fun increase() { + val curCount = counter + delay(100) + counter = curCount + 1 } } diff --git a/api/src/test/kotlin/com/oksusu/susu/api/post/application/VoteFacadeTest.kt b/api/src/test/kotlin/com/oksusu/susu/api/post/application/VoteFacadeTest.kt index a47967a0..743f165f 100644 --- a/api/src/test/kotlin/com/oksusu/susu/api/post/application/VoteFacadeTest.kt +++ b/api/src/test/kotlin/com/oksusu/susu/api/post/application/VoteFacadeTest.kt @@ -3,7 +3,7 @@ package com.oksusu.susu.api.post.application import com.oksusu.susu.api.ApiIntegrationSpec import com.oksusu.susu.api.auth.model.AuthContextImpl import com.oksusu.susu.api.auth.model.AuthUserImpl -import com.oksusu.susu.api.executeConcurrency +import com.oksusu.susu.api.testExtension.executeConcurrency import com.oksusu.susu.api.post.model.BoardModel import com.oksusu.susu.api.post.model.OnboardingVoteOptionCountModel import com.oksusu.susu.api.post.model.VoteOptionWithoutIdModel diff --git a/api/src/test/kotlin/com/oksusu/susu/api/testExtension/ConcurrencyExtension.kt b/api/src/test/kotlin/com/oksusu/susu/api/testExtension/ConcurrencyExtension.kt new file mode 100644 index 00000000..e647908a --- /dev/null +++ b/api/src/test/kotlin/com/oksusu/susu/api/testExtension/ConcurrencyExtension.kt @@ -0,0 +1,53 @@ +package com.oksusu.susu.api.testExtension + +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicLong + +private val logger = KotlinLogging.logger { } + +const val CONCURRENT_COUNT = 10 + +suspend fun coExecuteConcurrency(successCount: AtomicLong, block: suspend () -> T?) { + coroutineScope { + val deferreds = mutableListOf>() + for (i in 1..CONCURRENT_COUNT) { + val deferred = async(Dispatchers.IO) { + try { + block() + successCount.getAndIncrement() + } catch (e: Exception) { + logger.error { e } + } + + return@async + } + + deferreds.add(deferred) + } + + awaitAll(*deferreds.toTypedArray()) + } +} + +suspend fun executeConcurrency(successCount: AtomicLong, block: suspend () -> T?) { + val executorService = Executors.newFixedThreadPool(CONCURRENT_COUNT) + val latch = CountDownLatch(CONCURRENT_COUNT) + for (i in 1..CONCURRENT_COUNT) { + executorService.submit { + try { + runBlocking { + block() + } + successCount.getAndIncrement() + } catch (e: Throwable) { + logger.info { e.toString() } + } finally { + latch.countDown() + } + } + } + latch.await() +}