From d6a01717547432b01ab0f00883f56c0931aa6387 Mon Sep 17 00:00:00 2001 From: yisheng-zhou Date: Fri, 6 Dec 2024 10:33:33 -0800 Subject: [PATCH] Display broker metrics and brokerset status in kafka orion --- .../com/pinterest/orion/common/NodeInfo.java | 42 +++++++++++++++---- .../sensor/kafka/BrokerMetricsSensor.java | 15 +++++++ .../sensor/kafka/BrokersetStateSensor.java | 21 ++++++++++ .../orion/core/kafka/BrokersetState.java | 21 ++++++++++ .../basic-components/Kafka/BrokersetEntry.js | 8 +++- .../src/basic-components/Kafka/Brokersets.js | 24 ++++++++++- .../src/basic-components/Kafka/KafkaNode.js | 7 ++++ 7 files changed, 127 insertions(+), 11 deletions(-) create mode 100644 orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokerMetricsSensor.java diff --git a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java index 628458bd..ae864bad 100644 --- a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java +++ b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java @@ -36,7 +36,26 @@ public class NodeInfo implements Serializable { private Map agentSettings; private Map environment; private Set brokersets = new HashSet<>(); + /** + * Save the broker status in printable format (string) + */ private Map brokerStatus; + /** + * Save the broker status in raw format (number) + */ + private Map rawBrokerStatus; + /** + * @param rawBrokerStatus + */ + public void setRawBrokerStatus(Map rawBrokerStatus) { + this.rawBrokerStatus = rawBrokerStatus; + } + /** + * @return the rawBrokerStatus + */ + public Map getRawBrokerStatus() { + return rawBrokerStatus; + } /** * @param brokerStatus */ @@ -202,15 +221,22 @@ public void setNodeType(String nodeType) { this.nodeType = nodeType; } - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ @Override public String toString() { - return "NodeInfo [nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId=" - + clusterId + ", servicePort=" + servicePort + ", localtime=" - + localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings=" - + agentSettings + ", environment=" + environment + "]"; + return "NodeInfo [timestamp=" + timestamp + + ", nodeId=" + nodeId + + ", hostname=" + hostname + + ", ip=" + ip + + ", clusterId=" + clusterId + + ", servicePort=" + servicePort + + ", localtime=" + localtime + + ", rack=" + rack + + ", nodeType=" + nodeType + + ", serviceInfo=" + serviceInfo + + ", agentSettings=" + agentSettings + + ", environment=" + environment + + ", brokersets=" + brokersets + + ", brokerStatus=" + brokerStatus + + ", rawBrokerStatus=" + rawBrokerStatus + "]"; } - } diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokerMetricsSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokerMetricsSensor.java new file mode 100644 index 00000000..cd802bdd --- /dev/null +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokerMetricsSensor.java @@ -0,0 +1,15 @@ +package com.pinterest.orion.core.automation.sensor.kafka; + +import com.pinterest.orion.core.kafka.KafkaCluster; + +public class BrokerMetricsSensor extends KafkaSensor { + + @Override + public String getName() { + return "BrokerMetricsSensor"; + } + + @Override + public void sense(KafkaCluster cluster) throws Exception { + } +} diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java index 173e36cd..0684f046 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java @@ -79,6 +79,14 @@ public void sense(KafkaCluster cluster) throws Exception { brokersetState.addBrokerRange(Arrays.asList(start, end)); } brokersetState.setBrokerIds(new ArrayList<>(brokerIds)); + try { + updateBrokersetStateWithMetrics(cluster, brokersetState, brokerIds); + } catch (Exception e) { + logger.warning( + String.format("Failed to update brokerset state with metrics for brokerset %s in cluster %s.", + brokersetAlias, + cluster.getName())); + } brokersetStateMap.put(brokersetAlias, brokersetState); if (invalidBrokerset) { handleInvalidBrokerset(brokersetAlias, cluster.getName()); @@ -95,6 +103,19 @@ public void sense(KafkaCluster cluster) throws Exception { } } + /** + * Update brokerset state with metrics. + * This method should be overridden by subclasses to update brokerset state with metrics. + * @param cluster Kafka cluster. + * @param brokersetState Brokerset state to update. It has state fields and raw metrics fields to update. + * @param brokerIds Broker ids in the brokerset. + */ + protected void updateBrokersetStateWithMetrics( + KafkaCluster cluster, + BrokersetState brokersetState, + Set brokerIds) { + } + @Override public String getName() { return "BrokersetStateSensor"; diff --git a/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java b/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java index d1bcbd10..7ca6b57b 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; public class BrokersetState { /** @@ -19,6 +20,14 @@ public class BrokersetState { * The brokerIds are obtained from the cluster state. */ private List brokerIds = new ArrayList<>(); + /** + * Save the broker status in printable format (string). + */ + private Map rawBrokersetStatus; + /** + * Save the broker status in raw format (number). + */ + private Map brokersetStatus; /** * The constructor of BrokersetState. * @param brokersetAlias @@ -108,4 +117,16 @@ public List getBrokerIds() { public void setBrokerIds(List brokerIds) { this.brokerIds = brokerIds; } + public void setRawBrokersetStatus(Map rawBrokersetStatus) { + this.rawBrokersetStatus = rawBrokersetStatus; + } + public Map getRawBrokersetStatus() { + return rawBrokersetStatus; + } + public void setBrokersetStatus(Map brokersetStatus) { + this.brokersetStatus = brokersetStatus; + } + public Map getBrokersetStatus() { + return brokersetStatus; + } } diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js index 00b8f1a2..41f06b93 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js @@ -30,7 +30,13 @@ const routes = [ function getStatsData(clusterId, rawData) { let brokersetStats = []; let brokersetData = rawData.brokersetData; - brokersetStats.push({ key: "Broker Count", value: brokersetData.size}); + brokersetStats.push({ key: "Broker_Count", value: brokersetData.size}); + let brokersetStatus = brokersetData.brokersetStatus; + if (brokersetStatus !== undefined && brokersetStatus !== null) { + for (let key of Object.keys(brokersetStatus)) { + brokersetStats.push({ key: key, value: brokersetStatus[key] }); + } + } return brokersetStats; } diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js index 1ea7aa51..72e26bec 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js @@ -42,17 +42,37 @@ export default function Brokersets(props) { } let columns = [ { title: "Name", field: "brokersetAlias" }, - { title: "Broker Count", field: "brokerCount" } + { title: "Broker Count", field: "brokerCount" }, + { title: "Max CPU Usage", field: "maxCpuUsage" }, + { title: "Max Disk Usage", field: "maxDiskUsage" }, + { title: "Updated Time", field: "timestamp" } ] let clusterId = props.cluster.clusterId; let brokersetToRowValuesMap = {}; for (let brokerset of brokersets) { let brokersetAlias = brokerset.brokersetAlias; + let maxCpuUsage = "N/A"; + let maxDiskUsage = "N/A"; + let timestamp = "N/A" + if (brokerset.brokersetStatus) { + if (brokerset.brokersetStatus["CPU_Usage_All_Brokers_Max"] !== undefined) { + maxCpuUsage = brokerset.brokersetStatus["CPU_Usage_All_Brokers_Max"]; + } + if (brokerset.brokersetStatus["Disk_Usage_All_Brokers_Max"] !== undefined) { + maxDiskUsage = brokerset.brokersetStatus["Disk_Usage_All_Brokers_Max"]; + } + if (brokerset.brokersetStatus["Short_Timestamp"] !== undefined) { + timestamp = brokerset.brokersetStatus["Short_Timestamp"]; + } + } brokersetToRowValuesMap[brokersetAlias] = { "brokersetAlias": brokersetAlias, "clusterId": clusterId, "brokerCount": brokerset.size, - "brokersetData": brokerset + "brokersetData": brokerset, + "maxCpuUsage": maxCpuUsage, + "maxDiskUsage": maxDiskUsage, + "timestamp": timestamp } } diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js index bbafb494..bf8cbe29 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js @@ -268,6 +268,13 @@ function getBrokersetColumns() { function getBrokerStatsData(cluster, node) { let brokerStatsData = []; + let brokerStats = node.currentNodeInfo.brokerStatus; + if (brokerStats === undefined || brokerStats === null) { + return brokerStatsData; + } + for (let [key, value] of Object.entries(brokerStats)) { + brokerStatsData.push({key: key, value: JSON.stringify(value)}); + } return brokerStatsData; }