Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements to MemQ Cluster Sensor #308

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import com.pinterest.orion.core.utils.memq.zookeeper.MemqZookeeperClient;

import com.google.gson.Gson;
import com.pinterest.orion.common.NodeInfo;
Expand All @@ -39,9 +37,6 @@ public class MemqClusterSensor extends MemqSensor {

public static final String WRITE_ASSIGNMENTS = "writeAssignments";
public static final String TOPIC_CONFIG = "topicconfig";
public static final String BROKERS = "/brokers";
public static final String TOPICS = "/topics";
public static final String GOVERNOR = "/governor";
public static final String RAW_BROKER_INFO = "rawBrokerInfo";

@Override
Expand All @@ -57,29 +52,17 @@ public void initialize(Map<String, Object> config) throws PluginConfigurationExc
@Override
public void sense(MemqCluster cluster) throws Exception {
try {
if (cluster.getZkClient() == null) {
String zkUrl = cluster.getAttribute(MemqCluster.ZK_CONNECTION_STRING).getValue();
CuratorFramework curator = CuratorFrameworkFactory.newClient(zkUrl,
new ExponentialBackoffRetry(1000, 3));
curator.start();
curator.blockUntilConnected();
cluster.setZkClient(curator);
}
MemqZookeeperClient memqZookeeperClient = new MemqZookeeperClient(cluster);

CuratorFramework zkClient = cluster.getZkClient();
List<String> brokerNames = zkClient.getChildren().forPath(BROKERS);

List<String> brokerNames = memqZookeeperClient.getBrokerNames();
Map<String, List<String>> writeBrokerAssignments = new HashMap<>();

Map<String, Broker> rawBrokerMap = new HashMap<>();

Gson gson = new Gson();

Set<String> brokersInZookeeper = new HashSet<>();
for (String brokerName : brokerNames) {
byte[] brokerData = null;
String brokerDataJsonString = null;
try {
brokerData = zkClient.getData().forPath(BROKERS + "/" + brokerName);
brokerDataJsonString = memqZookeeperClient.getBrokerData(brokerName);
} catch (KeeperException.NoNodeException e) {
cluster.getNodeMap().remove(brokerName);
logger.info(
Expand All @@ -90,7 +73,7 @@ public void sense(MemqCluster cluster) throws Exception {
"Faced an unknown exception when getting broker data for " + brokerName +" from zookeeper:" + e);
continue;
}
Broker broker = gson.fromJson(new String(brokerData), Broker.class);
Broker broker = gson.fromJson(brokerDataJsonString, Broker.class);
NodeInfo info = new NodeInfo();
info.setClusterId(cluster.getClusterId());
String hostname = NetworkUtils.getHostnameFromIpIfAvailable(broker.getBrokerIP());
Expand All @@ -105,19 +88,21 @@ public void sense(MemqCluster cluster) throws Exception {

rawBrokerMap.put(broker.getBrokerIP(), broker);
for (TopicConfig topicConfig : broker.getAssignedTopics()) {
String topic = topicConfig.getTopic();
List<String> hostnames = writeBrokerAssignments.get(topic);
String topicName = topicConfig.getTopic();
List<String> hostnames = writeBrokerAssignments.get(topicName);
if (hostnames == null) {
hostnames = new ArrayList<>();
writeBrokerAssignments.put(topic, hostnames);
writeBrokerAssignments.put(topicName, hostnames);
}
hostnames.add(hostname);
}
brokersInZookeeper.add(broker.getBrokerIP());
}

boolean noBrokerInZookeeper = false;
if (brokersInZookeeper.isEmpty()) {
logger.warning("No broker found in zookeeper for cluster " + cluster.getClusterId());
noBrokerInZookeeper = true;
} else {
// Remove brokers that are not in zookeeper from the cluster node map
for (String nodeId : cluster.getNodeMap().keySet()) {
Expand All @@ -128,16 +113,20 @@ public void sense(MemqCluster cluster) throws Exception {
}

Map<String, TopicConfig> topicConfigMap = new HashMap<>();
List<String> topics = zkClient.getChildren().forPath(TOPICS);
for (String topic : topics) {
byte[] topicData = zkClient.getData().forPath(TOPICS + "/" + topic);
TopicConfig topicConfig = gson.fromJson(new String(topicData), TopicConfig.class);
topicConfigMap.put(topic, topicConfig);
List<String> topics = memqZookeeperClient.getTopics();
for (String topicName : topics) {
String topicDataJsonString = memqZookeeperClient.getTopicData(topicName);
TopicConfig topicConfig = gson.fromJson(topicDataJsonString, TopicConfig.class);
topicConfigMap.put(topicName, topicConfig);
}

byte[] governorData = zkClient.getData().forPath(GOVERNOR);
String governorIp = new String(governorData);
String clusterContext = "Governor: " + governorIp + "\n";
String clusterContext = "NO BROKER";
if (!noBrokerInZookeeper) {
String governorIp = memqZookeeperClient.getGovernorIp();
if (governorIp != null) {
clusterContext = "Governor: " + governorIp + "\n";
}
}

setAttribute(cluster, TOPIC_CONFIG, topicConfigMap);
setAttribute(cluster, RAW_BROKER_INFO, rawBrokerMap);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.pinterest.orion.core.utils.memq.zookeeper;

import com.pinterest.orion.core.memq.MemqCluster;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.List;

public class MemqZookeeperClient {
public static final String BROKERS = "/brokers";
public static final String TOPICS = "/topics";
public static final String GOVERNOR = "/governor";
private boolean refreshZkClientWhenException = true;
yisheng-zhou marked this conversation as resolved.
Show resolved Hide resolved
private String zkUrl;
private MemqCluster cluster;
private CuratorFramework zkClient;

public MemqZookeeperClient(MemqCluster cluster) throws Exception {
this.zkUrl = cluster.getAttribute(MemqCluster.ZK_CONNECTION_STRING).getValue();
this.cluster = cluster;
if (cluster.getZkClient() != null) {
this.zkClient = cluster.getZkClient();
} else {
refreshZkClient();
}
}

public void enableRefreshZkClientWhenException() {
this.refreshZkClientWhenException = true;
}

public void disableRefreshZkClientWhenException() {
this.refreshZkClientWhenException = false;
}

/**
* Create a new Zookeeper client using the connection string provided in the cluster configuration.
* @return CuratorFramework
* @throws Exception
*/
private CuratorFramework createZkClient() throws Exception {
CuratorFramework curator = CuratorFrameworkFactory.newClient(
zkUrl,
new ExponentialBackoffRetry(1000, 3)
);
curator.start();
curator.blockUntilConnected();
return curator;
}

/**
* Refresh the Zookeeper client by creating a new one.
* The new client is then set in the cluster object.
* @throws Exception
*/
public void refreshZkClient() throws Exception {
this.zkClient = createZkClient();
cluster.setZkClient(this.zkClient);
}

/**
* Get the children of a node in Zookeeper.
* When an exception occurs, the Zookeeper client is refreshed and the children are fetched again if the refreshZkClientWhenException flag is set.
* @param path The path of the node.
* @return List of children node names.
* @throws Exception
*/
private List<String> getChildNodes(String path) throws Exception {
try {
return zkClient.getChildren().forPath(path);
} catch (Exception e) {
if (refreshZkClientWhenException) {
refreshZkClient();
return zkClient.getChildren().forPath(path);
} else {
throw e;
}
}
}

/**
* Get the data of a node in Zookeeper.
* When an exception occurs, the Zookeeper client is refreshed and the data is fetched again if the refreshZkClientWhenException flag is set.
* @param path The path of the node.
* @return The data of the node as a json string
* @throws Exception
*/
private String getNodeData(String path) throws Exception {
try {
return new String(zkClient.getData().forPath(path));
} catch (Exception e) {
if (refreshZkClientWhenException) {
refreshZkClient();
return new String(zkClient.getData().forPath(path));
} else {
throw e;
}
}
}

/**
* Get the names of the brokers in Zookeeper.
* @return List of broker names.
* @throws Exception
*/
public List<String> getBrokerNames() throws Exception {
return getChildNodes(BROKERS);
}

/**
* Get the data of a broker in Zookeeper.
* @param brokerName The name of the broker.
* @return The data of the broker as a json string.
* @throws Exception
*/
public String getBrokerData(String brokerName) throws Exception {
return getNodeData(BROKERS + "/" + brokerName);
}

/**
* Get the names of the topics in Zookeeper.
* @return List of topic names.
* @throws Exception
*/
public List<String> getTopics() throws Exception {
return getChildNodes(TOPICS);
}

/**
* Get the data of a topic in Zookeeper.
* @param topicName The name of the topic.
* @return The data of the topic as a json string.
* @throws Exception
*/
public String getTopicData(String topicName) throws Exception {
return getNodeData(TOPICS + "/" + topicName);
}

/**
* Get IP address the governor in Zookeeper.
* In memq zookeeper, the governor is a node at the path "/governor". Its data is the IP address of the governor.
* @return The IP address of the governor.
* @throws Exception
*/
public String getGovernorIp() throws Exception {
try {
return getNodeData(GOVERNOR);
} catch (Exception e) {
return null;
}
}
}
Loading