Skip to content

Commit

Permalink
Merge branch 'release/2.0-RC1'
Browse files Browse the repository at this point in the history
  • Loading branch information
maesenka committed Apr 21, 2020
2 parents f2ebcba + 2dd2961 commit 5935104
Show file tree
Hide file tree
Showing 56 changed files with 2,391 additions and 1,921 deletions.
85 changes: 46 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,45 @@
This HTTP Client wraps the excellent [AsyncHttpClient](https://github.com/AsyncHttpClient/async-http-client) (AHC) so that
Observables are returned, and a number of best practices in RESTful integration are enforced.

# Upgrade to AHC 2
# Version 2.x

This version of RxHttpClient uses AHC 2.8.x. This implies a number of minor API changes w.r.t to the 0.x versions.
## Overview of changes

API Changes:

- The methods in ObservableBodyGenerators no longer declare that the throw `Exception`s
- `ServerResponse#getResponseBody(String)` replaced by `ServerResponse#getResponseBody(String)`

The following methods have been removed:

- `RxHttpClient.Builder#setExecutorService()`. Replaced by `RxHttpClient.Builder#setThreadFactory()`
- `RxHttpClient.Builder#setHostnameVerifier()`
- `RxHttpClient.Builder#setUseRelativeURIsWithConnectProxies()`
- `RxHttpClient` is now an interface that exposes an API based on [Reactive Streams](https://github.com/reactive-streams/reactive-streams-jvm). This
API is intended as a foundation for interoperability. It is not to be used in client code
- The primary implementation of `RxHttpClient` is `RxJavaHttpClient`, which is based on [RxJava 3](https://github.com/ReactiveX/RxJava)
- A java-interop libraries contains implementations for Spring `Reactor` and the jdk9 `Flow` API
- A scala `fs2` module, provides an alternative io.fs2.Streams-based API (see the [README](modules/fs2/README.md))

## Design
This version is built primarily on:

The following methods have been deprecated:
- [AsyncHttpClient 2.x](https://github.com/AsyncHttpClient/async-http-client)
- [RxJava 3.x](https://github.com/ReactiveX/RxJava)

RxJava 3 is fully compatible with [Reactive Streams](https://github.com/reactive-streams/reactive-streams-jvm) which enables this library to
work with with other Reactive-streams compatible libraries such as Reactor, Akka and FS2.

- `ClientRequest#getContentLength()`
- `RxHttpClient.Builder#setAllowPoolingConnections(boolean)`: use `setKeepAlive()`
- `RxHttpClient.Builder#setAcceptAnyCertificate(boolean)`: use `RxHttpClient.Builder#setUseInsecureTrustManager(boolean)`
- `RxHttpClient.Builder setDisableUrlEncodingForBoundedRequests(boolean)`: use ` RxHttpClient.Builder#setDisableUrlEncodingForBoundRequests(boolean)`
Although the JDK9 Flow API is semantically equivalent to the Reactive-Streams API, *it does not implement the
Reactive Streams API*. For this reason, the `FlowHttpClient` is not an implementor of the `RxHttpClient` interface.

# User Guide

## The RxHttpClient
## The RxJavaHttpClient

The intent is that your application uses one `RxHttpClient` instance for each integration point (usually a REST service). Because creating
an `RxHttpClient` is expensive, you should do this only once in your application.
The intent is that your application uses one `RxJavaHttpClient` instance for each integration point (usually a REST service). Because creating
an `RxJavaHttpClient` is expensive, you should do this only once in your application.

As `RxHttpClients` are limited to one service, we have natural bulkheads between integration points: errors and failures with
As `RxJavaHttpClients` are limited to one service, we have natural bulkheads between integration points: errors and failures with
respect to one integration point will have no direct effect on other integration points (at least if following the recommendations below).


## Creating an RxHttpClient
### Creating an RxHttpClient

An `RxHttpClient` is created using the `RxHttpClient.Builder` as in this example for Java:
An `RxJavaHttpClient` is created using the `RxHttpClient.Builder` as in this example for Java:


RxHttpClient client = new RxHttpClient.Builder()
RxJavaHttpClient client = new RxJavaHttpClient.Builder()
.setRequestTimeout(REQUEST_TIME_OUT)
.setMaxConnections(MAX_CONNECTIONS)
.setConnectionTTL(60000)
Expand All @@ -54,22 +53,8 @@ An `RxHttpClient` is created using the `RxHttpClient.Builder` as in this example
.setBaseUrl("http://example.com/api")
.build();

and for Scala:

import be.wegenenverkeer.rxhttp.scala.ImplicitConversions._

val client = new RxHttpClient.Builder()
.setRequestTimeout(REQUEST_TIME_OUT)
.setMaxConnections(MAX_CONNECTIONS)
.setConnectionTTL(60000)
.setConnectionTimeout(1000)
.setAccept("application/json")
.setBaseUrl("http://example.com/api")
.build
.asScala


## Creating Requests
### Creating Requests

REST Requests can be created using `ClientRequestBuilders` which in turn can be got from `RxHttpClient` instances, like so:

Expand Down Expand Up @@ -115,5 +100,27 @@ doesn't get stuck waiting for very slow or non-responsive servers.
destroyed


# Notes when upgrading from versions prior to 1.0

Since version 1.0, RxHttpClient uses AHC 2.6.x. or later. This implies a number of minor API changes w.r.t to the 0.x versions.

API Changes:

- The methods in ObservableBodyGenerators no longer declare that the throw `Exception`s
- `ServerResponse#getResponseBody(String)` replaced by `ServerResponse#getResponseBody(String)`

The following methods have been removed:

- `RxHttpClient.Builder#setExecutorService()`. Replaced by `RxHttpClient.Builder#setThreadFactory()`
- `RxHttpClient.Builder#setHostnameVerifier()`
- `RxHttpClient.Builder#setUseRelativeURIsWithConnectProxies()`


The following methods have been deprecated:

- `ClientRequest#getContentLength()`
- `RxHttpClient.Builder#setAllowPoolingConnections(boolean)`: use `setKeepAlive()`
- `RxHttpClient.Builder#setAcceptAnyCertificate(boolean)`: use `RxHttpClient.Builder#setUseInsecureTrustManager(boolean)`
- `RxHttpClient.Builder setDisableUrlEncodingForBoundedRequests(boolean)`: use ` RxHttpClient.Builder#setDisableUrlEncodingForBoundRequests(boolean)`


95 changes: 65 additions & 30 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,48 +1,71 @@
val Organization = "be.wegenenverkeer"

val Version = "1.2.0"
val Version = "2.0-RC1"

val ScalaVersion = "2.12.8"
val ScalaVersion = "2.13.0"

val ScalaBuildOptions = Seq("-unchecked",
"-deprecation",
"-feature",
"-language:reflectiveCalls",
"-language:implicitConversions",
"-language:postfixOps")
"-language:postfixOps",
"-language:higherKinds")


val asyncClient = "org.asynchttpclient" % "async-http-client" % "2.8.1"
val rxjava = "io.reactivex" % "rxjava" % "1.2.4"
val rxscala = "io.reactivex" %% "rxscala" % "0.26.5"
val slf4j = "org.slf4j" % "slf4j-api" % "1.7.25"
val commonsCodec = "commons-codec" % "commons-codec" % "1.10"
val json = "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.8" % "provided"
val asyncClient = "org.asynchttpclient" % "async-http-client" % "2.12.1"

val rxStreamsVersion = "1.0.3"
val rxJavaVersion = "3.0.1"
val reactorVersion = "3.3.3.RELEASE"
val fs2Version = "2.2.2"

val junit = "junit" % "junit" % "4.11" % "test"
val specs2 = "org.specs2" %% "specs2-core" % "3.8.6" % "test"
val slf4jSimple = "org.slf4j" % "slf4j-simple" % "1.7.6" % "test"
val wiremock = "com.github.tomakehurst" % "wiremock-jre8" % "2.23.2" % "test"
val slf4j = "org.slf4j" % "slf4j-api" % "1.7.30"
val commonsCodec = "commons-codec" % "commons-codec" % "1.10"
val json = "com.fasterxml.jackson.core" % "jackson-databind" % "2.10.3" % "provided"
val rx = "org.reactivestreams" % "reactive-streams" % rxStreamsVersion
val reactorAdapter = "io.projectreactor.addons" % "reactor-adapter" % reactorVersion
val reactorTest = "io.projectreactor" % "reactor-test" % reactorVersion % "test"

val rxJava = "io.reactivex.rxjava3" % "rxjava" % rxJavaVersion
val specs2 = "org.specs2" %% "specs2-core" % "4.9.3" % "test"
val slf4jSimple = "org.slf4j" % "slf4j-simple" % "1.7.30" % "test"
val wiremock = "com.github.tomakehurst" % "wiremock-jre8" % "2.26.3" % "test"
val junitInterface = "com.novocode" % "junit-interface" % "0.11" % Test
val jsonPath = "com.jayway.jsonpath" % "json-path" % "1.2.0" % "test"
val jsonPath = "com.jayway.jsonpath" % "json-path" % "2.4.0" % "test"



val commonDependencies = Seq(
asyncClient,
rxjava,
slf4j,
commonsCodec,
json
)

val javaDependencies = commonDependencies ++ Seq(slf4jSimple, junitInterface)
val rxJavaDependencies = Seq(
rxJava
)

lazy val interopDependencies = Seq(
rx,
reactorAdapter,
reactorTest
)


val javaDependencies = commonDependencies ++ Seq(slf4jSimple)

val scalaDependencies = commonDependencies ++ Seq(
rxscala,
specs2
)

val fs2Dependencies = commonDependencies ++ scalaDependencies ++ Seq(
"co.fs2" %% "fs2-core" % fs2Version % "provided",
"co.fs2" %% "fs2-reactive-streams" % fs2Version % "provided"
)

val mainTestDependencies = Seq(
junit,
slf4jSimple,
wiremock,
junitInterface,
Expand All @@ -55,6 +78,7 @@ lazy val disablePublishingRoot = Seq(
publish / skip := true
)

addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.11.0" cross CrossVersion.full)

lazy val moduleSettings =
Seq(
Expand All @@ -63,14 +87,17 @@ lazy val moduleSettings =
scalaVersion := ScalaVersion,
scalacOptions := ScalaBuildOptions,
parallelExecution := false,
parallelExecution in Test := false,
test / fork := true,
resolvers += "Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository",
resolvers += Resolver.typesafeRepo("releases"),
resolvers += "Spring repository" at "https://repo.spring.io/milestone"
) ++ testSettings ++ publishSettings //++ jacoco.settings

lazy val extraJavaSettings = Seq(
crossPaths := false,
autoScalaLibrary := false,
//Test / parallelExecution := false,
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % Test,
// javacOptions ++= Seq("-Xlint:deprecation"),
testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v")
)
Expand All @@ -79,26 +106,34 @@ lazy val testSettings = Seq(
libraryDependencies ++= mainTestDependencies,
parallelExecution in Test := false
)
lazy val javaInterop = (project in file("modules/java-interop")).settings(
name := "RxHttpClient-interop",
moduleSettings ++ extraJavaSettings,
javacOptions ++= Seq("--release", "11"),
libraryDependencies ++= javaDependencies ++ interopDependencies,
extraJavaSettings
) dependsOn (java % "compile->compile;test->test")

lazy val javaModule = (project in file("modules/java")).settings(
name := "RxHttpClient-java",
lazy val fs2 = (project in file("modules/fs2")).settings(
name := "RxHttpclient-fs2",
moduleSettings,
libraryDependencies ++= javaDependencies,
extraJavaSettings
)
libraryDependencies ++= fs2Dependencies
).dependsOn(java % "compile->compile;test->test")

lazy val scalaModule = (project in file("modules/scala")).settings(
name := "RxHttpClient-scala",
lazy val java = (project in file("modules/java")).settings(
name := "RxHttpClient",
moduleSettings,
libraryDependencies ++= scalaDependencies
) dependsOn javaModule
javacOptions ++= Seq("--release", "11"),
libraryDependencies ++= javaDependencies ++ rxJavaDependencies,
extraJavaSettings
)

lazy val main = (project in file("."))
.settings(
moduleSettings ++ disablePublishingRoot,
moduleSettings ++ disablePublishingRoot ++ extraJavaSettings,
name := "RxHttpClient"
)
.aggregate(javaModule, scalaModule)
.aggregate(javaInterop, java, fs2)

lazy val pomInfo = <url>https://github.com/WegenenVerkeer/atomium</url>
<licenses>
Expand Down
69 changes: 69 additions & 0 deletions modules/fs2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Overview

This module provides interop with the [FS2](https://github.com/functional-streams-for-scala/fs2) library.


# Getting Started

This module requires fs2, fs-reactive-streams on the classpath.

```
val fs2Version = ??? // any recent one should work
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-reactive-streams" % fs2Version,
"be.wegenenverkeer" %% "rxhttp-fs2" % "2.0-RC1")
```

This will pull the `be.wegenenverkeer.rxhttpClient` package in as a transitive dependency.

# API

This module provides a Streaming API `FSHttpApi`:

```
trait Fs2HttpApi {
def stream[F[_] : ConcurrentEffect](request: ClientRequest): Stream[F, ServerResponseElement]
def streamDechunked[F[_] : ConcurrentEffect](request: ClientRequest, separator: String, charset: Charset): Stream[F, String]
def streamDechunked[F[_] : ConcurrentEffect](request: ClientRequest, separator: String): Stream[F, String]
def stream[F[_] : ConcurrentEffect, A](req: ClientRequest, transform: Array[Byte] => A): Stream[F, A]
def execute[F[_] : Async, A](req: ClientRequest, tr: ServerResponse => A): F[A]
def requestBuilder: ClientRequestBuilder
def requestSigners: util.List[RequestSigner]
}
```

After importing `fs2.Implicits._` a RxJavaHttpClient can be converted into an implementation
of this trait.


```
val client : RxJavaHttpClient = ???
val response = client.fs2HttpApi.stream[IO, String](request, b => new String(b))
val output = response.compile.toVector.unsafeRunSync()
```

# Usage

Here is an example that gets the response elements (chunks) in a `fs2.Stream[IO, String]`. Obviously,
the request will only fire when the effect is run.

```
import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
import be.wegenenverkeer.rxhttp.fs2.Implicits._
val client : RxJavaHttpClient = ???
val response = client.fs2HttpApi.stream[IO, String](request, b => new String(b))
```

We can also return the complete response as a single value wrapped in an `IO`.

```
val resp = client.fs2HttpApi.execute[IO, String](request, sr => sr.getResponseBody)
//resp : IO[String]
```



Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package be.wegenenverkeer.rxhttp.fs2

import java.nio.charset.Charset
import java.util

import be.wegenenverkeer.rxhttp._
import cats.effect.{Async, ConcurrentEffect}
import _root_.fs2.Stream

/**
* Created by Karel Maesen, Geovise BVBA on 20/04/2020.
*/
trait Fs2HttpApi {

def stream[F[_] : ConcurrentEffect](request: ClientRequest): Stream[F, ServerResponseElement]

def streamDechunked[F[_] : ConcurrentEffect](request: ClientRequest, separator: String, charset: Charset): Stream[F, String]

def streamDechunked[F[_] : ConcurrentEffect](request: ClientRequest, separator: String): Stream[F, String]

def stream[F[_] : ConcurrentEffect, A](req: ClientRequest, transform: Array[Byte] => A): Stream[F, A]

def execute[F[_] : Async, A](req: ClientRequest, tr: ServerResponse => A): F[A]

def requestBuilder: ClientRequestBuilder

def requestSigners: util.List[RequestSigner]

}
Loading

0 comments on commit 5935104

Please sign in to comment.