From d6a01717547432b01ab0f00883f56c0931aa6387 Mon Sep 17 00:00:00 2001 From: yisheng-zhou Date: Fri, 6 Dec 2024 10:33:33 -0800 Subject: [PATCH 1/3] Display broker metrics and brokerset status in kafka orion --- .../com/pinterest/orion/common/NodeInfo.java | 42 +++++++++++++++---- .../sensor/kafka/BrokerMetricsSensor.java | 15 +++++++ .../sensor/kafka/BrokersetStateSensor.java | 21 ++++++++++ .../orion/core/kafka/BrokersetState.java | 21 ++++++++++ .../basic-components/Kafka/BrokersetEntry.js | 8 +++- .../src/basic-components/Kafka/Brokersets.js | 24 ++++++++++- .../src/basic-components/Kafka/KafkaNode.js | 7 ++++ 7 files changed, 127 insertions(+), 11 deletions(-) create mode 100644 orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokerMetricsSensor.java diff --git a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java index 628458bd..ae864bad 100644 --- a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java +++ b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java @@ -36,7 +36,26 @@ public class NodeInfo implements Serializable { private Map agentSettings; private Map environment; private Set brokersets = new HashSet<>(); + /** + * Save the broker status in printable format (string) + */ private Map brokerStatus; + /** + * Save the broker status in raw format (number) + */ + private Map rawBrokerStatus; + /** + * @param rawBrokerStatus + */ + public void setRawBrokerStatus(Map rawBrokerStatus) { + this.rawBrokerStatus = rawBrokerStatus; + } + /** + * @return the rawBrokerStatus + */ + public Map getRawBrokerStatus() { + return rawBrokerStatus; + } /** * @param brokerStatus */ @@ -202,15 +221,22 @@ public void setNodeType(String nodeType) { this.nodeType = nodeType; } - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ @Override public String toString() { - return "NodeInfo [nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId=" - + clusterId + ", servicePort=" + servicePort + ", localtime=" - + localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings=" - + agentSettings + ", environment=" + environment + "]"; + return "NodeInfo [timestamp=" + timestamp + + ", nodeId=" + nodeId + + ", hostname=" + hostname + + ", ip=" + ip + + ", clusterId=" + clusterId + + ", servicePort=" + servicePort + + ", localtime=" + localtime + + ", rack=" + rack + + ", nodeType=" + nodeType + + ", serviceInfo=" + serviceInfo + + ", agentSettings=" + agentSettings + + ", environment=" + environment + + ", brokersets=" + brokersets + + ", brokerStatus=" + brokerStatus + + ", rawBrokerStatus=" + rawBrokerStatus + "]"; } - } diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokerMetricsSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokerMetricsSensor.java new file mode 100644 index 00000000..cd802bdd --- /dev/null +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokerMetricsSensor.java @@ -0,0 +1,15 @@ +package com.pinterest.orion.core.automation.sensor.kafka; + +import com.pinterest.orion.core.kafka.KafkaCluster; + +public class BrokerMetricsSensor extends KafkaSensor { + + @Override + public String getName() { + return "BrokerMetricsSensor"; + } + + @Override + public void sense(KafkaCluster cluster) throws Exception { + } +} diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java index 173e36cd..0684f046 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java @@ -79,6 +79,14 @@ public void sense(KafkaCluster cluster) throws Exception { brokersetState.addBrokerRange(Arrays.asList(start, end)); } brokersetState.setBrokerIds(new ArrayList<>(brokerIds)); + try { + updateBrokersetStateWithMetrics(cluster, brokersetState, brokerIds); + } catch (Exception e) { + logger.warning( + String.format("Failed to update brokerset state with metrics for brokerset %s in cluster %s.", + brokersetAlias, + cluster.getName())); + } brokersetStateMap.put(brokersetAlias, brokersetState); if (invalidBrokerset) { handleInvalidBrokerset(brokersetAlias, cluster.getName()); @@ -95,6 +103,19 @@ public void sense(KafkaCluster cluster) throws Exception { } } + /** + * Update brokerset state with metrics. + * This method should be overridden by subclasses to update brokerset state with metrics. + * @param cluster Kafka cluster. + * @param brokersetState Brokerset state to update. It has state fields and raw metrics fields to update. + * @param brokerIds Broker ids in the brokerset. + */ + protected void updateBrokersetStateWithMetrics( + KafkaCluster cluster, + BrokersetState brokersetState, + Set brokerIds) { + } + @Override public String getName() { return "BrokersetStateSensor"; diff --git a/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java b/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java index d1bcbd10..7ca6b57b 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; public class BrokersetState { /** @@ -19,6 +20,14 @@ public class BrokersetState { * The brokerIds are obtained from the cluster state. */ private List brokerIds = new ArrayList<>(); + /** + * Save the broker status in printable format (string). + */ + private Map rawBrokersetStatus; + /** + * Save the broker status in raw format (number). + */ + private Map brokersetStatus; /** * The constructor of BrokersetState. * @param brokersetAlias @@ -108,4 +117,16 @@ public List getBrokerIds() { public void setBrokerIds(List brokerIds) { this.brokerIds = brokerIds; } + public void setRawBrokersetStatus(Map rawBrokersetStatus) { + this.rawBrokersetStatus = rawBrokersetStatus; + } + public Map getRawBrokersetStatus() { + return rawBrokersetStatus; + } + public void setBrokersetStatus(Map brokersetStatus) { + this.brokersetStatus = brokersetStatus; + } + public Map getBrokersetStatus() { + return brokersetStatus; + } } diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js index 00b8f1a2..41f06b93 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js @@ -30,7 +30,13 @@ const routes = [ function getStatsData(clusterId, rawData) { let brokersetStats = []; let brokersetData = rawData.brokersetData; - brokersetStats.push({ key: "Broker Count", value: brokersetData.size}); + brokersetStats.push({ key: "Broker_Count", value: brokersetData.size}); + let brokersetStatus = brokersetData.brokersetStatus; + if (brokersetStatus !== undefined && brokersetStatus !== null) { + for (let key of Object.keys(brokersetStatus)) { + brokersetStats.push({ key: key, value: brokersetStatus[key] }); + } + } return brokersetStats; } diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js index 1ea7aa51..72e26bec 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js @@ -42,17 +42,37 @@ export default function Brokersets(props) { } let columns = [ { title: "Name", field: "brokersetAlias" }, - { title: "Broker Count", field: "brokerCount" } + { title: "Broker Count", field: "brokerCount" }, + { title: "Max CPU Usage", field: "maxCpuUsage" }, + { title: "Max Disk Usage", field: "maxDiskUsage" }, + { title: "Updated Time", field: "timestamp" } ] let clusterId = props.cluster.clusterId; let brokersetToRowValuesMap = {}; for (let brokerset of brokersets) { let brokersetAlias = brokerset.brokersetAlias; + let maxCpuUsage = "N/A"; + let maxDiskUsage = "N/A"; + let timestamp = "N/A" + if (brokerset.brokersetStatus) { + if (brokerset.brokersetStatus["CPU_Usage_All_Brokers_Max"] !== undefined) { + maxCpuUsage = brokerset.brokersetStatus["CPU_Usage_All_Brokers_Max"]; + } + if (brokerset.brokersetStatus["Disk_Usage_All_Brokers_Max"] !== undefined) { + maxDiskUsage = brokerset.brokersetStatus["Disk_Usage_All_Brokers_Max"]; + } + if (brokerset.brokersetStatus["Short_Timestamp"] !== undefined) { + timestamp = brokerset.brokersetStatus["Short_Timestamp"]; + } + } brokersetToRowValuesMap[brokersetAlias] = { "brokersetAlias": brokersetAlias, "clusterId": clusterId, "brokerCount": brokerset.size, - "brokersetData": brokerset + "brokersetData": brokerset, + "maxCpuUsage": maxCpuUsage, + "maxDiskUsage": maxDiskUsage, + "timestamp": timestamp } } diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js index bbafb494..bf8cbe29 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js @@ -268,6 +268,13 @@ function getBrokersetColumns() { function getBrokerStatsData(cluster, node) { let brokerStatsData = []; + let brokerStats = node.currentNodeInfo.brokerStatus; + if (brokerStats === undefined || brokerStats === null) { + return brokerStatsData; + } + for (let [key, value] of Object.entries(brokerStats)) { + brokerStatsData.push({key: key, value: JSON.stringify(value)}); + } return brokerStatsData; } From d9273c0771fe9b17de17b0cb773141223b24af89 Mon Sep 17 00:00:00 2001 From: Yisheng Zhou Date: Tue, 10 Dec 2024 18:04:06 -0800 Subject: [PATCH 2/3] Address feedback comments --- .../sensor/kafka/BrokersetStateSensor.java | 6 ++++-- .../orion/core/kafka/BrokersetState.java | 10 ++++++++++ .../basic-components/Kafka/BrokersetEntry.js | 12 +++++++++++ .../src/basic-components/Kafka/Brokersets.js | 20 +++++++++---------- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java index 0684f046..91405941 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java @@ -83,9 +83,11 @@ public void sense(KafkaCluster cluster) throws Exception { updateBrokersetStateWithMetrics(cluster, brokersetState, brokerIds); } catch (Exception e) { logger.warning( - String.format("Failed to update brokerset state with metrics for brokerset %s in cluster %s.", + String.format( + "Failed to update brokerset state with metrics for brokerset %s in cluster %s. Error: %s", brokersetAlias, - cluster.getName())); + cluster.getName(), + e.getMessage())); } brokersetStateMap.put(brokersetAlias, brokersetState); if (invalidBrokerset) { diff --git a/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java b/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java index 7ca6b57b..99f8f36d 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java @@ -10,6 +10,10 @@ public class BrokersetState { * The brokersetAlias is the alias of the brokerset. */ private String brokersetAlias; + /** + * The instanceType is the type of the brokerset. + */ + private String instanceType; /** * The brokersetRanges are the ranges of brokerset that are in the brokerset. * The brokersetRanges are obtained from the brokerset configuration file. @@ -129,4 +133,10 @@ public void setBrokersetStatus(Map brokersetStatus) { public Map getBrokersetStatus() { return brokersetStatus; } + public void setInstanceType(String instanceType) { + this.instanceType = instanceType; + } + public String getInstanceType() { + return instanceType; + } } diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js index 41f06b93..bffbd85a 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js @@ -117,6 +117,10 @@ function getBrokersetInfoHeader(rawData, clusterId) { let brokersetData = rawData.brokersetData; let brokersetAlias = brokersetData.brokersetAlias; let brokerCount = brokersetData.size; + let instanceType = "Unknown"; + if (brokersetData.instanceType !== undefined && brokersetData.instanceType !== null) { + instanceType = brokersetData.instanceType; + } return ( @@ -136,6 +140,14 @@ function getBrokersetInfoHeader(rawData, clusterId) { label={brokerCount + " brokers"} /> + + + ); diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js index 72e26bec..3db6408e 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js @@ -43,23 +43,23 @@ export default function Brokersets(props) { let columns = [ { title: "Name", field: "brokersetAlias" }, { title: "Broker Count", field: "brokerCount" }, - { title: "Max CPU Usage", field: "maxCpuUsage" }, - { title: "Max Disk Usage", field: "maxDiskUsage" }, + { title: "Max CPU Usage 7D", field: "maxCpuUsage7Day" }, + { title: "Max Disk Usage 7D", field: "maxDiskUsage7Day" }, { title: "Updated Time", field: "timestamp" } ] let clusterId = props.cluster.clusterId; let brokersetToRowValuesMap = {}; for (let brokerset of brokersets) { let brokersetAlias = brokerset.brokersetAlias; - let maxCpuUsage = "N/A"; - let maxDiskUsage = "N/A"; + let maxCpuUsage7Day = "N/A"; + let maxDiskUsage7Day = "N/A"; let timestamp = "N/A" if (brokerset.brokersetStatus) { - if (brokerset.brokersetStatus["CPU_Usage_All_Brokers_Max"] !== undefined) { - maxCpuUsage = brokerset.brokersetStatus["CPU_Usage_All_Brokers_Max"]; + if (brokerset.brokersetStatus["CPU_Usage_Max_All_Brokers_7Days"] !== undefined) { + maxCpuUsage7Day = brokerset.brokersetStatus["CPU_Usage_Max_All_Brokers_7Days"]; } - if (brokerset.brokersetStatus["Disk_Usage_All_Brokers_Max"] !== undefined) { - maxDiskUsage = brokerset.brokersetStatus["Disk_Usage_All_Brokers_Max"]; + if (brokerset.brokersetStatus["Disk_Usage_Max_All_Brokers_7Days"] !== undefined) { + maxDiskUsage7Day = brokerset.brokersetStatus["Disk_Usage_Max_All_Brokers_7Days"]; } if (brokerset.brokersetStatus["Short_Timestamp"] !== undefined) { timestamp = brokerset.brokersetStatus["Short_Timestamp"]; @@ -70,8 +70,8 @@ export default function Brokersets(props) { "clusterId": clusterId, "brokerCount": brokerset.size, "brokersetData": brokerset, - "maxCpuUsage": maxCpuUsage, - "maxDiskUsage": maxDiskUsage, + "maxCpuUsage7Day": maxCpuUsage7Day, + "maxDiskUsage7Day": maxDiskUsage7Day, "timestamp": timestamp } } From 2e28b5e3ee7573237a4f440b0c37d024ec2c3773 Mon Sep 17 00:00:00 2001 From: Yisheng Zhou Date: Thu, 12 Dec 2024 11:53:20 -0800 Subject: [PATCH 3/3] Use string format for printing node --- .../com/pinterest/orion/common/NodeInfo.java | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java index ae864bad..057a9a1b 100644 --- a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java +++ b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java @@ -223,20 +223,24 @@ public void setNodeType(String nodeType) { @Override public String toString() { - return "NodeInfo [timestamp=" + timestamp + - ", nodeId=" + nodeId + - ", hostname=" + hostname + - ", ip=" + ip + - ", clusterId=" + clusterId + - ", servicePort=" + servicePort + - ", localtime=" + localtime + - ", rack=" + rack + - ", nodeType=" + nodeType + - ", serviceInfo=" + serviceInfo + - ", agentSettings=" + agentSettings + - ", environment=" + environment + - ", brokersets=" + brokersets + - ", brokerStatus=" + brokerStatus + - ", rawBrokerStatus=" + rawBrokerStatus + "]"; + return String.format( + "NodeInfo [timestamp=%d, nodeId=%s, hostname=%s, ip=%s, clusterId=%s, servicePort=%d, " + + "localtime=%d, rack=%s, nodeType=%s, serviceInfo=%s, agentSettings=%s, environment=%s, brokersets=%s, " + + "brokerStatus=%s, rawBrokerStatus=%s]", + timestamp, + nodeId, + hostname, + ip, + clusterId, + servicePort, + localtime, + rack, + nodeType, + serviceInfo, + agentSettings, + environment, + brokersets, + brokerStatus, + rawBrokerStatus); } }