Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposing native metrics of multiple clients in the same JVM #237

Open
Z1kkurat opened this issue Jan 30, 2023 · 3 comments
Open

Exposing native metrics of multiple clients in the same JVM #237

Z1kkurat opened this issue Jan 30, 2023 · 3 comments

Comments

@Z1kkurat
Copy link
Contributor

Z1kkurat commented Jan 30, 2023

The current implementation of the Prometheus collector for the native Kafka client (producer/consumer) metrics (via Producer#clientMetrics) can be inconvenient to use in case there are multiple consumers/producers in the same application, initialized within the scope of different modules. Let me illustrate:

val producerOf = ProducerOf.apply1[IO](..., ...)
for {
  // A.scala
  producer1 <- producerOf.apply(config.copy(common = config.common.copy(clientId = Some("producer1"))))
  collector1 = new KafkaMetricsCollector[IO](producer1.clientMetrics, None)

  // B.scala
  producer2 <- producerOf.apply(config.copy(common = config.common.copy(clientId = Some("producer2"))))
  collector2 = new KafkaMetricsCollector[IO](producer2.clientMetrics, None)

  _ = registry.register(collector1)
  _ = registry.register(collector2)
} yield ()

This code initializes two producers and registers their metrics. If we assume they are initialized within encapsulated code of different modules (files) this would probably be how we'd do it without exposing the newly created client from the encapsulated code. Unfortunately, this fails with java.lang.IllegalArgumentException:

java.lang.IllegalArgumentException: Collector already registered that provides name: producer_node_metrics_incoming_byte_total
	at io.prometheus.client.CollectorRegistry.register(CollectorRegistry.java:54)
	at com.FailingExample$.$anonfun$run$3(KafkaCollectorTest.scala:40)
	at com.FailingExample$.$anonfun$run$3$adapted(KafkaCollectorTest.scala:36)
	at apply @ com.evolutiongaming.kafka.journal.execution.ThreadPoolOf$.apply(ThreadPoolOf.scala:34)
	at delay @ com.evolutiongaming.skafka.producer.CreateProducerJ$.apply(CreateProducerJ.scala:13)

There's a possible workaround that partially resolves the issue:

for {
  producer1 <- producerOf.apply(config.copy(common = config.common.copy(clientId = Some("producer1"))))
  producer2 <- producerOf.apply(config.copy(common = config.common.copy(clientId = Some("producer2"))))
  allMetrics: IO[Seq[ClientMetric[IO]]] = for {
    metrics1 <- producer1.clientMetrics
    metrics2 <- producer2.clientMetrics
  } yield metrics1 ++ metrics2
  collector = new KafkaMetricsCollector[IO](allMetrics, None)
  _ = registry.register(collector)
} yield ()

This works, but it requires initializing all clients within the same scope which can be inconvenient in terms of code encapsulation.

I propose implementing a new abstraction over multiple clients returning metrics that keeps track of functions producing metrics for each "registered" client.

trait KafkaMetricsRegistry[F[_]] {
  /**
    * Register a function to obtain a list of client metrics.
    * Normally, you would pass [[com.evolutiongaming.skafka.consumer.Consumer.clientMetrics]] or
    * [[com.evolutiongaming.skafka.producer.Producer.clientMetrics]]
    * @return synthetic ID of registered function
    */
  def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, UUID]

  /** Collect metrics from all registered functions */
  def collectAll: F[Seq[ClientMetric[F]]]
}

The usage would be as follows:

val kafkaMetricsRegistry: KafkaMetricsRegistry[F] = ...
val producerOf: ProducerOf[F] = ...
for {
  // A.scala
  producer1 <- producerOf.apply(config.copy(common = config.common.copy(clientId = Some("producer1.test"))))
  _ <- kafkaMetricsRegistry.register(producer1.clientMetrics)

  // B.scala
  producer2 <- producerOf.apply(config.copy(common = config.common.copy(clientId = Some("producer2.test"))))
  _ <- kafkaMetricsRegistry.register(producer2.clientMetrics)

  // C.scala
  // Function to collect all metrics from registered consumers/producers
  allMetrics: IO[Seq[ClientMetric[IO]]] = kafkaMetricsRegistry.collectAll
  
  // Single Prometheus collector
  collector = new KafkaMetricsCollector[IO](allMetrics, None)

  _ = registry.register(collector)
} yield ()

This works as expected with a simple Ref-based implementation, producing metrics like the following:

# HELP producer_metrics_io_time_ns_avg The average length of time for I/O per select call in nanoseconds.
# TYPE producer_metrics_io_time_ns_avg gauge
producer_metrics_io_time_ns_avg{client_id="producer1.test",} 1323625.0
producer_metrics_io_time_ns_avg{client_id="producer2.test",} 596583.0

When client.id is not defined it looks a bit more confusing with both approaches (old workaround and a new proposed one), resulting in duplicate metrics, but it's still better than an exception:

# HELP producer_metrics_io_wait_time_ns_avg The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.
# TYPE producer_metrics_io_wait_time_ns_avg gauge
producer_metrics_io_wait_time_ns_avg{client_id="",} 328833.0
producer_metrics_io_wait_time_ns_avg{client_id="",} 50750.0
@t3hnar
Copy link
Contributor

t3hnar commented Jan 30, 2023

@Z1kkurat why not just pass ProducerOf from outside?
which will attach metrics?

Other option: passing ProducerMetrics as semi-static thing around.

@Z1kkurat
Copy link
Contributor Author

why not just pass ProducerOf from outside?

I don't think I understand the idea, can you share an example? In the first snippet of code ProducerOf is passed from the "outside". If this ProducerOf would try to create an instance of KafkaMetricsCollector when a producer is created and register it immediately after, it would fail with the same error as in the first example because that code would try to register multiple Prometheus collectors containing metrics with the same name which is prohibited

Other option: passing ProducerMetrics as semi-static thing around.

Please elaborate, what do you mean by "semi-static"?

@Z1kkurat
Copy link
Contributor Author

@t3hnar just for clarification -- I'm talking about native metrics exposed by the Kafka clients themselves Consumer#clientMetrics, not the ones we measure in the library

@Z1kkurat Z1kkurat changed the title Exposing metrics of multiple clients in the same JVM Exposing native metrics of multiple clients in the same JVM Feb 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants