Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bean subscription #426

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
4fc47e2
Initial bean listening, adds attributes that match config
scottopell Apr 7, 2023
e104ed2
Moves bean subscription into dedicated thread
scottopell Apr 10, 2023
9fee19d
Removes ConnectionListener
scottopell Apr 10, 2023
0b996df
Adds test covering basic bean subscription functionality
scottopell Apr 12, 2023
75a1bbc
Consolidate attribute matching and take 'ACTION' into account
scottopell Apr 12, 2023
9e624cf
Utilize 'synchronized' keyword for simpler locking
scottopell Apr 12, 2023
75d9296
Adds audit logging to bean refresh to catch any gaps in bean subscrip…
scottopell Apr 12, 2023
e4b2e6d
Shortens test timeframes for subscription to be processed
scottopell Apr 17, 2023
ff15967
Back to explicit locking
scottopell Apr 17, 2023
e0600d3
Removes un-necessary lockholder class
scottopell Apr 17, 2023
64d8438
implements bean unsubscription
scottopell Apr 17, 2023
fe8c168
Merge branch 'master' into feat/bean-listener
scottopell May 19, 2023
7f7c1a6
Adds extra logging instructions for misbehaving jmx server
scottopell May 19, 2023
1bd15f0
Adds dedicated remote container bean subscription tests
scottopell May 19, 2023
4fd1b09
Removes old mutex debugging code
scottopell May 22, 2023
57f1970
Adds bean unsubscription test and (failing) bean subscription w/ netw…
scottopell May 22, 2023
6670056
Adds JMX connection listener to mark instance as broken on connection…
scottopell May 22, 2023
d74319a
Removes un-needed bean subscriber thread
scottopell May 23, 2023
fe7705c
Clarifies bean-audit logging
scottopell May 23, 2023
c26e91b
Updates in-proc bean subscription tests with more attribute counting …
scottopell May 23, 2023
d746e40
Removes explicit locking in favor of 'synchronized'
scottopell May 23, 2023
090f0dc
Adds owned thread to process mbean subscription events
scottopell May 23, 2023
c3fbfe6
Remove some unused logs and accidental regressions
scottopell May 23, 2023
addb80f
Addresses linting errors
scottopell May 24, 2023
08cee1b
Renames ambiguous interface from BeanListener -> BeanTracker
scottopell May 24, 2023
27dc057
Fixes bug where metricReached would display multiple times incorrectly
scottopell May 24, 2023
642ee12
Corrects old name and var decl
scottopell May 31, 2023
4d9ce86
Merge branch 'master' into feat/bean-listener
scottopell Jun 5, 2023
e8c2b68
Adds toggle to turn on bean subscription globally
scottopell Jun 5, 2023
d1bd7b8
Merge branch 'master' into feat/bean-listener
scottopell Jul 5, 2023
79ce047
Adds env vars to enable subscription globally for validation
scottopell Jul 5, 2023
dcf963c
Updates TestBeanSubscription to use container IP directly similar to …
scottopell Jul 6, 2023
781db37
Merge branch 'master' into feat/bean-listener
scottopell Dec 27, 2023
905532a
Re-land changes from #481
scottopell Dec 27, 2023
bad14d0
Removes random idea for deployment validation
scottopell Dec 27, 2023
be3c9b4
Relands bean match ratio from #487
scottopell Dec 27, 2023
c118cc2
Fixes whoopsie
scottopell Dec 27, 2023
ea0bf5b
Fixes divide by zero bug introduced in previous commit
scottopell Dec 27, 2023
82bbc94
Adds tlm for beans registered and unregistered
scottopell Dec 27, 2023
b40c5d6
Update tools/misbehaving-jmx-server/README.md
scottopell Dec 28, 2023
34290a1
Fixes linting errors
scottopell Dec 28, 2023
617f4f6
Merge branch 'master' into feat/bean-listener
scottopell Jan 17, 2024
01e760c
Removes per-instance bean-subscription enablement in favor of applica…
scottopell Jan 17, 2024
34ab16b
Merge branch 'master' into feat/bean-listener
scottopell Jan 17, 2024
03eeff0
Merge branch 'master' into feat/bean-listener
scottopell Jan 25, 2024
95820e9
Fixes linter warning
scottopell Jan 25, 2024
f916745
Fixes spy usage in bean sub tests
scottopell Jan 25, 2024
ff5e64f
Adds env var enablement for backwards compatibility during rollout
scottopell Feb 8, 2024
5dfcd96
Merge branch 'master' into feat/bean-listener
scottopell Feb 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/org/datadog/jmxfetch/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ void start() {
}

