Skip to content

Commit

Permalink
AMLII-1360: Creates telemetry bean for Application-level metrics (#505)
Browse files Browse the repository at this point in the history
* Initial stab at application-level telemetry

* Renames telemetry fields to be more specific

* Hopefully address linting errors

* Fix remaining lint errors

* Adds teardown to de-register app telemetry bean

* Moves version from bean name to explicitly added tags (plus small refactor for testing)

* Tears down application in between each test
  • Loading branch information
scottopell authored Jan 2, 2024
1 parent eb8252f commit ae28d64
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 6 deletions.
103 changes: 101 additions & 2 deletions src/main/java/org/datadog/jmxfetch/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.datadog.jmxfetch.tasks.TaskProcessException;
import org.datadog.jmxfetch.tasks.TaskProcessor;
import org.datadog.jmxfetch.tasks.TaskStatusHandler;
import org.datadog.jmxfetch.util.AppTelemetry;
import org.datadog.jmxfetch.util.ByteArraySearcher;
import org.datadog.jmxfetch.util.CustomLogger;
import org.datadog.jmxfetch.util.FileHelper;
Expand All @@ -23,6 +24,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -33,8 +35,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -48,9 +50,19 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;

import javax.security.auth.login.FailedLoginException;



@SuppressWarnings("unchecked")
@Slf4j
public class App {
Expand Down Expand Up @@ -86,6 +98,8 @@ public class App {
private final AppConfig appConfig;
private HttpClient client;

private AppTelemetry appTelemetry;

/**
* Main method for backwards compatibility in case someone is launching process by class
* instead of by jar IE: java -classpath jmxfetch.jar org.datadog.jmxfetch.App
Expand Down Expand Up @@ -118,8 +132,66 @@ public App(final AppConfig appConfig) {
this.appConfig.getIpcHost(), this.appConfig.getIpcPort(), false);
}
this.configs = getConfigs(this.appConfig);

this.initTelemetryBean();
}

private ObjectName getAppTelemetryBeanName() {
ObjectName appTelemetryBeanName;

try {
appTelemetryBeanName = new ObjectName(
appConfig.getJmxfetchTelemetryDomain() + ":name=jmxfetch_app");
} catch (MalformedObjectNameException e) {
log.warn(
"Could not construct bean name for jmxfetch_telemetry_domain"
+ " '{}' and name 'jmxfetch_app'",
appConfig.getJmxfetchTelemetryDomain());
return null;
}

return appTelemetryBeanName;
}

private void initTelemetryBean() {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
AppTelemetry bean = new AppTelemetry();
ObjectName appTelemetryBeanName = getAppTelemetryBeanName();
if (appTelemetryBeanName == null) {
return;
}

try {
mbs.registerMBean(bean, appTelemetryBeanName);
log.debug("Succesfully registered app telemetry bean");
} catch (InstanceAlreadyExistsException
| MBeanRegistrationException
| NotCompliantMBeanException e) {
log.warn("Could not register bean named '{}' for instance: ",
appTelemetryBeanName.toString(), e);
}

this.appTelemetry = bean;
return;
}

private void teardownTelemetry() {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName appTelemetryBeanName = getAppTelemetryBeanName();
if (appTelemetryBeanName == null) {
return;
}

try {
mbs.unregisterMBean(appTelemetryBeanName);
log.debug("Succesfully unregistered app telemetry bean");
} catch (MBeanRegistrationException | InstanceNotFoundException e) {
log.warn("Could not unregister bean named '{}' for instance: ",
appTelemetryBeanName.toString(), e);
}
}


/**
* Main entry point of JMXFetch that returns integer on exit instead of calling {@code
* System#exit}.
Expand Down Expand Up @@ -153,7 +225,7 @@ public int run() {
return 0;
}

log.info("JMX Fetch " + MetadataHelper.getVersion() + " has started");
log.info("JMX Fetch " + this.appConfig.getVersion() + " has started");

// set up the config status
this.appConfig.updateStatus();
Expand Down Expand Up @@ -448,6 +520,7 @@ void start() {
}

void stop() {
this.teardownTelemetry();
this.collectionProcessor.stop();
this.recoveryProcessor.stop();
}
Expand All @@ -467,6 +540,9 @@ public void doIteration() {
for (Instance instance : this.instances) {
getMetricsTasks.add(new MetricCollectionTask(instance));
}
if (this.appTelemetry != null) {
this.appTelemetry.setRunningInstanceCount(this.instances.size());
}

if (!this.collectionProcessor.ready()) {
log.warn(
Expand Down Expand Up @@ -1006,6 +1082,7 @@ private Map<String,Object> getTelemetryInstanceConfig() {
config.put("conf",conf);

List<String> tags = new ArrayList<String>();
tags.add("version:" + this.appConfig.getVersion());
config.put("tags", tags);

return config;
Expand Down Expand Up @@ -1104,6 +1181,10 @@ private <T> void processInstantiationStatus(
"Could not initialize instance: {}:", instance.getName(), e);
instance.cleanUpAsync();
this.brokenInstanceMap.put(instance.toString(), instance);
if (this.appTelemetry != null) {
this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size());
this.appTelemetry.incrementBrokenInstanceEventCount();
}
}
}
}
Expand All @@ -1124,6 +1205,11 @@ private <T> void processFixedStatus(
this.brokenInstanceMap.remove(instance.toString());
this.instances.add(instance);

if (this.appTelemetry != null) {
this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size());
this.appTelemetry.setRunningInstanceCount(this.instances.size());
}

} catch (Throwable e) {
// Not much to do here, instance didn't recover
} finally {
Expand Down Expand Up @@ -1235,13 +1321,22 @@ private <T> void processCollectionStatus(

this.brokenInstanceMap.put(instance.toString(), instance);
log.debug("Adding broken instance to list: " + instance.getName());
if (this.appTelemetry != null) {
this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size());
this.appTelemetry.incrementBrokenInstanceEventCount();
}

log.warn(instanceMessage, ee.getCause());
} catch (Throwable t) {
// Legit exception during task - eviction necessary
log.debug("Adding broken instance to list: " + instance.getName());
this.brokenInstanceMap.put(instance.toString(), instance);

if (this.appTelemetry != null) {
this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size());
this.appTelemetry.incrementBrokenInstanceEventCount();
}

instanceStatus = Status.STATUS_ERROR;
instanceMessage = task.getWarning() + ": " + t.toString();

Expand All @@ -1264,4 +1359,8 @@ private <T> void processCollectionStatus(
}
}
}

public AppTelemetry getAppTelemetryBean() {
return this.appTelemetry;
}
}
5 changes: 5 additions & 0 deletions src/main/java/org/datadog/jmxfetch/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.datadog.jmxfetch.reporter.Reporter;
import org.datadog.jmxfetch.reporter.ReporterFactory;
import org.datadog.jmxfetch.service.ServiceNameProvider;
import org.datadog.jmxfetch.util.MetadataHelper;
import org.datadog.jmxfetch.validator.LogLevelValidator;
import org.datadog.jmxfetch.validator.PositiveIntegerValidator;
import org.datadog.jmxfetch.validator.ReporterValidator;
Expand Down Expand Up @@ -513,4 +514,8 @@ public int getStatsdBufferSize() {
public int getSocketTimeout() {
return statsdSocketTimeout;
}

public String getVersion() {
return MetadataHelper.getVersion();
}
}
41 changes: 41 additions & 0 deletions src/main/java/org/datadog/jmxfetch/util/AppTelemetry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.datadog.jmxfetch.util;

import java.util.concurrent.atomic.AtomicInteger;

/** Jmxfetch telemetry JMX MBean. */
public class AppTelemetry implements AppTelemetryMBean {
private AtomicInteger runningInstanceCount;
private AtomicInteger brokenInstanceCount;
private AtomicInteger brokenInstanceEventCount;

/** Jmxfetch telemetry bean constructor. */
public AppTelemetry() {
runningInstanceCount = new AtomicInteger(0);
brokenInstanceCount = new AtomicInteger(0);
brokenInstanceEventCount = new AtomicInteger(0);
}

public int getRunningInstanceCount() {
return runningInstanceCount.get();
}

public int getBrokenInstanceCount() {
return brokenInstanceCount.get();
}

public int getBrokenInstanceEventCount() {
return brokenInstanceEventCount.get();
}

public void setRunningInstanceCount(int count) {
this.runningInstanceCount.set(count);
}

public void setBrokenInstanceCount(int count) {
brokenInstanceCount.set(count);
}

public void incrementBrokenInstanceEventCount() {
brokenInstanceEventCount.incrementAndGet();
}
}
11 changes: 11 additions & 0 deletions src/main/java/org/datadog/jmxfetch/util/AppTelemetryMBean.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.datadog.jmxfetch.util;

public interface AppTelemetryMBean {

int getRunningInstanceCount();

int getBrokenInstanceCount();

int getBrokenInstanceEventCount();

}
33 changes: 33 additions & 0 deletions src/test/java/org/datadog/jmxfetch/TestApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;

import org.datadog.jmxfetch.util.AppTelemetry;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -43,6 +45,9 @@ public void testBeanRegexTags() throws Exception {
"nonRegexTag:value");

assertMetric("this.is.100", tags, 10);

AppTelemetry tlm = app.getAppTelemetryBean();
assertEquals(1, tlm.getRunningInstanceCount());
}

/** Tag metrics with MBeans parameters. */
Expand Down Expand Up @@ -72,6 +77,9 @@ public void testBeanTags() throws Exception {
"component");

assertMetric("this.is.100", tags, 7);

AppTelemetry tlm = app.getAppTelemetryBean();
assertEquals(1, tlm.getRunningInstanceCount());
}

/** Tag metrics with MBeans parameters with normalize_bean_param_tags option enabled. */
Expand Down Expand Up @@ -1160,4 +1168,29 @@ public void testNestedCompositeData() throws Exception {

assertCoverage();
}

@Test
public void testTelemetryTags() throws Exception {
SimpleTestJavaApp testApp = new SimpleTestJavaApp();
registerMBean(testApp, "org.datadog.jmxfetch.test:type=SimpleTestJavaApp");

when(appConfig.isTargetDirectInstances()).thenReturn(true);
when(appConfig.getJmxfetchTelemetry()).thenReturn(true);
when(appConfig.getVersion()).thenReturn("MOCKED_VERSION");

initApplication("jmx_telemetry_tags.yaml");

run();

List<String> telemetryTags = Arrays.asList(
"instance:jmxfetch_telemetry_instance",
"name:jmxfetch_app",
"jmx_domain:jmx_fetch",
"version:MOCKED_VERSION");

assertMetric("jmx.jmx_fetch.running_instance_count", 2, telemetryTags, -1);

// not asserting coverage, this is intended to test the tags present on telemetry
// not the set metrics collected
}
}
9 changes: 5 additions & 4 deletions src/test/java/org/datadog/jmxfetch/TestCommon.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ public void unregisterMBean() throws MBeanRegistrationException, InstanceNotFoun
}

