Skip to content

Commit

Permalink
[FEATURE] Raise errors for HTTP error codes in the generic client (#929)
Browse files Browse the repository at this point in the history
* [FEATURE] Raise errors for HTTP error codes in the generic client

Signed-off-by: Andriy Redko <[email protected]>

* Address code review comments

Signed-off-by: Andriy Redko <[email protected]>

---------

Signed-off-by: Andriy Redko <[email protected]>
(cherry picked from commit 5ad54c6)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Apr 10, 2024
1 parent f9724a0 commit 5b1729f
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 62 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
- Add xy_shape property ([#884](https://github.com/opensearch-project/opensearch-java/pull/885))
- Add missed fields to MultisearchBody: seqNoPrimaryTerm, storedFields, explain, fields, indicesBoost ([#914](https://github.com/opensearch-project/opensearch-java/pull/914))
- Add OpenSearchGenericClient with support for raw HTTP request/responses ([#910](https://github.com/opensearch-project/opensearch-java/pull/910))
- Add OpenSearchGenericClient with support for raw HTTP request/responses ([#910](https://github.com/opensearch-project/opensearch-java/pull/910), [#929](https://github.com/opensearch-project/opensearch-java/pull/929))
- Add missed fields to MultisearchBody: collapse, version, timeout ([#916](https://github.com/opensearch-project/opensearch-java/pull/916)
- Add missed fields to MultisearchBody: ext, rescore and to SearchRequest: ext ([#918](https://github.com/opensearch-project/opensearch-java/pull/918)

Expand Down
6 changes: 6 additions & 0 deletions guides/generic.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ The following sample code gets the `OpenSearchGenericClient` from the `OpenSearc
final OpenSearchGenericClient generic = javaClient().generic();
```

The generic client with default options (`ClientOptions.DEFAULT`) returns the responses as those were received from the server. The generic client could be instructed to raise an `OpenSearchClientException` exception instead if the HTTP status code is not indicating the successful response, for example:

```java
final OpenSearchGenericClient generic = javaClient().generic().witClientOptions(ClientOptions.throwOnHttpErrors());
```

## Sending Simple Request
The following sample code sends a simple request that does not require any payload to be provided (typically, `GET` requests).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public interface Body extends AutoCloseable {
* @return body as {@link String}
*/
default String bodyAsString() {
return new String(bodyAsBytes(), StandardCharsets.UTF_8);
}

/**
* Gets the body as {@link byte[]}
* @return body as {@link byte[]}
*/
default byte[] bodyAsBytes() {
try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (final InputStream in = body()) {
final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
Expand All @@ -77,7 +85,7 @@ default String bodyAsString() {
}

out.flush();
return new String(out.toByteArray(), StandardCharsets.UTF_8);
return out.toByteArray();
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ final class GenericResponse implements Response {
private final Collection<Map.Entry<String, String>> headers;
private final Body body;

GenericResponse(String uri, String protocol, String method, int status, String reason, Collection<Map.Entry<String, String>> headers) {
this(uri, protocol, method, status, reason, headers, null);
}

GenericResponse(
String uri,
String protocol,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.opensearch.generic;

/**
* Exception thrown by API client methods when OpenSearch could not accept or
* process a request.
* <p>
* The {@link #response()} contains the the raw response as returned by the API
* endpoint that was called.
*/
public class OpenSearchClientException extends RuntimeException {

private final Response response;

public OpenSearchClientException(Response response) {
super("Request failed: [" + response.getStatus() + "] " + response.getReason());
this.response = response;
}

/**
* The error response sent by OpenSearch
*/
public Response response() {
return this.response;
}

/**
* Status code returned by OpenSearch. Shortcut for
* {@code response().status()}.
*/
public int status() {
return this.response.getStatus();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.opensearch.client.ApiClient;
Expand All @@ -24,14 +26,37 @@
* Client for the generic HTTP requests.
*/
public class OpenSearchGenericClient extends ApiClient<OpenSearchTransport, OpenSearchGenericClient> {
/**
* Generic client options
*/
public static final class ClientOptions {
private static final ClientOptions DEFAULT = new ClientOptions();

private final Predicate<Integer> error;

private ClientOptions() {
this(statusCode -> false);
}

private ClientOptions(final Predicate<Integer> error) {
this.error = error;
}

public static ClientOptions throwOnHttpErrors() {
return new ClientOptions(statusCode -> statusCode >= 400);
}
}

/**
* Generic endpoint instance
*/
private static final class GenericEndpoint implements org.opensearch.client.transport.GenericEndpoint<Request, Response> {
private final Request request;
private final Predicate<Integer> error;

public GenericEndpoint(Request request) {
public GenericEndpoint(Request request, Predicate<Integer> error) {
this.request = request;
this.error = error;
}

@Override
Expand Down Expand Up @@ -67,24 +92,70 @@ public GenericResponse responseDeserializer(
int status,
String reason,
List<Entry<String, String>> headers,
String contentType,
InputStream body
@Nullable String contentType,
@Nullable InputStream body
) {
return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType));
if (isError(status)) {
// Fully consume the response body since the it will be propagated as an exception with possible no chance to be closed
try (Body b = Body.from(body, contentType)) {
if (b != null) {
return new GenericResponse(
uri,
protocol,
method,
status,
reason,
headers,
Body.from(b.bodyAsBytes(), b.contentType())
);
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers);
}
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType));
}
}

@Override
public boolean isError(int statusCode) {
return error.test(statusCode);
}

@Override
public <T extends RuntimeException> T exceptionConverter(int statusCode, @Nullable Response error) {
throw new OpenSearchClientException(error);
}
}

private final ClientOptions clientOptions;

public OpenSearchGenericClient(OpenSearchTransport transport) {
super(transport, null);
this(transport, null, ClientOptions.DEFAULT);
}

public OpenSearchGenericClient(OpenSearchTransport transport, @Nullable TransportOptions transportOptions) {
this(transport, transportOptions, ClientOptions.DEFAULT);
}

public OpenSearchGenericClient(
OpenSearchTransport transport,
@Nullable TransportOptions transportOptions,
ClientOptions clientOptions
) {
super(transport, transportOptions);
this.clientOptions = clientOptions;
}

public OpenSearchGenericClient withClientOptions(ClientOptions clientOptions) {
return new OpenSearchGenericClient(this.transport, this.transportOptions, clientOptions);
}

@Override
public OpenSearchGenericClient withTransportOptions(@Nullable TransportOptions transportOptions) {
return new OpenSearchGenericClient(this.transport, transportOptions);
return new OpenSearchGenericClient(this.transport, transportOptions, this.clientOptions);
}

/**
Expand All @@ -94,7 +165,7 @@ public OpenSearchGenericClient withTransportOptions(@Nullable TransportOptions t
* @throws IOException I/O exception
*/
public Response execute(Request request) throws IOException {
return transport.performRequest(request, new GenericEndpoint(request), this.transportOptions);
return transport.performRequest(request, new GenericEndpoint(request, clientOptions.error), this.transportOptions);
}

/**
Expand All @@ -103,6 +174,6 @@ public Response execute(Request request) throws IOException {
* @return generic HTTP response future
*/
public CompletableFuture<Response> executeAsync(Request request) {
return transport.performRequestAsync(request, new GenericEndpoint(request), this.transportOptions);
return transport.performRequestAsync(request, new GenericEndpoint(request, clientOptions.error), this.transportOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import javax.annotation.Nullable;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;

/**
* An endpoint links requests and responses to HTTP protocol encoding. It also defines the error response
Expand Down Expand Up @@ -90,4 +92,13 @@ default Map<String, String> headers(RequestT request) {
@Nullable
JsonpDeserializer<ErrorT> errorDeserializer(int statusCode);

/**
* Converts error response to exception instance of type {@code T}
* @param <T> exception type
* @param error error response
* @return exception instance
*/
default <T extends RuntimeException> T exceptionConverter(int statusCode, @Nullable ErrorT error) {
throw new OpenSearchException((ErrorResponse) error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.io.entity.BufferedHttpEntity;
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
Expand All @@ -76,8 +77,6 @@
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.GenericEndpoint;
import org.opensearch.client.transport.GenericSerializable;
Expand Down Expand Up @@ -485,36 +484,68 @@ private <ResponseT, ErrorT> ResponseT prepareResponse(Response clientResp, Endpo

try {
int statusCode = clientResp.getStatusLine().getStatusCode();

if (endpoint.isError(statusCode)) {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null) {
throw new TransportException("Request failed with status code '" + statusCode + "'", new ResponseException(clientResp));
}

if (statusCode == HttpStatus.SC_FORBIDDEN) {
throw new TransportException("Forbidden access", new ResponseException(clientResp));
} else if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
throw new TransportException("Unauthorized access", new ResponseException(clientResp));
} else if (endpoint.isError(statusCode)) {
HttpEntity entity = clientResp.getEntity();
if (entity == null) {
throw new TransportException("Expecting a response body, but none was sent", new ResponseException(clientResp));
}

// We may have to replay it.
entity = new BufferedHttpEntity(entity);

try {
InputStream content = entity.getContent();
try (JsonParser parser = mapper.jsonProvider().createParser(content)) {
ErrorT error = errorDeserializer.deserialize(parser, mapper);
// TODO: have the endpoint provide the exception constructor
throw new OpenSearchException((ErrorResponse) error);
if (endpoint instanceof GenericEndpoint) {
@SuppressWarnings("unchecked")
final GenericEndpoint<?, ResponseT> rawEndpoint = (GenericEndpoint<?, ResponseT>) endpoint;

final RequestLine requestLine = clientResp.getRequestLine();
final StatusLine statusLine = clientResp.getStatusLine();

// We may have to replay it.
entity = new BufferedHttpEntity(entity);

try (InputStream content = entity.getContent()) {
final ResponseT error = rawEndpoint.responseDeserializer(
requestLine.getUri(),
requestLine.getMethod(),
requestLine.getProtocolVersion().format(),
statusLine.getStatusCode(),
statusLine.getReasonPhrase(),
Arrays.stream(clientResp.getHeaders())
.map(h -> new AbstractMap.SimpleEntry<String, String>(h.getName(), h.getValue()))
.collect(Collectors.toList()),
entity.getContentType(),
content
);
throw rawEndpoint.exceptionConverter(statusCode, error);
}
} catch (MissingRequiredPropertyException errorEx) {
// Could not decode exception, try the response type
} else {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null) {
throw new TransportException(
"Request failed with status code '" + statusCode + "'",
new ResponseException(clientResp)
);
}

// We may have to replay it.
entity = new BufferedHttpEntity(entity);

try {
ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint);
return response;
} catch (Exception respEx) {
// No better luck: throw the original error decoding exception
throw new TransportException("Failed to decode error response", new ResponseException(clientResp));
InputStream content = entity.getContent();
try (JsonParser parser = mapper.jsonProvider().createParser(content)) {
ErrorT error = errorDeserializer.deserialize(parser, mapper);
throw endpoint.exceptionConverter(statusCode, error);
}
} catch (MissingRequiredPropertyException errorEx) {
// Could not decode exception, try the response type
try {
ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint);
return response;
} catch (Exception respEx) {
// No better luck: throw the original error decoding exception
throw new TransportException("Failed to decode error response", new ResponseException(clientResp));
}
}
}
} else {
Expand Down
Loading

0 comments on commit 5b1729f

Please sign in to comment.