Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update prometheus client to 1.3.1 #6

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ Collection of utilities to allow exposing prometheus metrics from pekko-http end

[prometheus-akka-http](https://github.com/varwise/prometheus-akka-http) - same thing but for akka-http

### Migration to 2.0.0

Version 2.0.0 uses Prometheus Java client library 1.0.0 which introduced some breaking changes.

Here's the quick migration guide:
1. Replace occurrences of `io.prometheus.client.CollectorRegistry` with `io.prometheus.metrics.model.registry.PrometheusRegistry`.
2. Supply `ExecutionContext` instance to `MetricsEndpoint`.

For more information about Prometheus Java client library please refer to https://prometheus.github.io/client_java/.
### Publishing

Artifacts are published to Maven central using the [sbt-sonatype](https://github.com/xerial/sbt-sonatype) plugin
Expand Down
24 changes: 13 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ organization := "com.varwise"

publishTo := sonatypePublishToBundle.value

version := "1.0.1"
version := "2.0.0"

val scala2Version = "2.13.13"
val scala3Version = "3.3.3"
Expand All @@ -15,21 +15,23 @@ crossScalaVersions := Seq(scala3Version, scala2Version)
credentials += Credentials(Path.userHome / ".sbt" / ".credentials")

libraryDependencies ++= {
val simpleclientVersion = "0.16.0"
val prometheusJavaClientVersion = "1.3.1"
val pekkoVersion = "1.0.2"
val pekkoHttpVersion = "1.0.1"
val scalaTestVersion = "3.2.18"
val slf4jVersion = "2.0.13"

Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoVersion % Provided,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion % Provided,
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % Provided,
"org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion % Provided,
"io.prometheus" % "simpleclient" % simpleclientVersion,
"io.prometheus" % "simpleclient_common" % simpleclientVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test,
"org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
"org.apache.pekko" %% "pekko-actor" % pekkoVersion % Provided,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion % Provided,
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % Provided,
"org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion % Provided,
"io.prometheus" % "prometheus-metrics-core" % prometheusJavaClientVersion,
"io.prometheus" % "prometheus-metrics-exposition-formats" % prometheusJavaClientVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test,
"org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.varwise.pekko.http.prometheus

import io.prometheus.client.{CollectorRegistry, Counter}
import io.prometheus.metrics.core.metrics.Counter
import io.prometheus.metrics.model.registry.PrometheusRegistry

trait EventObserver {
def observe(eventName: String, eventDetails: String): Unit
Expand All @@ -24,20 +25,19 @@ class PrometheusEventObserver(
metricHelp: String,
eventLabelName: String,
eventDetailsLabelName: String,
registry: CollectorRegistry)
registry: PrometheusRegistry)
extends EventObserver {

val counter: Counter = buildCounter.register(registry)

private def buildCounter: Counter.Builder =
private val counter: Counter =
Counter
.build()
.builder()
.name(metricName)
.help(metricHelp)
.labelNames(eventLabelName, eventDetailsLabelName)
.register(registry)

override def observe(eventName: String, eventDetails: String): Unit =
counter.labels(eventName, eventDetails).inc()
counter.labelValues(eventName, eventDetails).inc()
}

object PrometheusEventObserver {
Expand All @@ -47,7 +47,7 @@ object PrometheusEventObserver {
private val FailedOperationMetricHelp = "The number of observed failed operations"
private val DefaultEventLabelName = "event"
private val DefaultEventDetailsLabelName = "details"
private val DefaultRegistry = CollectorRegistry.defaultRegistry
private val DefaultRegistry = PrometheusRegistry.defaultRegistry

// Common event observers used in scala projects in Open Planet micro-services
lazy val SuccessfulOperations: PrometheusEventObserver =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.varwise.pekko.http.prometheus

import io.prometheus.client.{CollectorRegistry, Histogram}
import io.prometheus.metrics.core.metrics.Histogram
import io.prometheus.metrics.model.registry.PrometheusRegistry

import scala.concurrent.duration
import scala.concurrent.duration.{FiniteDuration, TimeUnit}
Expand Down Expand Up @@ -30,23 +31,21 @@ class PrometheusResponseTimeRecorder(
metricHelp: String,
buckets: List[Double],
endpointLabelName: String,
registry: CollectorRegistry,
registry: PrometheusRegistry,
timeUnit: TimeUnit)
extends ResponseTimeRecorder {

private val responseTimes = buildHistogram.register(registry)

override def recordResponseTime(endpoint: String, responseTime: FiniteDuration): Unit =
responseTimes.labels(endpoint).observe(responseTime.toUnit(timeUnit))

private def buildHistogram: Histogram.Builder =
private val responseTimes: Histogram =
Histogram
.build()
.builder()
.name(metricName)
.help(metricHelp)
.labelNames(endpointLabelName)
.buckets(buckets: _*)
.classicUpperBounds(buckets: _*)
.register(registry)

override def recordResponseTime(endpoint: String, responseTime: FiniteDuration): Unit =
responseTimes.labelValues(endpoint).observe(responseTime.toUnit(timeUnit))
}

object PrometheusResponseTimeRecorder {
Expand All @@ -58,7 +57,7 @@ object PrometheusResponseTimeRecorder {
val DefaultEndpointLabel = "endpoint"
val DefaultTimeUnit: TimeUnit = duration.SECONDS

lazy val DefaultRegistry: CollectorRegistry = CollectorRegistry.defaultRegistry
lazy val DefaultRegistry: PrometheusRegistry = PrometheusRegistry.defaultRegistry

lazy val Default: PrometheusResponseTimeRecorder =
new PrometheusResponseTimeRecorder(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,38 @@
package com.varwise.pekko.http.prometheus.api

import io.prometheus.metrics.expositionformats.ExpositionFormats
import io.prometheus.metrics.model.registry.PrometheusRegistry
import org.apache.pekko.http.scaladsl.model.{HttpCharsets, HttpEntity, MediaType}
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.server.Route
import io.prometheus.client.CollectorRegistry
import org.apache.pekko.stream.scaladsl.StreamConverters
import org.slf4j.LoggerFactory

class MetricsEndpoint(registry: CollectorRegistry) {
import java.io.{PipedInputStream, PipedOutputStream}
import scala.concurrent.{ExecutionContext, Future}

class MetricsEndpoint(registry: PrometheusRegistry)(implicit ec: ExecutionContext) {

private val logger = LoggerFactory.getLogger(classOf[MetricsEndpoint])
private val prometheusTextType =
MediaType.customWithFixedCharset("text", "plain", HttpCharsets.`UTF-8`, params = Map("version" -> "0.0.4"))
private val writer = ExpositionFormats.init().getPrometheusTextFormatWriter()

val routes: Route =
get {
path("metrics") {
complete {
MetricFamilySamplesEntity.fromRegistry(registry)
val in = new PipedInputStream
val out = new PipedOutputStream(in)
val byteSource = StreamConverters.fromInputStream(() => in)
val f = Future {
try
writer.write(out, registry.scrape())
finally
out.close()
}
f.failed.foreach(e => logger.error("Error when writing Prometheus metrics", e))
HttpEntity(prometheusTextType, byteSource)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.varwise.pekko.http.prometheus

import com.varwise.pekko.http.prometheus.Utils._
import io.prometheus.client.CollectorRegistry
import io.prometheus.metrics.model.registry.PrometheusRegistry
import io.prometheus.metrics.model.snapshots.{CounterSnapshot, Labels}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters.IterableHasAsScala

class PrometheusEventObserverSpec extends AnyFlatSpec with Matchers {

"PrometheusEventObserver" should "record observed events in a counter" in {
val registry = new CollectorRegistry()
val registry = new PrometheusRegistry()
val randomMetricName = generateRandomString
val randomMetricHelp = generateRandomString
val randomEventLabelName = generateRandomString
Expand All @@ -25,19 +28,29 @@ class PrometheusEventObserverSpec extends AnyFlatSpec with Matchers {
registry
)

def getCounterValue: java.lang.Double =
registry.getSampleValue(
randomMetricName + "_total",
Array(randomEventLabelName, randomEventDetailsLabelName),
Array(randomEventName, randomEventDetails)
def getCounterValue(): Option[Double] = {
val labels = Labels.of(
randomEventLabelName,
randomEventName,
randomEventDetailsLabelName,
randomEventDetails
)

getCounterValue shouldBe null
registry
.scrape()
.asScala
.collect { case counter: CounterSnapshot => counter }
.filter(_.getMetadata.getName == randomMetricName)
.flatMap(_.getDataPoints.asScala)
.filter(_.getLabels.equals(labels))
.map(_.getValue)
.headOption
}

getCounterValue() shouldBe None

eventObserver.observe(randomEventName, randomEventDetails)

getCounterValue should not be null
getCounterValue.intValue() shouldBe 1

getCounterValue() should not be None
getCounterValue().get.intValue() shouldBe 1
}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
package com.varwise.pekko.http.prometheus

import com.varwise.pekko.http.prometheus.Utils._
import io.prometheus.client.{Collector, CollectorRegistry}
import io.prometheus.metrics.model.registry.PrometheusRegistry
import io.prometheus.metrics.model.snapshots.{HistogramSnapshot, Labels}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.util.Random

class PrometheusResponseTimeRecorderSpec extends AnyFlatSpec with Matchers {

private def getBucketValue(
registry: CollectorRegistry,
registry: PrometheusRegistry,
metricName: String,
labelNames: List[String],
labelValues: List[String],
labels: Labels,
bucket: Double
): Int = {
val name = metricName + "_bucket"
): Long =
registry
.scrape()
.asScala
.collect { case metric: HistogramSnapshot => metric }
.filter(_.getMetadata.getName == metricName)
.flatMap(_.getDataPoints.asScala)
.filter(_.getLabels.equals(labels))
.flatMap(_.getClassicBuckets.asScala)
.find(_.getUpperBound == bucket)
.get
.getCount

// 'le' should be the first label in the list
val allLabelNames = (Array("le") ++ labelNames).reverse
val allLabelValues = (Array(Collector.doubleToGoString(bucket)) ++ labelValues).reverse
registry.getSampleValue(name, allLabelNames, allLabelValues).intValue()
}

"PrometheusLatencyRecorder" should "register a histogram and record request latencies" in {
val registry = new CollectorRegistry()
"PrometheusResponseTimeRecorder" should "register a histogram and record request latencies" in {
val registry = new PrometheusRegistry()
val randomMetricName = generateRandomString
val randomMetricHelp = generateRandomString
val randomLabelName = generateRandomString
Expand All @@ -48,20 +53,14 @@ class PrometheusResponseTimeRecorderSpec extends AnyFlatSpec with Matchers {

recorder.recordResponseTime(randomEndpointName, FiniteDuration(randomLatency, duration.MILLISECONDS))

val first =
getBucketValue(registry, randomMetricName, List(randomLabelName), List(randomEndpointName), buckets.head)
val second =
getBucketValue(registry, randomMetricName, List(randomLabelName), List(randomEndpointName), buckets.last)
val positiveInf = getBucketValue(
registry,
randomMetricName,
List(randomLabelName),
List(randomEndpointName),
Double.PositiveInfinity
)
val labels = Labels.of(randomLabelName, randomEndpointName)

val first = getBucketValue(registry, randomMetricName, labels, buckets.head)
val second = getBucketValue(registry, randomMetricName, labels, buckets.last)
val positiveInf = getBucketValue(registry, randomMetricName, labels, Double.PositiveInfinity)

first shouldBe 0
second shouldBe 1
positiveInf shouldBe 1
positiveInf shouldBe 0
}
}
Loading