Skip to content

Commit

Permalink
Allow specifying dispatcher for KafkaConsumerSubscriptionHealthCheck
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Jul 9, 2023
1 parent 88aa3e4 commit ced1f71
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.sksamuel.cohort.kafka

import com.sksamuel.cohort.HealthCheck
import com.sksamuel.cohort.HealthCheckResult
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.withContext
import org.apache.kafka.clients.consumer.KafkaConsumer

/**
Expand All @@ -13,30 +15,36 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
*
* This check reports healthy if the consumer is subscribed to all the provided topics, or if no
* topics are provided, then is subscribed to at least one topic.
*
* @param dispatcher specify the dispatcher to run the consumer.subscription call on. This should be the same
* single threaded dispatcher used elsewhere for operations on your consumer.
*/
class KafkaConsumerSubscriptionHealthCheck(
private val consumer: KafkaConsumer<*, *>,
private val dispatcher: CoroutineDispatcher?,
private val topics: Set<String>
) : HealthCheck {

/**
* Create the health check requiring this consumer to be subscribed to at least one topic, but
* without specifying what topic that is.
*/
constructor(consumer: KafkaConsumer<*, *>) : this(consumer, emptySet())
constructor(consumer: KafkaConsumer<*, *>) : this(consumer, null, emptySet())

constructor(consumer: KafkaConsumer<*, *>, dispatcher: CoroutineDispatcher) : this(consumer, dispatcher, emptySet())

override val name: String = "kafka_consumer_subscription"

override suspend fun check(): HealthCheckResult {
val subs = consumer.subscription()
override suspend fun check(): HealthCheckResult = runCatching {
val sub = if (dispatcher == null) consumer.subscription() else withContext(dispatcher) { consumer.subscription() }
val healthy = when {
topics.isEmpty() -> subs.isNotEmpty()
else -> subs.toSet().intersect(topics) == topics
topics.isEmpty() -> sub.isNotEmpty()
else -> sub.toSet().intersect(topics) == topics
}
val msg = "Kafka consumer is subscribed to ${subs.size} topics"
val msg = "Kafka consumer is subscribed to ${sub.size} topics"
return when (healthy) {
true -> HealthCheckResult.healthy(msg)
false -> HealthCheckResult.unhealthy(msg)
}
}
}.getOrElse { HealthCheckResult.unhealthy("Error fetching kafka subscriptions: ${it.message}") }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.sksamuel.cohort.kafka

import com.sksamuel.cohort.HealthStatus
import io.kotest.assertions.timing.continually
import io.kotest.assertions.timing.eventually
import io.kotest.core.extensions.install
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.testcontainers.kafka.KafkaContainerExtension
import io.kotest.extensions.testcontainers.kafka.admin
import io.kotest.extensions.testcontainers.kafka.consumer
import io.kotest.extensions.testcontainers.kafka.producer
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.utils.Bytes
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import java.time.Duration
import java.util.concurrent.Executors
import kotlin.time.Duration.Companion.seconds

class KafkaConsumerSubscriptionHealthCheckTest : FunSpec({

val kafka = install(KafkaContainerExtension(KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"))))

test("health check should pass while the consumer is subscribed") {

kafka.admin().createTopics(listOf(NewTopic("topic1234", 1, 1))).all().get()
val consumer = kafka.consumer()
consumer.subscribe(listOf("topic1234"))

val healthcheck = KafkaConsumerSubscriptionHealthCheck(consumer)
healthcheck.check().status shouldBe HealthStatus.Healthy

consumer.close()
healthcheck.check().status shouldBe HealthStatus.Unhealthy
}

test("health check should support running on the kafka thread") {

kafka.admin().createTopics(listOf(NewTopic("topic4412", 1, 1))).all().get()
val consumer = kafka.consumer()
kafka.producer().use {
it.send(ProducerRecord("topic4412", Bytes.wrap(byteArrayOf()), Bytes.wrap(byteArrayOf())))
}

consumer.subscribe(listOf("topic4412"))

val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val job = launch(dispatcher) {
while (isActive) {
consumer.poll(Duration.ofMillis(100)) // keep a lock on the consumer
delay(100)
}
}
job.invokeOnCompletion { consumer.close() }

val healthcheck = KafkaConsumerSubscriptionHealthCheck(consumer, dispatcher)
continually(3.seconds) {
healthcheck.check().status shouldBe HealthStatus.Healthy
}

job.cancel()

eventually(3.seconds) {
healthcheck.check().status shouldBe HealthStatus.Unhealthy
}
}

})
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.apache.kafka.common.utils.Bytes
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import java.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

class KafkaLastPollHealthCheckTest : FunSpec({
Expand Down

0 comments on commit ced1f71

Please sign in to comment.