Skip to content

Commit

Permalink
feat: slack 전송 에러 처리 로직 추가 (MZ-302) (#77)
Browse files Browse the repository at this point in the history
* feat: slack 전송 실패 메세지 처리 로직 추가

* chore: ktlint format
  • Loading branch information
wjdtkdgns authored Aug 3, 2024
1 parent 6e6f001 commit 5139aaa
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.oksusu.susu.batch.slack.job

import com.oksusu.susu.cache.key.Cache
import com.oksusu.susu.cache.model.FailedSentSlackMessageCache
import com.oksusu.susu.cache.service.CacheService
import com.oksusu.susu.client.slack.SlackClient
import com.oksusu.susu.client.slack.model.SlackMessageModel
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import org.springframework.stereotype.Component
import java.time.LocalDateTime

@Component
class ResendFailedSentSlackMessageJob(
private val cacheService: CacheService,
private val slackClient: SlackClient,
) {
private val logger = KotlinLogging.logger { }

companion object {
private const val RESEND_BEFORE_MINUTES = 1L
}

suspend fun resendFailedSentSlackMessage() {
logger.info { "start resend failed sent slack message" }

// 1분 전에 실패한 것이 타겟 (현재가 24분이면 23분을 말하는 것)
val targetTime = LocalDateTime.now().minusMinutes(RESEND_BEFORE_MINUTES)

// 실패 메세지 조회 및 삭제
val failedMessages = withContext(Dispatchers.IO) {
cacheService.sGetMembers(Cache.getFailedSentSlackMessageCache(targetTime))
}

withContext(Dispatchers.IO) {
cacheService.sDelete(Cache.getFailedSentSlackMessageCache(targetTime))
}

// 다수 메세지 token 별로 하나의 메세지로 병합
val message = mergeFailedMessage(failedMessages)

// 재전송
runCatching {
coroutineScope {
val sendDeferreds = message.map { (token, message) ->
val slackMessageModel = SlackMessageModel(text = message)

async(Dispatchers.IO) {
slackClient.sendMessage(
message = slackMessageModel,
token = token,
withRecover = false
)
}
}.toTypedArray()

awaitAll(*sendDeferreds)
}
}.onFailure {
// 재전송 실패시 1분 뒤에 다시 보낼 수 있게, 1분 뒤에 보내는 메세지 목록에 추가
logger.warn { "postpone resend slack message" }

postponeResendTimeOfFailedMessage(targetTime, message)
}

logger.info { "finish resend failed sent slack message" }
}

private suspend fun mergeFailedMessage(failedMessages: List<FailedSentSlackMessageCache>): Map<String, String> {
val message = mutableMapOf<String, String>()

failedMessages.forEach { model ->
val recoverMsg = if (model.isStacked) {
model.message
} else {
"[RECOVER - ${model.failedAt} slack failure] ${model.message}"
}

val stackedMessage = message[model.token]

message[model.token] = if (stackedMessage == null) {
recoverMsg
} else {
"$stackedMessage\n$recoverMsg"
}
}

return message
}

private suspend fun postponeResendTimeOfFailedMessage(targetTime: LocalDateTime, message: Map<String, String>) {
val nextTime = targetTime.plusMinutes(RESEND_BEFORE_MINUTES)

coroutineScope {
message.map { (token, message) ->
val model = FailedSentSlackMessageCache(
token = token,
message = message,
isStacked = true
)

async(Dispatchers.IO) {
cacheService.sSet(
cache = Cache.getFailedSentSlackMessageCache(nextTime),
value = model
)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.oksusu.susu.batch.slack.scheduler

import com.oksusu.susu.batch.slack.job.ResendFailedSentSlackMessageJob
import com.oksusu.susu.client.common.coroutine.ErrorPublishingCoroutineExceptionHandler
import com.oksusu.susu.common.extension.resolveCancellation
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

@Component
class ResendFailedSentSlackMessageScheduler(
private val resendFailedSentSlackMessageJob: ResendFailedSentSlackMessageJob,
private val coroutineExceptionHandler: ErrorPublishingCoroutineExceptionHandler,
) {
private val logger = KotlinLogging.logger { }

@Scheduled(
fixedRate = 1000 * 60,
initialDelayString = "\${oksusu.scheduled-tasks.resend-failed-sent-slack-message.initial-delay:100}"
)
fun resendFailedSentSlackMessageJob() {
CoroutineScope(Dispatchers.IO + coroutineExceptionHandler.handler).launch {
runCatching {
resendFailedSentSlackMessageJob.resendFailedSentSlackMessage()
}.onFailure { e ->
logger.resolveCancellation("[BATCH] fail to run resendFailedSentSlackMessageJob", e)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SusuStatisticsDailySummaryJob(
dailyLedgerCount = dailyLedgerCount,
friendCount = friendCount,
userWithdrawCount = userWithdrawCount,
dailyReportHistoryCount = dailyReportHistoryCount,
dailyReportHistoryCount = dailyReportHistoryCount
)
}.run { slackClient.sendSummary(this.message()) }
}
Expand Down
10 changes: 10 additions & 0 deletions cache/src/main/kotlin/com/oksusu/susu/cache/key/Cache.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.oksusu.susu.cache.key

import com.fasterxml.jackson.core.type.TypeReference
import com.oksusu.susu.cache.model.FailedSentSlackMessageCache
import com.oksusu.susu.cache.model.OidcPublicKeysCacheModel
import com.oksusu.susu.cache.model.SusuEnvelopeStatisticCacheModel
import com.oksusu.susu.cache.model.UserEnvelopeStatisticCacheModel
import com.oksusu.susu.common.consts.*
import com.oksusu.susu.common.util.toTypeReference
import java.time.Duration
import java.time.LocalDateTime

data class Cache<VALUE_TYPE>(
val key: String,
Expand Down Expand Up @@ -88,5 +90,13 @@ data class Cache<VALUE_TYPE>(
duration = Duration.ofDays(1).plusHours(1)
)
}

fun getFailedSentSlackMessageCache(time: LocalDateTime): Cache<FailedSentSlackMessageCache> {
return Cache(
key = "$FAILED_SENT_SLACK_MESSAGE_KEY:${time.minute}",
type = toTypeReference(),
duration = Duration.ofHours(2)
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.oksusu.susu.cache.model

import java.time.LocalDateTime

data class FailedSentSlackMessageCache(
val token: String,
val message: String,
val failedAt: LocalDateTime = LocalDateTime.now(),
val isStacked: Boolean = false,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ interface CacheService {
suspend fun <VALUE_TYPE : Any> getOrNull(cache: Cache<VALUE_TYPE>): VALUE_TYPE?
suspend fun <VALUE_TYPE : Any> delete(cache: Cache<VALUE_TYPE>)

/** set */
suspend fun <VALUE_TYPE : Any> sSet(cache: Cache<VALUE_TYPE>, value: VALUE_TYPE)
suspend fun <VALUE_TYPE : Any> sGetMembers(cache: Cache<VALUE_TYPE>): List<VALUE_TYPE>
suspend fun <VALUE_TYPE : Any> sDelete(cache: Cache<VALUE_TYPE>)

companion object {
/** key-value */
suspend fun <VALUE_TYPE : Any> CacheService.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import com.oksusu.susu.cache.key.Cache
import com.oksusu.susu.common.extension.mapper
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import kotlinx.coroutines.slf4j.MDCContext
Expand All @@ -16,6 +19,7 @@ class SuspendableCacheService(
) : CacheService {
private val logger = KotlinLogging.logger { }
private val keyValueOps = reactiveStringRedisTemplate.opsForValue()
private val setOps = reactiveStringRedisTemplate.opsForSet()

override suspend fun <VALUE_TYPE : Any> set(cache: Cache<VALUE_TYPE>, value: VALUE_TYPE) {
coroutineScope {
Expand Down Expand Up @@ -66,4 +70,49 @@ class SuspendableCacheService(
}
}.getOrNull()
}

override suspend fun <VALUE_TYPE : Any> sSet(cache: Cache<VALUE_TYPE>, value: VALUE_TYPE) {
coroutineScope {
launch(Dispatchers.IO + Job() + MDCContext()) {
runCatching {
setOps.add(
cache.key,
mapper.writeValueAsString(value)
).cache().awaitSingleOrNull()
}.onFailure { e ->
when (e) {
is CancellationException -> logger.debug { "Redis Set job cancelled." }
else -> logger.error(e) { "fail to set data from redis. key : ${cache.key}" }
}
}.getOrNull()
}
}
}

override suspend fun <VALUE_TYPE : Any> sGetMembers(cache: Cache<VALUE_TYPE>): List<VALUE_TYPE> {
return runCatching {
setOps.members(cache.key)
.cache()
.asFlow()
.map { mapper.readValue(it, cache.type) }
.toList()
}.onFailure { e ->
when (e) {
is CancellationException -> logger.debug { "Redis Read job cancelled." }
else -> logger.error(e) { "fail to read data from redis. key : ${cache.key}" }
}
}.getOrNull() ?: emptyList()
}

override suspend fun <VALUE_TYPE : Any> sDelete(cache: Cache<VALUE_TYPE>) {
runCatching {
setOps.delete(cache.key)
.awaitSingle()
}.onFailure { e ->
when (e) {
is CancellationException -> logger.debug { "Redis Delete job cancelled." }
else -> logger.error(e) { "fail to delete data from redis. key : ${cache.key}" }
}
}.getOrNull()
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.oksusu.susu.client.config

import com.oksusu.susu.cache.service.CacheService
import com.oksusu.susu.client.WebClientFactory
import com.oksusu.susu.client.slack.SlackClient
import com.oksusu.susu.client.slack.SuspendableSlackClient
Expand All @@ -10,6 +11,7 @@ import org.springframework.context.annotation.Configuration
@Configuration
class SlackClientConfig(
private val webhookConfig: SlackConfig.SlackWebhookConfig,
private val cacheService: CacheService,
) {
private val logger = KotlinLogging.logger {}

Expand All @@ -21,6 +23,6 @@ class SlackClientConfig(
fun slackClient(): SlackClient {
val webClient = WebClientFactory.generate(baseUrl = SLACK_WEBHOOKS_DOMAIN)
logger.info { "initialized slack client" }
return SuspendableSlackClient(webClient, webhookConfig)
return SuspendableSlackClient(webClient, webhookConfig, cacheService)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import com.oksusu.susu.client.slack.model.SlackMessageModel
interface SlackClient {
suspend fun sendSummary(message: SlackMessageModel): String
suspend fun sendError(message: SlackMessageModel): String
suspend fun sendMessage(message: SlackMessageModel, token: String, withRecover: Boolean = true): String
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
package com.oksusu.susu.client.slack

import com.oksusu.susu.cache.key.Cache
import com.oksusu.susu.cache.model.FailedSentSlackMessageCache
import com.oksusu.susu.cache.service.CacheService
import com.oksusu.susu.client.config.SlackConfig
import com.oksusu.susu.client.slack.model.SlackMessageModel
import com.oksusu.susu.common.extension.awaitSingleOrThrow
import com.oksusu.susu.common.extension.withMDCContext
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.Dispatchers
import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBody
import org.springframework.web.reactive.function.client.bodyToMono
import reactor.util.retry.Retry
import java.time.Duration
import java.time.LocalDateTime

class SuspendableSlackClient(
private val webclient: WebClient,
private val slackWebhookConfig: SlackConfig.SlackWebhookConfig,
private val cacheService: CacheService,
) : SlackClient {
private val logger = KotlinLogging.logger { }

override suspend fun sendSummary(message: SlackMessageModel): String {
return sendMessage(message, slackWebhookConfig.summaryToken)
}
Expand All @@ -20,15 +31,31 @@ class SuspendableSlackClient(
return sendMessage(message, slackWebhookConfig.errorToken)
}

private suspend fun sendMessage(message: SlackMessageModel, token: String): String {
return withMDCContext(Dispatchers.IO) {
webclient
.post()
.uri("/$token")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(message)
.retrieve()
.awaitBody()
}
override suspend fun sendMessage(message: SlackMessageModel, token: String, withRecover: Boolean): String {
return runCatching {
withMDCContext(Dispatchers.IO) {
webclient.post()
.uri("/${token}aasd")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(message)
.retrieve()
.bodyToMono<String>()
.retryWhen(Retry.fixedDelay(2, Duration.ofMillis(500)))
.awaitSingleOrThrow()
}
}.onFailure {
if (withRecover) {
recoverSendMessage(message, token)
}
}.getOrThrow()
}

private suspend fun recoverSendMessage(message: SlackMessageModel, token: String) {
val model = FailedSentSlackMessageCache(
token = token,
message = message.text
)

cacheService.sSet(Cache.getFailedSentSlackMessageCache(LocalDateTime.now()), model)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const val USER_COMMUNITY_PUNISHED_COUNT_KEY = "user_community_punished_count_key

const val SUSU_ENVELOPE_STATISTIC_AMOUNT_KEY = "susu_envelope_statistic_amount"

const val FAILED_SENT_SLACK_MESSAGE_KEY = "failed_sent_slack_message_key"

const val KID = "kid"

const val MDC_KEY_TRACE_ID = "traceId"
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ import java.time.LocalDateTime
interface ReportResultRepository : JpaRepository<ReportResult, Long>, ReportResultQRepository {
fun findAllByCreatedAtBetween(from: LocalDateTime, to: LocalDateTime): List<ReportResult>
}

0 comments on commit 5139aaa

Please sign in to comment.