Skip to content

Commit

Permalink
Development: Reduce payload for live synchronization of build overview (
Browse files Browse the repository at this point in the history
  • Loading branch information
krusche authored and AjayvirS committed Dec 3, 2024
1 parent 33e892c commit d4a3539
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public BuildJobQueueItem(BuildJobQueueItem queueItem, ResultDTO submissionResult
this(queueItem.id(), queueItem.name(), queueItem.buildAgent(), queueItem.participationId(), queueItem.courseId(), queueItem.exerciseId(), queueItem.retryCount(),
queueItem.priority(), queueItem.status(), queueItem.repositoryInfo(), queueItem.jobTimingInfo(), queueItem.buildConfig(), submissionResult);
}

public BuildJobQueueItem(BuildJobQueueItem queueItem, BuildAgentDTO buildAgent, int newRetryCount) {
this(queueItem.id(), queueItem.name(), buildAgent, queueItem.participationId(), queueItem.courseId(), queueItem.exerciseId(), newRetryCount, queueItem.priority(), null,
queueItem.repositoryInfo(), new JobTimingInfo(queueItem.jobTimingInfo.submissionDate(), ZonedDateTime.now(), null), queueItem.buildConfig(), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,26 @@ private void checkAvailabilityAndProcessNextBuild() {
processBuild(buildJob);
}
catch (RejectedExecutionException e) {
log.error("Couldn't add build job to threadpool: {}\n Concurrent Build Jobs Count: {} Active tasks in pool: {}, Concurrent Build Jobs Size: {}", buildJob,
// TODO: we should log this centrally and not on the local node
log.error("Couldn't add build job to thread pool: {}\n Concurrent Build Jobs Count: {} Active tasks in pool: {}, Concurrent Build Jobs Size: {}", buildJob,
localProcessingJobs.get(), localCIBuildExecutorService.getActiveCount(), localCIBuildExecutorService.getMaximumPoolSize(), e);

// Add the build job back to the queue
if (buildJob != null) {
processingJobs.remove(buildJob.id());

buildJob = new BuildJobQueueItem(buildJob, new BuildAgentDTO("", "", ""));
log.info("Adding build job back to the queue: {}", buildJob);
queue.add(buildJob);
// At most try out the build job 5 times when they get rejected
if (buildJob.retryCount() >= 5) {
// TODO: we should log this centrally and not on the local node
log.error("Build job was rejected 5 times. Not adding build job back to the queue: {}", buildJob);
}
else {
// NOTE: we increase the retry count here, because the build job was not processed successfully
// TODO: we should try to run this job on a different build agent to avoid getting the same error again
buildJob = new BuildJobQueueItem(buildJob, new BuildAgentDTO("", "", ""), buildJob.retryCount() + 1);
log.info("Adding build job {} back to the queue with retry count {}", buildJob, buildJob.retryCount());
queue.add(buildJob);
}
localProcessingJobs.decrementAndGet();
}

Expand Down Expand Up @@ -551,7 +561,8 @@ private void resumeBuildAgent() {
private boolean nodeIsAvailable() {
log.debug("Currently processing jobs on this node: {}, active threads in Pool: {}, maximum pool size of thread executor : {}", localProcessingJobs.get(),
localCIBuildExecutorService.getActiveCount(), localCIBuildExecutorService.getMaximumPoolSize());
return localProcessingJobs.get() < localCIBuildExecutorService.getMaximumPoolSize();
return localProcessingJobs.get() < localCIBuildExecutorService.getMaximumPoolSize()
&& localCIBuildExecutorService.getActiveCount() < localCIBuildExecutorService.getMaximumPoolSize() && localCIBuildExecutorService.getQueue().isEmpty();
}

public class QueuedBuildJobItemListener implements ItemListener<BuildJobQueueItem> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,21 @@ private void registerLocalCIMetrics() {
}

private static int extractRunningBuilds(Optional<SharedQueueManagementService> sharedQueueManagementService) {
return sharedQueueManagementService.map(queueManagementService -> queueManagementService.getBuildAgentInformation().stream()
.map(buildAgentInformation -> buildAgentInformation.runningBuildJobs().size()).reduce(0, Integer::sum)).orElse(0);
return sharedQueueManagementService.map(SharedQueueManagementService::getProcessingJobsSize).orElse(0);
}

private static int extractQueuedBuilds(Optional<SharedQueueManagementService> sharedQueueManagementService) {
return sharedQueueManagementService.map(queueManagementService -> queueManagementService.getQueuedJobs().size()).orElse(0);
return sharedQueueManagementService.map(SharedQueueManagementService::getQueuedJobsSize).orElse(0);
}

private static int extractBuildAgents(Optional<SharedQueueManagementService> sharedQueueManagementService) {
return sharedQueueManagementService.map(queueManagementService -> queueManagementService.getBuildAgentInformation().size()).orElse(0);
return sharedQueueManagementService.map(SharedQueueManagementService::getBuildAgentInformationSize).orElse(0);
}

private static int extractMaxConcurrentBuilds(Optional<SharedQueueManagementService> sharedQueueManagementService) {
return sharedQueueManagementService.map(queueManagementService -> queueManagementService.getBuildAgentInformation().stream()
.map(BuildAgentInformation::maxNumberOfConcurrentBuildJobs).reduce(0, Integer::sum)).orElse(0);
.filter(agent -> agent.status() != BuildAgentInformation.BuildAgentStatus.PAUSED).map(BuildAgentInformation::maxNumberOfConcurrentBuildJobs)
.reduce(0, Integer::sum)).orElse(0);
}

private void registerWebsocketMetrics() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.tum.cit.aet.artemis.programming.service.localci;

import java.util.ArrayList;
import java.util.List;

import jakarta.annotation.PostConstruct;
Expand All @@ -20,7 +21,9 @@
import com.hazelcast.map.listener.EntryUpdatedListener;

import de.tum.cit.aet.artemis.buildagent.dto.BuildAgentInformation;
import de.tum.cit.aet.artemis.buildagent.dto.BuildConfig;
import de.tum.cit.aet.artemis.buildagent.dto.BuildJobQueueItem;
import de.tum.cit.aet.artemis.buildagent.dto.RepositoryInfo;

/**
* This service is responsible for sending build job queue information over websockets.
Expand Down Expand Up @@ -68,18 +71,21 @@ public void init() {
}

private void sendQueuedJobsOverWebsocket(long courseId) {
localCIWebsocketMessagingService.sendQueuedBuildJobs(sharedQueueManagementService.getQueuedJobs());
localCIWebsocketMessagingService.sendQueuedBuildJobsForCourse(courseId, sharedQueueManagementService.getQueuedJobsForCourse(courseId));
var queuedJobs = removeUnnecessaryInformation(sharedQueueManagementService.getQueuedJobs());
var queuedJobsForCourse = queuedJobs.stream().filter(job -> job.courseId() == courseId).toList();
localCIWebsocketMessagingService.sendQueuedBuildJobs(queuedJobs);
localCIWebsocketMessagingService.sendQueuedBuildJobsForCourse(courseId, queuedJobsForCourse);
}

private void sendProcessingJobsOverWebsocket(long courseId) {
localCIWebsocketMessagingService.sendRunningBuildJobs(sharedQueueManagementService.getProcessingJobs());
localCIWebsocketMessagingService.sendRunningBuildJobsForCourse(courseId, sharedQueueManagementService.getProcessingJobsForCourse(courseId));
var processingJobs = removeUnnecessaryInformation(sharedQueueManagementService.getProcessingJobs());
var processingJobsForCourse = processingJobs.stream().filter(job -> job.courseId() == courseId).toList();
localCIWebsocketMessagingService.sendRunningBuildJobs(processingJobs);
localCIWebsocketMessagingService.sendRunningBuildJobsForCourse(courseId, processingJobsForCourse);
}

private void sendBuildAgentSummaryOverWebsocket() {
// remove the recentBuildJobs from the build agent information before sending it over the websocket
List<BuildAgentInformation> buildAgentSummary = sharedQueueManagementService.getBuildAgentInformationWithoutRecentBuildJobs();
var buildAgentSummary = removeUnnecessaryInformationFromBuildAgentInformation(sharedQueueManagementService.getBuildAgentInformationWithoutRecentBuildJobs());
localCIWebsocketMessagingService.sendBuildAgentSummary(buildAgentSummary);
}

Expand Down Expand Up @@ -142,4 +148,57 @@ public void entryUpdated(com.hazelcast.core.EntryEvent<String, BuildAgentInforma
sendBuildAgentInformationOverWebsocket(event.getValue().buildAgent().name());
}
}

/**
* Removes unnecessary information (e.g. repository info, build config, result) from the queued jobs before sending them over the websocket.
*
* @param queuedJobs the queued jobs
*/
private static List<BuildJobQueueItem> removeUnnecessaryInformation(List<BuildJobQueueItem> queuedJobs) {
var filteredQueuedJobs = new ArrayList<BuildJobQueueItem>(); // make list mutable in case it is not
for (BuildJobQueueItem job : queuedJobs) {
var buildConfig = removeUnnecessaryInformationFromBuildConfig(job.buildConfig());
var repositoryInfo = removeUnnecessaryInformationFromRepositoryInfo(job.repositoryInfo());
filteredQueuedJobs.add(new BuildJobQueueItem(job.id(), job.name(), job.buildAgent(), job.participationId(), job.courseId(), job.exerciseId(), job.retryCount(),
job.priority(), job.status(), repositoryInfo, job.jobTimingInfo(), buildConfig, null));

}
return filteredQueuedJobs;
}

/**
* Removes unnecessary information (e.g. build script, docker image) from the build config before sending it over the websocket.
*
* @param buildConfig the build config
*/
private static BuildConfig removeUnnecessaryInformationFromBuildConfig(BuildConfig buildConfig) {
// We pass "" instead of null strings to avoid errors when serializing to JSON
return new BuildConfig("", "", buildConfig.commitHashToBuild(), "", "", "", null, null, buildConfig.scaEnabled(), buildConfig.sequentialTestRunsEnabled(),
buildConfig.testwiseCoverageEnabled(), null, buildConfig.timeoutSeconds(), "", "", "");
}

/**
* Removes unnecessary information (RepositoryUris) from the repository info before sending it over the websocket.
*
* @param repositoryInfo the repository info
*/
private static RepositoryInfo removeUnnecessaryInformationFromRepositoryInfo(RepositoryInfo repositoryInfo) {
// We pass "" instead of null strings to avoid errors when serializing to JSON
return new RepositoryInfo(repositoryInfo.repositoryName(), repositoryInfo.repositoryType(), repositoryInfo.triggeredByPushTo(), "", "", "", null, null);
}

/**
* Removes unnecessary information (e.g. recent build jobs, public ssh key, result) from the running jobs before sending them over the websocket.
*
* @param buildAgentSummary the build agent summary
*/
private static List<BuildAgentInformation> removeUnnecessaryInformationFromBuildAgentInformation(List<BuildAgentInformation> buildAgentSummary) {
var filteredBuildAgentSummary = new ArrayList<BuildAgentInformation>(); // make list mutable in case it is not
for (BuildAgentInformation agent : buildAgentSummary) {
var runningJobs = removeUnnecessaryInformation(agent.runningBuildJobs());
filteredBuildAgentSummary.add(new BuildAgentInformation(agent.buildAgent(), agent.maxNumberOfConcurrentBuildJobs(), agent.numberOfCurrentBuildJobs(), runningJobs,
agent.status(), null, null));
}
return filteredBuildAgentSummary;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public void sendRunningBuildJobs(List<BuildJobQueueItem> buildJobQueue) {
public void sendBuildAgentSummary(List<BuildAgentInformation> buildAgentInfo) {
String channel = "/topic/admin/build-agents";
log.debug("Sending message on topic {}: {}", channel, buildAgentInfo);
// TODO: convert into a proper DTO and strip unnecessary information, e.g. build config, because it's not shown in the client and contains too much information
websocketMessagingService.sendMessage(channel, buildAgentInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public List<BuildJobQueueItem> getQueuedJobs() {
return new ArrayList<>(queue);
}

public int getQueuedJobsSize() {
return queue.size();
}

/**
* @return a copy of the processing jobs as ArrayList
*/
Expand All @@ -124,6 +128,10 @@ public List<BuildJobQueueItem> getProcessingJobs() {
return new ArrayList<>(processingJobs.values());
}

public int getProcessingJobsSize() {
return processingJobs.size();
}

public List<BuildJobQueueItem> getQueuedJobsForCourse(long courseId) {
return getQueuedJobs().stream().filter(job -> job.courseId() == courseId).toList();
}
Expand All @@ -145,6 +153,10 @@ public List<BuildAgentInformation> getBuildAgentInformation() {
return new ArrayList<>(buildAgentInformation.values());
}

public int getBuildAgentInformationSize() {
return buildAgentInformation.size();
}

public List<BuildAgentInformation> getBuildAgentInformationWithoutRecentBuildJobs() {
return getBuildAgentInformation().stream().map(agent -> new BuildAgentInformation(agent.buildAgent(), agent.maxNumberOfConcurrentBuildJobs(),
agent.numberOfCurrentBuildJobs(), agent.runningBuildJobs(), agent.status(), null, null)).toList();
Expand Down Expand Up @@ -303,5 +315,4 @@ public Page<BuildJob> getFilteredFinishedBuildJobs(FinishedBuildJobPageableSearc

return new PageImpl<>(orderedBuildJobs, buildJobIdsPage.getPageable(), buildJobIdsPage.getTotalElements());
}

}

0 comments on commit d4a3539

Please sign in to comment.