diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e7f49aa81..0a4dc0b593 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Added - Add xy_shape property ([#884](https://github.com/opensearch-project/opensearch-java/pull/885)) - Add missed fields to MultisearchBody: seqNoPrimaryTerm, storedFields, explain, fields, indicesBoost ([#914](https://github.com/opensearch-project/opensearch-java/pull/914)) -- Add OpenSearchGenericClient with support for raw HTTP request/responses ([#910](https://github.com/opensearch-project/opensearch-java/pull/910)) +- Add OpenSearchGenericClient with support for raw HTTP request/responses ([#910](https://github.com/opensearch-project/opensearch-java/pull/910), [#929](https://github.com/opensearch-project/opensearch-java/pull/929)) - Add missed fields to MultisearchBody: collapse, version, timeout ([#916](https://github.com/opensearch-project/opensearch-java/pull/916) - Add missed fields to MultisearchBody: ext, rescore and to SearchRequest: ext ([#918](https://github.com/opensearch-project/opensearch-java/pull/918) diff --git a/guides/generic.md b/guides/generic.md index f9ee7ff5fa..95eb8bdb23 100644 --- a/guides/generic.md +++ b/guides/generic.md @@ -16,6 +16,12 @@ The following sample code gets the `OpenSearchGenericClient` from the `OpenSearc final OpenSearchGenericClient generic = javaClient().generic(); ``` +The generic client with default options (`ClientOptions.DEFAULT`) returns the responses as those were received from the server. The generic client could be instructed to raise an `OpenSearchClientException` exception instead if the HTTP status code is not indicating the successful response, for example: + +```java +final OpenSearchGenericClient generic = javaClient().generic().witClientOptions(ClientOptions.throwOnHttpErrors()); +``` + ## Sending Simple Request The following sample code sends a simple request that does not require any payload to be provided (typically, `GET` requests). diff --git a/java-client/src/main/java/org/opensearch/client/opensearch/generic/Body.java b/java-client/src/main/java/org/opensearch/client/opensearch/generic/Body.java index 9957c93507..e43bf0551e 100644 --- a/java-client/src/main/java/org/opensearch/client/opensearch/generic/Body.java +++ b/java-client/src/main/java/org/opensearch/client/opensearch/generic/Body.java @@ -67,6 +67,14 @@ public interface Body extends AutoCloseable { * @return body as {@link String} */ default String bodyAsString() { + return new String(bodyAsBytes(), StandardCharsets.UTF_8); + } + + /** + * Gets the body as {@link byte[]} + * @return body as {@link byte[]} + */ + default byte[] bodyAsBytes() { try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { try (final InputStream in = body()) { final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; @@ -77,7 +85,7 @@ default String bodyAsString() { } out.flush(); - return new String(out.toByteArray(), StandardCharsets.UTF_8); + return out.toByteArray(); } catch (final IOException ex) { throw new UncheckedIOException(ex); } diff --git a/java-client/src/main/java/org/opensearch/client/opensearch/generic/GenericResponse.java b/java-client/src/main/java/org/opensearch/client/opensearch/generic/GenericResponse.java index 5a7a27cef8..6d394cbda5 100644 --- a/java-client/src/main/java/org/opensearch/client/opensearch/generic/GenericResponse.java +++ b/java-client/src/main/java/org/opensearch/client/opensearch/generic/GenericResponse.java @@ -27,6 +27,10 @@ final class GenericResponse implements Response { private final Collection> headers; private final Body body; + GenericResponse(String uri, String protocol, String method, int status, String reason, Collection> headers) { + this(uri, protocol, method, status, reason, headers, null); + } + GenericResponse( String uri, String protocol, diff --git a/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchClientException.java b/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchClientException.java new file mode 100644 index 0000000000..cfdcf3a539 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchClientException.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.generic; + +/** + * Exception thrown by API client methods when OpenSearch could not accept or + * process a request. + *

+ * The {@link #response()} contains the the raw response as returned by the API + * endpoint that was called. + */ +public class OpenSearchClientException extends RuntimeException { + + private final Response response; + + public OpenSearchClientException(Response response) { + super("Request failed: [" + response.getStatus() + "] " + response.getReason()); + this.response = response; + } + + /** + * The error response sent by OpenSearch + */ + public Response response() { + return this.response; + } + + /** + * Status code returned by OpenSearch. Shortcut for + * {@code response().status()}. + */ + public int status() { + return this.response.getStatus(); + } +} diff --git a/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java b/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java index 0d759af935..948444e150 100644 --- a/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java +++ b/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java @@ -10,10 +10,12 @@ import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.opensearch.client.ApiClient; @@ -24,14 +26,37 @@ * Client for the generic HTTP requests. */ public class OpenSearchGenericClient extends ApiClient { + /** + * Generic client options + */ + public static final class ClientOptions { + private static final ClientOptions DEFAULT = new ClientOptions(); + + private final Predicate error; + + private ClientOptions() { + this(statusCode -> false); + } + + private ClientOptions(final Predicate error) { + this.error = error; + } + + public static ClientOptions throwOnHttpErrors() { + return new ClientOptions(statusCode -> statusCode >= 400); + } + } + /** * Generic endpoint instance */ private static final class GenericEndpoint implements org.opensearch.client.transport.GenericEndpoint { private final Request request; + private final Predicate error; - public GenericEndpoint(Request request) { + public GenericEndpoint(Request request, Predicate error) { this.request = request; + this.error = error; } @Override @@ -67,24 +92,70 @@ public GenericResponse responseDeserializer( int status, String reason, List> headers, - String contentType, - InputStream body + @Nullable String contentType, + @Nullable InputStream body ) { - return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType)); + if (isError(status)) { + // Fully consume the response body since the it will be propagated as an exception with possible no chance to be closed + try (Body b = Body.from(body, contentType)) { + if (b != null) { + return new GenericResponse( + uri, + protocol, + method, + status, + reason, + headers, + Body.from(b.bodyAsBytes(), b.contentType()) + ); + } else { + return new GenericResponse(uri, protocol, method, status, reason, headers); + } + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + } else { + return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType)); + } + } + + @Override + public boolean isError(int statusCode) { + return error.test(statusCode); + } + + @Override + public T exceptionConverter(int statusCode, @Nullable Response error) { + throw new OpenSearchClientException(error); } } + private final ClientOptions clientOptions; + public OpenSearchGenericClient(OpenSearchTransport transport) { - super(transport, null); + this(transport, null, ClientOptions.DEFAULT); } public OpenSearchGenericClient(OpenSearchTransport transport, @Nullable TransportOptions transportOptions) { + this(transport, transportOptions, ClientOptions.DEFAULT); + } + + public OpenSearchGenericClient( + OpenSearchTransport transport, + @Nullable TransportOptions transportOptions, + ClientOptions clientOptions + ) { super(transport, transportOptions); + this.clientOptions = clientOptions; + } + + public OpenSearchGenericClient withClientOptions(ClientOptions clientOptions) { + return new OpenSearchGenericClient(this.transport, this.transportOptions, clientOptions); } @Override public OpenSearchGenericClient withTransportOptions(@Nullable TransportOptions transportOptions) { - return new OpenSearchGenericClient(this.transport, transportOptions); + return new OpenSearchGenericClient(this.transport, transportOptions, this.clientOptions); } /** @@ -94,7 +165,7 @@ public OpenSearchGenericClient withTransportOptions(@Nullable TransportOptions t * @throws IOException I/O exception */ public Response execute(Request request) throws IOException { - return transport.performRequest(request, new GenericEndpoint(request), this.transportOptions); + return transport.performRequest(request, new GenericEndpoint(request, clientOptions.error), this.transportOptions); } /** @@ -103,6 +174,6 @@ public Response execute(Request request) throws IOException { * @return generic HTTP response future */ public CompletableFuture executeAsync(Request request) { - return transport.performRequestAsync(request, new GenericEndpoint(request), this.transportOptions); + return transport.performRequestAsync(request, new GenericEndpoint(request, clientOptions.error), this.transportOptions); } } diff --git a/java-client/src/main/java/org/opensearch/client/transport/Endpoint.java b/java-client/src/main/java/org/opensearch/client/transport/Endpoint.java index 0e74bf2519..38f9c0a46a 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/Endpoint.java +++ b/java-client/src/main/java/org/opensearch/client/transport/Endpoint.java @@ -37,6 +37,8 @@ import javax.annotation.Nullable; import org.opensearch.client.json.JsonpDeserializer; import org.opensearch.client.json.NdJsonpSerializable; +import org.opensearch.client.opensearch._types.ErrorResponse; +import org.opensearch.client.opensearch._types.OpenSearchException; /** * An endpoint links requests and responses to HTTP protocol encoding. It also defines the error response @@ -90,4 +92,13 @@ default Map headers(RequestT request) { @Nullable JsonpDeserializer errorDeserializer(int statusCode); + /** + * Converts error response to exception instance of type {@code T} + * @param exception type + * @param error error response + * @return exception instance + */ + default T exceptionConverter(int statusCode, @Nullable ErrorT error) { + throw new OpenSearchException((ErrorResponse) error); + } } diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java index 74a72e804d..313ccc5437 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -62,6 +62,7 @@ import org.apache.hc.core5.http.HttpEntity; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.io.entity.BufferedHttpEntity; import org.apache.hc.core5.http.io.entity.ByteArrayEntity; import org.apache.hc.core5.http.io.entity.EntityUtils; @@ -76,8 +77,6 @@ import org.opensearch.client.json.JsonpDeserializer; import org.opensearch.client.json.JsonpMapper; import org.opensearch.client.json.NdJsonpSerializable; -import org.opensearch.client.opensearch._types.ErrorResponse; -import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.transport.Endpoint; import org.opensearch.client.transport.GenericEndpoint; import org.opensearch.client.transport.GenericSerializable; @@ -485,36 +484,68 @@ private ResponseT prepareResponse(Response clientResp, Endpo try { int statusCode = clientResp.getStatusLine().getStatusCode(); - - if (endpoint.isError(statusCode)) { - JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); - if (errorDeserializer == null) { - throw new TransportException("Request failed with status code '" + statusCode + "'", new ResponseException(clientResp)); - } - + if (statusCode == HttpStatus.SC_FORBIDDEN) { + throw new TransportException("Forbidden access", new ResponseException(clientResp)); + } else if (statusCode == HttpStatus.SC_UNAUTHORIZED) { + throw new TransportException("Unauthorized access", new ResponseException(clientResp)); + } else if (endpoint.isError(statusCode)) { HttpEntity entity = clientResp.getEntity(); if (entity == null) { throw new TransportException("Expecting a response body, but none was sent", new ResponseException(clientResp)); } - // We may have to replay it. - entity = new BufferedHttpEntity(entity); - - try { - InputStream content = entity.getContent(); - try (JsonParser parser = mapper.jsonProvider().createParser(content)) { - ErrorT error = errorDeserializer.deserialize(parser, mapper); - // TODO: have the endpoint provide the exception constructor - throw new OpenSearchException((ErrorResponse) error); + if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + + final RequestLine requestLine = clientResp.getRequestLine(); + final StatusLine statusLine = clientResp.getStatusLine(); + + // We may have to replay it. + entity = new BufferedHttpEntity(entity); + + try (InputStream content = entity.getContent()) { + final ResponseT error = rawEndpoint.responseDeserializer( + requestLine.getUri(), + requestLine.getMethod(), + requestLine.getProtocolVersion().format(), + statusLine.getStatusCode(), + statusLine.getReasonPhrase(), + Arrays.stream(clientResp.getHeaders()) + .map(h -> new AbstractMap.SimpleEntry(h.getName(), h.getValue())) + .collect(Collectors.toList()), + entity.getContentType(), + content + ); + throw rawEndpoint.exceptionConverter(statusCode, error); } - } catch (MissingRequiredPropertyException errorEx) { - // Could not decode exception, try the response type + } else { + JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); + if (errorDeserializer == null) { + throw new TransportException( + "Request failed with status code '" + statusCode + "'", + new ResponseException(clientResp) + ); + } + + // We may have to replay it. + entity = new BufferedHttpEntity(entity); + try { - ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); - return response; - } catch (Exception respEx) { - // No better luck: throw the original error decoding exception - throw new TransportException("Failed to decode error response", new ResponseException(clientResp)); + InputStream content = entity.getContent(); + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + ErrorT error = errorDeserializer.deserialize(parser, mapper); + throw endpoint.exceptionConverter(statusCode, error); + } + } catch (MissingRequiredPropertyException errorEx) { + // Could not decode exception, try the response type + try { + ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); + return response; + } catch (Exception respEx) { + // No better luck: throw the original error decoding exception + throw new TransportException("Failed to decode error response", new ResponseException(clientResp)); + } } } } else { diff --git a/java-client/src/main/java/org/opensearch/client/transport/rest_client/RestClientTransport.java b/java-client/src/main/java/org/opensearch/client/transport/rest_client/RestClientTransport.java index 1f75ae7e50..dfd83611ea 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/rest_client/RestClientTransport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/rest_client/RestClientTransport.java @@ -61,8 +61,6 @@ import org.opensearch.client.json.JsonpDeserializer; import org.opensearch.client.json.JsonpMapper; import org.opensearch.client.json.NdJsonpSerializable; -import org.opensearch.client.opensearch._types.ErrorResponse; -import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.transport.Endpoint; import org.opensearch.client.transport.GenericEndpoint; import org.opensearch.client.transport.GenericSerializable; @@ -259,34 +257,69 @@ private ResponseT getHighLevelResponse( } else if (statusCode == HttpStatus.SC_UNAUTHORIZED) { throw new TransportException("Unauthorized access", new ResponseException(clientResp)); } else if (endpoint.isError(statusCode)) { - JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); - if (errorDeserializer == null) { - throw new TransportException("Request failed with status code '" + statusCode + "'", new ResponseException(clientResp)); - } - HttpEntity entity = clientResp.getEntity(); if (entity == null) { throw new TransportException("Expecting a response body, but none was sent", new ResponseException(clientResp)); } - // We may have to replay it. - entity = new BufferedHttpEntity(entity); + if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + + final RequestLine requestLine = clientResp.getRequestLine(); + final StatusLine statusLine = clientResp.getStatusLine(); - try { - InputStream content = entity.getContent(); - try (JsonParser parser = mapper.jsonProvider().createParser(content)) { - ErrorT error = errorDeserializer.deserialize(parser, mapper); - // TODO: have the endpoint provide the exception constructor - throw new OpenSearchException((ErrorResponse) error); + // We may have to replay it. + entity = new BufferedHttpEntity(entity); + + String contentType = null; + if (entity.getContentType() != null) { + contentType = entity.getContentType().getValue(); } - } catch (MissingRequiredPropertyException errorEx) { - // Could not decode exception, try the response type + + try (InputStream content = entity.getContent()) { + final ResponseT error = rawEndpoint.responseDeserializer( + requestLine.getUri(), + requestLine.getMethod(), + requestLine.getProtocolVersion().getProtocol(), + statusLine.getStatusCode(), + statusLine.getReasonPhrase(), + Arrays.stream(clientResp.getHeaders()) + .map(h -> new AbstractMap.SimpleEntry(h.getName(), h.getValue())) + .collect(Collectors.toList()), + contentType, + content + ); + + throw rawEndpoint.exceptionConverter(statusCode, error); + } + } else { + JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); + if (errorDeserializer == null) { + throw new TransportException( + "Request failed with status code '" + statusCode + "'", + new ResponseException(clientResp) + ); + } + + // We may have to replay it. + entity = new BufferedHttpEntity(entity); + try { - ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); - return response; - } catch (Exception respEx) { - // No better luck: throw the original error decoding exception - throw new TransportException("Failed to decode error response", new ResponseException(clientResp)); + InputStream content = entity.getContent(); + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + ErrorT error = errorDeserializer.deserialize(parser, mapper); + throw endpoint.exceptionConverter(statusCode, error); + } + } catch (MissingRequiredPropertyException errorEx) { + // Could not decode exception, try the response type + try { + ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); + return response; + } catch (Exception respEx) { + // No better luck: throw the original error decoding exception + throw new TransportException("Failed to decode error response", new ResponseException(clientResp)); + } } } } else { diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractGenericClientIT.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractGenericClientIT.java index bf3900bfb8..c5267e2bc6 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractGenericClientIT.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractGenericClientIT.java @@ -20,6 +20,8 @@ import org.opensearch.client.opensearch._types.mapping.Property; import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.opensearch.generic.Bodies; +import org.opensearch.client.opensearch.generic.OpenSearchClientException; +import org.opensearch.client.opensearch.generic.OpenSearchGenericClient.ClientOptions; import org.opensearch.client.opensearch.generic.Requests; import org.opensearch.client.opensearch.generic.Response; import org.opensearch.client.opensearch.indices.CreateIndexRequest; @@ -90,6 +92,36 @@ public void shouldReturnSearchResults() throws Exception { } } + @Test + public void shouldReturn404() throws IOException { + final String index = "non_existing_doc"; + createIndex(index); + + try ( + Response response = javaClient().generic().execute(Requests.builder().endpoint("/" + index + "/_doc/10").method("GET").build()) + ) { + assertThat(response.getStatus(), equalTo(404)); + assertThat(response.getBody().isPresent(), equalTo(true)); + } + } + + @Test + public void shouldThrow() throws IOException { + final String index = "non_existing_doc"; + createIndex(index); + + final OpenSearchClientException ex = assertThrows(OpenSearchClientException.class, () -> { + try ( + Response response = javaClient().generic() + .withClientOptions(ClientOptions.throwOnHttpErrors()) + .execute(Requests.builder().endpoint("/" + index + "/_doc/10").method("GET").build()) + ) {} + }); + + assertThat(ex.status(), equalTo(404)); + assertThat(ex.response().getBody().isPresent(), equalTo(true)); + } + private void createTestDocuments(String index) throws IOException { createTestDocument(index, "1", createItem("hummer", "huge", "yes", 2)); createTestDocument(index, "2", createItem("jammer", "huge", "yes", 1)); @@ -148,11 +180,9 @@ private void createIndexUntyped(String index) throws IOException { .add( "properties", Json.createObjectBuilder() - .add("name", Json.createObjectBuilder().add("type", "keyword")) - .add("doc_values", true) + .add("name", Json.createObjectBuilder().add("type", "keyword").add("doc_values", true)) - .add("size", Json.createObjectBuilder().add("type", "keyword")) - .add("doc_values", true) + .add("size", Json.createObjectBuilder().add("type", "keyword").add("doc_values", true)) ) ) )