diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/memq/MemqClusterSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/memq/MemqClusterSensor.java index ee2c5c6d..d2504d2e 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/memq/MemqClusterSensor.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/memq/MemqClusterSensor.java @@ -22,9 +22,7 @@ import java.util.Map; import java.util.Set; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; +import com.pinterest.orion.core.utils.memq.zookeeper.MemqZookeeperClient; import com.google.gson.Gson; import com.pinterest.orion.common.NodeInfo; @@ -39,9 +37,6 @@ public class MemqClusterSensor extends MemqSensor { public static final String WRITE_ASSIGNMENTS = "writeAssignments"; public static final String TOPIC_CONFIG = "topicconfig"; - public static final String BROKERS = "/brokers"; - public static final String TOPICS = "/topics"; - public static final String GOVERNOR = "/governor"; public static final String RAW_BROKER_INFO = "rawBrokerInfo"; @Override @@ -57,29 +52,17 @@ public void initialize(Map config) throws PluginConfigurationExc @Override public void sense(MemqCluster cluster) throws Exception { try { - if (cluster.getZkClient() == null) { - String zkUrl = cluster.getAttribute(MemqCluster.ZK_CONNECTION_STRING).getValue(); - CuratorFramework curator = CuratorFrameworkFactory.newClient(zkUrl, - new ExponentialBackoffRetry(1000, 3)); - curator.start(); - curator.blockUntilConnected(); - cluster.setZkClient(curator); - } + MemqZookeeperClient memqZookeeperClient = new MemqZookeeperClient(cluster); - CuratorFramework zkClient = cluster.getZkClient(); - List brokerNames = zkClient.getChildren().forPath(BROKERS); - + List brokerNames = memqZookeeperClient.getBrokerNames(); Map> writeBrokerAssignments = new HashMap<>(); - Map rawBrokerMap = new HashMap<>(); - Gson gson = new Gson(); - Set brokersInZookeeper = new HashSet<>(); for (String brokerName : brokerNames) { - byte[] brokerData = null; + String brokerDataJsonString = null; try { - brokerData = zkClient.getData().forPath(BROKERS + "/" + brokerName); + brokerDataJsonString = memqZookeeperClient.getBrokerData(brokerName); } catch (KeeperException.NoNodeException e) { cluster.getNodeMap().remove(brokerName); logger.info( @@ -90,7 +73,7 @@ public void sense(MemqCluster cluster) throws Exception { "Faced an unknown exception when getting broker data for " + brokerName +" from zookeeper:" + e); continue; } - Broker broker = gson.fromJson(new String(brokerData), Broker.class); + Broker broker = gson.fromJson(brokerDataJsonString, Broker.class); NodeInfo info = new NodeInfo(); info.setClusterId(cluster.getClusterId()); String hostname = NetworkUtils.getHostnameFromIpIfAvailable(broker.getBrokerIP()); @@ -105,19 +88,21 @@ public void sense(MemqCluster cluster) throws Exception { rawBrokerMap.put(broker.getBrokerIP(), broker); for (TopicConfig topicConfig : broker.getAssignedTopics()) { - String topic = topicConfig.getTopic(); - List hostnames = writeBrokerAssignments.get(topic); + String topicName = topicConfig.getTopic(); + List hostnames = writeBrokerAssignments.get(topicName); if (hostnames == null) { hostnames = new ArrayList<>(); - writeBrokerAssignments.put(topic, hostnames); + writeBrokerAssignments.put(topicName, hostnames); } hostnames.add(hostname); } brokersInZookeeper.add(broker.getBrokerIP()); } + boolean noBrokerInZookeeper = false; if (brokersInZookeeper.isEmpty()) { logger.warning("No broker found in zookeeper for cluster " + cluster.getClusterId()); + noBrokerInZookeeper = true; } else { // Remove brokers that are not in zookeeper from the cluster node map for (String nodeId : cluster.getNodeMap().keySet()) { @@ -128,16 +113,20 @@ public void sense(MemqCluster cluster) throws Exception { } Map topicConfigMap = new HashMap<>(); - List topics = zkClient.getChildren().forPath(TOPICS); - for (String topic : topics) { - byte[] topicData = zkClient.getData().forPath(TOPICS + "/" + topic); - TopicConfig topicConfig = gson.fromJson(new String(topicData), TopicConfig.class); - topicConfigMap.put(topic, topicConfig); + List topics = memqZookeeperClient.getTopics(); + for (String topicName : topics) { + String topicDataJsonString = memqZookeeperClient.getTopicData(topicName); + TopicConfig topicConfig = gson.fromJson(topicDataJsonString, TopicConfig.class); + topicConfigMap.put(topicName, topicConfig); } - byte[] governorData = zkClient.getData().forPath(GOVERNOR); - String governorIp = new String(governorData); - String clusterContext = "Governor: " + governorIp + "\n"; + String clusterContext = "NO BROKER"; + if (!noBrokerInZookeeper) { + String governorIp = memqZookeeperClient.getGovernorIp(); + if (governorIp != null) { + clusterContext = "Governor: " + governorIp + "\n"; + } + } setAttribute(cluster, TOPIC_CONFIG, topicConfigMap); setAttribute(cluster, RAW_BROKER_INFO, rawBrokerMap); diff --git a/orion-server/src/main/java/com/pinterest/orion/core/utils/memq/zookeeper/MemqZookeeperClient.java b/orion-server/src/main/java/com/pinterest/orion/core/utils/memq/zookeeper/MemqZookeeperClient.java new file mode 100644 index 00000000..9a72e334 --- /dev/null +++ b/orion-server/src/main/java/com/pinterest/orion/core/utils/memq/zookeeper/MemqZookeeperClient.java @@ -0,0 +1,153 @@ +package com.pinterest.orion.core.utils.memq.zookeeper; + +import com.pinterest.orion.core.memq.MemqCluster; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.util.List; + +public class MemqZookeeperClient { + public static final String BROKERS = "/brokers"; + public static final String TOPICS = "/topics"; + public static final String GOVERNOR = "/governor"; + private boolean refreshZkClientOnException = true; + private String zkUrl; + private MemqCluster cluster; + private CuratorFramework zkClient; + + public MemqZookeeperClient(MemqCluster cluster) throws Exception { + this.zkUrl = cluster.getAttribute(MemqCluster.ZK_CONNECTION_STRING).getValue(); + this.cluster = cluster; + if (cluster.getZkClient() != null) { + this.zkClient = cluster.getZkClient(); + } else { + refreshZkClient(); + } + } + + public void enableRefreshZkClientOnException() { + this.refreshZkClientOnException = true; + } + + public void disableRefreshZkClientOnException() { + this.refreshZkClientOnException = false; + } + + /** + * Create a new Zookeeper client using the connection string provided in the cluster configuration. + * @return CuratorFramework + * @throws Exception + */ + private CuratorFramework createZkClient() throws Exception { + CuratorFramework curator = CuratorFrameworkFactory.newClient( + zkUrl, + new ExponentialBackoffRetry(1000, 3) + ); + curator.start(); + curator.blockUntilConnected(); + return curator; + } + + /** + * Refresh the Zookeeper client by creating a new one. + * The new client is then set in the cluster object. + * @throws Exception + */ + public void refreshZkClient() throws Exception { + this.zkClient = createZkClient(); + cluster.setZkClient(this.zkClient); + } + + /** + * Get the children of a node in Zookeeper. + * When an exception occurs, the Zookeeper client is refreshed and the children are fetched again if the refreshZkClientOnException flag is set. + * @param path The path of the node. + * @return List of children node names. + * @throws Exception + */ + private List getChildNodes(String path) throws Exception { + try { + return zkClient.getChildren().forPath(path); + } catch (Exception e) { + if (refreshZkClientOnException) { + refreshZkClient(); + return zkClient.getChildren().forPath(path); + } else { + throw e; + } + } + } + + /** + * Get the data of a node in Zookeeper. + * When an exception occurs, the Zookeeper client is refreshed and the data is fetched again if the refreshZkClientOnException flag is set. + * @param path The path of the node. + * @return The data of the node as a json string + * @throws Exception + */ + private String getNodeData(String path) throws Exception { + try { + return new String(zkClient.getData().forPath(path)); + } catch (Exception e) { + if (refreshZkClientOnException) { + refreshZkClient(); + return new String(zkClient.getData().forPath(path)); + } else { + throw e; + } + } + } + + /** + * Get the names of the brokers in Zookeeper. + * @return List of broker names. + * @throws Exception + */ + public List getBrokerNames() throws Exception { + return getChildNodes(BROKERS); + } + + /** + * Get the data of a broker in Zookeeper. + * @param brokerName The name of the broker. + * @return The data of the broker as a json string. + * @throws Exception + */ + public String getBrokerData(String brokerName) throws Exception { + return getNodeData(BROKERS + "/" + brokerName); + } + + /** + * Get the names of the topics in Zookeeper. + * @return List of topic names. + * @throws Exception + */ + public List getTopics() throws Exception { + return getChildNodes(TOPICS); + } + + /** + * Get the data of a topic in Zookeeper. + * @param topicName The name of the topic. + * @return The data of the topic as a json string. + * @throws Exception + */ + public String getTopicData(String topicName) throws Exception { + return getNodeData(TOPICS + "/" + topicName); + } + + /** + * Get IP address the governor in Zookeeper. + * In memq zookeeper, the governor is a node at the path "/governor". Its data is the IP address of the governor. + * @return The IP address of the governor. + * @throws Exception + */ + public String getGovernorIp() throws Exception { + try { + return getNodeData(GOVERNOR); + } catch (Exception e) { + return null; + } + } +}