Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMLII-1360: Creates telemetry bean for Application-level metrics #505

Merged
merged 7 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion src/main/java/org/datadog/jmxfetch/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.datadog.jmxfetch.tasks.TaskProcessException;
import org.datadog.jmxfetch.tasks.TaskProcessor;
import org.datadog.jmxfetch.tasks.TaskStatusHandler;
import org.datadog.jmxfetch.util.AppTelemetry;
import org.datadog.jmxfetch.util.ByteArraySearcher;
import org.datadog.jmxfetch.util.CustomLogger;
import org.datadog.jmxfetch.util.FileHelper;
Expand All @@ -23,6 +24,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -33,8 +35,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -48,9 +50,18 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;

import javax.security.auth.login.FailedLoginException;



@SuppressWarnings("unchecked")
@Slf4j
public class App {
Expand Down Expand Up @@ -86,6 +97,8 @@ public class App {
private final AppConfig appConfig;
private HttpClient client;

private AppTelemetry appTelemetry;

/**
* Main method for backwards compatibility in case someone is launching process by class
* instead of by jar IE: java -classpath jmxfetch.jar org.datadog.jmxfetch.App
Expand Down Expand Up @@ -118,8 +131,42 @@ public App(final AppConfig appConfig) {
this.appConfig.getIpcHost(), this.appConfig.getIpcPort(), false);
}
this.configs = getConfigs(this.appConfig);

this.initTelemetryBean();
}

private void initTelemetryBean() {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
AppTelemetry bean = new AppTelemetry();
ObjectName appTelemetryBeanName;

try {
appTelemetryBeanName = new ObjectName(
appConfig.getJmxfetchTelemetryDomain() + ":name=jmxfetch_app"
+ ",version=" + MetadataHelper.getVersion());
} catch (MalformedObjectNameException e) {
log.warn(
"Could not construct bean name for jmxfetch_telemetry_domain"
+ " '{}' and name 'jmxfetch_app'",
appConfig.getJmxfetchTelemetryDomain());
return;
}

try {
mbs.registerMBean(bean, appTelemetryBeanName);
log.debug("Succesfully registered app telemetry bean");
} catch (InstanceAlreadyExistsException
| MBeanRegistrationException
| NotCompliantMBeanException e) {
log.warn("Could not register bean named '{}' for instance: ",
appTelemetryBeanName.toString(), e);
}

this.appTelemetry = bean;
return;
}


/**
* Main entry point of JMXFetch that returns integer on exit instead of calling {@code
* System#exit}.
Expand Down Expand Up @@ -467,6 +514,9 @@ public void doIteration() {
for (Instance instance : this.instances) {
getMetricsTasks.add(new MetricCollectionTask(instance));
}
if (this.appTelemetry != null) {
this.appTelemetry.setRunningInstanceCount(this.instances.size());
}

if (!this.collectionProcessor.ready()) {
log.warn(
Expand Down Expand Up @@ -1104,6 +1154,10 @@ private <T> void processInstantiationStatus(
"Could not initialize instance: {}:", instance.getName(), e);
instance.cleanUpAsync();
this.brokenInstanceMap.put(instance.toString(), instance);
if (this.appTelemetry != null) {
this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size());
this.appTelemetry.incrementBrokenInstanceEventCount();
}
}
}
}
Expand All @@ -1124,6 +1178,11 @@ private <T> void processFixedStatus(
this.brokenInstanceMap.remove(instance.toString());
this.instances.add(instance);

if (this.appTelemetry != null) {
this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size());
this.appTelemetry.setRunningInstanceCount(this.instances.size());
}

} catch (Throwable e) {
// Not much to do here, instance didn't recover
} finally {
Expand Down Expand Up @@ -1235,13 +1294,22 @@ private <T> void processCollectionStatus(

this.brokenInstanceMap.put(instance.toString(), instance);
log.debug("Adding broken instance to list: " + instance.getName());
if (this.appTelemetry != null) {
this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size());
this.appTelemetry.incrementBrokenInstanceEventCount();
}

log.warn(instanceMessage, ee.getCause());
} catch (Throwable t) {
// Legit exception during task - eviction necessary
log.debug("Adding broken instance to list: " + instance.getName());
this.brokenInstanceMap.put(instance.toString(), instance);

if (this.appTelemetry != null) {
this.appTelemetry.setBrokenInstanceCount(this.brokenInstanceMap.size());
this.appTelemetry.incrementBrokenInstanceEventCount();
}

instanceStatus = Status.STATUS_ERROR;
instanceMessage = task.getWarning() + ": " + t.toString();

Expand All @@ -1264,4 +1332,8 @@ private <T> void processCollectionStatus(
}
}
}

public AppTelemetry getAppTelemetryBean() {
return this.appTelemetry;
}
}
41 changes: 41 additions & 0 deletions src/main/java/org/datadog/jmxfetch/util/AppTelemetry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.datadog.jmxfetch.util;

import java.util.concurrent.atomic.AtomicInteger;

/** Jmxfetch telemetry JMX MBean. */
public class AppTelemetry implements AppTelemetryMBean {
private AtomicInteger runningInstanceCount;
private AtomicInteger brokenInstanceCount;
private AtomicInteger brokenInstanceEventCount;

/** Jmxfetch telemetry bean constructor. */
public AppTelemetry() {
runningInstanceCount = new AtomicInteger(0);
brokenInstanceCount = new AtomicInteger(0);
brokenInstanceEventCount = new AtomicInteger(0);
}

public int getRunningInstanceCount() {
return runningInstanceCount.get();
}

public int getBrokenInstanceCount() {
return brokenInstanceCount.get();
}

public int getBrokenInstanceEventCount() {
return brokenInstanceEventCount.get();
}

public void setRunningInstanceCount(int count) {
this.runningInstanceCount.set(count);
}

public void setBrokenInstanceCount(int count) {
brokenInstanceCount.set(count);
}

public void incrementBrokenInstanceEventCount() {
brokenInstanceEventCount.incrementAndGet();
}
}
11 changes: 11 additions & 0 deletions src/main/java/org/datadog/jmxfetch/util/AppTelemetryMBean.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.datadog.jmxfetch.util;

public interface AppTelemetryMBean {

int getRunningInstanceCount();

int getBrokenInstanceCount();

int getBrokenInstanceEventCount();

}
8 changes: 8 additions & 0 deletions src/test/java/org/datadog/jmxfetch/TestApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;

import org.datadog.jmxfetch.util.AppTelemetry;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -43,6 +45,9 @@ public void testBeanRegexTags() throws Exception {
"nonRegexTag:value");

assertMetric("this.is.100", tags, 10);

AppTelemetry tlm = app.getAppTelemetryBean();
assertEquals(1, tlm.getRunningInstanceCount());
}

/** Tag metrics with MBeans parameters. */
Expand Down Expand Up @@ -72,6 +77,9 @@ public void testBeanTags() throws Exception {
"component");

assertMetric("this.is.100", tags, 7);

AppTelemetry tlm = app.getAppTelemetryBean();
assertEquals(1, tlm.getRunningInstanceCount());
}

/** Tag metrics with MBeans parameters with normalize_bean_param_tags option enabled. */
Expand Down
Loading