From 5139aaab44884a378626c6230aa4888672e2b996 Mon Sep 17 00:00:00 2001 From: Sanghoon Jeong <67852689+wjdtkdgns@users.noreply.github.com> Date: Sat, 3 Aug 2024 22:35:33 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20slack=20=EC=A0=84=EC=86=A1=20=EC=97=90?= =?UTF-8?q?=EB=9F=AC=20=EC=B2=98=EB=A6=AC=20=EB=A1=9C=EC=A7=81=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20(MZ-302)=20(#77)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: slack 전송 실패 메세지 처리 로직 추가 * chore: ktlint format --- .../job/ResendFailedSentSlackMessageJob.kt | 111 ++++++++++++++++++ .../ResendFailedSentSlackMessageScheduler.kt | 33 ++++++ .../job/SusuStatisticsDailySummaryJob.kt | 2 +- .../kotlin/com/oksusu/susu/cache/key/Cache.kt | 10 ++ .../model/FailedSentSlackMessageCache.kt | 10 ++ .../oksusu/susu/cache/service/CacheService.kt | 5 + .../cache/service/SuspendableCacheService.kt | 49 ++++++++ .../susu/client/config/SlackClientConfig.kt | 4 +- .../oksusu/susu/client/slack/SlackClient.kt | 1 + .../client/slack/SuspendableSlackClient.kt | 49 ++++++-- .../oksusu/susu/common/consts/SusuConsts.kt | 2 + .../infrastructure/ReportResultRepository.kt | 1 - 12 files changed, 263 insertions(+), 14 deletions(-) create mode 100644 batch/src/main/kotlin/com/oksusu/susu/batch/slack/job/ResendFailedSentSlackMessageJob.kt create mode 100644 batch/src/main/kotlin/com/oksusu/susu/batch/slack/scheduler/ResendFailedSentSlackMessageScheduler.kt create mode 100644 cache/src/main/kotlin/com/oksusu/susu/cache/model/FailedSentSlackMessageCache.kt diff --git a/batch/src/main/kotlin/com/oksusu/susu/batch/slack/job/ResendFailedSentSlackMessageJob.kt b/batch/src/main/kotlin/com/oksusu/susu/batch/slack/job/ResendFailedSentSlackMessageJob.kt new file mode 100644 index 00000000..f03a8618 --- /dev/null +++ b/batch/src/main/kotlin/com/oksusu/susu/batch/slack/job/ResendFailedSentSlackMessageJob.kt @@ -0,0 +1,111 @@ +package com.oksusu.susu.batch.slack.job + +import com.oksusu.susu.cache.key.Cache +import com.oksusu.susu.cache.model.FailedSentSlackMessageCache +import com.oksusu.susu.cache.service.CacheService +import com.oksusu.susu.client.slack.SlackClient +import com.oksusu.susu.client.slack.model.SlackMessageModel +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.* +import org.springframework.stereotype.Component +import java.time.LocalDateTime + +@Component +class ResendFailedSentSlackMessageJob( + private val cacheService: CacheService, + private val slackClient: SlackClient, +) { + private val logger = KotlinLogging.logger { } + + companion object { + private const val RESEND_BEFORE_MINUTES = 1L + } + + suspend fun resendFailedSentSlackMessage() { + logger.info { "start resend failed sent slack message" } + + // 1분 전에 실패한 것이 타겟 (현재가 24분이면 23분을 말하는 것) + val targetTime = LocalDateTime.now().minusMinutes(RESEND_BEFORE_MINUTES) + + // 실패 메세지 조회 및 삭제 + val failedMessages = withContext(Dispatchers.IO) { + cacheService.sGetMembers(Cache.getFailedSentSlackMessageCache(targetTime)) + } + + withContext(Dispatchers.IO) { + cacheService.sDelete(Cache.getFailedSentSlackMessageCache(targetTime)) + } + + // 다수 메세지 token 별로 하나의 메세지로 병합 + val message = mergeFailedMessage(failedMessages) + + // 재전송 + runCatching { + coroutineScope { + val sendDeferreds = message.map { (token, message) -> + val slackMessageModel = SlackMessageModel(text = message) + + async(Dispatchers.IO) { + slackClient.sendMessage( + message = slackMessageModel, + token = token, + withRecover = false + ) + } + }.toTypedArray() + + awaitAll(*sendDeferreds) + } + }.onFailure { + // 재전송 실패시 1분 뒤에 다시 보낼 수 있게, 1분 뒤에 보내는 메세지 목록에 추가 + logger.warn { "postpone resend slack message" } + + postponeResendTimeOfFailedMessage(targetTime, message) + } + + logger.info { "finish resend failed sent slack message" } + } + + private suspend fun mergeFailedMessage(failedMessages: List): Map { + val message = mutableMapOf() + + failedMessages.forEach { model -> + val recoverMsg = if (model.isStacked) { + model.message + } else { + "[RECOVER - ${model.failedAt} slack failure] ${model.message}" + } + + val stackedMessage = message[model.token] + + message[model.token] = if (stackedMessage == null) { + recoverMsg + } else { + "$stackedMessage\n$recoverMsg" + } + } + + return message + } + + private suspend fun postponeResendTimeOfFailedMessage(targetTime: LocalDateTime, message: Map) { + val nextTime = targetTime.plusMinutes(RESEND_BEFORE_MINUTES) + + coroutineScope { + message.map { (token, message) -> + val model = FailedSentSlackMessageCache( + token = token, + message = message, + isStacked = true + ) + + async(Dispatchers.IO) { + cacheService.sSet( + cache = Cache.getFailedSentSlackMessageCache(nextTime), + value = model + ) + } + } + } + } +} diff --git a/batch/src/main/kotlin/com/oksusu/susu/batch/slack/scheduler/ResendFailedSentSlackMessageScheduler.kt b/batch/src/main/kotlin/com/oksusu/susu/batch/slack/scheduler/ResendFailedSentSlackMessageScheduler.kt new file mode 100644 index 00000000..cc04c7e8 --- /dev/null +++ b/batch/src/main/kotlin/com/oksusu/susu/batch/slack/scheduler/ResendFailedSentSlackMessageScheduler.kt @@ -0,0 +1,33 @@ +package com.oksusu.susu.batch.slack.scheduler + +import com.oksusu.susu.batch.slack.job.ResendFailedSentSlackMessageJob +import com.oksusu.susu.client.common.coroutine.ErrorPublishingCoroutineExceptionHandler +import com.oksusu.susu.common.extension.resolveCancellation +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Component + +@Component +class ResendFailedSentSlackMessageScheduler( + private val resendFailedSentSlackMessageJob: ResendFailedSentSlackMessageJob, + private val coroutineExceptionHandler: ErrorPublishingCoroutineExceptionHandler, +) { + private val logger = KotlinLogging.logger { } + + @Scheduled( + fixedRate = 1000 * 60, + initialDelayString = "\${oksusu.scheduled-tasks.resend-failed-sent-slack-message.initial-delay:100}" + ) + fun resendFailedSentSlackMessageJob() { + CoroutineScope(Dispatchers.IO + coroutineExceptionHandler.handler).launch { + runCatching { + resendFailedSentSlackMessageJob.resendFailedSentSlackMessage() + }.onFailure { e -> + logger.resolveCancellation("[BATCH] fail to run resendFailedSentSlackMessageJob", e) + } + } + } +} diff --git a/batch/src/main/kotlin/com/oksusu/susu/batch/summary/job/SusuStatisticsDailySummaryJob.kt b/batch/src/main/kotlin/com/oksusu/susu/batch/summary/job/SusuStatisticsDailySummaryJob.kt index b74f5920..40ed04cd 100644 --- a/batch/src/main/kotlin/com/oksusu/susu/batch/summary/job/SusuStatisticsDailySummaryJob.kt +++ b/batch/src/main/kotlin/com/oksusu/susu/batch/summary/job/SusuStatisticsDailySummaryJob.kt @@ -62,7 +62,7 @@ class SusuStatisticsDailySummaryJob( dailyLedgerCount = dailyLedgerCount, friendCount = friendCount, userWithdrawCount = userWithdrawCount, - dailyReportHistoryCount = dailyReportHistoryCount, + dailyReportHistoryCount = dailyReportHistoryCount ) }.run { slackClient.sendSummary(this.message()) } } diff --git a/cache/src/main/kotlin/com/oksusu/susu/cache/key/Cache.kt b/cache/src/main/kotlin/com/oksusu/susu/cache/key/Cache.kt index 51b3a4b7..2f8c62de 100644 --- a/cache/src/main/kotlin/com/oksusu/susu/cache/key/Cache.kt +++ b/cache/src/main/kotlin/com/oksusu/susu/cache/key/Cache.kt @@ -1,12 +1,14 @@ package com.oksusu.susu.cache.key import com.fasterxml.jackson.core.type.TypeReference +import com.oksusu.susu.cache.model.FailedSentSlackMessageCache import com.oksusu.susu.cache.model.OidcPublicKeysCacheModel import com.oksusu.susu.cache.model.SusuEnvelopeStatisticCacheModel import com.oksusu.susu.cache.model.UserEnvelopeStatisticCacheModel import com.oksusu.susu.common.consts.* import com.oksusu.susu.common.util.toTypeReference import java.time.Duration +import java.time.LocalDateTime data class Cache( val key: String, @@ -88,5 +90,13 @@ data class Cache( duration = Duration.ofDays(1).plusHours(1) ) } + + fun getFailedSentSlackMessageCache(time: LocalDateTime): Cache { + return Cache( + key = "$FAILED_SENT_SLACK_MESSAGE_KEY:${time.minute}", + type = toTypeReference(), + duration = Duration.ofHours(2) + ) + } } } diff --git a/cache/src/main/kotlin/com/oksusu/susu/cache/model/FailedSentSlackMessageCache.kt b/cache/src/main/kotlin/com/oksusu/susu/cache/model/FailedSentSlackMessageCache.kt new file mode 100644 index 00000000..50373a4b --- /dev/null +++ b/cache/src/main/kotlin/com/oksusu/susu/cache/model/FailedSentSlackMessageCache.kt @@ -0,0 +1,10 @@ +package com.oksusu.susu.cache.model + +import java.time.LocalDateTime + +data class FailedSentSlackMessageCache( + val token: String, + val message: String, + val failedAt: LocalDateTime = LocalDateTime.now(), + val isStacked: Boolean = false, +) diff --git a/cache/src/main/kotlin/com/oksusu/susu/cache/service/CacheService.kt b/cache/src/main/kotlin/com/oksusu/susu/cache/service/CacheService.kt index 81cc2d6a..890ae991 100644 --- a/cache/src/main/kotlin/com/oksusu/susu/cache/service/CacheService.kt +++ b/cache/src/main/kotlin/com/oksusu/susu/cache/service/CacheService.kt @@ -14,6 +14,11 @@ interface CacheService { suspend fun getOrNull(cache: Cache): VALUE_TYPE? suspend fun delete(cache: Cache) + /** set */ + suspend fun sSet(cache: Cache, value: VALUE_TYPE) + suspend fun sGetMembers(cache: Cache): List + suspend fun sDelete(cache: Cache) + companion object { /** key-value */ suspend fun CacheService.set( diff --git a/cache/src/main/kotlin/com/oksusu/susu/cache/service/SuspendableCacheService.kt b/cache/src/main/kotlin/com/oksusu/susu/cache/service/SuspendableCacheService.kt index 3b5af6e8..24bc5356 100644 --- a/cache/src/main/kotlin/com/oksusu/susu/cache/service/SuspendableCacheService.kt +++ b/cache/src/main/kotlin/com/oksusu/susu/cache/service/SuspendableCacheService.kt @@ -4,6 +4,9 @@ import com.oksusu.susu.cache.key.Cache import com.oksusu.susu.common.extension.mapper import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.* +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import kotlinx.coroutines.slf4j.MDCContext @@ -16,6 +19,7 @@ class SuspendableCacheService( ) : CacheService { private val logger = KotlinLogging.logger { } private val keyValueOps = reactiveStringRedisTemplate.opsForValue() + private val setOps = reactiveStringRedisTemplate.opsForSet() override suspend fun set(cache: Cache, value: VALUE_TYPE) { coroutineScope { @@ -66,4 +70,49 @@ class SuspendableCacheService( } }.getOrNull() } + + override suspend fun sSet(cache: Cache, value: VALUE_TYPE) { + coroutineScope { + launch(Dispatchers.IO + Job() + MDCContext()) { + runCatching { + setOps.add( + cache.key, + mapper.writeValueAsString(value) + ).cache().awaitSingleOrNull() + }.onFailure { e -> + when (e) { + is CancellationException -> logger.debug { "Redis Set job cancelled." } + else -> logger.error(e) { "fail to set data from redis. key : ${cache.key}" } + } + }.getOrNull() + } + } + } + + override suspend fun sGetMembers(cache: Cache): List { + return runCatching { + setOps.members(cache.key) + .cache() + .asFlow() + .map { mapper.readValue(it, cache.type) } + .toList() + }.onFailure { e -> + when (e) { + is CancellationException -> logger.debug { "Redis Read job cancelled." } + else -> logger.error(e) { "fail to read data from redis. key : ${cache.key}" } + } + }.getOrNull() ?: emptyList() + } + + override suspend fun sDelete(cache: Cache) { + runCatching { + setOps.delete(cache.key) + .awaitSingle() + }.onFailure { e -> + when (e) { + is CancellationException -> logger.debug { "Redis Delete job cancelled." } + else -> logger.error(e) { "fail to delete data from redis. key : ${cache.key}" } + } + }.getOrNull() + } } diff --git a/client/src/main/kotlin/com/oksusu/susu/client/config/SlackClientConfig.kt b/client/src/main/kotlin/com/oksusu/susu/client/config/SlackClientConfig.kt index 422810e8..e444d449 100644 --- a/client/src/main/kotlin/com/oksusu/susu/client/config/SlackClientConfig.kt +++ b/client/src/main/kotlin/com/oksusu/susu/client/config/SlackClientConfig.kt @@ -1,5 +1,6 @@ package com.oksusu.susu.client.config +import com.oksusu.susu.cache.service.CacheService import com.oksusu.susu.client.WebClientFactory import com.oksusu.susu.client.slack.SlackClient import com.oksusu.susu.client.slack.SuspendableSlackClient @@ -10,6 +11,7 @@ import org.springframework.context.annotation.Configuration @Configuration class SlackClientConfig( private val webhookConfig: SlackConfig.SlackWebhookConfig, + private val cacheService: CacheService, ) { private val logger = KotlinLogging.logger {} @@ -21,6 +23,6 @@ class SlackClientConfig( fun slackClient(): SlackClient { val webClient = WebClientFactory.generate(baseUrl = SLACK_WEBHOOKS_DOMAIN) logger.info { "initialized slack client" } - return SuspendableSlackClient(webClient, webhookConfig) + return SuspendableSlackClient(webClient, webhookConfig, cacheService) } } diff --git a/client/src/main/kotlin/com/oksusu/susu/client/slack/SlackClient.kt b/client/src/main/kotlin/com/oksusu/susu/client/slack/SlackClient.kt index aaa3abe0..99990065 100644 --- a/client/src/main/kotlin/com/oksusu/susu/client/slack/SlackClient.kt +++ b/client/src/main/kotlin/com/oksusu/susu/client/slack/SlackClient.kt @@ -5,4 +5,5 @@ import com.oksusu.susu.client.slack.model.SlackMessageModel interface SlackClient { suspend fun sendSummary(message: SlackMessageModel): String suspend fun sendError(message: SlackMessageModel): String + suspend fun sendMessage(message: SlackMessageModel, token: String, withRecover: Boolean = true): String } diff --git a/client/src/main/kotlin/com/oksusu/susu/client/slack/SuspendableSlackClient.kt b/client/src/main/kotlin/com/oksusu/susu/client/slack/SuspendableSlackClient.kt index 0b9e021d..71e81542 100644 --- a/client/src/main/kotlin/com/oksusu/susu/client/slack/SuspendableSlackClient.kt +++ b/client/src/main/kotlin/com/oksusu/susu/client/slack/SuspendableSlackClient.kt @@ -1,17 +1,28 @@ package com.oksusu.susu.client.slack +import com.oksusu.susu.cache.key.Cache +import com.oksusu.susu.cache.model.FailedSentSlackMessageCache +import com.oksusu.susu.cache.service.CacheService import com.oksusu.susu.client.config.SlackConfig import com.oksusu.susu.client.slack.model.SlackMessageModel +import com.oksusu.susu.common.extension.awaitSingleOrThrow import com.oksusu.susu.common.extension.withMDCContext +import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.Dispatchers import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.awaitBody +import org.springframework.web.reactive.function.client.bodyToMono +import reactor.util.retry.Retry +import java.time.Duration +import java.time.LocalDateTime class SuspendableSlackClient( private val webclient: WebClient, private val slackWebhookConfig: SlackConfig.SlackWebhookConfig, + private val cacheService: CacheService, ) : SlackClient { + private val logger = KotlinLogging.logger { } + override suspend fun sendSummary(message: SlackMessageModel): String { return sendMessage(message, slackWebhookConfig.summaryToken) } @@ -20,15 +31,31 @@ class SuspendableSlackClient( return sendMessage(message, slackWebhookConfig.errorToken) } - private suspend fun sendMessage(message: SlackMessageModel, token: String): String { - return withMDCContext(Dispatchers.IO) { - webclient - .post() - .uri("/$token") - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(message) - .retrieve() - .awaitBody() - } + override suspend fun sendMessage(message: SlackMessageModel, token: String, withRecover: Boolean): String { + return runCatching { + withMDCContext(Dispatchers.IO) { + webclient.post() + .uri("/${token}aasd") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue(message) + .retrieve() + .bodyToMono() + .retryWhen(Retry.fixedDelay(2, Duration.ofMillis(500))) + .awaitSingleOrThrow() + } + }.onFailure { + if (withRecover) { + recoverSendMessage(message, token) + } + }.getOrThrow() + } + + private suspend fun recoverSendMessage(message: SlackMessageModel, token: String) { + val model = FailedSentSlackMessageCache( + token = token, + message = message.text + ) + + cacheService.sSet(Cache.getFailedSentSlackMessageCache(LocalDateTime.now()), model) } } diff --git a/common/src/main/kotlin/com/oksusu/susu/common/consts/SusuConsts.kt b/common/src/main/kotlin/com/oksusu/susu/common/consts/SusuConsts.kt index d88830d1..554a3f5f 100644 --- a/common/src/main/kotlin/com/oksusu/susu/common/consts/SusuConsts.kt +++ b/common/src/main/kotlin/com/oksusu/susu/common/consts/SusuConsts.kt @@ -26,6 +26,8 @@ const val USER_COMMUNITY_PUNISHED_COUNT_KEY = "user_community_punished_count_key const val SUSU_ENVELOPE_STATISTIC_AMOUNT_KEY = "susu_envelope_statistic_amount" +const val FAILED_SENT_SLACK_MESSAGE_KEY = "failed_sent_slack_message_key" + const val KID = "kid" const val MDC_KEY_TRACE_ID = "traceId" diff --git a/domain/src/main/kotlin/com/oksusu/susu/domain/report/infrastructure/ReportResultRepository.kt b/domain/src/main/kotlin/com/oksusu/susu/domain/report/infrastructure/ReportResultRepository.kt index 2809a7b1..3251330a 100644 --- a/domain/src/main/kotlin/com/oksusu/susu/domain/report/infrastructure/ReportResultRepository.kt +++ b/domain/src/main/kotlin/com/oksusu/susu/domain/report/infrastructure/ReportResultRepository.kt @@ -11,4 +11,3 @@ import java.time.LocalDateTime interface ReportResultRepository : JpaRepository, ReportResultQRepository { fun findAllByCreatedAtBetween(from: LocalDateTime, to: LocalDateTime): List } -