/**
* Clear instances and their instance telemetry bean after execution of every test.
* Tear down instances and application.
*/
@After
public void clearInstances() {
public void teardown() {
if (app != null) {
app.clearAllInstances();
app.stop();
}
}

Expand Down Expand Up @@ -176,7 +177,7 @@ protected void initApplication(String yamlFileName) throws FileNotFoundException
}

/*
* Init JMXFetch with the given YAML configuration template
* Init JMXFetch with the given YAML configuration template
* The configuration can be specified as a yaml literal with each arg
* representing one line of the Yaml file
* Does not support any SD/AD features.
Expand All @@ -188,7 +189,7 @@ protected void initApplicationWithYamlLines(String... yamlLines)

String confdDirectory = tempFile.getParent().toString();
String yamlFileName = tempFile.getFileName().toString();

List<String> params = new ArrayList<String>();
params.add("--reporter");
params.add("console");
Expand Down
14 changes: 14 additions & 0 deletions src/test/resources/jmx_telemetry_tags.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
init_config:

instances:
- jvm_direct: true
refresh_beans: 4
collect_default_jvm_metrics: false
name: jmx_test_instance
tags:
- "env:stage"
- "newTag:test"
conf:
- include:
domain: org.datadog.jmxfetch.test

0 comments on commit ae28d64

Please sign in to comment.