diff --git a/src/main/scala/io/github/dutrevis/CPUMetrics.scala b/src/main/scala/io/github/dutrevis/CPUMetrics.scala new file mode 100644 index 0000000..98df0c3 --- /dev/null +++ b/src/main/scala/io/github/dutrevis/CPUMetrics.scala @@ -0,0 +1,249 @@ +package io.github.dutrevis + +import java.util.{Map => JMap} +import scala.collection.JavaConverters._ + +import com.codahale.metrics.{Gauge, Metric, MetricRegistry} +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.{ + DriverPlugin, + ExecutorPlugin, + PluginContext, + SparkPlugin +} + +/** Collects CPU resource metrics from a unix-based operating system.

Use + * when Spark is running in clusters with standalone, Mesos or YARN resource + * managers.

CPU metrics are obtained from the numbers of the first line of + * the `/proc/stat` file, available at the proc pseudo-filesystem of unix-based + * operating systems. These numbers identify the amount of time the CPU has + * spent performing different kinds of work, arranged in columns at the + * following order: "cpu_user", "cpu_nice", "cpu_system", "cpu_idle", + * "cpu_iowait", "cpu_irq" and "cpu_softirq".

+ * @note + * All of the numbers retrieved are aggregates since the system first booted. + * @note + * Time units are in USER_HZ or Jiffies (typically hundredths of a second) + * @note + * Values for "cpu_steal", "cpu_guest" and "cpu_guest_nice", available at + * spectific Linux versions, are not parsed from the file. + */ +class CPUMetrics extends SparkPlugin { + + /** Maps the collector methods to their respective metric names, that will be + * displayed in the Dropwizard's metric system. + */ + val metricMapping = Map[String, (StatMetricCollector) => Double]( + "UserCPU" -> collectUserCPU, + "NiceCPU" -> collectNiceCPU, + "SystemCPU" -> collectSystemCPU, + "IdleCPU" -> collectIdleCPU, + "WaitCPU" -> collectWaitCPU + ) + + /** Registers a provided Dropwizard's Metric instance into a metric registry + * under a metric name.

+ * @param metricRegistry + * a MetricRegistry instance from dropwizard.metrics

+ * @param metricName + * a metric name as a String

+ * @param metricInstance + * an instance of a dropwizard's Metric class to be registered

+ * @throws IllegalArgumentException + * if the metric name is already registered + */ + def registerMetric( + metricRegistry: MetricRegistry, + metricName: String, + metricInstance: Metric + ): Unit = { + metricRegistry.register(metricName, metricInstance) + () + } + + /** Creates a Dropwizard's Gauge metric type - an instantaneous reading of a + * particular value -, setting the provided collector method as the + * `getValue` method of the Gauge instance.

+ * @param metricCollector + * a StatMetricCollector instance

+ * @param collectorMethod + * a method that receives the metricCollector and returns a metric value as + * a Double

+ */ + def createGaugeMetric( + metricCollector: StatMetricCollector, + collectorMethod: (StatMetricCollector) => Double + ): Gauge[Double] = { + new Gauge[Double] { + override def getValue: Double = { collectorMethod(metricCollector) } + } + } + + /** Collects the aggregated CPU usage value for normal processes executing in + * user mode, as an average out of the total of CPU usage time for all + * processes.

+ * @param metricCollector + * a StatMetricCollector instance

+ * @note + * Collected value is of type Double with precision of 2. + */ + def collectUserCPU(metricCollector: StatMetricCollector): Double = { + val procFileContent = metricCollector.getProcFileContent() + val originalMetricName: String = "cpu_user" + metricCollector.getMetricValue(procFileContent, originalMetricName) + } + + /** Collects the aggregated CPU usage value for niced processes (there is, run + * with the nice command) executing in user mode and calculates its average + * out of the total of the CPU usage time for all processes.

+ * @param metricCollector + * a StatMetricCollector instance

+ * @note + * Collected value is of type Double with precision of 2. + */ + def collectNiceCPU(metricCollector: StatMetricCollector): Double = { + val procFileContent = metricCollector.getProcFileContent() + val originalMetricName: String = "cpu_nice" + metricCollector.getMetricValue(procFileContent, originalMetricName) + } + + /** Collects the aggregated CPU usage value processes executing in kernel mode + * as an average out of the total of the CPU usage time for all processes. + *

+ * @param metricCollector + * a StatMetricCollector instance

+ * @note + * Collected value is of type Double with precision of 2. + */ + def collectSystemCPU(metricCollector: StatMetricCollector): Double = { + val procFileContent = metricCollector.getProcFileContent() + val originalMetricName: String = "cpu_system" + metricCollector.getMetricValue(procFileContent, originalMetricName) + } + + /** Collects the aggregated CPU usage value for when no processes are running, + * as an average out of the total of the CPU usage time for all processes. + *

+ * @param metricCollector + * a StatMetricCollector instance

+ * @note + * Collected value is of type Double with precision of 2. + */ + def collectIdleCPU(metricCollector: StatMetricCollector): Double = { + val procFileContent = metricCollector.getProcFileContent() + val originalMetricName: String = "cpu_idle" + metricCollector.getMetricValue(procFileContent, originalMetricName) + } + + /** Collects the aggregated CPU usage value for when it's waiting for I/O to + * complete, as an average out of the total of the CPU usage time for all + * processes.

+ * @param metricCollector + * a StatMetricCollector instance

+ * @note + * Collected value is of type Double with precision of 2. + */ + def collectWaitCPU(metricCollector: StatMetricCollector): Double = { + val procFileContent = metricCollector.getProcFileContent() + val originalMetricName: String = "cpu_iowait" + metricCollector.getMetricValue(procFileContent, originalMetricName) + } + + /** Returns the plugin's driver-side component. The returned DriverPlugin + * instance is called once, early in the initialization of the Spark driver. + * The operation it performs consists in the sequential registration of each + * mapped metric as a Gauge metric into an existing metric Registry. A test + * call is executed once on each collector method before its registration, + * assuring that the metric is available to be read and collected from the + * local OS, thus preventing future errors when the Gauge is first executed + * by the metrics system. If a `NoSuchElementException` is thrown in this + * attempt, the method is not registered, enabling the registration of the + * subsequent mapped metrics by the plugin. The plugin component ends its + * execution once all metrics are registered, leaving to the Dropwizard's + * Metrics system the job of collecting and exporting the registered metrics + * in a pre or user-defined frequency.

+ * @note + * The driver's initialization is blocked during the operations inside + * `init`, so heavy performing operations must be avoided. + * @note + * The overriden `init` method must return a Map, that will be provided as + * `extraConf` to an `ExecutorPlugin` instance. + * @return + * An instance of the `DriverPlugin` + */ + override def driverPlugin(): DriverPlugin = { + new DriverPlugin() { + override def init( + sc: SparkContext, + myContext: PluginContext + ): JMap[String, String] = { + val metricCollector = new StatMetricCollector + for ( + ( + metricName: String, + collectorMethod: ((StatMetricCollector) => Double) + ) <- + metricMapping + ) + try { + var testCall = collectorMethod(metricCollector) + registerMetric( + myContext.metricRegistry, + MetricRegistry.name(metricName), + createGaugeMetric(metricCollector, collectorMethod) + ) + } catch { + case e: NoSuchElementException => () + } + Map.empty[String, String].asJava + } + } + } + + /** Returns the plugin's executor-side component. The returned ExecutorPlugin + * instance is called once, early in the initialization of the executor + * process. The operation it performs consists in the sequential registration + * of each mapped metric as a Gauge metric into an existing metric Registry. + * A test call is executed once on each collector method before its + * registration, assuring that the metric is available to be read and + * collected from the local OS, thus preventing future errors when the Gauge + * is first executed by the metrics system. If a `NoSuchElementException` is + * thrown in this attempt, the method is not registered, enabling the + * registration of the subsequent mapped metrics by the plugin. The plugin + * component ends its execution once all metrics are registered, leaving to + * the Dropwizard's Metrics system the job of collecting and exporting the + * registered metrics in a pre or user-defined frequency.

+ * @note + * The executor's initialization is blocked during the operations inside + * `init`, so heavy performing operations must be avoided. + * @return + * An instance of the `ExecutorPlugin` Unit + */ + override def executorPlugin(): ExecutorPlugin = { + new ExecutorPlugin() { + override def init( + myContext: PluginContext, + extraConf: JMap[String, String] + ): Unit = { + val metricCollector = new StatMetricCollector + for ( + ( + metricName: String, + collectorMethod: ((StatMetricCollector) => Double) + ) <- + metricMapping + ) + try { + var testCall = collectorMethod(metricCollector) + registerMetric( + myContext.metricRegistry, + MetricRegistry.name(metricName), + createGaugeMetric(metricCollector, collectorMethod) + ) + } catch { + case e: NoSuchElementException => () + } + } + } + } +} diff --git a/src/main/scala/io/github/dutrevis/StatMetricCollector.scala b/src/main/scala/io/github/dutrevis/StatMetricCollector.scala new file mode 100644 index 0000000..8a7f20e --- /dev/null +++ b/src/main/scala/io/github/dutrevis/StatMetricCollector.scala @@ -0,0 +1,52 @@ +package io.github.dutrevis + +class StatMetricCollector extends ProcFileMetricCollector { + + override val procFilePath = "/proc/stat" + + /** A List with the names of the columns present in the `/proc/stat` file. + * They are used as keys of a Map that points to the values collected from + * the file.

+ */ + val procFileCPUColumns = List( + "cpu_user", + "cpu_nice", + "cpu_system", + "cpu_idle", + "cpu_iowait", + "cpu_irq", + "cpu_softirq" + ) + + /** Gets the value of a metric from the content of a proc file provided. The + * metric is searched according to the original name provided in the + * parameter `originalMetricName`.

+ * @param procFileContent + * a String with the content of the desired proc file

+ * @param originalMetricName + * the original metric name by which it is found in the proc file

+ * @return + * the metric value as Double + * @throws NoSuchElementException + * if a metric is not found with the provided original name + */ + override def getMetricValue( + procFileContent: String, + originalMetricName: String + ): Double = { + val procFileData = (procFileCPUColumns + .zip( + procFileContent.linesIterator + .take(1) + .mkString + .split(" +") + .tail + .map(_.toLong) + ) + .toMap) + val metricValue = + procFileData(originalMetricName) / procFileData.foldLeft(0.0)(_ + _._2) + metricValue + } + +} diff --git a/src/test/scala/CPUMetricsTest.scala b/src/test/scala/CPUMetricsTest.scala new file mode 100644 index 0000000..aabab6b --- /dev/null +++ b/src/test/scala/CPUMetricsTest.scala @@ -0,0 +1,172 @@ +import io.github.dutrevis.{CPUMetrics, StatMetricCollector} + +import com.codahale.metrics.{Gauge, Metric, MetricRegistry} + +import org.mockito.captor.ArgCaptor +import org.mockito.integrations.scalatest.ResetMocksAfterEachTest +import org.mockito.{MockitoSugar, ArgumentMatchersSugar} +import org.scalatest.funsuite.AnyFunSuite + +class CPUMetricsTest + extends AnyFunSuite + with MockitoSugar + with ArgumentMatchersSugar + with ResetMocksAfterEachTest { + + // Arrange common mocks + val gaugeMock = mock[Gauge[Double]] + val metricMock = mock[Metric] + val metricRegistryMock = mock[MetricRegistry] + val statMetricCollectorMock = mock[StatMetricCollector] + + // Arrange common values + val procFileContentTest = new String + + test("Method createGaugeMetric should return Gauge[Double]") { + val cpuMetrics = new CPUMetrics + val collectorMethod = (s: StatMetricCollector) => { 123456.toDouble } + + val returnedGaugeMetric = + cpuMetrics.createGaugeMetric(statMetricCollectorMock, collectorMethod) + assert(returnedGaugeMetric.isInstanceOf[Gauge[Double]]) + } + + test("Method registerMetric should call register") { + val cpuMetrics = new CPUMetrics + val metricName: String = "any_metric" + + when(metricRegistryMock.register(any[String], any[Metric])) + .thenReturn(metricMock) + cpuMetrics.registerMetric(metricRegistryMock, metricName, gaugeMock) + + verify(metricRegistryMock, times(1)).register(metricName, gaugeMock) + } + + test("Method collectUserCPU should call getMetricValue") { + val originalMetricName: String = "cpu_user" + val cpuMetrics = new CPUMetrics + + when(statMetricCollectorMock.getProcFileContent()) + .thenReturn(procFileContentTest) + cpuMetrics.collectUserCPU(statMetricCollectorMock) + verify(statMetricCollectorMock, times(1)) + .getMetricValue(procFileContentTest, originalMetricName) + } + + test("Method collectUserCPU should call getMetricValue with args") { + val cpuMetrics = new CPUMetrics + val originalMetricName: String = "cpu_user" + val procFileContentCaptor = ArgCaptor[String] + val originalMetricNameCaptor = ArgCaptor[String] + + when(statMetricCollectorMock.getProcFileContent()) + .thenReturn(procFileContentTest) + cpuMetrics.collectUserCPU(statMetricCollectorMock) + verify(statMetricCollectorMock).getMetricValue( + procFileContentCaptor, + originalMetricNameCaptor + ) + procFileContentCaptor hasCaptured procFileContentTest + originalMetricNameCaptor hasCaptured originalMetricName + } + + test("Method collectUserCPU should return Double") { + val originalMetricName: String = "cpu_user" + val expectedDoubleValue: Double = 79242 + val cpuMetrics = new CPUMetrics + + when(statMetricCollectorMock.getProcFileContent()) + .thenReturn(procFileContentTest) + when( + statMetricCollectorMock.getMetricValue( + procFileContentTest, + originalMetricName + ) + ) + .thenReturn(expectedDoubleValue) + + assertResult(expectedDoubleValue) { + cpuMetrics.collectUserCPU(statMetricCollectorMock) + } + } + + test("Method collectNiceCPU should return Double") { + val originalMetricName: String = "cpu_nice" + val expectedDoubleValue: Double = 0 + val cpuMetrics = new CPUMetrics + + when(statMetricCollectorMock.getProcFileContent()) + .thenReturn(procFileContentTest) + when( + statMetricCollectorMock.getMetricValue( + procFileContentTest, + originalMetricName + ) + ) + .thenReturn(expectedDoubleValue) + + assertResult(expectedDoubleValue) { + cpuMetrics.collectNiceCPU(statMetricCollectorMock) + } + } + + test("Method collectSystemCPU should return Double") { + val originalMetricName: String = "cpu_system" + val expectedDoubleValue: Double = 74306 + val cpuMetrics = new CPUMetrics + + when(statMetricCollectorMock.getProcFileContent()) + .thenReturn(procFileContentTest) + when( + statMetricCollectorMock.getMetricValue( + procFileContentTest, + originalMetricName + ) + ) + .thenReturn(expectedDoubleValue) + + assertResult(expectedDoubleValue) { + cpuMetrics.collectSystemCPU(statMetricCollectorMock) + } + } + + test("Method collectIdleCPU should return Double") { + val originalMetricName: String = "cpu_idle" + val expectedDoubleValue: Double = 842486413 + val cpuMetrics = new CPUMetrics + + when(statMetricCollectorMock.getProcFileContent()) + .thenReturn(procFileContentTest) + when( + statMetricCollectorMock.getMetricValue( + procFileContentTest, + originalMetricName + ) + ) + .thenReturn(expectedDoubleValue) + + assertResult(expectedDoubleValue) { + cpuMetrics.collectIdleCPU(statMetricCollectorMock) + } + } + + test("Method collectWaitCPU should return Double") { + val originalMetricName: String = "cpu_iowait" + val expectedDoubleValue: Double = 756859 + val cpuMetrics = new CPUMetrics + + when(statMetricCollectorMock.getProcFileContent()) + .thenReturn(procFileContentTest) + when( + statMetricCollectorMock.getMetricValue( + procFileContentTest, + originalMetricName + ) + ) + .thenReturn(expectedDoubleValue) + + assertResult(expectedDoubleValue) { + cpuMetrics.collectWaitCPU(statMetricCollectorMock) + } + } +} diff --git a/src/test/scala/StatMetricCollectorTest.scala b/src/test/scala/StatMetricCollectorTest.scala new file mode 100644 index 0000000..e2b521e --- /dev/null +++ b/src/test/scala/StatMetricCollectorTest.scala @@ -0,0 +1,60 @@ +import io.github.dutrevis.StatMetricCollector +import org.scalatest.funsuite.AnyFunSuite +import org.mockito.{MockitoSugar, ArgumentMatchersSugar} +import org.mockito.integrations.scalatest.ResetMocksAfterEachTest +import org.mockito.captor.ArgCaptor + +import java.nio.charset.{StandardCharsets, Charset} +import java.nio.file.{Files, Path} + +class StatMetricCollectorTest + extends AnyFunSuite + with MockitoSugar + with ArgumentMatchersSugar + with ResetMocksAfterEachTest { + + // Arrange common values + val procFileDataTest = Map[String, Long]( + "cpu_user" -> 79242, + "cpu_nice" -> 0, + "cpu_system" -> 74306, + "cpu_idle" -> 842486413, + "cpu_iowait" -> 756859, + "cpu_irq" -> 6140, + "cpu_softirq" -> 67701 + ) + + val procFileContentTest: String = new String( + Files.readAllBytes( + Path + .of(".") + .toAbsolutePath + .getParent() + .resolve("src/test/scala/proc/stat") + ), + StandardCharsets.UTF_8 + ) + + test("Method getMetricValue should return specific Double value") { + val metricCollector = new StatMetricCollector + val originalMetricName: String = "cpu_user" + val expectedDoubleValue: Double = + procFileDataTest(originalMetricName) / procFileDataTest.foldLeft(0.0)( + _ + _._2 + ) + + assertResult(expectedDoubleValue) { + metricCollector.getMetricValue(procFileContentTest, originalMetricName) + } + } + + test("Method getMetricValue should throw NoSuchElementException") { + val metricCollector = new StatMetricCollector + val originalMetricName: String = "wrong_metric" + + assertThrows[NoSuchElementException]( + metricCollector.getMetricValue(procFileContentTest, originalMetricName) + ) + } + +} diff --git a/src/test/scala/meminfo.test b/src/test/scala/meminfo.test new file mode 100644 index 0000000..66305b9 --- /dev/null +++ b/src/test/scala/meminfo.test @@ -0,0 +1,42 @@ +MemTotal: 1921988 kB +MemFree: 1374408 kB +Buffers: 32688 kB +Cached: 370540 kB +SwapCached: 0 kB +Active: 344604 kB +Inactive: 80800 kB +Active(anon): 22364 kB +Inactive(anon): 4 kB +Active(file): 322240 kB +Inactive(file): 80796 kB +Unevictable: 0 kB +Mlocked: 0 kB +SwapTotal: 1048572 kB +SwapFree: 1048572 kB +Dirty: 48 kB +Writeback: 0 kB +AnonPages: 22260 kB +Mapped: 13628 kB +Shmem: 196 kB +Slab: 91648 kB +SReclaimable: 34024 kB +SUnreclaim: 57624 kB +KernelStack: 2880 kB +PageTables: 3620 kB +NFS_Unstable: 0 kB +Bounce: 0 kB +WritebackTmp: 0 kB +CommitLimit: 2009564 kB +Committed_AS: 134216 kB +VmallocTotal: 34359738367 kB +VmallocUsed: 12276 kB +VmallocChunk: 34359712840 kB +HardwareCorrupted: 0 kB +AnonHugePages: 0 kB +HugePages_Total: 0 +HugePages_Free: 0 +HugePages_Rsvd: 0 +HugePages_Surp: 0 +Hugepagesize: 2048 kB +DirectMap4k: 8064 kB +DirectMap2M: 2088960 kB diff --git a/src/test/scala/proc/stat b/src/test/scala/proc/stat new file mode 100644 index 0000000..44c38f2 --- /dev/null +++ b/src/test/scala/proc/stat @@ -0,0 +1,15 @@ +cpu 79242 0 74306 842486413 756859 6140 67701 0 +cpu0 49663 0 40234 104757317 542691 4420 39572 0 +cpu1 2724 0 2118 105420424 767 1719 6084 0 +cpu2 18578 0 18430 105191522 204592 0 714 0 +cpu3 513 0 979 105428698 739 0 2907 0 +cpu4 1623 0 2105 105426291 444 0 3373 0 +cpu5 3491 0 5326 105414798 7134 0 3087 0 +cpu6 1636 0 3081 105420689 201 0 8229 0 +cpu7 1011 0 2029 105426670 288 0 3731 0 +intr 1139300141 1054390414 3 0 5 255 0 0 0 3 0 0 0 4 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 3664020 44569070 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 2 3702799 4393655 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 2 3480582 527155 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 2 3445617 3870988 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 6 2 4877935 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 3 2 3445167 0 0 0 0 647 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 2 3 3481606 0 0 0 0 0 0 0 0 0 0 0 0 2004579 1 1 1 0 0 0 0 0 2 3 3445582 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 +ctxt 367249552 +btime 1310547399 +processes 107918 +procs_running 1 +procs_blocked 0 \ No newline at end of file