Skip to content

Commit

Permalink
BE: Expose Kafka Connect validation errors in the UI
Browse files Browse the repository at this point in the history
  • Loading branch information
yeikel committed Dec 12, 2024
1 parent 318bcc9 commit 57c69f5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.client;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.ApiClient;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
Expand All @@ -14,9 +15,11 @@
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.util.WebClientConfigurator;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand Down Expand Up @@ -58,10 +61,24 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {

private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
Mono.error(new ValidationException("Invalid configuration")))
.onErrorResume(WebClientResponseException.InternalServerError.class, e ->
Mono.error(new ValidationException("Invalid configuration")));
.onErrorResume(WebClientResponseException.BadRequest.class,
RetryingKafkaConnectClient::parseConnectErrorMessage)
.onErrorResume(WebClientResponseException.InternalServerError.class,
RetryingKafkaConnectClient::parseConnectErrorMessage);
}

// Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
// Adding the connect runtime dependency for this single class seems excessive
private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}

private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
return Mono.error(new ValidationException(
Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message()));
}

@Override
Expand Down
9 changes: 8 additions & 1 deletion api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.kafbat.ui.api.model.ErrorResponse;
import io.kafbat.ui.model.ConnectorDTO;
import io.kafbat.ui.model.ConnectorPluginConfigDTO;
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
Expand Down Expand Up @@ -280,7 +281,13 @@ public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
)
)
.exchange()
.expectStatus().isBadRequest();
.expectStatus().isBadRequest()
.expectBody(ErrorResponse.class)
.value(response -> assertThat(response.getMessage()).isEqualTo("""
Connector configuration is invalid and contains the following 2 error(s):
Invalid value invalid number for configuration tasks.max: Not a number of type INT
Invalid value null for configuration tasks.max: Value must be non-null
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"""));

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
Expand Down

0 comments on commit 57c69f5

Please sign in to comment.