diff --git a/src/main/java/org/datadog/jmxfetch/App.java b/src/main/java/org/datadog/jmxfetch/App.java index 8c78e8ac..12a621d9 100644 --- a/src/main/java/org/datadog/jmxfetch/App.java +++ b/src/main/java/org/datadog/jmxfetch/App.java @@ -520,6 +520,9 @@ void start() { } void stop() { + for (Instance in : this.getInstances()) { + in.cleanUp(); + } this.teardownTelemetry(); this.collectionProcessor.stop(); this.recoveryProcessor.stop(); diff --git a/src/main/java/org/datadog/jmxfetch/AppConfig.java b/src/main/java/org/datadog/jmxfetch/AppConfig.java index a01ff561..42b5e9cb 100644 --- a/src/main/java/org/datadog/jmxfetch/AppConfig.java +++ b/src/main/java/org/datadog/jmxfetch/AppConfig.java @@ -230,6 +230,15 @@ public class AppConfig { required = false) private String statusLocation; + @Parameter( + names = {"--enable_bean_subscription", "-B"}, + description = + "EXPERIMENTAL: If true, JMX beans will be discovered via subscription rather" + + " than poll-based. Obsoletes 'initialBeanRefreshPeriod' and" + + " 'beanRefreshPeriod'.", + required = false) + private boolean enableBeanSubscription; + @Parameter( names = {"--exit_file_location", "-e"}, description = @@ -518,4 +527,12 @@ public int getSocketTimeout() { public String getVersion() { return MetadataHelper.getVersion(); } + + public boolean getEnableBeanSubscription() { + // As noted in `pkg/jmxfetch/jmxfetch.go` in the agent, using an env var + // for enablement is a temporary measure until the stable JMXFetch is upgraded + // to a version supporting this CLI arg. + boolean isEnvEnabled = System.getenv("DD_JMX_BEAN_SUBSCRIPTION_ENABLED") != null; + return isEnvEnabled || enableBeanSubscription; + } } diff --git a/src/main/java/org/datadog/jmxfetch/BeanNotificationListener.java b/src/main/java/org/datadog/jmxfetch/BeanNotificationListener.java new file mode 100644 index 00000000..026ca39f --- /dev/null +++ b/src/main/java/org/datadog/jmxfetch/BeanNotificationListener.java @@ -0,0 +1,58 @@ +package org.datadog.jmxfetch; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.management.MBeanServerNotification; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.ObjectName; + + +@Slf4j +class BeanNotificationListener implements NotificationListener { + private final BlockingQueue queue; + private final BeanTracker beanTracker; + + public BeanNotificationListener(final BeanTracker bt) { + this.beanTracker = bt; + this.queue = new LinkedBlockingQueue<>(); + new Thread(new Runnable() { + @Override + public void run() { + while (true) { + try { + MBeanServerNotification mbs = queue.take(); + processMBeanServerNotification(mbs); + } catch (InterruptedException e) { + // ignore + } + } + } + }).start(); + } + + @Override + public void handleNotification(Notification notification, Object handback) { + if (!(notification instanceof MBeanServerNotification)) { + return; + } + MBeanServerNotification mbs = (MBeanServerNotification) notification; + queue.offer(mbs); + } + + private void processMBeanServerNotification(MBeanServerNotification notif) { + log.debug("MBeanNotification: ts {} seqNum: {} msg: '{}'", + notif.getTimeStamp(), + notif.getSequenceNumber(), + notif.getMessage()); + ObjectName beanName = notif.getMBeanName(); + if (notif.getType().equals(MBeanServerNotification.REGISTRATION_NOTIFICATION)) { + beanTracker.trackBean(beanName); + } else if (notif.getType().equals(MBeanServerNotification.UNREGISTRATION_NOTIFICATION)) { + beanTracker.untrackBean(beanName); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/datadog/jmxfetch/BeanTracker.java b/src/main/java/org/datadog/jmxfetch/BeanTracker.java new file mode 100644 index 00000000..81ff9b1c --- /dev/null +++ b/src/main/java/org/datadog/jmxfetch/BeanTracker.java @@ -0,0 +1,9 @@ +package org.datadog.jmxfetch; + +import javax.management.ObjectName; + +public interface BeanTracker { + public void trackBean(ObjectName beanName); + + public void untrackBean(ObjectName beanName); +} diff --git a/src/main/java/org/datadog/jmxfetch/Connection.java b/src/main/java/org/datadog/jmxfetch/Connection.java index 803cd817..9d7484c6 100644 --- a/src/main/java/org/datadog/jmxfetch/Connection.java +++ b/src/main/java/org/datadog/jmxfetch/Connection.java @@ -3,27 +3,25 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.SocketTimeoutException; -import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import javax.management.Attribute; import javax.management.AttributeNotFoundException; import javax.management.InstanceNotFoundException; import javax.management.IntrospectionException; -import javax.management.MBeanAttributeInfo; +import javax.management.ListenerNotFoundException; import javax.management.MBeanException; import javax.management.MBeanInfo; import javax.management.MBeanServerConnection; +import javax.management.MBeanServerDelegate; +import javax.management.MalformedObjectNameException; +import javax.management.Notification; +import javax.management.NotificationListener; import javax.management.ObjectName; import javax.management.ReflectionException; +import javax.management.relation.MBeanServerNotificationFilter; +import javax.management.remote.JMXConnectionNotification; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; @@ -35,6 +33,44 @@ public class Connection { protected MBeanServerConnection mbs; protected Map env; protected JMXServiceURL address; + private NotificationListener connectionNotificationListener; + private boolean seenConnectionIssues; + + private static class ConnectionNotificationListener implements NotificationListener { + public void handleNotification(Notification notification, Object handback) { + if (!(notification instanceof JMXConnectionNotification)) { + return; + } + if (!(handback instanceof Connection)) { + return; + } + + JMXConnectionNotification connNotif = (JMXConnectionNotification) notification; + Connection conn = (Connection) handback; + + if (connNotif.getType() == JMXConnectionNotification.CLOSED + || connNotif.getType() == JMXConnectionNotification.FAILED + || connNotif.getType() == JMXConnectionNotification.NOTIFS_LOST) { + log.warn("Marking connection issues due to {} - {}", + connNotif.getType(), connNotif.getMessage()); + conn.seenConnectionIssues = true; + } + log.debug("Received connection notification: {} Message: {}", + connNotif.getType(), connNotif.getMessage()); + } + } + + /** Subscribes for bean registration/deregistration events under the specified bean scopes. */ + public void subscribeToBeanScopes(List beanScopes, BeanTracker bl) + throws MalformedObjectNameException, IOException, InstanceNotFoundException { + BeanNotificationListener listener = new BeanNotificationListener(bl); + for (String scope : beanScopes) { + ObjectName name = new ObjectName(scope); + MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter(); + filter.enableObjectName(name); + } + mbs.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, listener, null, null); + } /** Gets attributes for matching bean name. */ public MBeanInfo getMBeanInfo(ObjectName beanName) @@ -56,6 +92,10 @@ protected void createConnection() throws IOException { log.info("Connecting to: " + this.address); connector = JMXConnectorFactory.connect(this.address, this.env); mbs = connector.getMBeanServerConnection(); + + this.connectionNotificationListener = new ConnectionNotificationListener(); + connector.addConnectionNotificationListener( + this.connectionNotificationListener, null, this); } /** Gets attribute for matching bean and attribute name. */ @@ -73,13 +113,21 @@ public Object getAttribute(ObjectName objectName, String attributeName) public void closeConnector() { if (connector != null) { try { + this.connector.removeConnectionNotificationListener( + this.connectionNotificationListener); connector.close(); - } catch (IOException e) { + connector = null; + } catch (IOException | ListenerNotFoundException e) { // ignore } } } + /** True if connection has been notified of failure/lost notifications. */ + public boolean hasSeenConnectionIssues() { + return this.seenConnectionIssues; + } + /** Returns a boolean describing if the connection is still alive. */ public boolean isAlive() { if (connector == null) { diff --git a/src/main/java/org/datadog/jmxfetch/Instance.java b/src/main/java/org/datadog/jmxfetch/Instance.java index 9da8de75..01ae63f4 100644 --- a/src/main/java/org/datadog/jmxfetch/Instance.java +++ b/src/main/java/org/datadog/jmxfetch/Instance.java @@ -8,7 +8,6 @@ import org.yaml.snakeyaml.Yaml; - import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -36,7 +35,7 @@ import javax.security.auth.login.FailedLoginException; @Slf4j -public class Instance { +public class Instance implements BeanTracker { private static final int MAX_RETURNED_METRICS = 350; private static final int DEFAULT_REFRESH_BEANS_PERIOD = 600; public static final String PROCESS_NAME_REGEX = "process_name_regex"; @@ -55,6 +54,7 @@ public Yaml initialValue() { private List beanScopes; private List configurationList = new ArrayList(); private List matchingAttributes; + private int metricCountForMatchingAttributes; private HashSet failingAttributes; private Integer initialRefreshBeansPeriod; private Integer refreshBeansPeriod; @@ -75,6 +75,8 @@ public Yaml initialValue() { private AppConfig appConfig; private Boolean cassandraAliasing; private boolean emptyDefaultHostname; + private boolean enableBeanSubscription; + private boolean beanSubscriptionActive; private InstanceTelemetry instanceTelemetryBean; private ObjectName instanceTelemetryBeanName; private MBeanServer mbs; @@ -110,6 +112,7 @@ public Instance( this.tags = getTagsMap(instanceMap.get("tags"), appConfig); this.checkName = checkName; this.matchingAttributes = new ArrayList(); + this.metricCountForMatchingAttributes = 0; this.failingAttributes = new HashSet(); if (appConfig.getRefreshBeansPeriod() == null) { this.refreshBeansPeriod = (Integer) instanceMap.get("refresh_beans"); @@ -240,6 +243,10 @@ public Instance( log.info("collect_default_jvm_metrics is false - not collecting default JVM metrics"); } + this.beans = new HashSet<>(); + this.enableBeanSubscription = appConfig.getEnableBeanSubscription(); + log.info("JMXFetch Subscription mode enabled={}", this.enableBeanSubscription); + this.beanSubscriptionActive = false; instanceTelemetryBean = createInstanceTelemetryBean(); } @@ -432,13 +439,20 @@ public Connection getConnection( } /** Initializes the instance. May force a new connection.. */ - public void init(boolean forceNewConnection) + public synchronized void init(boolean forceNewConnection) throws IOException, FailedLoginException, SecurityException { log.info("Trying to connect to JMX Server at " + this.toString()); connection = getConnection(instanceMap, forceNewConnection); + if (this.enableBeanSubscription) { + log.info("Subscribing for bean notifications"); + this.subscribeToBeans(); + } else { + log.info("Bean subscription not enabled."); + } log.info( "Trying to collect bean list for the first time for JMX Server at {}", this); - this.refreshBeansList(); + + this.refreshBeansList(true); this.initialRefreshTime = this.lastRefreshTime; log.info("Connected to JMX Server at {} with {} beans", this, this.beans.size()); this.getMatchingAttributes(); @@ -466,22 +480,26 @@ public String toString() { } /** Returns a map of metrics collected. */ - public List getMetrics() throws IOException { - + public synchronized List getMetrics() throws IOException { + if (!this.isInstanceHealthy()) { + this.connection.closeConnector(); + throw new IOException("Instance is in a bad state", null); + } // In case of ephemeral beans, we can force to refresh the bean list x seconds // post initialization and every x seconds thereafter. // To enable this, a "refresh_beans_initial" and/or "refresh_beans" parameters must be // specified in the yaml/json config + Integer period = (this.initialRefreshTime == this.lastRefreshTime) ? this.initialRefreshBeansPeriod : this.refreshBeansPeriod; + List metrics = new ArrayList(); if (isPeriodDue(this.lastRefreshTime, period)) { log.info("Refreshing bean list for " + this.getCheckName()); - this.refreshBeansList(); + this.refreshBeansList(false); this.getMatchingAttributes(); } - List metrics = new ArrayList(); Iterator it = matchingAttributes.iterator(); // increment the lastCollectionTime @@ -503,11 +521,19 @@ public List getMetrics() throws IOException { + jmxAttr + " twice in a row. Removing it from the attribute list"); it.remove(); + this.failingAttributes.remove(jmxAttr); + this.metricCountForMatchingAttributes -= jmxAttr.getLastMetricsCount(); } else { this.failingAttributes.add(jmxAttr); } } } + + // TODO put this in telemetry + long duration = System.currentTimeMillis() - this.lastCollectionTime; + log.info("Collection finished in {}ms. MatchingAttributes={} CollectedMetrics={}", + duration, matchingAttributes.size(), metrics.size()); + if (instanceTelemetryBean != null) { instanceTelemetryBean.setBeansFetched(beans.size()); instanceTelemetryBean.setTopLevelAttributeCount(matchingAttributes.size()); @@ -516,8 +542,8 @@ public List getMetrics() throws IOException { + " With beans fetched = " + instanceTelemetryBean.getBeansFetched() + " top attributes = " + instanceTelemetryBean.getTopLevelAttributeCount() + " metrics = " + instanceTelemetryBean.getMetricCount() - + " wildcard domain query count = " - + instanceTelemetryBean.getWildcardDomainQueryCount() + + " wildcard domain query count = " + + instanceTelemetryBean.getWildcardDomainQueryCount() + " bean match ratio = " + instanceTelemetryBean.getBeanMatchRatio()); } return metrics; @@ -525,6 +551,9 @@ public List getMetrics() throws IOException { /** Returns whather or not the given period has elapsed since reference time. */ public boolean isPeriodDue(long refTime, Integer refPeriod) { + if (this.beanSubscriptionActive) { + return false; + } if ((System.currentTimeMillis() - refTime) / 1000 < refPeriod) { return false; } else { @@ -541,15 +570,155 @@ public boolean timeToCollect() { } } - private void getMatchingAttributes() throws IOException { - limitReached = false; + private synchronized int addMatchingAttributesForBean( + ObjectName beanName, + MBeanInfo info, + boolean metricReachedPreviouslyDisplayed + ) throws IOException { Reporter reporter = appConfig.getReporter(); String action = appConfig.getAction(); - boolean metricReachedDisplayed = false; + int matchedAttributesForBean = 0; + for (MBeanAttributeInfo attributeInfo : info.getAttributes()) { + if (this.metricCountForMatchingAttributes >= maxReturnedMetrics) { + this.limitReached = true; + if (action.equals(AppConfig.ACTION_COLLECT)) { + log.warn("Maximum number of metrics reached."); + break; + } else if (!metricReachedPreviouslyDisplayed + && !action.equals(AppConfig.ACTION_LIST_COLLECTED) + && !action.equals(AppConfig.ACTION_LIST_NOT_MATCHING)) { + reporter.displayMetricReached(); + } + } + JmxAttribute jmxAttribute; + String attributeType = attributeInfo.getType(); + if (JmxSimpleAttribute.matchAttributeType(attributeType)) { + log.debug( + ATTRIBUTE + + beanName + + " : " + + attributeInfo + + " has attributeInfo simple type"); + jmxAttribute = + new JmxSimpleAttribute( + attributeInfo, + beanName, + info.getClassName(), + instanceName, + checkName, + connection, + serviceNameProvider, + tags, + cassandraAliasing, + emptyDefaultHostname, + normalizeBeanParamTags); + } else if (JmxComplexAttribute.matchAttributeType(attributeType)) { + log.debug( + ATTRIBUTE + + beanName + + " : " + + attributeInfo + + " has attributeInfo composite type"); + jmxAttribute = + new JmxComplexAttribute( + attributeInfo, + beanName, + info.getClassName(), + instanceName, + checkName, + connection, + serviceNameProvider, + tags, + emptyDefaultHostname, + normalizeBeanParamTags); + } else if (JmxTabularAttribute.matchAttributeType(attributeType)) { + log.debug( + ATTRIBUTE + + beanName + + " : " + + attributeInfo + + " has attributeInfo tabular type"); + jmxAttribute = + new JmxTabularAttribute( + attributeInfo, + beanName, + info.getClassName(), + instanceName, + checkName, + connection, + serviceNameProvider, + tags, + emptyDefaultHostname, + normalizeBeanParamTags); + } else { + try { + log.debug( + ATTRIBUTE + + beanName + + " : " + + attributeInfo + + " has an unsupported type: " + + attributeType); + } catch (NullPointerException e) { + log.warn("Caught unexpected NullPointerException"); + } + continue; + } + + // For each attribute we try it with each configuration to see if there is one that + // matches + // If so, we store the attribute so metrics will be collected from it. Otherwise we + // discard it. + for (Configuration conf : configurationList) { + try { + if (jmxAttribute.match(conf)) { + jmxAttribute.setMatchingConf(conf); + this.metricCountForMatchingAttributes += jmxAttribute.getMetricsCount(); + matchedAttributesForBean++; + log.debug("Added attribute {} from bean {}.", jmxAttribute, beanName); + this.matchingAttributes.add(jmxAttribute); + if (action.equals(AppConfig.ACTION_LIST_EVERYTHING) + || action.equals(AppConfig.ACTION_LIST_MATCHING) + || action.equals(AppConfig.ACTION_LIST_COLLECTED) + && !this.limitReached + || action.equals(AppConfig.ACTION_LIST_LIMITED) + && this.limitReached) { + reporter.displayMatchingAttributeName( + jmxAttribute, + this.metricCountForMatchingAttributes, + maxReturnedMetrics); + } + break; + } + } catch (Exception e) { + log.error( + "Error while trying to match attributeInfo configuration " + + "with the Attribute: " + + beanName + + " : " + + attributeInfo, + e); + } + } + + if (jmxAttribute.getMatchingConf() == null + && (action.equals(AppConfig.ACTION_LIST_EVERYTHING) + || action.equals(AppConfig.ACTION_LIST_NOT_MATCHING))) { + reporter.displayNonMatchingAttributeName(jmxAttribute); + } + } + + return matchedAttributesForBean; + } + + private void getMatchingAttributes() throws IOException { this.matchingAttributes.clear(); this.failingAttributes.clear(); - int metricsCount = 0; + this.limitReached = false; + + Reporter reporter = appConfig.getReporter(); + String action = appConfig.getAction(); int beansWithAttributeMatch = 0; @@ -559,169 +728,106 @@ private void getMatchingAttributes() throws IOException { for (ObjectName beanName : this.beans) { boolean attributeMatched = false; - if (limitReached) { + if (this.limitReached) { log.debug("Limit reached"); if (action.equals(AppConfig.ACTION_COLLECT)) { break; } } - String className; - MBeanAttributeInfo[] attributeInfos; + MBeanInfo info; try { log.debug("Getting bean info for bean: {}", beanName); - MBeanInfo info = connection.getMBeanInfo(beanName); - - log.debug("Getting class name for bean: {}", beanName); - className = info.getClassName(); - log.debug("Getting attributes for bean: {}", beanName); - attributeInfos = info.getAttributes(); + info = connection.getMBeanInfo(beanName); } catch (IOException e) { - // we should not continue throw e; } catch (Exception e) { log.warn("Cannot get attributes or class name for bean {}: ", beanName, e); continue; } - for (MBeanAttributeInfo attributeInfo : attributeInfos) { - if (metricsCount >= maxReturnedMetrics) { - limitReached = true; - if (action.equals(AppConfig.ACTION_COLLECT)) { - log.warn("Maximum number of metrics reached."); - break; - } else if (!metricReachedDisplayed - && !action.equals(AppConfig.ACTION_LIST_COLLECTED) - && !action.equals(AppConfig.ACTION_LIST_NOT_MATCHING)) { - reporter.displayMetricReached(); - metricReachedDisplayed = true; - } - } - JmxAttribute jmxAttribute; - String attributeType = attributeInfo.getType(); + int numMatchedAttributes = addMatchingAttributesForBean(beanName, info, limitReached); + if (numMatchedAttributes > 0) { + beansWithAttributeMatch++; + } + } - if (JmxSimpleAttribute.matchAttributeType(attributeType)) { - log.debug( - ATTRIBUTE - + beanName - + " : " - + attributeInfo - + " has attributeInfo simple type"); - jmxAttribute = - new JmxSimpleAttribute( - attributeInfo, - beanName, - className, - instanceName, - checkName, - connection, - serviceNameProvider, - tags, - cassandraAliasing, - emptyDefaultHostname, - normalizeBeanParamTags); - } else if (JmxComplexAttribute.matchAttributeType(attributeType)) { - log.debug( - ATTRIBUTE - + beanName - + " : " - + attributeInfo - + " has attributeInfo composite type"); - jmxAttribute = - new JmxComplexAttribute( - attributeInfo, - beanName, - className, - instanceName, - checkName, - connection, - serviceNameProvider, - tags, - emptyDefaultHostname, - normalizeBeanParamTags); - } else if (JmxTabularAttribute.matchAttributeType(attributeType)) { - log.debug( - ATTRIBUTE - + beanName - + " : " - + attributeInfo - + " has attributeInfo tabular type"); - jmxAttribute = - new JmxTabularAttribute( - attributeInfo, - beanName, - className, - instanceName, - checkName, - connection, - serviceNameProvider, - tags, - emptyDefaultHostname, - normalizeBeanParamTags); - } else { - try { - log.debug( - ATTRIBUTE - + beanName - + " : " - + attributeInfo - + " has an unsupported type: " - + attributeType); - } catch (NullPointerException e) { - log.warn("Caught unexpected NullPointerException"); - } - continue; - } + if (instanceTelemetryBean != null && beans.size() > 0) { + instanceTelemetryBean.setBeanMatchRatio(beansWithAttributeMatch / beans.size()); + } - // For each attribute we try it with each configuration to see if there is one that - // matches - // If so, we store the attribute so metrics will be collected from it. Otherwise we - // discard it. - for (Configuration conf : configurationList) { - try { - if (jmxAttribute.match(conf)) { - jmxAttribute.setMatchingConf(conf); - metricsCount += jmxAttribute.getMetricsCount(); - this.matchingAttributes.add(jmxAttribute); - - if (action.equals(AppConfig.ACTION_LIST_EVERYTHING) - || action.equals(AppConfig.ACTION_LIST_MATCHING) - || action.equals(AppConfig.ACTION_LIST_COLLECTED) - && !limitReached - || action.equals(AppConfig.ACTION_LIST_LIMITED) - && limitReached) { - reporter.displayMatchingAttributeName( - jmxAttribute, metricsCount, maxReturnedMetrics); - } - break; - } - } catch (Exception e) { - log.error( - "Error while trying to match attributeInfo configuration " - + "with the Attribute: " - + beanName - + " : " - + attributeInfo, - e); - } - } - if (jmxAttribute.getMatchingConf() == null - && (action.equals(AppConfig.ACTION_LIST_EVERYTHING) - || action.equals(AppConfig.ACTION_LIST_NOT_MATCHING))) { - reporter.displayNonMatchingAttributeName(jmxAttribute); - } - if (jmxAttribute.getMatchingConf() != null) { - attributeMatched = true; - } - } - if (attributeMatched) { - beansWithAttributeMatch += 1; + log.info("Found {} matching attributes with {} metrics total", + matchingAttributes.size(), + this.metricCountForMatchingAttributes); + } + + /** Adds any matching attributes from the specified bean. */ + public synchronized void trackBean(ObjectName beanName) { + log.debug("Bean registered event. {}", beanName); + String className; + MBeanAttributeInfo[] attributeInfos; + int matchedAttributesForBean = 0; + try { + log.debug("Getting bean info for bean: {}", beanName); + MBeanInfo info = connection.getMBeanInfo(beanName); + + matchedAttributesForBean = this.addMatchingAttributesForBean(beanName, info, false); + this.beans.add(beanName); + } catch (IOException e) { + // Nothing to do, connection issue + log.warn("Could not connect to get bean attributes or class name: " + e.getMessage()); + } catch (Exception e) { + log.warn("Cannot get bean attributes or class name: " + e.getMessage(), e); + } + log.debug("Bean registration processed. '{}'. Found {} matching attributes.", + beanName, matchedAttributesForBean); + + if (instanceTelemetryBean != null) { + int beanRegisterationsHandled = instanceTelemetryBean.getBeanRegistrationsHandled(); + instanceTelemetryBean.setBeanRegistrationsHandled(beanRegisterationsHandled + 1); + } + } + + /** Removes any matching attributes from the specified bean. */ + public synchronized void untrackBean(ObjectName beanName) { + int removedMetrics = 0; + int removedAttributes = 0; + for (Iterator it = this.matchingAttributes.iterator(); it.hasNext();) { + JmxAttribute current = it.next(); + if (current.getBeanName().compareTo(beanName) == 0) { + it.remove(); + // `getLastMetricsCount` used here instead of `getMetricsCount` because + // the bean no longer exists by the time we get this message. + // `getMetricsCount` does a live query and therefore will fail + removedMetrics += current.getLastMetricsCount(); + removedAttributes++; } } + + log.debug("Bean unregistered, removed {} attributes ({} metrics) matching bean {}", + removedAttributes, + removedMetrics, + beanName); + this.metricCountForMatchingAttributes -= removedMetrics; + if (instanceTelemetryBean != null) { - instanceTelemetryBean.setBeanMatchRatio((double) - beansWithAttributeMatch / beans.size()); + int beanUnRegistrationsHandled = instanceTelemetryBean.getBeanUnregistrationsHandled(); + instanceTelemetryBean.setBeanUnregistrationsHandled(beanUnRegistrationsHandled + 1); + } + } + + private void subscribeToBeans() { + List beanScopes = this.getBeansScopes(); + + try { + connection.subscribeToBeanScopes(beanScopes, this); + this.beanSubscriptionActive = true; + log.info("Subscribed to {} bean scopes successfully!", beanScopes.size()); + } catch (MalformedObjectNameException | InstanceNotFoundException | IOException e) { + log.warn("Bean subscription failed! Will rely on bean_refresh, ensure it is set " + + " to an appropriate value (currently {} seconds). Exception: ", + this.refreshBeansPeriod, e); + this.beanSubscriptionActive = false; } - log.info("Found {} matching attributes", matchingAttributes.size()); } /** Returns a list of strings listing the bean scopes. */ @@ -736,20 +842,22 @@ public List getBeansScopes() { * Query and refresh the instance's list of beans. Limit the query scope when possible on * certain actions, and fallback if necessary. */ - private void refreshBeansList() throws IOException { - this.beans = new HashSet(); + private synchronized void refreshBeansList(boolean isInitialQuery) throws IOException { + Set newBeans = new HashSet<>(); String action = appConfig.getAction(); boolean limitQueryScopes = !action.equals(AppConfig.ACTION_LIST_EVERYTHING) && !action.equals(AppConfig.ACTION_LIST_NOT_MATCHING); + boolean fullBeanQueryNeeded = true; if (limitQueryScopes) { try { List beanScopes = getBeansScopes(); for (String scope : beanScopes) { ObjectName name = new ObjectName(scope); - this.beans.addAll(connection.queryNames(name)); + newBeans.addAll(connection.queryNames(name)); } + fullBeanQueryNeeded = false; } catch (MalformedObjectNameException e) { log.error("Unable to create ObjectName", e); } catch (IOException e) { @@ -758,16 +866,57 @@ private void refreshBeansList() throws IOException { } } - if (this.beans.isEmpty()) { - this.beans = connection.queryNames(null); + if (fullBeanQueryNeeded) { + newBeans = connection.queryNames(null); if (instanceTelemetryBean != null) { int wildcardQueryCount = instanceTelemetryBean.getWildcardDomainQueryCount(); instanceTelemetryBean.setWildcardDomainQueryCount(wildcardQueryCount + 1); } } + if (this.beanSubscriptionActive && !fullBeanQueryNeeded && !isInitialQuery) { + // This code exists to validate the bean-subscription is working properly + // If every new bean and de-registered bean properly fires an event, then + // this.beans (current set that has been updated via subscription) should + // always equal the new set of beans queried (unless it was a full bean query) + if (!this.beans.containsAll(newBeans)) { + // Newly queried set of beans contains beans not actively tracked + // ie, maybe a registeredBean subscription msg did not get processed correctly + Set beansNotSeen = new HashSet<>(); + beansNotSeen.addAll(newBeans); + beansNotSeen.removeAll(this.beans); + + log.error("[beansub-audit] Bean refresh found {} previously untracked beans", + beansNotSeen.size()); + for (ObjectName b : beansNotSeen) { + log.error("[beansub-audit] New not-tracked bean {}", b); + } + } + if (!newBeans.containsAll(this.beans)) { + // Newly queried set of beans is missing beans that are actively tracked + // ie, maybe a deregisteredBean subscription msg did not get processed correctly + Set incorrectlyTrackedBeans = new HashSet<>(); + incorrectlyTrackedBeans.addAll(this.beans); + incorrectlyTrackedBeans.removeAll(newBeans); + + log.error("[beansub-audit] Bean refresh found {} fewer beans than expected", + incorrectlyTrackedBeans.size()); + for (ObjectName b : incorrectlyTrackedBeans) { + log.error("[beansub-audit] Currently tracked bean not returned from fresh" + + " query: {}", b); + } + } + } + this.beans = newBeans; this.lastRefreshTime = System.currentTimeMillis(); } + /** True if instance is in a good state to collect metrics. */ + private boolean isInstanceHealthy() { + // If we have subscription mode on and the connection has either failed or notifications + // have been lost, then we must consider this instance unhealthy and re-init. + return !(this.beanSubscriptionActive && connection.hasSeenConnectionIssues()); + } + /** Returns a string array listing the service check tags. */ public String[] getServiceCheckTags() { List tags = new ArrayList(); @@ -827,6 +976,11 @@ public int getMaxNumberOfMetrics() { return this.maxReturnedMetrics; } + /** Returns the current number of metrics this instance will collect. */ + public int getCurrentNumberOfMetrics() { + return this.metricCountForMatchingAttributes; + } + public InstanceTelemetry getInstanceTelemetryBean() { return this.instanceTelemetryBean; } diff --git a/src/main/java/org/datadog/jmxfetch/JmxAttribute.java b/src/main/java/org/datadog/jmxfetch/JmxAttribute.java index 601f6f69..2789e641 100644 --- a/src/main/java/org/datadog/jmxfetch/JmxAttribute.java +++ b/src/main/java/org/datadog/jmxfetch/JmxAttribute.java @@ -62,6 +62,7 @@ public abstract class JmxAttribute { private List defaultTagsList; private boolean cassandraAliasing; protected String checkName; + private int lastMetricSize; private boolean normalizeBeanParamTags; JmxAttribute( @@ -86,6 +87,7 @@ public abstract class JmxAttribute { this.cassandraAliasing = cassandraAliasing; this.checkName = checkName; this.serviceNameProvider = serviceNameProvider; + this.lastMetricSize = 0; this.normalizeBeanParamTags = normalizeBeanParamTags; // A bean name is formatted like that: @@ -277,10 +279,19 @@ public String toString() { + attribute.getType(); } - public abstract List getMetrics() + protected abstract List getMetricsImpl() throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, ReflectionException, IOException; + /** Returns all metrics tracked by this attribute. */ + public final List getMetrics() + throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, + ReflectionException, IOException { + List metrics = this.getMetricsImpl(); + this.lastMetricSize = metrics.size(); + return metrics; + } + /** * An abstract function implemented in the inherited classes JmxSimpleAttribute and * JmxComplexAttribute. @@ -302,6 +313,12 @@ public int getMetricsCount() { } } + /** Gets the most recent collection's metric count. */ + public int getLastMetricsCount() { + return this.lastMetricSize; + } + + /** Gets the JMX Attribute info value. Makes a call through the connection */ Object getJmxValue() throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, diff --git a/src/main/java/org/datadog/jmxfetch/JmxComplexAttribute.java b/src/main/java/org/datadog/jmxfetch/JmxComplexAttribute.java index 6849116f..85e28c60 100644 --- a/src/main/java/org/datadog/jmxfetch/JmxComplexAttribute.java +++ b/src/main/java/org/datadog/jmxfetch/JmxComplexAttribute.java @@ -72,7 +72,7 @@ private void populateSubAttributeList(Object attributeValue) { } @Override - public List getMetrics() throws AttributeNotFoundException, MBeanException, + public List getMetricsImpl() throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException { List metrics = new ArrayList(subAttributeList.size()); for (String subAttribute : subAttributeList) { diff --git a/src/main/java/org/datadog/jmxfetch/JmxSimpleAttribute.java b/src/main/java/org/datadog/jmxfetch/JmxSimpleAttribute.java index c76676ef..105303c4 100644 --- a/src/main/java/org/datadog/jmxfetch/JmxSimpleAttribute.java +++ b/src/main/java/org/datadog/jmxfetch/JmxSimpleAttribute.java @@ -74,7 +74,7 @@ public JmxSimpleAttribute( } @Override - public List getMetrics() + public List getMetricsImpl() throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, ReflectionException, IOException { if (cachedMetric == null) { diff --git a/src/main/java/org/datadog/jmxfetch/JmxTabularAttribute.java b/src/main/java/org/datadog/jmxfetch/JmxTabularAttribute.java index 816e0aa2..35796646 100644 --- a/src/main/java/org/datadog/jmxfetch/JmxTabularAttribute.java +++ b/src/main/java/org/datadog/jmxfetch/JmxTabularAttribute.java @@ -145,7 +145,7 @@ protected String[] getTags(String key, String subAttribute) } @Override - public List getMetrics() + public List getMetricsImpl() throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, ReflectionException, IOException { Map> subMetrics = new HashMap>(); diff --git a/src/main/java/org/datadog/jmxfetch/JvmDirectConnection.java b/src/main/java/org/datadog/jmxfetch/JvmDirectConnection.java index 08c85c77..99c4b27d 100644 --- a/src/main/java/org/datadog/jmxfetch/JvmDirectConnection.java +++ b/src/main/java/org/datadog/jmxfetch/JvmDirectConnection.java @@ -20,6 +20,10 @@ public void closeConnector() { // ignore } + public boolean isConnectorClosed() { + return false; + } + public boolean isAlive() { return true; } diff --git a/src/main/java/org/datadog/jmxfetch/util/InstanceTelemetry.java b/src/main/java/org/datadog/jmxfetch/util/InstanceTelemetry.java index 3cbb938f..a943bc6d 100644 --- a/src/main/java/org/datadog/jmxfetch/util/InstanceTelemetry.java +++ b/src/main/java/org/datadog/jmxfetch/util/InstanceTelemetry.java @@ -9,6 +9,8 @@ public class InstanceTelemetry implements InstanceTelemetryMBean { private int metricCount; private int wildcardDomainQueryCount; private double beanMatchRatio; + private int beanRegistrationsHandled; + private int beanUnregistrationsHandled; /** Jmxfetch telemetry bean constructor. */ public InstanceTelemetry() { @@ -16,7 +18,18 @@ public InstanceTelemetry() { topLevelAttributeCount = 0; metricCount = 0; wildcardDomainQueryCount = 0; + // This needs to be re-thought a bit + // it makes sense in a bean-refresh-loop world + // but in a subscription-world + // it's not clear what this should be + // current thought is to split this + // into two fields: + // - numBeansWithMatchingAttributes + // - numBeansWithoutMatchingAttributes + // a bit wordy though. beanMatchRatio = 0.0; + beanRegistrationsHandled = 0; + beanUnregistrationsHandled = 0; } public int getBeansFetched() { @@ -39,6 +52,14 @@ public double getBeanMatchRatio() { return beanMatchRatio; } + public int getBeanRegistrationsHandled() { + return beanRegistrationsHandled; + } + + public int getBeanUnregistrationsHandled() { + return beanUnregistrationsHandled; + } + public void setBeansFetched(int count) { beansFetched = count; } @@ -59,4 +80,12 @@ public void setBeanMatchRatio(double ratio) { beanMatchRatio = ratio; } + public void setBeanRegistrationsHandled(int count) { + beanRegistrationsHandled = count; + } + + public void setBeanUnregistrationsHandled(int count) { + beanUnregistrationsHandled = count; + } + } diff --git a/src/test/java/org/datadog/jmxfetch/TestBeanSubscription.java b/src/test/java/org/datadog/jmxfetch/TestBeanSubscription.java new file mode 100644 index 00000000..d04b313a --- /dev/null +++ b/src/test/java/org/datadog/jmxfetch/TestBeanSubscription.java @@ -0,0 +1,373 @@ +package org.datadog.jmxfetch; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Arrays; + +import lombok.extern.slf4j.Slf4j; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runners.model.Statement; +import org.junit.runner.Description; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import org.datadog.jmxfetch.reporter.ConsoleReporter; + +@Slf4j +public class TestBeanSubscription extends TestCommon { + private static final int rmiPort = 9090; + private static final int controlPort = 9091; + private static final int supervisorPort = 9092; + private JMXServerControlClient controlClient; + private JMXServerSupervisorClient supervisorClient; + private static Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); + + private static ImageFromDockerfile img = new ImageFromDockerfile() + .withFileFromPath(".", Paths.get("./tools/misbehaving-jmx-server/")); + + @Rule(order = 0) + public GenericContainer cont = new GenericContainer<>(img) + .withEnv(Collections.singletonMap("RMI_PORT", "" + rmiPort)) + .withEnv(Collections.singletonMap("CONTROL_PORT", "" + controlPort)) + .withEnv(Collections.singletonMap("SUPERVISOR_PORT", "" + supervisorPort)) + .waitingFor(Wait.forLogMessage(".*Supervisor HTTP Server Started. Waiting for initialization payload POST to /init.*", 1)); + + @Rule(order = 1) + public TestRule setupRule = new TestRule() { + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + String ipAddress = cont.getContainerInfo().getNetworkSettings().getIpAddress(); + controlClient = new JMXServerControlClient(ipAddress, controlPort); + supervisorClient = new JMXServerSupervisorClient(ipAddress, supervisorPort); + cont.followOutput(logConsumer); + try { + log.info("Initializing JMX Server with RMI hostname {}", ipAddress); + supervisorClient.initializeJMXServer(ipAddress); + } catch (IOException e) { + log.warn("Supervisor call to set rmi hostname failed, tests may fail in some environments, e: ", e); + } + base.evaluate(); + } + }; + } + }; + + @Test + public void testJMXFetchBasic() throws IOException, InterruptedException { + String testDomain = "test-domain"; + String ipAddress = cont.getContainerInfo().getNetworkSettings().getIpAddress(); + when(appConfig.getEnableBeanSubscription()).thenReturn(true); + this.initApplicationWithYamlLines( + "init_config:", + " is_jmx: true", + "", + "instances:", + " - name: jmxint_container", + " host: " + ipAddress, + " port: " + rmiPort, + " min_collection_interval: null", // allow collections at arbitrary intervals since we trigger them manually in the tests + " refresh_beans: 5000", // effectively disable bean refresh + " collect_default_jvm_metrics: false", + " max_returned_metrics: 300000", + " conf:", + " - include:", + " domain: " + testDomain + ); + + this.app.doIteration(); + List> metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + assertEquals(0, metrics.size()); + + int numBeans = 2; + int numAttributesPerBean = 4; + + this.controlClient.createMBeans(testDomain, numBeans, numAttributesPerBean, 0, 0); + + // Allow time for subscriptions to come through and be registered + Thread.sleep(100); + + this.app.doIteration(); + metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + assertEquals(numBeans * numAttributesPerBean, metrics.size()); + } + + @Test + public void testJMXFetchManyBeans() throws IOException, InterruptedException { + cont.followOutput(logConsumer); + String testDomain = "test-domain"; + String ipAddress = cont.getContainerInfo().getNetworkSettings().getIpAddress(); + when(appConfig.getEnableBeanSubscription()).thenReturn(true); + this.initApplicationWithYamlLines( + "init_config:", + " is_jmx: true", + "", + "instances:", + " - name: jmxint_container", + " host: " + ipAddress, + " port: " + rmiPort, + " min_collection_interval: null", + " refresh_beans: 5000", // effectively disable bean refresh + " collect_default_jvm_metrics: false", + " max_returned_metrics: 300000", + " conf:", + " - include:", + " domain: " + testDomain + ); + + this.app.doIteration(); + List> metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + assertEquals(0, metrics.size()); + + int numBeans = 200; + int numAttributesPerBean = 4; + + this.controlClient.createMBeans(testDomain, numBeans, numAttributesPerBean, 0, 0); + + // Time for subscriptions to come through and be registered + Thread.sleep(2000); + + this.app.doIteration(); + metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + assertEquals(numBeans * numAttributesPerBean, metrics.size()); + } + + @Test + public void testConcurrentCollectionWithSubscriptionUpdates() throws IOException, InterruptedException { + String testDomain = "test-domain"; + cont.followOutput(logConsumer); + String ipAddress = cont.getContainerInfo().getNetworkSettings().getIpAddress(); + when(appConfig.getEnableBeanSubscription()).thenReturn(true); + this.initApplicationWithYamlLines( + "init_config:", + " is_jmx: true", + "", + "instances:", + " - name: jmxint_container", + " host: " + ipAddress, + " port: " + rmiPort, + " min_collection_interval: null", + " refresh_beans: 5000", // effectively disable bean refresh + " collect_default_jvm_metrics: false", + " max_returned_metrics: 300000", + " conf:", + " - include:", + " domain: " + testDomain + ); + + this.app.doIteration(); + List> metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + // Sanity check, no beans exist, none should be found + assertEquals(0, metrics.size()); + + int numBeans = 200; + int numAttributesPerBean = 4; + int expectedMetrics = numBeans * numAttributesPerBean; + + // This call blocks until the beans actually exist in the remote application + this.controlClient.createMBeans(testDomain, numBeans, numAttributesPerBean, 0, 0); + + // Intentionally leaving no time for subscriptions to be processed to test how + // a collection behaves when interleaved with bean subscription traffic + + this.app.doIteration(); + // Iteration is done, don't care how many metrics were collected + // (almost certainly less than numBeans * numAttributesPerBean) + metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + + // Sleep to ensure bean subscriptions get processed (ie, attribute matching correctly takes place) + Thread.sleep(2000); + + // Now, without doing an iteration, we should see the correct number for + // how many metrics are about to be collected + // This is effectively testing "Did the attribute matching execute correctly for all bean notifications" + assertEquals("getCurrentNumberOfMetrics returns the correct value _before_ running a collection", expectedMetrics, this.app.getInstances().get(0).getCurrentNumberOfMetrics()); + + // Do an actual collection to ensure validate the metrics collected + this.app.doIteration(); + metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + + assertEquals("actual metrics collected matches expectedMetrics", expectedMetrics, metrics.size()); + } + + @Test + public void testBeanRemoval() throws IOException, InterruptedException { + String testDomain = "test-domain"; + cont.followOutput(logConsumer); + String ipAddress = cont.getContainerInfo().getNetworkSettings().getIpAddress(); + when(appConfig.getEnableBeanSubscription()).thenReturn(true); + this.initApplicationWithYamlLines( + "init_config:", + " is_jmx: true", + "", + "instances:", + " - name: jmxint_container", + " host: " + ipAddress, + " port: " + rmiPort, + " min_collection_interval: null", + " refresh_beans: 5000", // effectively disable bean refresh + " collect_default_jvm_metrics: false", + " max_returned_metrics: 300000", + " conf:", + " - include:", + " domain: " + testDomain + ); + + this.app.doIteration(); + List> metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + assertEquals("Sanity check, no beans/metrics exist at beginning of test", 0, metrics.size()); + + int numBeans = 20; + int numAttributesPerBean = 4; + int expectedMetrics = numBeans * numAttributesPerBean; + + this.controlClient.createMBeans(testDomain, numBeans, numAttributesPerBean, 0, 0); + + Thread.sleep(500); + + this.app.doIteration(); + metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + + assertEquals("After creating beans, correct metrics collected", expectedMetrics, metrics.size()); + + numBeans = 10; + expectedMetrics = numBeans * numAttributesPerBean; + + this.controlClient.createMBeans(testDomain, numBeans, numAttributesPerBean, 0, 0); + + Thread.sleep(500); + + assertEquals("Number of metrics to be collected properly updated", expectedMetrics, this.app.getInstances().get(0).getCurrentNumberOfMetrics()); + + this.app.doIteration(); + metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + + assertEquals("After removing beans, correct metrics collected", expectedMetrics, metrics.size()); + } + + @Test + public void testNetworkFailure() throws IOException, InterruptedException { + String testDomain = "test-domain-nwkfail"; + cont.followOutput(logConsumer); + String ipAddress = cont.getContainerInfo().getNetworkSettings().getIpAddress(); + when(appConfig.getEnableBeanSubscription()).thenReturn(true); + this.initApplicationWithYamlLines( + "init_config:", + " is_jmx: true", + "", + "instances:", + " - name: jmxint_container", + " host: " + ipAddress, + " port: " + rmiPort, + " min_collection_interval: null", + " refresh_beans: 5000", // effectively disable bean refresh + " collect_default_jvm_metrics: false", + " max_returned_metrics: 300000", + " conf:", + " - include:", + " domain: " + testDomain + ); + + this.app.doIteration(); + List> metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + assertEquals("Sanity check, no beans/metrics exist at beginning of test", 0, metrics.size()); + + this.controlClient.jmxCutNetwork(); + // In testing, this needs a slight delay for the connection to "fail" + // via JMXConnectionNotification. + // Artificially sleep to allow time for this since that is the point of this test + Thread.sleep(50); + this.controlClient.jmxRestoreNetwork(); + + int numBeans = 20; + int numAttributesPerBean = 4; + int expectedMetrics = numBeans * numAttributesPerBean; + + this.controlClient.createMBeans(testDomain, numBeans, numAttributesPerBean, 0, 0); + + // One iteration to recover instance, no metrics are actually collected + this.app.doIteration(); + // Second iteration should collect metrics + this.app.doIteration(); + metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + + assertEquals("Number of metrics to be collected properly updated", expectedMetrics, this.app.getInstances().get(0).getCurrentNumberOfMetrics()); + + assertEquals("Network recovered, did we collect correct metrics?", expectedMetrics, metrics.size()); + } + + @Test + public void testDisconnectDuringBeanCreation() throws IOException, InterruptedException { + String testDomain = "test-domain-dsc-bn-creat"; + cont.followOutput(logConsumer); + String ipAddress = cont.getContainerInfo().getNetworkSettings().getIpAddress(); + when(appConfig.getEnableBeanSubscription()).thenReturn(true); + this.initApplicationWithYamlLines( + "init_config:", + " is_jmx: true", + "", + "instances:", + " - name: jmxint_container", + " host: " + ipAddress, + " port: " + rmiPort, + " min_collection_interval: null", + " refresh_beans: 5000", // effectively disable bean refresh + " collect_default_jvm_metrics: false", + " max_returned_metrics: 300000", + " conf:", + " - include:", + " domain: " + testDomain + ); + + this.app.doIteration(); + List> metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + assertEquals("Sanity check, no beans/metrics exist at beginning of test", 0, metrics.size()); + + this.controlClient.jmxCutNetwork(); + + int numBeans = 20; + int numAttributesPerBean = 4; + int expectedMetrics = numBeans * numAttributesPerBean; + + this.controlClient.createMBeans(testDomain, numBeans, numAttributesPerBean, 0, 0); + + // once beans created, restore network + this.controlClient.jmxRestoreNetwork(); + + // When attempting to collect metrics, instance is marked as broken due to an unhealthy network + // _and_ it is added to brokenInstances so the broken instance is recovered in the _same_ iteration + // Note in other "reconnection" tests (see TestReconnection) there are two iterations required + // The first marks it as broken and the second recovers it. This test only needs one since getMetrics + // fails so quickly. So this is technically a race and if this test ever becomes flakey this is why. + this.app.doIteration(); + + // Now create more beans which triggers subscription updates + numBeans = 22; + expectedMetrics = numBeans * numAttributesPerBean; + this.controlClient.createMBeans(testDomain, numBeans, numAttributesPerBean, 0, 0); + + // Allow subscription updates to be processed + Thread.sleep(500); + + assertEquals("Number of metrics to be collected properly updated", expectedMetrics, this.app.getInstances().get(0).getCurrentNumberOfMetrics()); + + this.app.doIteration(); + metrics = ((ConsoleReporter) this.appConfig.getReporter()).getMetrics(); + + assertEquals("Network recovered, did we collect correct metrics?", expectedMetrics, metrics.size()); + + } +} diff --git a/src/test/java/org/datadog/jmxfetch/TestCommon.java b/src/test/java/org/datadog/jmxfetch/TestCommon.java index ff0bf445..14b61f8d 100644 --- a/src/test/java/org/datadog/jmxfetch/TestCommon.java +++ b/src/test/java/org/datadog/jmxfetch/TestCommon.java @@ -15,6 +15,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import javax.management.InstanceAlreadyExistsException; @@ -99,7 +102,7 @@ protected void registerMBean(Object application, String objectStringName) * @throws InstanceNotFoundException * @throws MBeanRegistrationException */ - public void unregisterMBeans() throws MBeanRegistrationException, InstanceNotFoundException { + public void unregisterAllMBeans() throws MBeanRegistrationException, InstanceNotFoundException { if (mbs != null) { for (ObjectName objectName : objectNames) { mbs.unregisterMBean(objectName); @@ -107,6 +110,25 @@ public void unregisterMBeans() throws MBeanRegistrationException, InstanceNotFou } } + /** + * Unregister a specific MBean. + * + * @throws InstanceNotFoundException + * @throws MBeanRegistrationException + */ + protected void unregisterMBean(Object application, String objectStringName) throws MBeanRegistrationException, InstanceNotFoundException, MalformedObjectNameException { + if (mbs != null) { + ObjectName objectName = new ObjectName(objectStringName); + for (Iterator it = objectNames.iterator(); it.hasNext();) { + ObjectName elem = it.next(); + if (elem.compareTo(objectName) == 0) { + it.remove(); + } + } + mbs.unregisterMBean(objectName); + } + } + /** * Tear down instances and application. */ @@ -117,15 +139,21 @@ public void teardown() { app.stop(); } try { - unregisterMBeans(); + unregisterAllMBeans(); } catch (MBeanRegistrationException | InstanceNotFoundException e) { // Ignore } } /** Init JMXFetch with the given YAML configuration file. */ - protected void initApplication(String yamlFileName, String autoDiscoveryPipeFile) + protected void initApplication(String yamlFileName, String autoDiscoveryPipeFile, AppConfig passedAppConfig) throws FileNotFoundException, IOException { + AppConfig appConfig = this.appConfig; + if (passedAppConfig != null) { + appConfig = passedAppConfig; + } + this.appConfig = appConfig; + // We do a first collection // We initialize the main app that will collect these metrics using JMX String confdDirectory = @@ -177,7 +205,15 @@ protected void initApplication(String yamlFileName, String autoDiscoveryPipeFile } protected void initApplication(String yamlFileName) throws FileNotFoundException, IOException { - initApplication(yamlFileName, ""); + initApplication(yamlFileName, "", null); + } + + protected void initApplication(String yamlFileName, AppConfig appConfig) throws FileNotFoundException, IOException { + initApplication(yamlFileName, "", appConfig); + } + + protected void initApplication(String yamlFileName, String autoDiscoveryPipeFile) throws FileNotFoundException, IOException { + initApplication(yamlFileName, autoDiscoveryPipeFile, null); } /* diff --git a/src/test/java/org/datadog/jmxfetch/TestInstance.java b/src/test/java/org/datadog/jmxfetch/TestInstance.java index 4fa8adf5..9b00601b 100644 --- a/src/test/java/org/datadog/jmxfetch/TestInstance.java +++ b/src/test/java/org/datadog/jmxfetch/TestInstance.java @@ -6,6 +6,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import java.net.URL; import java.util.ArrayList; @@ -19,6 +20,10 @@ public class TestInstance extends TestCommon { private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger("Test Instance"); + // This delay exists for the subscription thread to recieve the MBeanNotification + // and call into `Instance`. Conceptually this is just a Thread.yield() + private static int subscriptionDelayMs = 50; + @Test public void testMinCollectionInterval() throws Exception { registerMBean(new SimpleTestJavaApp(), "org.datadog.jmxfetch.test:foo=Bar,qux=Baz"); @@ -237,4 +242,149 @@ public void testRefreshBeans() throws Exception { // 17 = 13 metrics from java.lang + 2 iteration=one + 2 iteration=two assertEquals(17, metrics.size()); } + + /** Tests bean_subscription */ + @Test + public void testBeanSubscription() throws Exception { + SimpleTestJavaApp testApp = new SimpleTestJavaApp(); + AppConfig config = spy(AppConfig.builder().build()); + when(config.getEnableBeanSubscription()).thenReturn(true); + + // initial fetch + initApplication("jmx_bean_subscription.yaml", config); + + // We do a first collection + run(); + List> metrics = getMetrics(); + int numRegisteredAttributes = 0; + + assertEquals(numRegisteredAttributes, metrics.size()); + + // We register first mbean with 2 matching attributes + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=one"); + numRegisteredAttributes += 2; + + Thread.sleep(subscriptionDelayMs); + + run(); + metrics = getMetrics(); + + assertEquals(numRegisteredAttributes, metrics.size()); + + // We register additional mbean + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=two"); + numRegisteredAttributes += 2; + Thread.sleep(subscriptionDelayMs); + + // We run a third collection. + run(); + metrics = getMetrics(); + + assertEquals(numRegisteredAttributes, metrics.size()); + + List instances = getInstances(); + assertEquals(1, instances.size()); + Instance i = instances.get(0); + assertEquals(2, i.getInstanceTelemetryBean().getBeanRegistrationsHandled()); + assertEquals(0, i.getInstanceTelemetryBean().getBeanUnregistrationsHandled()); + } + + @Test + public void testBeanUnsubscription() throws Exception { + SimpleTestJavaApp testApp = new SimpleTestJavaApp(); + AppConfig config = spy(AppConfig.builder().build()); + when(config.getEnableBeanSubscription()).thenReturn(true); + // Bean-refresh interval is to to 50s, so the only bean refresh will be + // initial fetch + initApplication("jmx_bean_subscription.yaml", config); + int numRegisteredAttributes = 0; + + // We do a first collection + run(); + List> metrics = getMetrics(); + + // Sanity check -- no beans exist yet + assertEquals(numRegisteredAttributes, metrics.size()); + + // We register an additional mbean + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=one"); + numRegisteredAttributes += 2; + + Thread.sleep(subscriptionDelayMs); + + run(); + metrics = getMetrics(); + + assertEquals(numRegisteredAttributes, metrics.size()); + + // We UN-register first mbean + unregisterMBean(testApp, "org.datadog.jmxfetch.test:iteration=one"); + numRegisteredAttributes -= 2; + Thread.sleep(subscriptionDelayMs); + + // We run a third collection. + run(); + metrics = getMetrics(); + + // Note that the collected metrics size is correct even without any special work for bean + // unregistration as the `iteration=one` metric will fail to be found and simply not be included + assertEquals(numRegisteredAttributes, metrics.size()); + // Which is why this second check exists, it reflects the running total of metrics + // collected which is used to determine if new attributes can be added + List instances = getInstances(); + assertEquals(1, instances.size()); + Instance i = instances.get(0); + assertEquals(numRegisteredAttributes, i.getCurrentNumberOfMetrics()); + + assertEquals(1, i.getInstanceTelemetryBean().getBeanRegistrationsHandled()); + assertEquals(1, i.getInstanceTelemetryBean().getBeanUnregistrationsHandled()); + } + + @Test + public void testBeanSubscriptionAttributeCounting() throws Exception { + // This test only looks at the instance's getCurrentNumberOfMetrics as this + // should be updated on bean registration/deregistration + + SimpleTestJavaApp testApp = new SimpleTestJavaApp(); + AppConfig config = spy(AppConfig.builder().build()); + when(config.getEnableBeanSubscription()).thenReturn(true); + initApplication("jmx_bean_subscription.yaml", config); // max returned metrics is 10 + + int numRegisteredAttributes = 0; + assertEquals(numRegisteredAttributes, app.getInstances().get(0).getCurrentNumberOfMetrics()); + + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=one"); + numRegisteredAttributes += 2; + Thread.sleep(subscriptionDelayMs); + assertEquals(numRegisteredAttributes, app.getInstances().get(0).getCurrentNumberOfMetrics()); + + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=two"); + numRegisteredAttributes += 2; + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=three"); + numRegisteredAttributes += 2; + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=four"); + numRegisteredAttributes += 2; + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=five"); + numRegisteredAttributes += 2; + + Thread.sleep(subscriptionDelayMs * 2); // wait longer bc more beans + assertEquals(numRegisteredAttributes, app.getInstances().get(0).getCurrentNumberOfMetrics()); + + + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=six"); + // no change to numRegisteredAttributes as this registration should FAIL due to max number of metrics + + Thread.sleep(subscriptionDelayMs); + assertEquals(numRegisteredAttributes, app.getInstances().get(0).getCurrentNumberOfMetrics()); + + unregisterMBean(testApp, "org.datadog.jmxfetch.test:iteration=one"); + numRegisteredAttributes -= 2; + Thread.sleep(subscriptionDelayMs); + assertEquals(numRegisteredAttributes, app.getInstances().get(0).getCurrentNumberOfMetrics()); + + registerMBean(testApp, "org.datadog.jmxfetch.test:iteration=seven"); + numRegisteredAttributes += 2; + Thread.sleep(subscriptionDelayMs); + assertEquals(numRegisteredAttributes, app.getInstances().get(0).getCurrentNumberOfMetrics()); + } } diff --git a/src/test/resources/jmx_bean_subscription.yaml b/src/test/resources/jmx_bean_subscription.yaml new file mode 100644 index 00000000..01a75f4f --- /dev/null +++ b/src/test/resources/jmx_bean_subscription.yaml @@ -0,0 +1,15 @@ +init_config: + +instances: + - process_name_regex: .*surefire.* + min_collection_interval: null + collect_default_jvm_metrics: false + refresh_beans: 50000 + max_returned_metrics: 10 + name: jmx_test_instance + conf: + - include: + domain: org.datadog.jmxfetch.test + attribute: + - ShouldBe100 + - ShouldBe1000 diff --git a/tools/misbehaving-jmx-server/README.md b/tools/misbehaving-jmx-server/README.md index 38c100b2..9c251fb1 100644 --- a/tools/misbehaving-jmx-server/README.md +++ b/tools/misbehaving-jmx-server/README.md @@ -27,6 +27,15 @@ a secondary `init` payload that contains the correct RMI Hostname. It is designe - `SUPERVISOR_PORT` - HTTP control port for the supervisor process (if using) (default 8088) - `MISBEHAVING_OPTS` - Manages memory, GC configurations, and system properties of the Java process running the JMXServer (default `-Xmx128M -Xms128M`) +To enable debug logging, set the system property `-Dorg.slf4j.simpleLogger.defaultLogLevel=debug`. +This can be done in the Dockerfile `CMD` if running in a container + +For extra debug logging around RMI systems, try the following: +- `-Djava.rmi.server.logCalls=true` +- `-Dsun.rmi.server.exceptionTrace=true` +- `-Dsun.rmi.transport.logLevel=verbose` +- `-Dsun.rmi.transport.tcp.logLevel=verbose` + ## HTTP Control Actions (jmx-server) - POST `/cutNetwork` - Denies any requests to create a new socket (ie, no more connections will be 'accept'ed) and then closes existing TCP sockets - POST `/restoreNetwork` - Allows new sockets to be created