This HTTP Client wraps the excellent AsyncHttpClient (AHC) so that Observables are returned, and a number of best practices in RESTful integration are enforced.
RxHttpClient
is now an interface that exposes an API based on Reactive Streams. This API is intended as a foundation for interoperability. It is not to be used in client code- The primary implementation of
RxHttpClient
isRxJavaHttpClient
, which is based on RxJava 3 - A java-interop libraries contains implementations for Spring
Reactor
and the jdk9Flow
API - A scala
fs2
module, provides an alternative io.fs2.Streams-based API (see the README)
This version is built primarily on:
RxJava 3 is fully compatible with Reactive Streams which enables this library to work with with other Reactive-streams compatible libraries such as Reactor, Akka and FS2.
Although the JDK9 Flow API is semantically equivalent to the Reactive-Streams API, it does not implement the
Reactive Streams API. For this reason, the FlowHttpClient
is not an implementor of the RxHttpClient
interface.
The intent is that your application uses one RxJavaHttpClient
instance for each integration point (usually a REST service). Because creating
an RxJavaHttpClient
is expensive, you should do this only once in your application.
As RxJavaHttpClients
are limited to one service, we have natural bulkheads between integration points: errors and failures with
respect to one integration point will have no direct effect on other integration points (at least if following the recommendations below).
An RxJavaHttpClient
is created using the RxHttpClient.Builder
as in this example for Java:
RxJavaHttpClient client = new RxJavaHttpClient.Builder()
.setRequestTimeout(REQUEST_TIME_OUT)
.setMaxConnections(MAX_CONNECTIONS)
.setConnectionTTL(60000)
.setConnectionTimeout(1000)
.setAccept("application/json")
.setBaseUrl("http://example.com/api")
.build();
REST Requests can be created using ClientRequestBuilders
which in turn can be got from RxHttpClient
instances, like so:
ClientRequest request = client.requestBuilder()
.setMethod("GET")
.setUrlRelativetoBase(path)
.addQueryParam("q", "test")
.build();
ClientRequest
s are immutable so can be freely shared across threads or (Akka) Actors.
RxHttpClient
has several methods for executing ClientRequests
:
executeToCompletion(ClientRequest, Function<ServerResponse, F>)
returns anObservable<F>
. The second parameter is a function that decodes theServerResponse
to a value of typeF
. The returnedObservable
emits exactly oneF
before completing. In case of an error, it emits either anHttpClientError
orHttpServerError
.execute(ClientRequest, Function<ServerResponse, F>)
returns aCompletableFuture<F>
with the response after being decoded by the function in the argumentexecuteOservably(ClientRequest, Function<byte[], F>)
returns anObservable<F>
that emits anF
for each HTTP response part or chunk received. This is especially useful for processing HTTP responses that use chunked transfer encoding. Each chunk will be transformed to a value ofF
and directly emitted by the Observable. Notice that there is no guarantee that the received chunks correspond exactly to the chunks as transmitted.executeAndDechunk(ClientRequest, String)
returns anObservable<String>
that emits a String whenever a separator String is observed in the received chunks. This is especially useful when chunked transfer encoding is used for server sent events (SSE) or other streaming data.
All Observables returned by these methods are "Cold" Observables. This means that the ClientRequest
is executed only when some Observer subscribes
to the Observable. In fact, whenever an Observer subscribes to the Observable, the request is executed.
To allow proper bulkheading between integration points and the rest of your application, you should follow these recommendations:
- Set the maximum number of Connections using method
RxHttpClient.Builder().setMaxConnections(int)
so that one misbehaving integration doesn't start exhausting server resources - Set an explicit Connection Time-To-Live (TTL) using method
RxHttpClient.builder().setConnectionTTL(int)
. This ensures that connections are regularly recreated which is a good thing in dynamic (clooud) environments - Ensure you have an appropriate Connection Timeout set using
RxHttpClient.Builder().setConnectionTimeout(int)
. The default is set to 5 seconds. - Ensure you have appropriate Request Timeouts and Read Timeouts set. The default for both is 1 min. These time outs ensures your application doesn't get stuck waiting for very slow or non-responsive servers.
- Before discarding an
RxHttpClient
explicitly invoke theRxHttpClient.close()
method itsExecutorService
is closed and I/O threads are destroyed
Since version 1.0, RxHttpClient uses AHC 2.6.x. or later. This implies a number of minor API changes w.r.t to the 0.x versions.
API Changes:
- The methods in ObservableBodyGenerators no longer declare that the throw
Exception
s ServerResponse#getResponseBody(String)
replaced byServerResponse#getResponseBody(String)
The following methods have been removed:
RxHttpClient.Builder#setExecutorService()
. Replaced byRxHttpClient.Builder#setThreadFactory()
RxHttpClient.Builder#setHostnameVerifier()
RxHttpClient.Builder#setUseRelativeURIsWithConnectProxies()
The following methods have been deprecated:
ClientRequest#getContentLength()
RxHttpClient.Builder#setAllowPoolingConnections(boolean)
: usesetKeepAlive()
RxHttpClient.Builder#setAcceptAnyCertificate(boolean)
: useRxHttpClient.Builder#setUseInsecureTrustManager(boolean)
RxHttpClient.Builder setDisableUrlEncodingForBoundedRequests(boolean)
: useRxHttpClient.Builder#setDisableUrlEncodingForBoundRequests(boolean)