From 4eaba4e5f095856973464d905b75730bf6e068ba Mon Sep 17 00:00:00 2001 From: Mohamed Bilel Besrour <58034472+BBesrour@users.noreply.github.com> Date: Wed, 27 Nov 2024 18:08:37 +0100 Subject: [PATCH] Integrated code lifecycle: Fix an issue with concurrent build queue access (#9876) --- .../service/SharedQueueProcessingService.java | 4 +- .../localci/SharedQueueManagementService.java | 47 ++++++++++++------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/src/main/java/de/tum/cit/aet/artemis/buildagent/service/SharedQueueProcessingService.java b/src/main/java/de/tum/cit/aet/artemis/buildagent/service/SharedQueueProcessingService.java index 2c2e8b8e16bb..8b43315ab01b 100644 --- a/src/main/java/de/tum/cit/aet/artemis/buildagent/service/SharedQueueProcessingService.java +++ b/src/main/java/de/tum/cit/aet/artemis/buildagent/service/SharedQueueProcessingService.java @@ -360,7 +360,9 @@ private BuildAgentInformation getUpdatedLocalBuildAgentInformation(BuildJobQueue } private List getProcessingJobsOfNode(String memberAddress) { - return processingJobs.values().stream().filter(job -> Objects.equals(job.buildAgent().memberAddress(), memberAddress)).toList(); + // NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition + List processingJobsList = new ArrayList<>(processingJobs.values()); + return processingJobsList.stream().filter(job -> Objects.equals(job.buildAgent().memberAddress(), memberAddress)).toList(); } private void removeOfflineNodes() { diff --git a/src/main/java/de/tum/cit/aet/artemis/programming/service/localci/SharedQueueManagementService.java b/src/main/java/de/tum/cit/aet/artemis/programming/service/localci/SharedQueueManagementService.java index 87b44d4872ba..059df76379da 100644 --- a/src/main/java/de/tum/cit/aet/artemis/programming/service/localci/SharedQueueManagementService.java +++ b/src/main/java/de/tum/cit/aet/artemis/programming/service/localci/SharedQueueManagementService.java @@ -108,36 +108,45 @@ public void pushDockerImageCleanupInfo() { } } + /** + * @return a copy of the queued build jobs as ArrayList + */ public List getQueuedJobs() { - return queue.stream().toList(); + // NOTE: we should not use streams with IQueue directly, because it can be unstable, when many items are added at the same time and there is a slow network condition + return new ArrayList<>(queue); } + /** + * @return a copy of the processing jobs as ArrayList + */ public List getProcessingJobs() { - return processingJobs.values().stream().toList(); + // NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition + return new ArrayList<>(processingJobs.values()); } public List getQueuedJobsForCourse(long courseId) { - return queue.stream().filter(job -> job.courseId() == courseId).toList(); + return getQueuedJobs().stream().filter(job -> job.courseId() == courseId).toList(); } public List getProcessingJobsForCourse(long courseId) { - return processingJobs.values().stream().filter(job -> job.courseId() == courseId).toList(); + return getProcessingJobs().stream().filter(job -> job.courseId() == courseId).toList(); } public List getQueuedJobsForParticipation(long participationId) { - return queue.stream().filter(job -> job.participationId() == participationId).toList(); + return getQueuedJobs().stream().filter(job -> job.participationId() == participationId).toList(); } public List getProcessingJobsForParticipation(long participationId) { - return processingJobs.values().stream().filter(job -> job.participationId() == participationId).toList(); + return getProcessingJobs().stream().filter(job -> job.participationId() == participationId).toList(); } public List getBuildAgentInformation() { - return buildAgentInformation.values().stream().toList(); + // NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition + return new ArrayList<>(buildAgentInformation.values()); } public List getBuildAgentInformationWithoutRecentBuildJobs() { - return buildAgentInformation.values().stream().map(agent -> new BuildAgentInformation(agent.buildAgent(), agent.maxNumberOfConcurrentBuildJobs(), + return getBuildAgentInformation().stream().map(agent -> new BuildAgentInformation(agent.buildAgent(), agent.maxNumberOfConcurrentBuildJobs(), agent.numberOfCurrentBuildJobs(), agent.runningBuildJobs(), agent.status(), null, null)).toList(); } @@ -156,9 +165,10 @@ public void resumeBuildAgent(String agent) { */ public void cancelBuildJob(String buildJobId) { // Remove build job if it is queued - if (queue.stream().anyMatch(job -> Objects.equals(job.id(), buildJobId))) { + List queuedJobs = getQueuedJobs(); + if (queuedJobs.stream().anyMatch(job -> Objects.equals(job.id(), buildJobId))) { List toRemove = new ArrayList<>(); - for (BuildJobQueueItem job : queue) { + for (BuildJobQueueItem job : queuedJobs) { if (Objects.equals(job.id(), buildJobId)) { toRemove.add(job); } @@ -197,7 +207,8 @@ public void cancelAllQueuedBuildJobs() { * Cancel all running build jobs. */ public void cancelAllRunningBuildJobs() { - for (BuildJobQueueItem buildJob : processingJobs.values()) { + List runningJobs = getProcessingJobs(); + for (BuildJobQueueItem buildJob : runningJobs) { cancelBuildJob(buildJob.id()); } } @@ -208,7 +219,7 @@ public void cancelAllRunningBuildJobs() { * @param agentName name of the agent */ public void cancelAllRunningBuildJobsForAgent(String agentName) { - processingJobs.values().stream().filter(job -> Objects.equals(job.buildAgent().name(), agentName)).forEach(job -> cancelBuildJob(job.id())); + getProcessingJobs().stream().filter(job -> Objects.equals(job.buildAgent().name(), agentName)).forEach(job -> cancelBuildJob(job.id())); } /** @@ -217,8 +228,9 @@ public void cancelAllRunningBuildJobsForAgent(String agentName) { * @param courseId id of the course */ public void cancelAllQueuedBuildJobsForCourse(long courseId) { + List queuedJobs = getQueuedJobs(); List toRemove = new ArrayList<>(); - for (BuildJobQueueItem job : queue) { + for (BuildJobQueueItem job : queuedJobs) { if (job.courseId() == courseId) { toRemove.add(job); } @@ -232,7 +244,8 @@ public void cancelAllQueuedBuildJobsForCourse(long courseId) { * @param courseId id of the course */ public void cancelAllRunningBuildJobsForCourse(long courseId) { - for (BuildJobQueueItem buildJob : processingJobs.values()) { + List runningJobs = getProcessingJobs(); + for (BuildJobQueueItem buildJob : runningJobs) { if (buildJob.courseId() == courseId) { cancelBuildJob(buildJob.id()); } @@ -246,14 +259,16 @@ public void cancelAllRunningBuildJobsForCourse(long courseId) { */ public void cancelAllJobsForParticipation(long participationId) { List toRemove = new ArrayList<>(); - for (BuildJobQueueItem queuedJob : queue) { + List queuedJobs = getQueuedJobs(); + for (BuildJobQueueItem queuedJob : queuedJobs) { if (queuedJob.participationId() == participationId) { toRemove.add(queuedJob); } } queue.removeAll(toRemove); - for (BuildJobQueueItem runningJob : processingJobs.values()) { + List runningJobs = getProcessingJobs(); + for (BuildJobQueueItem runningJob : runningJobs) { if (runningJob.participationId() == participationId) { cancelBuildJob(runningJob.id()); }