diff --git a/README.md b/README.md index 53a08bc..7d4b138 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,15 @@ Collection of utilities to allow exposing prometheus metrics from pekko-http end [prometheus-akka-http](https://github.com/varwise/prometheus-akka-http) - same thing but for akka-http +### Migration to 2.0.0 + +Version 2.0.0 uses Prometheus Java client library 1.0.0 which introduced some breaking changes. + +Here's the quick migration guide: +1. Replace occurrences of `io.prometheus.client.CollectorRegistry` with `io.prometheus.metrics.model.registry.PrometheusRegistry`. +2. Supply `ExecutionContext` instance to `MetricsEndpoint`. + +For more information about Prometheus Java client library please refer to https://prometheus.github.io/client_java/. ### Publishing Artifacts are published to Maven central using the [sbt-sonatype](https://github.com/xerial/sbt-sonatype) plugin diff --git a/build.sbt b/build.sbt index f85cac1..fe27244 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ organization := "com.varwise" publishTo := sonatypePublishToBundle.value -version := "1.0.1" +version := "2.0.0" val scala2Version = "2.13.13" val scala3Version = "3.3.3" @@ -15,21 +15,23 @@ crossScalaVersions := Seq(scala3Version, scala2Version) credentials += Credentials(Path.userHome / ".sbt" / ".credentials") libraryDependencies ++= { - val simpleclientVersion = "0.16.0" + val prometheusJavaClientVersion = "1.3.1" val pekkoVersion = "1.0.2" val pekkoHttpVersion = "1.0.1" val scalaTestVersion = "3.2.18" + val slf4jVersion = "2.0.13" Seq( - "org.apache.pekko" %% "pekko-actor" % pekkoVersion % Provided, - "org.apache.pekko" %% "pekko-stream" % pekkoVersion % Provided, - "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % Provided, - "org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion % Provided, - "io.prometheus" % "simpleclient" % simpleclientVersion, - "io.prometheus" % "simpleclient_common" % simpleclientVersion, - "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test, - "org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % Test, - "org.scalatest" %% "scalatest" % scalaTestVersion % Test + "org.apache.pekko" %% "pekko-actor" % pekkoVersion % Provided, + "org.apache.pekko" %% "pekko-stream" % pekkoVersion % Provided, + "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % Provided, + "org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion % Provided, + "io.prometheus" % "prometheus-metrics-core" % prometheusJavaClientVersion, + "io.prometheus" % "prometheus-metrics-exposition-formats" % prometheusJavaClientVersion, + "org.slf4j" % "slf4j-api" % slf4jVersion, + "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test, + "org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % Test, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test ) } diff --git a/src/main/scala/com/varwise/pekko/http/prometheus/EventObserver.scala b/src/main/scala/com/varwise/pekko/http/prometheus/EventObserver.scala index 93d08f3..19985b1 100644 --- a/src/main/scala/com/varwise/pekko/http/prometheus/EventObserver.scala +++ b/src/main/scala/com/varwise/pekko/http/prometheus/EventObserver.scala @@ -1,6 +1,7 @@ package com.varwise.pekko.http.prometheus -import io.prometheus.client.{CollectorRegistry, Counter} +import io.prometheus.metrics.core.metrics.Counter +import io.prometheus.metrics.model.registry.PrometheusRegistry trait EventObserver { def observe(eventName: String, eventDetails: String): Unit @@ -24,20 +25,19 @@ class PrometheusEventObserver( metricHelp: String, eventLabelName: String, eventDetailsLabelName: String, - registry: CollectorRegistry) + registry: PrometheusRegistry) extends EventObserver { - val counter: Counter = buildCounter.register(registry) - - private def buildCounter: Counter.Builder = + private val counter: Counter = Counter - .build() + .builder() .name(metricName) .help(metricHelp) .labelNames(eventLabelName, eventDetailsLabelName) + .register(registry) override def observe(eventName: String, eventDetails: String): Unit = - counter.labels(eventName, eventDetails).inc() + counter.labelValues(eventName, eventDetails).inc() } object PrometheusEventObserver { @@ -47,7 +47,7 @@ object PrometheusEventObserver { private val FailedOperationMetricHelp = "The number of observed failed operations" private val DefaultEventLabelName = "event" private val DefaultEventDetailsLabelName = "details" - private val DefaultRegistry = CollectorRegistry.defaultRegistry + private val DefaultRegistry = PrometheusRegistry.defaultRegistry // Common event observers used in scala projects in Open Planet micro-services lazy val SuccessfulOperations: PrometheusEventObserver = diff --git a/src/main/scala/com/varwise/pekko/http/prometheus/ResponseTimeRecorder.scala b/src/main/scala/com/varwise/pekko/http/prometheus/ResponseTimeRecorder.scala index eeb47e3..8585066 100644 --- a/src/main/scala/com/varwise/pekko/http/prometheus/ResponseTimeRecorder.scala +++ b/src/main/scala/com/varwise/pekko/http/prometheus/ResponseTimeRecorder.scala @@ -1,6 +1,7 @@ package com.varwise.pekko.http.prometheus -import io.prometheus.client.{CollectorRegistry, Histogram} +import io.prometheus.metrics.core.metrics.Histogram +import io.prometheus.metrics.model.registry.PrometheusRegistry import scala.concurrent.duration import scala.concurrent.duration.{FiniteDuration, TimeUnit} @@ -30,23 +31,21 @@ class PrometheusResponseTimeRecorder( metricHelp: String, buckets: List[Double], endpointLabelName: String, - registry: CollectorRegistry, + registry: PrometheusRegistry, timeUnit: TimeUnit) extends ResponseTimeRecorder { - private val responseTimes = buildHistogram.register(registry) - - override def recordResponseTime(endpoint: String, responseTime: FiniteDuration): Unit = - responseTimes.labels(endpoint).observe(responseTime.toUnit(timeUnit)) - - private def buildHistogram: Histogram.Builder = + private val responseTimes: Histogram = Histogram - .build() + .builder() .name(metricName) .help(metricHelp) .labelNames(endpointLabelName) - .buckets(buckets: _*) + .classicUpperBounds(buckets: _*) + .register(registry) + override def recordResponseTime(endpoint: String, responseTime: FiniteDuration): Unit = + responseTimes.labelValues(endpoint).observe(responseTime.toUnit(timeUnit)) } object PrometheusResponseTimeRecorder { @@ -58,7 +57,7 @@ object PrometheusResponseTimeRecorder { val DefaultEndpointLabel = "endpoint" val DefaultTimeUnit: TimeUnit = duration.SECONDS - lazy val DefaultRegistry: CollectorRegistry = CollectorRegistry.defaultRegistry + lazy val DefaultRegistry: PrometheusRegistry = PrometheusRegistry.defaultRegistry lazy val Default: PrometheusResponseTimeRecorder = new PrometheusResponseTimeRecorder( diff --git a/src/main/scala/com/varwise/pekko/http/prometheus/api/MetricFamilySamplesEntity.scala b/src/main/scala/com/varwise/pekko/http/prometheus/api/MetricFamilySamplesEntity.scala deleted file mode 100644 index 9e57661..0000000 --- a/src/main/scala/com/varwise/pekko/http/prometheus/api/MetricFamilySamplesEntity.scala +++ /dev/null @@ -1,33 +0,0 @@ -package com.varwise.pekko.http.prometheus.api - -import org.apache.pekko.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller} -import org.apache.pekko.http.scaladsl.model._ -import io.prometheus.client.Collector.MetricFamilySamples -import io.prometheus.client.CollectorRegistry -import io.prometheus.client.exporter.common.TextFormat - -import java.io.{StringWriter, Writer} -import java.util - -case class MetricFamilySamplesEntity(samples: util.Enumeration[MetricFamilySamples]) - -object MetricFamilySamplesEntity { - private val mediaTypeParams = Map("version" -> "0.0.4") - - private val mediaType = - MediaType.customWithFixedCharset("text", "plain", HttpCharsets.`UTF-8`, params = mediaTypeParams) - - def fromRegistry(collectorRegistry: CollectorRegistry): MetricFamilySamplesEntity = - MetricFamilySamplesEntity(collectorRegistry.metricFamilySamples()) - - def toPrometheusTextFormat(e: MetricFamilySamplesEntity): String = { - val writer: Writer = new StringWriter() - TextFormat.write004(writer, e.samples) - - writer.toString - } - - implicit val metricsFamilySamplesMarshaller: ToEntityMarshaller[MetricFamilySamplesEntity] = - Marshaller.withFixedContentType(mediaType)(s => HttpEntity(mediaType, toPrometheusTextFormat(s))) - -} diff --git a/src/main/scala/com/varwise/pekko/http/prometheus/api/MetricsEndpoint.scala b/src/main/scala/com/varwise/pekko/http/prometheus/api/MetricsEndpoint.scala index 7f77854..20d0f12 100644 --- a/src/main/scala/com/varwise/pekko/http/prometheus/api/MetricsEndpoint.scala +++ b/src/main/scala/com/varwise/pekko/http/prometheus/api/MetricsEndpoint.scala @@ -1,16 +1,38 @@ package com.varwise.pekko.http.prometheus.api +import io.prometheus.metrics.expositionformats.ExpositionFormats +import io.prometheus.metrics.model.registry.PrometheusRegistry +import org.apache.pekko.http.scaladsl.model.{HttpCharsets, HttpEntity, MediaType} import org.apache.pekko.http.scaladsl.server.Directives._ import org.apache.pekko.http.scaladsl.server.Route -import io.prometheus.client.CollectorRegistry +import org.apache.pekko.stream.scaladsl.StreamConverters +import org.slf4j.LoggerFactory -class MetricsEndpoint(registry: CollectorRegistry) { +import java.io.{PipedInputStream, PipedOutputStream} +import scala.concurrent.{ExecutionContext, Future} + +class MetricsEndpoint(registry: PrometheusRegistry)(implicit ec: ExecutionContext) { + + private val logger = LoggerFactory.getLogger(classOf[MetricsEndpoint]) + private val prometheusTextType = + MediaType.customWithFixedCharset("text", "plain", HttpCharsets.`UTF-8`, params = Map("version" -> "0.0.4")) + private val writer = ExpositionFormats.init().getPrometheusTextFormatWriter() val routes: Route = get { path("metrics") { complete { - MetricFamilySamplesEntity.fromRegistry(registry) + val in = new PipedInputStream + val out = new PipedOutputStream(in) + val byteSource = StreamConverters.fromInputStream(() => in) + val f = Future { + try + writer.write(out, registry.scrape()) + finally + out.close() + } + f.failed.foreach(e => logger.error("Error when writing Prometheus metrics", e)) + HttpEntity(prometheusTextType, byteSource) } } } diff --git a/src/test/scala/com/varwise/pekko/http/prometheus/PrometheusEventObserverSpec.scala b/src/test/scala/com/varwise/pekko/http/prometheus/PrometheusEventObserverSpec.scala index 0c84a71..7979002 100644 --- a/src/test/scala/com/varwise/pekko/http/prometheus/PrometheusEventObserverSpec.scala +++ b/src/test/scala/com/varwise/pekko/http/prometheus/PrometheusEventObserverSpec.scala @@ -1,14 +1,17 @@ package com.varwise.pekko.http.prometheus import com.varwise.pekko.http.prometheus.Utils._ -import io.prometheus.client.CollectorRegistry +import io.prometheus.metrics.model.registry.PrometheusRegistry +import io.prometheus.metrics.model.snapshots.{CounterSnapshot, Labels} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import scala.jdk.CollectionConverters.IterableHasAsScala + class PrometheusEventObserverSpec extends AnyFlatSpec with Matchers { "PrometheusEventObserver" should "record observed events in a counter" in { - val registry = new CollectorRegistry() + val registry = new PrometheusRegistry() val randomMetricName = generateRandomString val randomMetricHelp = generateRandomString val randomEventLabelName = generateRandomString @@ -25,19 +28,29 @@ class PrometheusEventObserverSpec extends AnyFlatSpec with Matchers { registry ) - def getCounterValue: java.lang.Double = - registry.getSampleValue( - randomMetricName + "_total", - Array(randomEventLabelName, randomEventDetailsLabelName), - Array(randomEventName, randomEventDetails) + def getCounterValue(): Option[Double] = { + val labels = Labels.of( + randomEventLabelName, + randomEventName, + randomEventDetailsLabelName, + randomEventDetails ) - - getCounterValue shouldBe null + registry + .scrape() + .asScala + .collect { case counter: CounterSnapshot => counter } + .filter(_.getMetadata.getName == randomMetricName) + .flatMap(_.getDataPoints.asScala) + .filter(_.getLabels.equals(labels)) + .map(_.getValue) + .headOption + } + + getCounterValue() shouldBe None eventObserver.observe(randomEventName, randomEventDetails) - getCounterValue should not be null - getCounterValue.intValue() shouldBe 1 - + getCounterValue() should not be None + getCounterValue().get.intValue() shouldBe 1 } } diff --git a/src/test/scala/com/varwise/pekko/http/prometheus/PrometheusResponseTimeRecorderSpec.scala b/src/test/scala/com/varwise/pekko/http/prometheus/PrometheusResponseTimeRecorderSpec.scala index d6046ec..aac0b9b 100644 --- a/src/test/scala/com/varwise/pekko/http/prometheus/PrometheusResponseTimeRecorderSpec.scala +++ b/src/test/scala/com/varwise/pekko/http/prometheus/PrometheusResponseTimeRecorderSpec.scala @@ -1,33 +1,38 @@ package com.varwise.pekko.http.prometheus import com.varwise.pekko.http.prometheus.Utils._ -import io.prometheus.client.{Collector, CollectorRegistry} +import io.prometheus.metrics.model.registry.PrometheusRegistry +import io.prometheus.metrics.model.snapshots.{HistogramSnapshot, Labels} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import scala.concurrent.duration import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters.IterableHasAsScala import scala.util.Random class PrometheusResponseTimeRecorderSpec extends AnyFlatSpec with Matchers { private def getBucketValue( - registry: CollectorRegistry, + registry: PrometheusRegistry, metricName: String, - labelNames: List[String], - labelValues: List[String], + labels: Labels, bucket: Double - ): Int = { - val name = metricName + "_bucket" + ): Long = + registry + .scrape() + .asScala + .collect { case metric: HistogramSnapshot => metric } + .filter(_.getMetadata.getName == metricName) + .flatMap(_.getDataPoints.asScala) + .filter(_.getLabels.equals(labels)) + .flatMap(_.getClassicBuckets.asScala) + .find(_.getUpperBound == bucket) + .get + .getCount - // 'le' should be the first label in the list - val allLabelNames = (Array("le") ++ labelNames).reverse - val allLabelValues = (Array(Collector.doubleToGoString(bucket)) ++ labelValues).reverse - registry.getSampleValue(name, allLabelNames, allLabelValues).intValue() - } - - "PrometheusLatencyRecorder" should "register a histogram and record request latencies" in { - val registry = new CollectorRegistry() + "PrometheusResponseTimeRecorder" should "register a histogram and record request latencies" in { + val registry = new PrometheusRegistry() val randomMetricName = generateRandomString val randomMetricHelp = generateRandomString val randomLabelName = generateRandomString @@ -48,20 +53,14 @@ class PrometheusResponseTimeRecorderSpec extends AnyFlatSpec with Matchers { recorder.recordResponseTime(randomEndpointName, FiniteDuration(randomLatency, duration.MILLISECONDS)) - val first = - getBucketValue(registry, randomMetricName, List(randomLabelName), List(randomEndpointName), buckets.head) - val second = - getBucketValue(registry, randomMetricName, List(randomLabelName), List(randomEndpointName), buckets.last) - val positiveInf = getBucketValue( - registry, - randomMetricName, - List(randomLabelName), - List(randomEndpointName), - Double.PositiveInfinity - ) + val labels = Labels.of(randomLabelName, randomEndpointName) + + val first = getBucketValue(registry, randomMetricName, labels, buckets.head) + val second = getBucketValue(registry, randomMetricName, labels, buckets.last) + val positiveInf = getBucketValue(registry, randomMetricName, labels, Double.PositiveInfinity) first shouldBe 0 second shouldBe 1 - positiveInf shouldBe 1 + positiveInf shouldBe 0 } } diff --git a/src/test/scala/com/varwise/pekko/http/prometheus/api/MetricsEndpointSpec.scala b/src/test/scala/com/varwise/pekko/http/prometheus/api/MetricsEndpointSpec.scala index 231538a..d316757 100644 --- a/src/test/scala/com/varwise/pekko/http/prometheus/api/MetricsEndpointSpec.scala +++ b/src/test/scala/com/varwise/pekko/http/prometheus/api/MetricsEndpointSpec.scala @@ -1,23 +1,25 @@ package com.varwise.pekko.http.prometheus.api +import com.varwise.pekko.http.prometheus.Utils._ +import io.prometheus.metrics.core.metrics.Histogram +import io.prometheus.metrics.expositionformats.ExpositionFormats +import io.prometheus.metrics.model.registry.PrometheusRegistry import org.apache.pekko.http.scaladsl.model.HttpCharsets import org.apache.pekko.http.scaladsl.testkit.ScalatestRouteTest -import com.varwise.pekko.http.prometheus.Utils._ -import io.prometheus.client.exporter.common.TextFormat -import io.prometheus.client.{CollectorRegistry, Histogram} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import java.io.StringWriter +import java.io.ByteArrayOutputStream +import java.nio.charset.StandardCharsets import scala.util.Random class MetricsEndpointSpec extends AnyFlatSpec with Matchers with ScalatestRouteTest { - private def createEndpoint(collectorRegistry: CollectorRegistry) = + private def createEndpoint(collectorRegistry: PrometheusRegistry) = new MetricsEndpoint(collectorRegistry) "Metrics endpoint" should "return the correct media type and charset" in { - val api = createEndpoint(CollectorRegistry.defaultRegistry) + val api = createEndpoint(PrometheusRegistry.defaultRegistry) Get("/metrics") ~> api.routes ~> check { mediaType.subType shouldBe "plain" mediaType.isText shouldBe true @@ -27,20 +29,21 @@ class MetricsEndpointSpec extends AnyFlatSpec with Matchers with ScalatestRouteT } it should "return serialized metrics in the prometheus text format" in { - val registry = new CollectorRegistry() + val registry = new PrometheusRegistry() val api = createEndpoint(registry) val RandomTestName = generateRandomStringOfLength(16) val RandomTestHelp = generateRandomStringOfLength(16) - val hist = Histogram.build().name(RandomTestName).help(RandomTestHelp).linearBuckets(0, 1, 10).register(registry) + val hist = Histogram.builder().name(RandomTestName).help(RandomTestHelp).register(registry) hist.observe(Math.abs(Random.nextDouble())) Get("/metrics") ~> api.routes ~> check { val resp = responseAs[String] - val writer = new StringWriter() - TextFormat.write004(writer, registry.metricFamilySamples()) - - resp shouldBe writer.toString + val baos = new ByteArrayOutputStream() + ExpositionFormats.init().getPrometheusTextFormatWriter().write(baos, registry.scrape()) + val expected = new String(baos.toByteArray, StandardCharsets.UTF_8) + baos.close() + resp shouldBe expected } } }