Skip to content

Commit

Permalink
Merge pull request #6 from varwise/jjag/update-prometheus-client
Browse files Browse the repository at this point in the history
Update prometheus client to 1.3.1
  • Loading branch information
wlk authored May 27, 2024
2 parents 4079d1b + 20411fd commit 81d10fd
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 116 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ Collection of utilities to allow exposing prometheus metrics from pekko-http end

[prometheus-akka-http](https://github.com/varwise/prometheus-akka-http) - same thing but for akka-http

### Migration to 2.0.0

Version 2.0.0 uses Prometheus Java client library 1.0.0 which introduced some breaking changes.

Here's the quick migration guide:
1. Replace occurrences of `io.prometheus.client.CollectorRegistry` with `io.prometheus.metrics.model.registry.PrometheusRegistry`.
2. Supply `ExecutionContext` instance to `MetricsEndpoint`.

For more information about Prometheus Java client library please refer to https://prometheus.github.io/client_java/.
### Publishing

Artifacts are published to Maven central using the [sbt-sonatype](https://github.com/xerial/sbt-sonatype) plugin
Expand Down
24 changes: 13 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ organization := "com.varwise"

publishTo := sonatypePublishToBundle.value

version := "1.0.1"
version := "2.0.0"

val scala2Version = "2.13.13"
val scala3Version = "3.3.3"
Expand All @@ -15,21 +15,23 @@ crossScalaVersions := Seq(scala3Version, scala2Version)
credentials += Credentials(Path.userHome / ".sbt" / ".credentials")

libraryDependencies ++= {
val simpleclientVersion = "0.16.0"
val prometheusJavaClientVersion = "1.3.1"
val pekkoVersion = "1.0.2"
val pekkoHttpVersion = "1.0.1"
val scalaTestVersion = "3.2.18"
val slf4jVersion = "2.0.13"

Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoVersion % Provided,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion % Provided,
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % Provided,
"org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion % Provided,
"io.prometheus" % "simpleclient" % simpleclientVersion,
"io.prometheus" % "simpleclient_common" % simpleclientVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test,
"org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
"org.apache.pekko" %% "pekko-actor" % pekkoVersion % Provided,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion % Provided,
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % Provided,
"org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion % Provided,
"io.prometheus" % "prometheus-metrics-core" % prometheusJavaClientVersion,
"io.prometheus" % "prometheus-metrics-exposition-formats" % prometheusJavaClientVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test,
"org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.varwise.pekko.http.prometheus

import io.prometheus.client.{CollectorRegistry, Counter}
import io.prometheus.metrics.core.metrics.Counter
import io.prometheus.metrics.model.registry.PrometheusRegistry

trait EventObserver {
def observe(eventName: String, eventDetails: String): Unit
Expand All @@ -24,20 +25,19 @@ class PrometheusEventObserver(
metricHelp: String,
eventLabelName: String,
eventDetailsLabelName: String,
registry: CollectorRegistry)
registry: PrometheusRegistry)
extends EventObserver {

val counter: Counter = buildCounter.register(registry)

private def buildCounter: Counter.Builder =
private val counter: Counter =
Counter
.build()
.builder()
.name(metricName)
.help(metricHelp)
.labelNames(eventLabelName, eventDetailsLabelName)
.register(registry)

override def observe(eventName: String, eventDetails: String): Unit =
counter.labels(eventName, eventDetails).inc()
counter.labelValues(eventName, eventDetails).inc()
}

object PrometheusEventObserver {
Expand All @@ -47,7 +47,7 @@ object PrometheusEventObserver {
private val FailedOperationMetricHelp = "The number of observed failed operations"
private val DefaultEventLabelName = "event"
private val DefaultEventDetailsLabelName = "details"
private val DefaultRegistry = CollectorRegistry.defaultRegistry
private val DefaultRegistry = PrometheusRegistry.defaultRegistry

// Common event observers used in scala projects in Open Planet micro-services
lazy val SuccessfulOperations: PrometheusEventObserver =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.varwise.pekko.http.prometheus

import io.prometheus.client.{CollectorRegistry, Histogram}
import io.prometheus.metrics.core.metrics.Histogram
import io.prometheus.metrics.model.registry.PrometheusRegistry

import scala.concurrent.duration
import scala.concurrent.duration.{FiniteDuration, TimeUnit}
Expand Down Expand Up @@ -30,23 +31,21 @@ class PrometheusResponseTimeRecorder(
metricHelp: String,
buckets: List[Double],
endpointLabelName: String,
registry: CollectorRegistry,
registry: PrometheusRegistry,
timeUnit: TimeUnit)
extends ResponseTimeRecorder {

private val responseTimes = buildHistogram.register(registry)

override def recordResponseTime(endpoint: String, responseTime: FiniteDuration): Unit =
responseTimes.labels(endpoint).observe(responseTime.toUnit(timeUnit))

private def buildHistogram: Histogram.Builder =
private val responseTimes: Histogram =
Histogram
.build()
.builder()
.name(metricName)
.help(metricHelp)
.labelNames(endpointLabelName)
.buckets(buckets: _*)
.classicUpperBounds(buckets: _*)
.register(registry)

override def recordResponseTime(endpoint: String, responseTime: FiniteDuration): Unit =
responseTimes.labelValues(endpoint).observe(responseTime.toUnit(timeUnit))
}

object PrometheusResponseTimeRecorder {
Expand All @@ -58,7 +57,7 @@ object PrometheusResponseTimeRecorder {
val DefaultEndpointLabel = "endpoint"
val DefaultTimeUnit: TimeUnit = duration.SECONDS

lazy val DefaultRegistry: CollectorRegistry = CollectorRegistry.defaultRegistry
lazy val DefaultRegistry: PrometheusRegistry = PrometheusRegistry.defaultRegistry

lazy val Default: PrometheusResponseTimeRecorder =
new PrometheusResponseTimeRecorder(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,38 @@
package com.varwise.pekko.http.prometheus.api

import io.prometheus.metrics.expositionformats.ExpositionFormats
import io.prometheus.metrics.model.registry.PrometheusRegistry
import org.apache.pekko.http.scaladsl.model.{HttpCharsets, HttpEntity, MediaType}
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.server.Route
import io.prometheus.client.CollectorRegistry
import org.apache.pekko.stream.scaladsl.StreamConverters
import org.slf4j.LoggerFactory

class MetricsEndpoint(registry: CollectorRegistry) {
import java.io.{PipedInputStream, PipedOutputStream}
import scala.concurrent.{ExecutionContext, Future}

class MetricsEndpoint(registry: PrometheusRegistry)(implicit ec: ExecutionContext) {

private val logger = LoggerFactory.getLogger(classOf[MetricsEndpoint])
private val prometheusTextType =
MediaType.customWithFixedCharset("text", "plain", HttpCharsets.`UTF-8`, params = Map("version" -> "0.0.4"))
private val writer = ExpositionFormats.init().getPrometheusTextFormatWriter()

val routes: Route =
get {
path("metrics") {
complete {
MetricFamilySamplesEntity.fromRegistry(registry)
val in = new PipedInputStream
val out = new PipedOutputStream(in)
val byteSource = StreamConverters.fromInputStream(() => in)
val f = Future {
try
writer.write(out, registry.scrape())
finally
out.close()
}
f.failed.foreach(e => logger.error("Error when writing Prometheus metrics", e))
HttpEntity(prometheusTextType, byteSource)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.varwise.pekko.http.prometheus

import com.varwise.pekko.http.prometheus.Utils._
import io.prometheus.client.CollectorRegistry
import io.prometheus.metrics.model.registry.PrometheusRegistry
import io.prometheus.metrics.model.snapshots.{CounterSnapshot, Labels}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters.IterableHasAsScala

class PrometheusEventObserverSpec extends AnyFlatSpec with Matchers {

"PrometheusEventObserver" should "record observed events in a counter" in {
val registry = new CollectorRegistry()
val registry = new PrometheusRegistry()
val randomMetricName = generateRandomString
val randomMetricHelp = generateRandomString
val randomEventLabelName = generateRandomString
Expand All @@ -25,19 +28,29 @@ class PrometheusEventObserverSpec extends AnyFlatSpec with Matchers {
registry
)

def getCounterValue: java.lang.Double =
registry.getSampleValue(
randomMetricName + "_total",
Array(randomEventLabelName, randomEventDetailsLabelName),
Array(randomEventName, randomEventDetails)
def getCounterValue(): Option[Double] = {
val labels = Labels.of(
randomEventLabelName,
randomEventName,
randomEventDetailsLabelName,
randomEventDetails
)

getCounterValue shouldBe null
registry
.scrape()
.asScala
.collect { case counter: CounterSnapshot => counter }
.filter(_.getMetadata.getName == randomMetricName)
.flatMap(_.getDataPoints.asScala)
.filter(_.getLabels.equals(labels))
.map(_.getValue)
.headOption
}

getCounterValue() shouldBe None

eventObserver.observe(randomEventName, randomEventDetails)

getCounterValue should not be null
getCounterValue.intValue() shouldBe 1

getCounterValue() should not be None
getCounterValue().get.intValue() shouldBe 1
}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
package com.varwise.pekko.http.prometheus

import com.varwise.pekko.http.prometheus.Utils._
import io.prometheus.client.{Collector, CollectorRegistry}
import io.prometheus.metrics.model.registry.PrometheusRegistry
import io.prometheus.metrics.model.snapshots.{HistogramSnapshot, Labels}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.util.Random

class PrometheusResponseTimeRecorderSpec extends AnyFlatSpec with Matchers {

private def getBucketValue(
registry: CollectorRegistry,
registry: PrometheusRegistry,
metricName: String,
labelNames: List[String],
labelValues: List[String],
labels: Labels,
bucket: Double
): Int = {
val name = metricName + "_bucket"
): Long =
registry
.scrape()
.asScala
.collect { case metric: HistogramSnapshot => metric }
.filter(_.getMetadata.getName == metricName)
.flatMap(_.getDataPoints.asScala)
.filter(_.getLabels.equals(labels))
.flatMap(_.getClassicBuckets.asScala)
.find(_.getUpperBound == bucket)
.get
.getCount

// 'le' should be the first label in the list
val allLabelNames = (Array("le") ++ labelNames).reverse
val allLabelValues = (Array(Collector.doubleToGoString(bucket)) ++ labelValues).reverse
registry.getSampleValue(name, allLabelNames, allLabelValues).intValue()
}

"PrometheusLatencyRecorder" should "register a histogram and record request latencies" in {
val registry = new CollectorRegistry()
"PrometheusResponseTimeRecorder" should "register a histogram and record request latencies" in {
val registry = new PrometheusRegistry()
val randomMetricName = generateRandomString
val randomMetricHelp = generateRandomString
val randomLabelName = generateRandomString
Expand All @@ -48,20 +53,14 @@ class PrometheusResponseTimeRecorderSpec extends AnyFlatSpec with Matchers {

recorder.recordResponseTime(randomEndpointName, FiniteDuration(randomLatency, duration.MILLISECONDS))

val first =
getBucketValue(registry, randomMetricName, List(randomLabelName), List(randomEndpointName), buckets.head)
val second =
getBucketValue(registry, randomMetricName, List(randomLabelName), List(randomEndpointName), buckets.last)
val positiveInf = getBucketValue(
registry,
randomMetricName,
List(randomLabelName),
List(randomEndpointName),
Double.PositiveInfinity
)
val labels = Labels.of(randomLabelName, randomEndpointName)

val first = getBucketValue(registry, randomMetricName, labels, buckets.head)
val second = getBucketValue(registry, randomMetricName, labels, buckets.last)
val positiveInf = getBucketValue(registry, randomMetricName, labels, Double.PositiveInfinity)

first shouldBe 0
second shouldBe 1
positiveInf shouldBe 1
positiveInf shouldBe 0
}
}
Loading

0 comments on commit 81d10fd

Please sign in to comment.