diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java index f0b291af6..72ab7386a 100644 --- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java +++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java @@ -1,5 +1,6 @@ package io.kafbat.ui.client; +import com.fasterxml.jackson.annotation.JsonProperty; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.ApiClient; import io.kafbat.ui.connect.api.KafkaConnectClientApi; @@ -14,9 +15,11 @@ import io.kafbat.ui.exception.KafkaConnectConflictReponseException; import io.kafbat.ui.exception.ValidationException; import io.kafbat.ui.util.WebClientConfigurator; +import jakarta.validation.constraints.NotNull; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Objects; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; @@ -58,10 +61,24 @@ private static Flux withRetryOnConflict(Flux publisher) { private static Mono withBadRequestErrorHandling(Mono publisher) { return publisher - .onErrorResume(WebClientResponseException.BadRequest.class, e -> - Mono.error(new ValidationException("Invalid configuration"))) - .onErrorResume(WebClientResponseException.InternalServerError.class, e -> - Mono.error(new ValidationException("Invalid configuration"))); + .onErrorResume(WebClientResponseException.BadRequest.class, + RetryingKafkaConnectClient::parseConnectErrorMessage) + .onErrorResume(WebClientResponseException.InternalServerError.class, + RetryingKafkaConnectClient::parseConnectErrorMessage); + } + + // Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java + // Adding the connect runtime dependency for this single class seems excessive + private record ErrorMessage(@NotNull @JsonProperty("message") String message) { + } + + private static @NotNull Mono parseConnectErrorMessage(WebClientResponseException parseException) { + final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class); + return Mono.error(new ValidationException( + Objects.requireNonNull(errorMessage, + // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java + "This should not happen according to the ConnectExceptionMapper") + .message())); } @Override diff --git a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java index 3588298f4..3646d7579 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import io.kafbat.ui.api.model.ErrorResponse; import io.kafbat.ui.model.ConnectorDTO; import io.kafbat.ui.model.ConnectorPluginConfigDTO; import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO; @@ -280,7 +281,13 @@ public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() { ) ) .exchange() - .expectStatus().isBadRequest(); + .expectStatus().isBadRequest() + .expectBody(ErrorResponse.class) + .value(response -> assertThat(response.getMessage()).isEqualTo(""" + Connector configuration is invalid and contains the following 2 error(s): + Invalid value invalid number for configuration tasks.max: Not a number of type INT + Invalid value null for configuration tasks.max: Value must be non-null + You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`""")); webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",