Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review comments and code formatting #362

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class WorkerNodeServiceImpl implements WorkerNodeService {
private static final Logger logger = LogManager.getLogger(WorkerNodeServiceImpl.class);

private static final long MAX_VERSION_GAP_ALLOWED = Long.getLong("max.allowed.version.gap.worker.recovery", 2);
private static boolean disableMonitoring = Boolean.getBoolean("global.worker.monitoring.disable");
private static final boolean disableMonitoring = Boolean.getBoolean("global.worker.monitoring.disable");
private static final String MSG_RECOVERY_VERSION_NAME = "MSG_RECOVERY_VERSION";

@Autowired
Expand All @@ -77,9 +77,7 @@ void setWorkerMonitoring(){

if(disableMonitoring) {
logger.info("Monitoring is disabled,setting busyness status as not available for all workers");
transactionTemplate.executeWithoutResult(transactionStatus -> {
readAllWorkersUuids().stream().forEach(uuid -> updateWorkerBusynessValue(uuid,"NA") );
});
transactionTemplate.executeWithoutResult(transactionStatus -> readAllWorkersUuids().stream().forEach(uuid -> updateWorkerBusynessValue(uuid,"NA") ));
}
}
//readAllWorkersUuids().stream().forEach(uuid -> updateWorkerBusynessValue(uuid,"NA") );
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

import java.util.List;

import static ch.lambdaj.Lambda.extract;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,43 @@
@ManagedResource(description = "Worker Metrics API")
public class WorkerMetricsMBean {

@Autowired
private CpuUtilizationService cpuUtilizationService;
@Autowired
private DiskReadUtilizationService diskReadUtilizationService;
@Autowired
private MemoryUtilizationService memoryUtilizationService;
@Autowired
private DiskWriteUtilizationService diskWriteUtilizationService;
@Autowired
private WorkerManager workerManager;
@Autowired
@Qualifier("numberOfExecutionThreads")
private Integer numberOfThreads;
@Autowired
private CpuUtilizationService cpuUtilizationService;
@Autowired
private DiskReadUtilizationService diskReadUtilizationService;
@Autowired
private MemoryUtilizationService memoryUtilizationService;
@Autowired
private DiskWriteUtilizationService diskWriteUtilizationService;
@Autowired
private WorkerManager workerManager;
@Autowired
@Qualifier("numberOfExecutionThreads")
private Integer numberOfThreads;

@ManagedAttribute(description = "Current Cpu Usage")
public double getCpuUsage() {
return cpuUtilizationService.getCurrentValue();
}
@ManagedAttribute(description = "Current Cpu Usage")
public double getCpuUsage() {
return cpuUtilizationService.getCurrentValue();
}

@ManagedAttribute(description = "Current Memory Usage")
public double getMemoryUsage() {
return memoryUtilizationService.getCurrentValue();
}
@ManagedAttribute(description = "Current Memory Usage")
public double getMemoryUsage() {
return memoryUtilizationService.getCurrentValue();
}

@ManagedAttribute(description = "Current Disk Read Usage")
public long getDiskReadUsage() {
return diskReadUtilizationService.getCurrentValue();
}
@ManagedAttribute(description = "Current Disk Read Usage")
public long getDiskReadUsage() {
return diskReadUtilizationService.getCurrentValue();
}

@ManagedAttribute(description = "Current Disk Write Usage")
public long getDiskWriteUsage() {
return diskWriteUtilizationService.getCurrentValue();
}
@ManagedAttribute(description = "Current Disk Write Usage")
public long getDiskWriteUsage() {
return diskWriteUtilizationService.getCurrentValue();
}

@ManagedAttribute(description = "Running Tasks Count")
public double getWorkerThreadsUsage() {
return ((double) workerManager.getRunningTasksCount() * 100) / numberOfThreads;
}
@ManagedAttribute(description = "Running Tasks Count")
public double getWorkerThreadsUsage() {
return ((double) workerManager.getRunningTasksCount() * 100) / numberOfThreads;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void init() {

@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> cpuUsage = Pair.of(WorkerPerformanceMetric.CPU_USAGE,getCurrentValue());
Pair<WorkerPerformanceMetric, Serializable> cpuUsage = Pair.of(WorkerPerformanceMetric.CPU_USAGE, getCurrentValue());
return cpuUsage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@
import java.io.Serializable;

public class DiskReadUtilizationService extends WorkerPerformanceMetricBase {
private OSProcess process;
private OSProcess process;

@PostConstruct
public void init() {
this.process = getProcess();
}
@PostConstruct
public void init() {
this.process = getProcess();
}

@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> diskUsage = Pair.of(WorkerPerformanceMetric.DISK_READ_USAGE, getCurrentValue());
return diskUsage;
}
@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> diskUsage = Pair.of(WorkerPerformanceMetric.DISK_READ_USAGE, getCurrentValue());
return diskUsage;
}

public long getCurrentValue() {
long readBytes = 0;
if (process != null) {
readBytes = process.getBytesRead();
}
return readBytes;
}
public long getCurrentValue() {
long readBytes = 0;
if (process != null) {
readBytes = process.getBytesRead();
}
return readBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@
import java.io.Serializable;

public class DiskWriteUtilizationService extends WorkerPerformanceMetricBase {
private OSProcess process;
private OSProcess process;

@PostConstruct
public void init() {
this.process = getProcess();
}
@PostConstruct
public void init() {
this.process = getProcess();
}

@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> diskWriteUsage = Pair.of(WorkerPerformanceMetric.DISK_WRITE_USAGE, getCurrentValue());
return diskWriteUsage;
}
@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> diskWriteUsage = Pair.of(WorkerPerformanceMetric.DISK_WRITE_USAGE, getCurrentValue());
return diskWriteUsage;
}

public long getCurrentValue() {
long writeBytes = 0;
if (process != null) {
writeBytes = process.getBytesWritten();
}
return writeBytes;
}
public long getCurrentValue() {
long writeBytes = 0;
if (process != null) {
writeBytes = process.getBytesWritten();
}
return writeBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,28 @@

import io.cloudslang.worker.monitor.service.WorkerPerformanceMetric;
import org.apache.commons.lang3.tuple.Pair;

import java.io.Serializable;

import static java.lang.Runtime.getRuntime;

public class HeapUtilizationService extends WorkerPerformanceMetricBase {

@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> heapUsage = Pair.of(WorkerPerformanceMetric.HEAP_SIZE, getCurrentValue());
return heapUsage;
}

public double getCurrentValue() {
// Get current size of heap in bytes
long totalMemory = Runtime.getRuntime().totalMemory();
long freeMemory = getRuntime().freeMemory();
double allocatedMemory = totalMemory - freeMemory;
long maxMemory = getRuntime().maxMemory();
double percentageHeapUsed = (allocatedMemory / formatTo2Decimal(maxMemory)) * 100;
return formatTo2Decimal(percentageHeapUsed);
}
@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> heapUsage = Pair.of(WorkerPerformanceMetric.HEAP_SIZE, getCurrentValue());
return heapUsage;
}

public double getCurrentValue() {
// Get current size of heap in bytes
long totalMemory = Runtime.getRuntime().totalMemory();
long freeMemory = getRuntime().freeMemory();
double allocatedMemory = totalMemory - freeMemory;
long maxMemory = getRuntime().maxMemory();
double percentageHeapUsed = (allocatedMemory / formatTo2Decimal(maxMemory)) * 100;
return formatTo2Decimal(percentageHeapUsed);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import oshi.SystemInfo;
import oshi.hardware.GlobalMemory;
import oshi.software.os.OSProcess;

import javax.annotation.PostConstruct;
import java.io.Serializable;

public class MemoryUtilizationService extends WorkerPerformanceMetricBase {

private long usedRamProcess;
private OSProcess process;
private long totalRam;

Expand All @@ -39,13 +39,12 @@ public void init() {

@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> memUsage = Pair.of(WorkerPerformanceMetric.MEMORY_USAGE, getCurrentValue());
return memUsage;
return Pair.of(WorkerPerformanceMetric.MEMORY_USAGE, getCurrentValue());
}

public double getCurrentValue() {
this.usedRamProcess = process.getResidentSetSize();
double ramUsed = (double) (usedRamProcess * 100 / totalRam);
long usedRamProcess = process.getResidentSetSize();
double ramUsed = ((double) usedRamProcess * 100 / totalRam);
return formatTo2Decimal(ramUsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@

public abstract class WorkerPerformanceMetricBase implements WorkerPerfMetric {

protected OSProcess getProcess() {
SystemInfo systemInfo = new SystemInfo();
OperatingSystem operatingSystem = systemInfo.getOperatingSystem();
int processId = operatingSystem.getProcessId();
OSProcess osProcess = operatingSystem.getProcess(processId);
return osProcess;
}
protected OSProcess getProcess() {
SystemInfo systemInfo = new SystemInfo();
OperatingSystem operatingSystem = systemInfo.getOperatingSystem();
int processId = operatingSystem.getProcessId();
OSProcess osProcess = operatingSystem.getProcess(processId);
return osProcess;
}

protected double formatTo2Decimal(double value) {
return new BigDecimal(value).setScale(2, RoundingMode.HALF_EVEN).doubleValue();
}
protected double formatTo2Decimal(double value) {
return new BigDecimal(value).setScale(2, RoundingMode.HALF_EVEN).doubleValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@

public class WorkerThreadUtilization implements WorkerPerfMetric {

@Autowired
private WorkerManager workerManager;
@Autowired
private WorkerManager workerManager;

@Autowired
@Qualifier("numberOfExecutionThreads")
private int numberOfThreads;
@Autowired
@Qualifier("numberOfExecutionThreads")
private int numberOfThreads;


@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> threadUtilization = Pair.of(WorkerPerformanceMetric.THREAD_UTILIZATION, getCurrentValue());
return threadUtilization;
}
@Override
public Pair<WorkerPerformanceMetric, Serializable> measure() {
Pair<WorkerPerformanceMetric, Serializable> threadUtilization = Pair.of(WorkerPerformanceMetric.THREAD_UTILIZATION, getCurrentValue());
return threadUtilization;
}

public int getCurrentValue() {
return ((workerManager.getRunningTasksCount() * 100) / numberOfThreads);
}
public int getCurrentValue() {
return ((workerManager.getRunningTasksCount() * 100) / numberOfThreads);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,28 @@

public class WorkerMetricsServiceImpl implements WorkerMetricsService {
protected static final Logger logger = LogManager.getLogger(WorkerMetricsServiceImpl.class);
static int capacity = Integer.getInteger("metrics.collection.sampleCount", 10);
boolean disabled = Boolean.getBoolean("worker.monitoring.disable");
final int capacity = Integer.getInteger("metrics.collection.sampleCount", Integer.MAX_VALUE);
final boolean disabled = Boolean.getBoolean("worker.monitoring.disable");
@Autowired
PerfMetricCollector perfMetricCollector;

@Autowired
private WorkerStateUpdateService workerStateUpdateService;

private LinkedBlockingQueue<Map<WorkerPerformanceMetric, Serializable>> collectMetricQueue = new LinkedBlockingQueue<Map<WorkerPerformanceMetric, Serializable>>(capacity);
private final LinkedBlockingQueue<Map<WorkerPerformanceMetric, Serializable>> collectMetricQueue = new LinkedBlockingQueue<>(capacity);
@Autowired
private EventBus eventBus;

@Override
public void collectPerformanceMetrics() {
try {
if(!isMonitoringDisabled()) {
if (!isMonitoringDisabled()) {
Map<WorkerPerformanceMetric, Serializable> metricInfo = perfMetricCollector.collectMetrics();
collectMetricQueue.put(metricInfo);
if (logger.isDebugEnabled()) {
logger.debug("Collected worker metric "+ metricInfo.size());
logger.debug("Collected worker metric " + metricInfo.size());
}
}
else{
Thread.sleep(100);
}
} catch (Exception e) {
logger.error("Failed to load metric into queue", e);
}
Expand All @@ -69,17 +66,14 @@ private boolean isMonitoringDisabled() {
@Override
public void dispatchPerformanceMetrics() {
try {
if(!isMonitoringDisabled()) {
List<Map<WorkerPerformanceMetric, Serializable>> metricData = getCurrentBatch(collectMetricQueue);
List<Map<WorkerPerformanceMetric, Serializable>> metricData = getCurrentBatch(collectMetricQueue);
if (!isMonitoringDisabled() && metricData.size() > 0) {
ScoreEvent scoreEvent = new ScoreEvent(EventConstants.WORKER_PERFORMANCE_MONITOR, (Serializable) metricData);
eventBus.dispatch(scoreEvent);
if (logger.isDebugEnabled()) {
logger.debug("Dispatched worker metric "+ metricData.size());
logger.debug("Dispatched worker metric " + metricData.size());
}
}
else{
Thread.sleep(100);
}
} catch (Exception e) {
logger.error("Failed to dispatch metric info event", e);
}
Expand Down
Loading