void stop() {
for (Instance in : this.getInstances()) {
in.cleanUp();
}
this.collectionProcessor.stop();
this.recoveryProcessor.stop();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.datadog.jmxfetch;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.management.MBeanServerNotification;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;


@Slf4j
class BeanNotificationListener implements NotificationListener {
private final BlockingQueue<MBeanServerNotification> queue;
private final BeanTracker beanListener;

public BeanNotificationListener(final BeanTracker bl) {
this.beanListener = bl;
this.queue = new LinkedBlockingQueue<>();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
MBeanServerNotification mbs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Do you need to declare this outside the try?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I don't, moved the decl inside the try

try {
mbs = queue.take();
processMBeanServerNotification(mbs);
} catch (InterruptedException e) {
// ignore
}
}
}
}).start();
}

@Override
public void handleNotification(Notification notification, Object handback) {
if (!(notification instanceof MBeanServerNotification)) {
return;
}
MBeanServerNotification mbs = (MBeanServerNotification) notification;
queue.offer(mbs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do anything here if we get false back? Maybe log/warn we couldn't add the MBeanServerNotification?

}

private void processMBeanServerNotification(MBeanServerNotification notif) {
log.debug("MBeanNotification: ts {} seqNum: {} msg: '{}'",
notif.getTimeStamp(),
notif.getSequenceNumber(),
notif.getMessage());
ObjectName beanName = notif.getMBeanName();
if (notif.getType().equals(MBeanServerNotification.REGISTRATION_NOTIFICATION)) {
beanListener.trackBean(beanName);
} else if (notif.getType().equals(MBeanServerNotification.UNREGISTRATION_NOTIFICATION)) {
beanListener.untrackBean(beanName);
}
}
}
9 changes: 9 additions & 0 deletions src/main/java/org/datadog/jmxfetch/BeanTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.datadog.jmxfetch;

import javax.management.ObjectName;

public interface BeanTracker {
public void trackBean(ObjectName beanName);

public void untrackBean(ObjectName beanName);
}
70 changes: 59 additions & 11 deletions src/main/java/org/datadog/jmxfetch/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,25 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.Attribute;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.MBeanAttributeInfo;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanInfo;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerDelegate;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.relation.MBeanServerNotificationFilter;
import javax.management.remote.JMXConnectionNotification;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
Expand All @@ -35,6 +33,44 @@ public class Connection {
protected MBeanServerConnection mbs;
protected Map<String, Object> env;
protected JMXServiceURL address;
private NotificationListener connectionNotificationListener;
private boolean seenConnectionIssues;

private static class ConnectionNotificationListener implements NotificationListener {
public void handleNotification(Notification notification, Object handback) {
if (!(notification instanceof JMXConnectionNotification)) {
return;
}
if (!(handback instanceof Connection)) {
return;
}

JMXConnectionNotification connNotif = (JMXConnectionNotification) notification;
Connection conn = (Connection) handback;

if (connNotif.getType() == JMXConnectionNotification.CLOSED
|| connNotif.getType() == JMXConnectionNotification.FAILED
|| connNotif.getType() == JMXConnectionNotification.NOTIFS_LOST) {
log.warn("Marking connection issues due to {} - {}",
connNotif.getType(), connNotif.getMessage());
conn.seenConnectionIssues = true;
}
log.debug("Received connection notification: {} Message: {}",
connNotif.getType(), connNotif.getMessage());
}
}

/** Subscribes for bean registration/deregistration events under the specified bean scopes. */
public void subscribeToBeanScopes(List<String> beanScopes, BeanTracker bl)
throws MalformedObjectNameException, IOException, InstanceNotFoundException {
BeanNotificationListener listener = new BeanNotificationListener(bl);
for (String scope : beanScopes) {
ObjectName name = new ObjectName(scope);
MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
filter.enableObjectName(name);
}
mbs.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, listener, null, null);
}

/** Gets attributes for matching bean name. */
public MBeanInfo getMBeanInfo(ObjectName beanName)
Expand All @@ -56,6 +92,10 @@ protected void createConnection() throws IOException {
log.info("Connecting to: " + this.address);
connector = JMXConnectorFactory.connect(this.address, this.env);
mbs = connector.getMBeanServerConnection();

this.connectionNotificationListener = new ConnectionNotificationListener();
connector.addConnectionNotificationListener(
this.connectionNotificationListener, null, this);
}

/** Gets attribute for matching bean and attribute name. */
Expand All @@ -73,13 +113,21 @@ public Object getAttribute(ObjectName objectName, String attributeName)
public void closeConnector() {
if (connector != null) {
try {
this.connector.removeConnectionNotificationListener(
this.connectionNotificationListener);
connector.close();
} catch (IOException e) {
connector = null;
} catch (IOException | ListenerNotFoundException e) {
// ignore
}
}
}

/** True if connection has been notified of failure/lost notifications. */
public boolean hasSeenConnectionIssues() {
return this.seenConnectionIssues;
}

/** Returns a boolean describing if the connection is still alive. */
public boolean isAlive() {
if (connector == null) {
Expand Down
Loading