Skip to content

Commit

Permalink
Display broker metrics and brokerset status in kafka orion
Browse files Browse the repository at this point in the history
  • Loading branch information
yisheng-zhou committed Dec 6, 2024
1 parent 869442a commit d6a0171
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,26 @@ public class NodeInfo implements Serializable {
private Map<String, String> agentSettings;
private Map<String, String> environment;
private Set<String> brokersets = new HashSet<>();
/**
* Save the broker status in printable format (string)
*/
private Map<String, String> brokerStatus;
/**
* Save the broker status in raw format (number)
*/
private Map<String, Double> rawBrokerStatus;
/**
* @param rawBrokerStatus
*/
public void setRawBrokerStatus(Map<String, Double> rawBrokerStatus) {
this.rawBrokerStatus = rawBrokerStatus;
}
/**
* @return the rawBrokerStatus
*/
public Map<String, Double> getRawBrokerStatus() {
return rawBrokerStatus;
}
/**
* @param brokerStatus
*/
Expand Down Expand Up @@ -202,15 +221,22 @@ public void setNodeType(String nodeType) {
this.nodeType = nodeType;
}

/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "NodeInfo [nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId="
+ clusterId + ", servicePort=" + servicePort + ", localtime="
+ localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings="
+ agentSettings + ", environment=" + environment + "]";
return "NodeInfo [timestamp=" + timestamp +
", nodeId=" + nodeId +
", hostname=" + hostname +
", ip=" + ip +
", clusterId=" + clusterId +
", servicePort=" + servicePort +
", localtime=" + localtime +
", rack=" + rack +
", nodeType=" + nodeType +
", serviceInfo=" + serviceInfo +
", agentSettings=" + agentSettings +
", environment=" + environment +
", brokersets=" + brokersets +
", brokerStatus=" + brokerStatus +
", rawBrokerStatus=" + rawBrokerStatus + "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.pinterest.orion.core.automation.sensor.kafka;

import com.pinterest.orion.core.kafka.KafkaCluster;

public class BrokerMetricsSensor extends KafkaSensor {

@Override
public String getName() {
return "BrokerMetricsSensor";
}

@Override
public void sense(KafkaCluster cluster) throws Exception {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public void sense(KafkaCluster cluster) throws Exception {
brokersetState.addBrokerRange(Arrays.asList(start, end));
}
brokersetState.setBrokerIds(new ArrayList<>(brokerIds));
try {
updateBrokersetStateWithMetrics(cluster, brokersetState, brokerIds);
} catch (Exception e) {
logger.warning(
String.format("Failed to update brokerset state with metrics for brokerset %s in cluster %s.",
brokersetAlias,
cluster.getName()));
}
brokersetStateMap.put(brokersetAlias, brokersetState);
if (invalidBrokerset) {
handleInvalidBrokerset(brokersetAlias, cluster.getName());
Expand All @@ -95,6 +103,19 @@ public void sense(KafkaCluster cluster) throws Exception {
}
}

/**
* Update brokerset state with metrics.
* This method should be overridden by subclasses to update brokerset state with metrics.
* @param cluster Kafka cluster.
* @param brokersetState Brokerset state to update. It has state fields and raw metrics fields to update.
* @param brokerIds Broker ids in the brokerset.
*/
protected void updateBrokersetStateWithMetrics(
KafkaCluster cluster,
BrokersetState brokersetState,
Set<String> brokerIds) {
}

@Override
public String getName() {
return "BrokersetStateSensor";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class BrokersetState {
/**
Expand All @@ -19,6 +20,14 @@ public class BrokersetState {
* The brokerIds are obtained from the cluster state.
*/
private List<String> brokerIds = new ArrayList<>();
/**
* Save the broker status in printable format (string).
*/
private Map<String, Double> rawBrokersetStatus;
/**
* Save the broker status in raw format (number).
*/
private Map<String, String> brokersetStatus;
/**
* The constructor of BrokersetState.
* @param brokersetAlias
Expand Down Expand Up @@ -108,4 +117,16 @@ public List<String> getBrokerIds() {
public void setBrokerIds(List<String> brokerIds) {
this.brokerIds = brokerIds;
}
public void setRawBrokersetStatus(Map<String, Double> rawBrokersetStatus) {
this.rawBrokersetStatus = rawBrokersetStatus;
}
public Map<String, Double> getRawBrokersetStatus() {
return rawBrokersetStatus;
}
public void setBrokersetStatus(Map<String, String> brokersetStatus) {
this.brokersetStatus = brokersetStatus;
}
public Map<String, String> getBrokersetStatus() {
return brokersetStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ const routes = [
function getStatsData(clusterId, rawData) {
let brokersetStats = [];
let brokersetData = rawData.brokersetData;
brokersetStats.push({ key: "Broker Count", value: brokersetData.size});
brokersetStats.push({ key: "Broker_Count", value: brokersetData.size});
let brokersetStatus = brokersetData.brokersetStatus;
if (brokersetStatus !== undefined && brokersetStatus !== null) {
for (let key of Object.keys(brokersetStatus)) {
brokersetStats.push({ key: key, value: brokersetStatus[key] });
}
}
return brokersetStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,37 @@ export default function Brokersets(props) {
}
let columns = [
{ title: "Name", field: "brokersetAlias" },
{ title: "Broker Count", field: "brokerCount" }
{ title: "Broker Count", field: "brokerCount" },
{ title: "Max CPU Usage", field: "maxCpuUsage" },
{ title: "Max Disk Usage", field: "maxDiskUsage" },
{ title: "Updated Time", field: "timestamp" }
]
let clusterId = props.cluster.clusterId;
let brokersetToRowValuesMap = {};
for (let brokerset of brokersets) {
let brokersetAlias = brokerset.brokersetAlias;
let maxCpuUsage = "N/A";
let maxDiskUsage = "N/A";
let timestamp = "N/A"
if (brokerset.brokersetStatus) {
if (brokerset.brokersetStatus["CPU_Usage_All_Brokers_Max"] !== undefined) {
maxCpuUsage = brokerset.brokersetStatus["CPU_Usage_All_Brokers_Max"];
}
if (brokerset.brokersetStatus["Disk_Usage_All_Brokers_Max"] !== undefined) {
maxDiskUsage = brokerset.brokersetStatus["Disk_Usage_All_Brokers_Max"];
}
if (brokerset.brokersetStatus["Short_Timestamp"] !== undefined) {
timestamp = brokerset.brokersetStatus["Short_Timestamp"];
}
}
brokersetToRowValuesMap[brokersetAlias] = {
"brokersetAlias": brokersetAlias,
"clusterId": clusterId,
"brokerCount": brokerset.size,
"brokersetData": brokerset
"brokersetData": brokerset,
"maxCpuUsage": maxCpuUsage,
"maxDiskUsage": maxDiskUsage,
"timestamp": timestamp
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ function getBrokersetColumns() {

function getBrokerStatsData(cluster, node) {
let brokerStatsData = [];
let brokerStats = node.currentNodeInfo.brokerStatus;
if (brokerStats === undefined || brokerStats === null) {
return brokerStatsData;
}
for (let [key, value] of Object.entries(brokerStats)) {
brokerStatsData.push({key: key, value: JSON.stringify(value)});
}
return brokerStatsData;
}

Expand Down

0 comments on commit d6a0171

Please sign in to comment.