Skip to content

Commit

Permalink
Adds audit logging to bean refresh to catch any gaps in bean subscrip…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
scottopell committed Apr 12, 2023
1 parent 9e624cf commit 75d9296
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 40 deletions.
29 changes: 29 additions & 0 deletions src/main/java/org/datadog/jmxfetch/BeanSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.datadog.jmxfetch;

import java.util.concurrent.CompletableFuture;
import java.util.List;

public class BeanSubscriber implements Runnable {
private List<String> beanScopes;
private Connection connection;
private BeanListener listener;
public CompletableFuture<Boolean> subscriptionSuccessful;

BeanSubscriber(List<String> beanScopes, Connection connection, BeanListener listener) {
this.beanScopes = beanScopes;
this.connection = connection;
this.listener = listener;
this.subscriptionSuccessful = new CompletableFuture<Boolean>();
}

public void run() {
try {
connection.subscribeToBeanScopes(beanScopes, this.listener);
this.subscriptionSuccessful.complete(true);

Thread.currentThread().join();
} catch (Exception e) {
this.subscriptionSuccessful.complete(false);
}
}
}
88 changes: 48 additions & 40 deletions src/main/java/org/datadog/jmxfetch/Instance.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.datadog.jmxfetch.service.ServiceNameProvider;
import org.yaml.snakeyaml.Yaml;

import java.util.concurrent.CompletableFuture;
import static java.util.concurrent.TimeUnit.*;

import java.io.File;
Expand Down Expand Up @@ -107,6 +106,7 @@ public Yaml initialValue() {
private Boolean cassandraAliasing;
private boolean emptyDefaultHostname;
private boolean enableBeanSubscription;
private boolean beanSubscriptionActive;

/** Constructor, instantiates Instance based of a previous instance and appConfig. */
public Instance(Instance instance, AppConfig appConfig) {
Expand Down Expand Up @@ -263,8 +263,10 @@ public Instance(
log.info("collect_default_jvm_metrics is false - not collecting default JVM metrics");
}

this.beans = new HashSet<>();
Boolean enableBeanSubscription = (Boolean) instanceMap.get("enable_bean_subscription");
this.enableBeanSubscription = enableBeanSubscription != null && enableBeanSubscription;
this.beanSubscriptionActive = false;
}

public static boolean isDirectInstance(Map<String, Object> configInstance) {
Expand Down Expand Up @@ -425,16 +427,16 @@ public synchronized void init(boolean forceNewConnection)
throws IOException, FailedLoginException, SecurityException {
log.info("Trying to connect to JMX Server at " + this.toString());
connection = getConnection(instanceMap, forceNewConnection);
log.info(
"Trying to collect bean list for the first time for JMX Server at "
+ this.toString());
this.refreshBeansList();
if (this.enableBeanSubscription) {
log.info("Subscribing for bean notifications before init");
log.info("Subscribing for bean notifications");
this.subscribeToBeans();
} else {
log.info("Bean subscription not enabled.");
}
log.info(
"Trying to collect bean list for the first time for JMX Server at "
+ this.toString());
this.refreshBeansList();
this.initialRefreshTime = this.lastRefreshTime;
log.info("Connected to JMX Server at " + this.toString());
this.getMatchingAttributes();
Expand Down Expand Up @@ -723,45 +725,25 @@ public synchronized void beanUnregistered(ObjectName mBeanName) {
log.info("Bean unregistered event. {}", mBeanName);
}

private class BeanSubscriber implements Runnable {
private List<String> beanScopes;
private Connection connection;
private BeanListener listener;
public CompletableFuture<Boolean> subscriptionSuccessful;

BeanSubscriber(List<String> beanScopes, Connection connection, BeanListener listener) {
this.beanScopes = beanScopes;
this.connection = connection;
this.listener = listener;
this.subscriptionSuccessful = new CompletableFuture<Boolean>();
}

public void run() {
try {
log.info("Subscribing to {} bean scopes", beanScopes.size());

connection.subscribeToBeanScopes(beanScopes, this.listener);
this.subscriptionSuccessful.complete(true);

Thread.currentThread().join();
} catch (Exception e) {
log.warn("Exception while subscribing to beans {}", e);
this.subscriptionSuccessful.complete(false);
}
}
}

private void subscribeToBeans() {
List<String> beanScopes = this.getBeansScopes();
BeanSubscriber subscriber = new BeanSubscriber(beanScopes, this.connection, this);

try {
new Thread(subscriber).start();
if (subscriber.subscriptionSuccessful.get(1, SECONDS)) {
log.info("Subscribed successfully!");
}
this.beanSubscriptionActive = subscriber.subscriptionSuccessful.get(1, SECONDS);
} catch (Exception e) {
log.warn("Exception while subscribing to beans {}", e);
log.warn("Error retrieving bean subscription value, assuming subscription failed. Exception: {}", e);
this.beanSubscriptionActive = false;
}

if (this.beanSubscriptionActive) {
log.info("Subscribed to {} bean scopes successfully!", beanScopes.size());
} else {
log.warn("Bean subscription failed! Will rely on bean_refresh, ensure it is set "
+ " to an appropriate value (currently {} seconds)",
this.refreshBeansPeriod);
}
}

Expand All @@ -778,27 +760,53 @@ public List<String> getBeansScopes() {
* certain actions, and fallback if necessary.
*/
private synchronized void refreshBeansList() throws IOException {
this.beans = new HashSet<ObjectName>();
Set<ObjectName> newBeans = new HashSet<>();
String action = appConfig.getAction();
boolean limitQueryScopes =
!action.equals(AppConfig.ACTION_LIST_EVERYTHING)
&& !action.equals(AppConfig.ACTION_LIST_NOT_MATCHING);

boolean fullBeanQueryNeeded = false;
if (limitQueryScopes) {
try {
List<String> beanScopes = getBeansScopes();
for (String scope : beanScopes) {
ObjectName name = new ObjectName(scope);
this.beans.addAll(connection.queryNames(name));
newBeans.addAll(connection.queryNames(name));
}
} catch (Exception e) {
fullBeanQueryNeeded = true;
log.error(
"Unable to compute a common bean scope, querying all beans as a fallback",
e);
}
}

this.beans = (this.beans.isEmpty()) ? connection.queryNames(null) : this.beans;
if (fullBeanQueryNeeded) {
newBeans = connection.queryNames(null);
}
if (this.beanSubscriptionActive && !fullBeanQueryNeeded) {
// This code exists to validate the bean-subscription is working properly
// If every new bean and de-registered bean properly fires an event, then
// this.beans (current set that has been updated via subscription) should
// always equal the new set of beans queried (unless it was a full bean query)
Set<ObjectName> beansNotSeen = new HashSet<>();
if (!this.beans.containsAll(newBeans)) {
beansNotSeen.addAll(newBeans);
beansNotSeen.removeAll(this.beans);
log.error("Bean refresh found {} new beans that were not already known via subscription", beansNotSeen.size());
}
if (!newBeans.containsAll(this.beans)){
beansNotSeen.addAll(this.beans);
beansNotSeen.removeAll(newBeans);
log.error("Bean refresh found {} FEWER beans than already known via subscription", beansNotSeen.size());
}

for (ObjectName b : beansNotSeen) {
log.error("Bean {} is one that has never been seen before, see why subscription did not detect this bean.", b.toString());
}
}
this.beans = newBeans;
this.lastRefreshTime = System.currentTimeMillis();
}

Expand Down

0 comments on commit 75d9296

Please sign in to comment.