From b8e0dad92671f876c19b3712208f1c513e65ab72 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 14 May 2024 08:48:04 -0400 Subject: [PATCH] [BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2Transport (#978) * [BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2Transport Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko --- CHANGELOG.md | 3 +- .../generic/OpenSearchGenericClient.java | 30 ++--- .../transport/aws/AwsSdk2Transport.java | 113 +++++++++++++++--- .../util/OpenSearchRequestBodyBuffer.java | 7 ++ 4 files changed, 113 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10daf54eec..33761aadf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ This section is for maintaining a changelog for all breaking changes for the cli ### Fixed - Fix the deserialization of SortOptions ([#981](https://github.com/opensearch-project/opensearch-java/pull/981)) +- Generic HTTP Actions in Java Client does not work with AwsSdk2Transport ([#978](https://github.com/opensearch-project/opensearch-java/pull/978)) ### Security @@ -419,4 +420,4 @@ This section is for maintaining a changelog for all breaking changes for the cli [2.5.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.4.0...v2.5.0 [2.4.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.3.0...v2.4.0 [2.3.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.2.0...v2.3.0 -[2.2.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.1.0...v2.2.0 \ No newline at end of file +[2.2.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.1.0...v2.2.0 diff --git a/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java b/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java index 948444e150..4a8fffa8a4 100644 --- a/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java +++ b/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java @@ -95,27 +95,17 @@ public GenericResponse responseDeserializer( @Nullable String contentType, @Nullable InputStream body ) { - if (isError(status)) { - // Fully consume the response body since the it will be propagated as an exception with possible no chance to be closed - try (Body b = Body.from(body, contentType)) { - if (b != null) { - return new GenericResponse( - uri, - protocol, - method, - status, - reason, - headers, - Body.from(b.bodyAsBytes(), b.contentType()) - ); - } else { - return new GenericResponse(uri, protocol, method, status, reason, headers); - } - } catch (final IOException ex) { - throw new UncheckedIOException(ex); + try (Body b = Body.from(body, contentType)) { + if (b != null) { + // Fully consume the response body: + // - if it will be propagated as an exception with possible no chance to be closed + // - the entity stream will be consumed and become unavailable + return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(b.bodyAsBytes(), b.contentType())); + } else { + return new GenericResponse(uri, protocol, method, status, reason, headers); } - } else { - return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType)); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); } } diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java index 2269b8e13b..289399b610 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java @@ -17,6 +17,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; +import java.util.AbstractMap; import java.util.Collection; import java.util.Map; import java.util.Objects; @@ -24,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -35,6 +37,7 @@ import org.opensearch.client.opensearch._types.ErrorResponse; import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.GenericEndpoint; import org.opensearch.client.transport.JsonEndpoint; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.TransportException; @@ -47,6 +50,7 @@ import software.amazon.awssdk.auth.signer.Aws4Signer; import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; import software.amazon.awssdk.http.SdkHttpClient; @@ -393,7 +397,15 @@ private ResponseT executeSync( try { bodyStream = executeResponse.responseBody().orElse(null); SdkHttpResponse httpResponse = executeResponse.httpResponse(); - return parseResponse(httpResponse, bodyStream, endpoint, options); + return parseResponse( + httpRequest.getUri(), + httpRequest.method(), + httpRequest.protocol(), + httpResponse, + bodyStream, + endpoint, + options + ); } finally { if (bodyStream != null) { bodyStream.close(); @@ -421,7 +433,17 @@ private CompletableFuture executeAsync( CompletableFuture ret = new CompletableFuture<>(); try { InputStream bodyStream = new ByteArrayInputStream(responseBody); - ret.complete(parseResponse(response, bodyStream, endpoint, options)); + ret.complete( + parseResponse( + httpRequest.getUri(), + httpRequest.method(), + httpRequest.protocol(), + response, + bodyStream, + endpoint, + options + ) + ); } catch (Throwable e) { ret.completeExceptionally(e); } @@ -430,6 +452,9 @@ private CompletableFuture executeAsync( } private ResponseT parseResponse( + URI uri, + @Nonnull SdkHttpMethod method, + String protocol, @Nonnull SdkHttpResponse httpResponse, @CheckForNull InputStream bodyStream, @Nonnull Endpoint endpoint, @@ -478,24 +503,51 @@ private ResponseT parseResponse( } if (endpoint.isError(statusCode)) { - JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); - if (errorDeserializer == null || bodyStream == null) { - throw new TransportException("Request failed with status code '" + statusCode + "'"); - } - try { - try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) { - ErrorT error = errorDeserializer.deserialize(parser, mapper); - throw new OpenSearchException((ErrorResponse) error); + if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + + String contentType = null; + if (bodyStream != null) { + contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null); + } + + final ResponseT error = rawEndpoint.responseDeserializer( + uri.toString(), + method.name(), + protocol, + httpResponse.statusCode(), + httpResponse.statusText().orElse(null), + httpResponse.headers() + .entrySet() + .stream() + .map(h -> new AbstractMap.SimpleEntry(h.getKey(), Objects.toString(h.getValue()))) + .collect(Collectors.toList()), + contentType, + bodyStream + ); + + throw rawEndpoint.exceptionConverter(statusCode, error); + } else { + JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); + if (errorDeserializer == null || bodyStream == null) { + throw new TransportException("Request failed with status code '" + statusCode + "'"); + } + try { + try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) { + ErrorT error = errorDeserializer.deserialize(parser, mapper); + throw new OpenSearchException((ErrorResponse) error); + } + } catch (OpenSearchException e) { + throw e; + } catch (Exception e) { + // can't parse the error - use a general exception + ErrorCause.Builder cause = new ErrorCause.Builder(); + cause.type("http_exception"); + cause.reason("server returned " + statusCode); + ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build())); + throw new OpenSearchException(error); } - } catch (OpenSearchException e) { - throw e; - } catch (Exception e) { - // can't parse the error - use a general exception - ErrorCause.Builder cause = new ErrorCause.Builder(); - cause.type("http_exception"); - cause.reason("server returned " + statusCode); - ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build())); - throw new OpenSearchException(error); } } else { if (endpoint instanceof BooleanEndpoint) { @@ -523,6 +575,29 @@ private ResponseT parseResponse( ; } return response; + } else if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + + String contentType = null; + if (bodyStream != null) { + contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null); + } + + return rawEndpoint.responseDeserializer( + uri.toString(), + method.name(), + protocol, + httpResponse.statusCode(), + httpResponse.statusText().orElse(null), + httpResponse.headers() + .entrySet() + .stream() + .map(h -> new AbstractMap.SimpleEntry(h.getKey(), Objects.toString(h.getValue()))) + .collect(Collectors.toList()), + contentType, + bodyStream + ); } else { throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); } diff --git a/java-client/src/main/java/org/opensearch/client/util/OpenSearchRequestBodyBuffer.java b/java-client/src/main/java/org/opensearch/client/util/OpenSearchRequestBodyBuffer.java index 012e90af08..0cf0ebb90e 100644 --- a/java-client/src/main/java/org/opensearch/client/util/OpenSearchRequestBodyBuffer.java +++ b/java-client/src/main/java/org/opensearch/client/util/OpenSearchRequestBodyBuffer.java @@ -18,8 +18,10 @@ import java.util.zip.GZIPOutputStream; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import org.apache.hc.core5.http.ContentType; import org.opensearch.client.json.JsonpMapper; import org.opensearch.client.json.NdJsonpSerializable; +import org.opensearch.client.transport.GenericSerializable; import org.opensearch.client.transport.OpenSearchTransport; /** @@ -71,6 +73,11 @@ public void addContent(Object content) throws IOException { if (content instanceof NdJsonpSerializable) { isMulti = true; addNdJson(((NdJsonpSerializable) content)); + } else if (content instanceof GenericSerializable) { + ContentType.parse(((GenericSerializable) content).serialize(captureBuffer)); + if (isMulti) { + captureBuffer.write((byte) '\n'); + } } else { mapper.serialize(content, jsonGenerator); jsonGenerator.flush();