Skip to content

Commit

Permalink
[BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2T…
Browse files Browse the repository at this point in the history
…ransport (#978)

* [BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2Transport

Signed-off-by: Andriy Redko <[email protected]>

* Address code review comments

Signed-off-by: Andriy Redko <[email protected]>

---------

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored May 14, 2024
1 parent e672fdc commit b8e0dad
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 40 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ This section is for maintaining a changelog for all breaking changes for the cli

### Fixed
- Fix the deserialization of SortOptions ([#981](https://github.com/opensearch-project/opensearch-java/pull/981))
- Generic HTTP Actions in Java Client does not work with AwsSdk2Transport ([#978](https://github.com/opensearch-project/opensearch-java/pull/978))

### Security

Expand Down Expand Up @@ -419,4 +420,4 @@ This section is for maintaining a changelog for all breaking changes for the cli
[2.5.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.4.0...v2.5.0
[2.4.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.3.0...v2.4.0
[2.3.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.2.0...v2.3.0
[2.2.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.1.0...v2.2.0
[2.2.0]: https://github.com/opensearch-project/opensearch-java/compare/v2.1.0...v2.2.0
Original file line number Diff line number Diff line change
Expand Up @@ -95,27 +95,17 @@ public GenericResponse responseDeserializer(
@Nullable String contentType,
@Nullable InputStream body
) {
if (isError(status)) {
// Fully consume the response body since the it will be propagated as an exception with possible no chance to be closed
try (Body b = Body.from(body, contentType)) {
if (b != null) {
return new GenericResponse(
uri,
protocol,
method,
status,
reason,
headers,
Body.from(b.bodyAsBytes(), b.contentType())
);
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers);
}
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
try (Body b = Body.from(body, contentType)) {
if (b != null) {
// Fully consume the response body:
// - if it will be propagated as an exception with possible no chance to be closed
// - the entity stream will be consumed and become unavailable
return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(b.bodyAsBytes(), b.contentType()));
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers);
}
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType));
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand All @@ -35,6 +37,7 @@
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.GenericEndpoint;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.TransportException;
Expand All @@ -47,6 +50,7 @@
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.SdkHttpClient;
Expand Down Expand Up @@ -393,7 +397,15 @@ private <ResponseT> ResponseT executeSync(
try {
bodyStream = executeResponse.responseBody().orElse(null);
SdkHttpResponse httpResponse = executeResponse.httpResponse();
return parseResponse(httpResponse, bodyStream, endpoint, options);
return parseResponse(
httpRequest.getUri(),
httpRequest.method(),
httpRequest.protocol(),
httpResponse,
bodyStream,
endpoint,
options
);
} finally {
if (bodyStream != null) {
bodyStream.close();
Expand Down Expand Up @@ -421,7 +433,17 @@ private <ResponseT> CompletableFuture<ResponseT> executeAsync(
CompletableFuture<ResponseT> ret = new CompletableFuture<>();
try {
InputStream bodyStream = new ByteArrayInputStream(responseBody);
ret.complete(parseResponse(response, bodyStream, endpoint, options));
ret.complete(
parseResponse(
httpRequest.getUri(),
httpRequest.method(),
httpRequest.protocol(),
response,
bodyStream,
endpoint,
options
)
);
} catch (Throwable e) {
ret.completeExceptionally(e);
}
Expand All @@ -430,6 +452,9 @@ private <ResponseT> CompletableFuture<ResponseT> executeAsync(
}

private <ResponseT, ErrorT> ResponseT parseResponse(
URI uri,
@Nonnull SdkHttpMethod method,
String protocol,
@Nonnull SdkHttpResponse httpResponse,
@CheckForNull InputStream bodyStream,
@Nonnull Endpoint<?, ResponseT, ErrorT> endpoint,
Expand Down Expand Up @@ -478,24 +503,51 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
}

if (endpoint.isError(statusCode)) {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null || bodyStream == null) {
throw new TransportException("Request failed with status code '" + statusCode + "'");
}
try {
try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) {
ErrorT error = errorDeserializer.deserialize(parser, mapper);
throw new OpenSearchException((ErrorResponse) error);
if (endpoint instanceof GenericEndpoint) {
@SuppressWarnings("unchecked")
final GenericEndpoint<?, ResponseT> rawEndpoint = (GenericEndpoint<?, ResponseT>) endpoint;

String contentType = null;
if (bodyStream != null) {
contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null);
}

final ResponseT error = rawEndpoint.responseDeserializer(
uri.toString(),
method.name(),
protocol,
httpResponse.statusCode(),
httpResponse.statusText().orElse(null),
httpResponse.headers()
.entrySet()
.stream()
.map(h -> new AbstractMap.SimpleEntry<String, String>(h.getKey(), Objects.toString(h.getValue())))
.collect(Collectors.toList()),
contentType,
bodyStream
);

throw rawEndpoint.exceptionConverter(statusCode, error);
} else {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null || bodyStream == null) {
throw new TransportException("Request failed with status code '" + statusCode + "'");
}
try {
try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) {
ErrorT error = errorDeserializer.deserialize(parser, mapper);
throw new OpenSearchException((ErrorResponse) error);
}
} catch (OpenSearchException e) {
throw e;
} catch (Exception e) {
// can't parse the error - use a general exception
ErrorCause.Builder cause = new ErrorCause.Builder();
cause.type("http_exception");
cause.reason("server returned " + statusCode);
ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build()));
throw new OpenSearchException(error);
}
} catch (OpenSearchException e) {
throw e;
} catch (Exception e) {
// can't parse the error - use a general exception
ErrorCause.Builder cause = new ErrorCause.Builder();
cause.type("http_exception");
cause.reason("server returned " + statusCode);
ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build()));
throw new OpenSearchException(error);
}
} else {
if (endpoint instanceof BooleanEndpoint) {
Expand Down Expand Up @@ -523,6 +575,29 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
;
}
return response;
} else if (endpoint instanceof GenericEndpoint) {
@SuppressWarnings("unchecked")
final GenericEndpoint<?, ResponseT> rawEndpoint = (GenericEndpoint<?, ResponseT>) endpoint;

String contentType = null;
if (bodyStream != null) {
contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null);
}

return rawEndpoint.responseDeserializer(
uri.toString(),
method.name(),
protocol,
httpResponse.statusCode(),
httpResponse.statusText().orElse(null),
httpResponse.headers()
.entrySet()
.stream()
.map(h -> new AbstractMap.SimpleEntry<String, String>(h.getKey(), Objects.toString(h.getValue())))
.collect(Collectors.toList()),
contentType,
bodyStream
);
} else {
throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.util.zip.GZIPOutputStream;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import org.apache.hc.core5.http.ContentType;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.transport.GenericSerializable;
import org.opensearch.client.transport.OpenSearchTransport;

/**
Expand Down Expand Up @@ -71,6 +73,11 @@ public void addContent(Object content) throws IOException {
if (content instanceof NdJsonpSerializable) {
isMulti = true;
addNdJson(((NdJsonpSerializable) content));
} else if (content instanceof GenericSerializable) {
ContentType.parse(((GenericSerializable) content).serialize(captureBuffer));
if (isMulti) {
captureBuffer.write((byte) '\n');
}
} else {
mapper.serialize(content, jsonGenerator);
jsonGenerator.flush();
Expand Down

0 comments on commit b8e0dad

Please sign in to comment.