Skip to content

Commit

Permalink
Iris: Display ingestion state for lecture slide upload (#9090)
Browse files Browse the repository at this point in the history
  • Loading branch information
yassinsws authored and AjayvirS committed Dec 3, 2024
1 parent dfaccbd commit 8a64f88
Show file tree
Hide file tree
Showing 44 changed files with 997 additions and 215 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package de.tum.cit.aet.artemis.iris.dto;

public enum IngestionState {
NOT_STARTED, IN_PROGRESS, PARTIALLY_INGESTED, DONE, ERROR
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package de.tum.cit.aet.artemis.iris.dto;

public record IngestionStateResponseDTO(IngestionState state) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static de.tum.cit.aet.artemis.core.config.Constants.PROFILE_IRIS;

import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;

Expand All @@ -20,11 +22,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import de.tum.cit.aet.artemis.iris.domain.settings.IrisSubSettingsType;
import de.tum.cit.aet.artemis.iris.dto.IngestionState;
import de.tum.cit.aet.artemis.iris.dto.IngestionStateResponseDTO;
import de.tum.cit.aet.artemis.iris.exception.IrisException;
import de.tum.cit.aet.artemis.iris.exception.IrisForbiddenException;
import de.tum.cit.aet.artemis.iris.exception.IrisInternalPyrisErrorException;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.PyrisVariantDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.lectureingestionwebhook.PyrisWebhookLectureDeletionExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.lectureingestionwebhook.PyrisWebhookLectureIngestionExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.job.IngestionWebhookJob;
import de.tum.cit.aet.artemis.iris.web.open.PublicPyrisStatusUpdateResource;

/**
Expand All @@ -42,12 +48,19 @@ public class PyrisConnectorService {

private final ObjectMapper objectMapper;

private final PyrisJobService pyrisJobService;

@Value("${server.url}")
private String artemisBaseUrl;

@Value("${artemis.iris.url}")
private String pyrisUrl;

public PyrisConnectorService(@Qualifier("pyrisRestTemplate") RestTemplate restTemplate, MappingJackson2HttpMessageConverter springMvcJacksonConverter) {
public PyrisConnectorService(@Qualifier("pyrisRestTemplate") RestTemplate restTemplate, MappingJackson2HttpMessageConverter springMvcJacksonConverter,
PyrisJobService pyrisJobService) {
this.restTemplate = restTemplate;
this.objectMapper = springMvcJacksonConverter.getObjectMapper();
this.pyrisJobService = pyrisJobService;
}

/**
Expand Down Expand Up @@ -97,15 +110,63 @@ public void executePipeline(String feature, String variant, Object executionDTO)
* @param variant The variant of the feature to execute
* @param executionDTO The DTO sent as a body for the execution
*/
public void executeLectureWebhook(String variant, PyrisWebhookLectureIngestionExecutionDTO executionDTO) {
public void executeLectureAddtionWebhook(String variant, PyrisWebhookLectureIngestionExecutionDTO executionDTO) {
var endpoint = "/api/v1/webhooks/lectures/" + variant;
try {
restTemplate.postForEntity(pyrisUrl + endpoint, objectMapper.valueToTree(executionDTO), Void.class);
}
catch (HttpStatusCodeException e) {
log.error("Failed to send lecture unit {} to Pyris: {}", executionDTO.pyrisLectureUnit().lectureUnitId(), e.getMessage());
throw toIrisException(e);
}
catch (RestClientException | IllegalArgumentException e) {
log.error("Failed to send lecture unit {} to Pyris: {}", executionDTO.pyrisLectureUnit().lectureUnitId(), e.getMessage());
throw new PyrisConnectorException("Could not fetch response from Pyris");
}
}

/**
* Retrieves the ingestion state of the lecture unit specified by retrieving the ingestion state from the vector database in Pyris.
*
* @param courseId id of the course
* @param lectureId id of the lecture
* @param lectureUnitId id of the lectureUnit to check in the Pyris vector database
* @return The ingestion state of the lecture Unit
*
*/
IngestionState getLectureUnitIngestionState(long courseId, long lectureId, long lectureUnitId) {
try {
String encodedBaseUrl = URLEncoder.encode(artemisBaseUrl, StandardCharsets.UTF_8);
String url = pyrisUrl + "/api/v1/courses/" + courseId + "/lectures/" + lectureId + "/lectureUnits/" + lectureUnitId + "/ingestion-state?base_url=" + encodedBaseUrl;
IngestionStateResponseDTO response = restTemplate.getForObject(url, IngestionStateResponseDTO.class);
IngestionState state = response.state();
if (state != IngestionState.DONE) {
if (pyrisJobService.currentJobs().stream().filter(job -> job instanceof IngestionWebhookJob).map(job -> (IngestionWebhookJob) job)
.anyMatch(ingestionJob -> ingestionJob.courseId() == courseId && ingestionJob.lectureId() == lectureId && ingestionJob.lectureUnitId() == lectureUnitId)) {
return IngestionState.IN_PROGRESS;
}
}
return state;
}
catch (RestClientException | IllegalArgumentException e) {
log.error("Error fetching ingestion state for lecture {}, lecture unit {}", lectureId, lectureUnitId, e);
throw new PyrisConnectorException("Error fetching ingestion state for lecture unit" + lectureUnitId);
}
}

/**
* Executes a webhook and send lectures to the webhook with the given variant
*
* @param executionDTO The DTO sent as a body for the execution
*/
public void executeLectureDeletionWebhook(PyrisWebhookLectureDeletionExecutionDTO executionDTO) {
var endpoint = "/api/v1/webhooks/lectures/delete";
try {
restTemplate.postForEntity(pyrisUrl + endpoint, objectMapper.valueToTree(executionDTO), Void.class);
}
catch (HttpStatusCodeException e) {
log.error("Failed to send lectures to Pyris", e);
throw toIrisException(e);
// TODO : add error ingestion UI.
}
catch (RestClientException | IllegalArgumentException e) {
log.error("Failed to send lectures to Pyris", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static de.tum.cit.aet.artemis.core.config.Constants.PROFILE_IRIS;

import java.security.SecureRandom;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand Down Expand Up @@ -93,11 +94,14 @@ public String addCourseChatJob(Long courseId, Long sessionId) {
/**
* Adds a new ingestion webhook job to the job map with a timeout.
*
* @param courseId the ID of the course associated with the webhook job
* @param lectureId the ID of the lecture associated with the webhook job
* @param lectureUnitId the ID of the lecture unit associated with the webhook job
* @return a unique token identifying the created webhook job
*/
public String addIngestionWebhookJob() {
public String addIngestionWebhookJob(long courseId, long lectureId, long lectureUnitId) {
var token = generateJobIdToken();
var job = new IngestionWebhookJob(token);
var job = new IngestionWebhookJob(token, courseId, lectureId, lectureUnitId);
long timeoutWebhookJob = 60;
TimeUnit unitWebhookJob = TimeUnit.MINUTES;
jobMap.put(token, job, timeoutWebhookJob, unitWebhookJob);
Expand All @@ -122,6 +126,15 @@ public void updateJob(PyrisJob job) {
jobMap.put(job.jobId(), job);
}

/**
* Get all current jobs.
*
* @return the all current jobs
*/
public Collection<PyrisJob> currentJobs() {
return jobMap.values();
}

/**
* Get the job of a token.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,12 @@ private void removeJobIfTerminatedElseUpdate(List<PyrisStageDTO> stages, PyrisJo
}

/**
* Handles the status update of a lecture ingestion job and logs the results for now => will change later
* TODO: Update this method to handle changes beyond logging
* Handles the status update of a lecture ingestion job.
*
* @param job the job that is updated
* @param statusUpdate the status update
*/
public void handleStatusUpdate(IngestionWebhookJob job, PyrisLectureIngestionStatusUpdateDTO statusUpdate) {
statusUpdate.stages().forEach(stage -> log.info(stage.name() + ":" + stage.message()));
removeJobIfTerminatedElseUpdate(statusUpdate.stages(), job);
}
}
Loading

0 comments on commit 8a64f88

Please sign in to comment.