diff --git a/src/main/java/org/datadog/jmxfetch/App.java b/src/main/java/org/datadog/jmxfetch/App.java index 6d2d2e16b..8c78e8ac3 100644 --- a/src/main/java/org/datadog/jmxfetch/App.java +++ b/src/main/java/org/datadog/jmxfetch/App.java @@ -10,6 +10,7 @@ import org.datadog.jmxfetch.tasks.TaskProcessException; import org.datadog.jmxfetch.tasks.TaskProcessor; import org.datadog.jmxfetch.tasks.TaskStatusHandler; +import org.datadog.jmxfetch.util.AppTelemetry; import org.datadog.jmxfetch.util.ByteArraySearcher; import org.datadog.jmxfetch.util.CustomLogger; import org.datadog.jmxfetch.util.FileHelper; @@ -23,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.lang.management.ManagementFactory; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; @@ -33,8 +35,8 @@ import java.util.Iterator; import java.util.List; import java.util.ListIterator; -import java.util.Map; import java.util.Map.Entry; +import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; @@ -48,9 +50,19 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; + +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; + import javax.security.auth.login.FailedLoginException; + @SuppressWarnings("unchecked") @Slf4j public class App { @@ -86,6 +98,8 @@ public class App { private final AppConfig appConfig; private HttpClient client; + private AppTelemetry appTelemetry; + /** * Main method for backwards compatibility in case someone is launching process by class * instead of by jar IE: java -classpath jmxfetch.jar org.datadog.jmxfetch.App @@ -118,8 +132,66 @@ public App(final AppConfig appConfig) { this.appConfig.getIpcHost(), this.appConfig.getIpcPort(), false); } this.configs = getConfigs(this.appConfig); + + this.initTelemetryBean(); + } + + private ObjectName getAppTelemetryBeanName() { + ObjectName appTelemetryBeanName; + + try { + appTelemetryBeanName = new ObjectName( + appConfig.getJmxfetchTelemetryDomain() + ":name=jmxfetch_app"); + } catch (MalformedObjectNameException e) { + log.warn( + "Could not construct bean name for jmxfetch_telemetry_domain" + + " '{}' and name 'jmxfetch_app'", + appConfig.getJmxfetchTelemetryDomain()); + return null; + } + + return appTelemetryBeanName; } + private void initTelemetryBean() { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + AppTelemetry bean = new AppTelemetry(); + ObjectName appTelemetryBeanName = getAppTelemetryBeanName(); + if (appTelemetryBeanName == null) { + return; + } + + try { + mbs.registerMBean(bean, appTelemetryBeanName); + log.debug("Succesfully registered app telemetry bean"); + } catch (InstanceAlreadyExistsException + | MBeanRegistrationException + | NotCompliantMBeanException e) { + log.warn("Could not register bean named '{}' for instance: ", + appTelemetryBeanName.toString(), e); + } + + this.appTelemetry = bean; + return; + } + + private void teardownTelemetry() { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName appTelemetryBeanName = getAppTelemetryBeanName(); + if (appTelemetryBeanName == null) { + return; + } + + try { + mbs.unregisterMBean(appTelemetryBeanName); + log.debug("Succesfully unregistered app telemetry bean"); + } catch (MBeanRegistrationException | InstanceNotFoundException e) { + log.warn("Could not unregister bean named '{}' for instance: ", + appTelemetryBeanName.toString(), e); + } + } + + /** * Main entry point of JMXFetch that returns integer on exit instead of calling {@code * System#exit}. @@ -153,7 +225,7 @@ public int run() { return 0; } - log.info("JMX Fetch " + MetadataHelper.getVersion() + " has started"); + log.info("JMX Fetch " + this.appConfig.getVersion() + " has started"); // set up the config status this.appConfig.updateStatus(); @@ -448,6 +520,7 @@ void start() { } void stop() { + this.teardownTelemetry(); this.collectionProcessor.stop(); this.recoveryProcessor.stop(); } @@ -467,6 +540,9 @@ public void doIteration() { for (Instance instance : this.instances) { getMetricsTasks.add(new MetricCollectionTask(instance)); } + if (this.appTelemetry != null) { + this.appTelemetry.setRunningInstanceCount(this.instances.size()); + } if (!this.collectionProcessor.ready()) { log.warn( @@ -1006,6 +1082,7 @@ private Map getTelemetryInstanceConfig() { config.put("conf",conf); List tags = new ArrayList(); + tags.add("version:" + this.appConfig.getVersion()); config.put("tags", tags); return config; @@ -1104,6 +1181,10 @@ private void processInstantiationStatus( "Could not initialize instance: {}:", instance.getName(), e); instance.cleanUpAsync(); this.brokenInstanceMap.put(instance.toString(), instance); + if (this.appTelemetry != null) { + this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size()); + this.appTelemetry.incrementBrokenInstanceEventCount(); + } } } } @@ -1124,6 +1205,11 @@ private void processFixedStatus( this.brokenInstanceMap.remove(instance.toString()); this.instances.add(instance); + if (this.appTelemetry != null) { + this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size()); + this.appTelemetry.setRunningInstanceCount(this.instances.size()); + } + } catch (Throwable e) { // Not much to do here, instance didn't recover } finally { @@ -1235,6 +1321,10 @@ private void processCollectionStatus( this.brokenInstanceMap.put(instance.toString(), instance); log.debug("Adding broken instance to list: " + instance.getName()); + if (this.appTelemetry != null) { + this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size()); + this.appTelemetry.incrementBrokenInstanceEventCount(); + } log.warn(instanceMessage, ee.getCause()); } catch (Throwable t) { @@ -1242,6 +1332,11 @@ private void processCollectionStatus( log.debug("Adding broken instance to list: " + instance.getName()); this.brokenInstanceMap.put(instance.toString(), instance); + if (this.appTelemetry != null) { + this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size()); + this.appTelemetry.incrementBrokenInstanceEventCount(); + } + instanceStatus = Status.STATUS_ERROR; instanceMessage = task.getWarning() + ": " + t.toString(); @@ -1264,4 +1359,8 @@ private void processCollectionStatus( } } } + + public AppTelemetry getAppTelemetryBean() { + return this.appTelemetry; + } } diff --git a/src/main/java/org/datadog/jmxfetch/AppConfig.java b/src/main/java/org/datadog/jmxfetch/AppConfig.java index a03842d03..a01ff561d 100644 --- a/src/main/java/org/datadog/jmxfetch/AppConfig.java +++ b/src/main/java/org/datadog/jmxfetch/AppConfig.java @@ -10,6 +10,7 @@ import org.datadog.jmxfetch.reporter.Reporter; import org.datadog.jmxfetch.reporter.ReporterFactory; import org.datadog.jmxfetch.service.ServiceNameProvider; +import org.datadog.jmxfetch.util.MetadataHelper; import org.datadog.jmxfetch.validator.LogLevelValidator; import org.datadog.jmxfetch.validator.PositiveIntegerValidator; import org.datadog.jmxfetch.validator.ReporterValidator; @@ -513,4 +514,8 @@ public int getStatsdBufferSize() { public int getSocketTimeout() { return statsdSocketTimeout; } + + public String getVersion() { + return MetadataHelper.getVersion(); + } } diff --git a/src/main/java/org/datadog/jmxfetch/util/AppTelemetry.java b/src/main/java/org/datadog/jmxfetch/util/AppTelemetry.java new file mode 100644 index 000000000..77308c3ca --- /dev/null +++ b/src/main/java/org/datadog/jmxfetch/util/AppTelemetry.java @@ -0,0 +1,41 @@ +package org.datadog.jmxfetch.util; + +import java.util.concurrent.atomic.AtomicInteger; + +/** Jmxfetch telemetry JMX MBean. */ +public class AppTelemetry implements AppTelemetryMBean { + private AtomicInteger runningInstanceCount; + private AtomicInteger brokenInstanceCount; + private AtomicInteger brokenInstanceEventCount; + + /** Jmxfetch telemetry bean constructor. */ + public AppTelemetry() { + runningInstanceCount = new AtomicInteger(0); + brokenInstanceCount = new AtomicInteger(0); + brokenInstanceEventCount = new AtomicInteger(0); + } + + public int getRunningInstanceCount() { + return runningInstanceCount.get(); + } + + public int getBrokenInstanceCount() { + return brokenInstanceCount.get(); + } + + public int getBrokenInstanceEventCount() { + return brokenInstanceEventCount.get(); + } + + public void setRunningInstanceCount(int count) { + this.runningInstanceCount.set(count); + } + + public void setBrokenInstanceCount(int count) { + brokenInstanceCount.set(count); + } + + public void incrementBrokenInstanceEventCount() { + brokenInstanceEventCount.incrementAndGet(); + } +} diff --git a/src/main/java/org/datadog/jmxfetch/util/AppTelemetryMBean.java b/src/main/java/org/datadog/jmxfetch/util/AppTelemetryMBean.java new file mode 100644 index 000000000..66028b12a --- /dev/null +++ b/src/main/java/org/datadog/jmxfetch/util/AppTelemetryMBean.java @@ -0,0 +1,11 @@ +package org.datadog.jmxfetch.util; + +public interface AppTelemetryMBean { + + int getRunningInstanceCount(); + + int getBrokenInstanceCount(); + + int getBrokenInstanceEventCount(); + +} diff --git a/src/test/java/org/datadog/jmxfetch/TestApp.java b/src/test/java/org/datadog/jmxfetch/TestApp.java index f857d9673..5fec8d252 100644 --- a/src/test/java/org/datadog/jmxfetch/TestApp.java +++ b/src/test/java/org/datadog/jmxfetch/TestApp.java @@ -5,6 +5,8 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; +import org.datadog.jmxfetch.util.AppTelemetry; + import java.io.File; import java.util.Arrays; import java.util.Collections; @@ -43,6 +45,9 @@ public void testBeanRegexTags() throws Exception { "nonRegexTag:value"); assertMetric("this.is.100", tags, 10); + + AppTelemetry tlm = app.getAppTelemetryBean(); + assertEquals(1, tlm.getRunningInstanceCount()); } /** Tag metrics with MBeans parameters. */ @@ -72,6 +77,9 @@ public void testBeanTags() throws Exception { "component"); assertMetric("this.is.100", tags, 7); + + AppTelemetry tlm = app.getAppTelemetryBean(); + assertEquals(1, tlm.getRunningInstanceCount()); } /** Tag metrics with MBeans parameters with normalize_bean_param_tags option enabled. */ @@ -1160,4 +1168,29 @@ public void testNestedCompositeData() throws Exception { assertCoverage(); } + + @Test + public void testTelemetryTags() throws Exception { + SimpleTestJavaApp testApp = new SimpleTestJavaApp(); + registerMBean(testApp, "org.datadog.jmxfetch.test:type=SimpleTestJavaApp"); + + when(appConfig.isTargetDirectInstances()).thenReturn(true); + when(appConfig.getJmxfetchTelemetry()).thenReturn(true); + when(appConfig.getVersion()).thenReturn("MOCKED_VERSION"); + + initApplication("jmx_telemetry_tags.yaml"); + + run(); + + List telemetryTags = Arrays.asList( + "instance:jmxfetch_telemetry_instance", + "name:jmxfetch_app", + "jmx_domain:jmx_fetch", + "version:MOCKED_VERSION"); + + assertMetric("jmx.jmx_fetch.running_instance_count", 2, telemetryTags, -1); + + // not asserting coverage, this is intended to test the tags present on telemetry + // not the set metrics collected + } } diff --git a/src/test/java/org/datadog/jmxfetch/TestCommon.java b/src/test/java/org/datadog/jmxfetch/TestCommon.java index 49fb77a12..9503eff25 100644 --- a/src/test/java/org/datadog/jmxfetch/TestCommon.java +++ b/src/test/java/org/datadog/jmxfetch/TestCommon.java @@ -109,12 +109,13 @@ public void unregisterMBean() throws MBeanRegistrationException, InstanceNotFoun } /** - * Clear instances and their instance telemetry bean after execution of every test. + * Tear down instances and application. */ @After - public void clearInstances() { + public void teardown() { if (app != null) { app.clearAllInstances(); + app.stop(); } } @@ -176,7 +177,7 @@ protected void initApplication(String yamlFileName) throws FileNotFoundException } /* - * Init JMXFetch with the given YAML configuration template + * Init JMXFetch with the given YAML configuration template * The configuration can be specified as a yaml literal with each arg * representing one line of the Yaml file * Does not support any SD/AD features. @@ -188,7 +189,7 @@ protected void initApplicationWithYamlLines(String... yamlLines) String confdDirectory = tempFile.getParent().toString(); String yamlFileName = tempFile.getFileName().toString(); - + List params = new ArrayList(); params.add("--reporter"); params.add("console"); diff --git a/src/test/resources/jmx_telemetry_tags.yaml b/src/test/resources/jmx_telemetry_tags.yaml new file mode 100644 index 000000000..f7c10f848 --- /dev/null +++ b/src/test/resources/jmx_telemetry_tags.yaml @@ -0,0 +1,14 @@ +--- +init_config: + +instances: + - jvm_direct: true + refresh_beans: 4 + collect_default_jvm_metrics: false + name: jmx_test_instance + tags: + - "env:stage" + - "newTag:test" + conf: + - include: + domain: org.datadog.jmxfetch.test \ No newline at end of file