Skip to content

Commit

Permalink
fix: key 생성 방식 변경 및 M-thread 테스트 추가
Browse files Browse the repository at this point in the history
  • Loading branch information
wjdtkdgns committed Aug 16, 2024
1 parent 3d2e6c3 commit 0ef21de
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.oksusu.susu.api.common.lock

interface LockManager {
suspend fun <T> lock(type: LockType, key: String, block: suspend () -> T): T
suspend fun <T> lock(key: String, block: suspend () -> T): T
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ private sealed class LockMsg {
class CheckQueueEmpty(val channel: SendChannel<LockReturn>) : LockMsg()
}

@ObsoleteCoroutinesApi

@OptIn(ObsoleteCoroutinesApi::class)
private fun lockActor() = CoroutineScope(Dispatchers.IO).actor<LockMsg> {
// queue 맨 앞 == 락 설정
val lockQueue = LinkedList<SendChannel<LockReturn>>()
Expand Down Expand Up @@ -121,12 +122,12 @@ class SuspendableLockManager : LockManager {

private val actorMap = ConcurrentHashMap<String, SendChannel<LockMsg>>()

override suspend fun <T> lock(type: LockType, key: String, block: suspend () -> T): T {
override suspend fun <T> lock(key: String, block: suspend () -> T): T {
// lock 관련 리턴 받을 채널
val channel = Channel<LockReturn>()

// key에 해당하는 actor
val actor = actorMap.compute("${type}_${key}") { _, value -> value ?: lockActor() }
val actor = actorMap.compute(key) { _, value -> value ?: lockActor() }
?: throw FailToExecuteException(ErrorCode.FAIL_TO_EXECUTE_LOCK)

// 락 설정
Expand All @@ -148,7 +149,7 @@ class SuspendableLockManager : LockManager {
releaseLock(actor, channel)

// 큐가 빈 액터 삭제
deleteEmptyQueueActor(type, channel, key)
deleteEmptyQueueActor(channel, key)

// 채널 닫기
channel.close()
Expand Down Expand Up @@ -178,7 +179,7 @@ class SuspendableLockManager : LockManager {
channel.receive()
}

private suspend fun deleteEmptyQueueActor(type: LockType, channel: Channel<LockReturn>, key: String) {
private suspend fun deleteEmptyQueueActor(channel: Channel<LockReturn>, key: String) {
actorMap.computeIfPresent(key) { _, value ->
val rtn = runBlocking(Dispatchers.Unconfined) {
value.send(LockMsg.CheckQueueEmpty(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.oksusu.susu.api.post.application
import com.oksusu.susu.api.auth.model.AuthUser
import com.oksusu.susu.api.common.dto.SusuPageRequest
import com.oksusu.susu.api.common.lock.LockManager
import com.oksusu.susu.api.common.lock.LockType
import com.oksusu.susu.api.count.application.CountService
import com.oksusu.susu.api.event.model.DeleteVoteCountEvent
import com.oksusu.susu.api.post.model.*
Expand Down Expand Up @@ -162,7 +161,7 @@ class VoteFacade(
}

suspend fun vote(user: AuthUser, id: Long, request: CreateVoteHistoryRequest) {
lockManager.lock(LockType.VOTE, "$id") {
lockManager.lock("$id") {
when (request.isCancel) {
true -> cancelVote(user.uid, id, request.optionId)
false -> castVote(user.uid, id, request.optionId)
Expand Down
29 changes: 0 additions & 29 deletions api/src/test/kotlin/com/oksusu/susu/api/ConcurrencyExtension.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package com.oksusu.susu.api.common.lock

import com.oksusu.susu.api.executeConcurrency
import com.oksusu.susu.api.testExtension.CONCURRENT_COUNT
import com.oksusu.susu.api.testExtension.coExecuteConcurrency
import com.oksusu.susu.api.testExtension.executeConcurrency
import io.github.oshai.kotlinlogging.KotlinLogging
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.equals.shouldBeEqual
import kotlinx.coroutines.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong

class SuspendableLockManagerTest : DescribeSpec({
Expand All @@ -16,59 +20,65 @@ class SuspendableLockManagerTest : DescribeSpec({
val countService3 = CountService()

beforeEach {
countService1.set(0)
countService2.set(0)
countService3.set(0)
countService1.apply { this.counter = 0 }
countService2.apply { this.counter = 0 }
countService3.apply { this.counter = 0 }
}

describe("suspendable lock manager") {
context("조회시") {
it("올바른 config 값이 조회되어야 한다.") {
context("락을 설정하면") {
it("여러 코루틴으로 동작했을 때, 카운트가 동작한 수만큼 증가해야한다.") {
val successCount = AtomicLong()

coExecuteConcurrency(successCount) {
lockManager.lock("1") {
countService1.increase()
logger.info { "1 ${countService1.counter}" }
}
}

countService1.counter shouldBeEqual CONCURRENT_COUNT
successCount.get() shouldBeEqual CONCURRENT_COUNT.toLong()
}

it("여러 쓰레드를 생성해 동작했을 때, 카운트가 동작한 수만큼 증가해야한다.") {
val successCount = AtomicLong()

executeConcurrency(successCount) {
lockManager.lock(LockType.VOTE, "1") {
val counter = countService1.counter
Thread.sleep(100)
logger.info { "1 $counter" }
countService1.set(counter + 1)
lockManager.lock("1") {
countService1.increase()
logger.info { "1 ${countService1.counter}" }
}
}

countService1.counter shouldBeEqual 5
successCount.get() shouldBeEqual 5
countService1.counter shouldBeEqual CONCURRENT_COUNT
successCount.get() shouldBeEqual CONCURRENT_COUNT.toLong()
}

it("올바른 config 값이 조회되어야 한다.") {
it("여러 코루틴으로 동작했을 때, 키 별로 락이 지정되고, 카운트가 올바르게 증가해야한다.") {
val successCount = AtomicLong()

coroutineScope {
val deferreds = mutableListOf<Deferred<Long>>()
for (i in 1..5) {
for (i in 1..CONCURRENT_COUNT) {
val deferred1 = async(Dispatchers.IO) {
lockManager.lock(LockType.VOTE, "1") {
val counter = countService1.counter
Thread.sleep(100)
logger.info { "1 $counter" }
countService1.set(counter + 1)
lockManager.lock("1") {
countService1.increase()
logger.info { "1 ${countService1.counter}" }
}
successCount.getAndIncrement()
}
val deferred2 = async(Dispatchers.IO) {
lockManager.lock(LockType.VOTE, "2") {
val counter = countService2.counter
Thread.sleep(100)
logger.info { "2 $counter" }
countService2.set(counter + 1)
lockManager.lock("2") {
countService2.increase()
logger.info { "2 ${countService2.counter}" }
}
successCount.getAndIncrement()
}
val deferred3 = async(Dispatchers.IO) {
lockManager.lock(LockType.VOTE, "3") {
val counter = countService3.counter
Thread.sleep(100)
logger.info { "3 $counter" }
countService3.set(counter + 1)
lockManager.lock("3") {
countService3.increase()
logger.info { "3 ${countService3.counter}" }
}
successCount.getAndIncrement()
}
Expand All @@ -81,10 +91,69 @@ class SuspendableLockManagerTest : DescribeSpec({
awaitAll(*deferreds.toTypedArray())
}

countService1.counter shouldBeEqual 5
countService2.counter shouldBeEqual 5
countService3.counter shouldBeEqual 5
successCount.get() shouldBeEqual 15
countService1.counter shouldBeEqual CONCURRENT_COUNT
countService2.counter shouldBeEqual CONCURRENT_COUNT
countService3.counter shouldBeEqual CONCURRENT_COUNT
successCount.get() shouldBeEqual CONCURRENT_COUNT * 3L
}

it("여러 쓰레드를 생성해 동작했을 때, 키 별로 락이 지정되고, 카운트가 올바르게 증가해야한다.") {
val successCount = AtomicLong()
val executorService = Executors.newFixedThreadPool(30)
val latch = CountDownLatch(30)
for (i in 1..CONCURRENT_COUNT) {
executorService.submit {
try {
runBlocking {
lockManager.lock("1") {
countService1.increase()
logger.info { "1 ${countService1.counter}" }
}
}
successCount.getAndIncrement()
} catch (e: Throwable) {
logger.info { e.toString() }
} finally {
latch.countDown()
}
}
executorService.submit {
try {
runBlocking {
lockManager.lock("2") {
countService2.increase()
logger.info { "2 ${countService2.counter}" }
}
}
successCount.getAndIncrement()
} catch (e: Throwable) {
logger.info { e.toString() }
} finally {
latch.countDown()
}
}
executorService.submit {
try {
runBlocking {
lockManager.lock("3") {
countService3.increase()
logger.info { "3 ${countService3.counter}" }
}
}
successCount.getAndIncrement()
} catch (e: Throwable) {
logger.info { e.toString() }
} finally {
latch.countDown()
}
}
}
latch.await()

countService1.counter shouldBeEqual CONCURRENT_COUNT
countService2.counter shouldBeEqual CONCURRENT_COUNT
countService3.counter shouldBeEqual CONCURRENT_COUNT
successCount.get() shouldBeEqual CONCURRENT_COUNT * 3L
}
}
}
Expand All @@ -93,7 +162,9 @@ class SuspendableLockManagerTest : DescribeSpec({
private class CountService {
var counter: Int = 0

fun set(a: Int) {
counter = a
suspend fun increase() {
val curCount = counter
delay(100)
counter = curCount + 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.oksusu.susu.api.post.application
import com.oksusu.susu.api.ApiIntegrationSpec
import com.oksusu.susu.api.auth.model.AuthContextImpl
import com.oksusu.susu.api.auth.model.AuthUserImpl
import com.oksusu.susu.api.executeConcurrency
import com.oksusu.susu.api.testExtension.executeConcurrency
import com.oksusu.susu.api.post.model.BoardModel
import com.oksusu.susu.api.post.model.OnboardingVoteOptionCountModel
import com.oksusu.susu.api.post.model.VoteOptionWithoutIdModel
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.oksusu.susu.api.testExtension

import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong

private val logger = KotlinLogging.logger { }

const val CONCURRENT_COUNT = 10

suspend fun <T> coExecuteConcurrency(successCount: AtomicLong, block: suspend () -> T?) {
coroutineScope {
val deferreds = mutableListOf<Deferred<Unit>>()
for (i in 1..CONCURRENT_COUNT) {
val deferred = async(Dispatchers.IO) {
try {
block()
successCount.getAndIncrement()
} catch (e: Exception) {
logger.error { e }
}

return@async
}

deferreds.add(deferred)
}

awaitAll(*deferreds.toTypedArray())
}
}

suspend fun <T> executeConcurrency(successCount: AtomicLong, block: suspend () -> T?) {
val executorService = Executors.newFixedThreadPool(CONCURRENT_COUNT)
val latch = CountDownLatch(CONCURRENT_COUNT)
for (i in 1..CONCURRENT_COUNT) {
executorService.submit {
try {
runBlocking {
block()
}
successCount.getAndIncrement()
} catch (e: Throwable) {
logger.info { e.toString() }
} finally {
latch.countDown()
}
}
}
latch.await()
}

0 comments on commit 0ef21de

Please sign in to comment.