Skip to content

Commit

Permalink
Merge pull request #39 from krasserm/fs2-010
Browse files Browse the repository at this point in the history
Upgrade to FS2 0.10.0-M3
  • Loading branch information
krasserm authored Jul 9, 2017
2 parents 3e4c4e1 + 49667ae commit 55530f7
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 170 deletions.
37 changes: 31 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,47 @@ Streamz

Streamz provides combinator libraries for integrating [Functional Streams for Scala](https://github.com/functional-streams-for-scala/fs2) (FS2), [Akka Streams](http://doc.akka.io/docs/akka/2.4/scala/stream/index.html) and [Apache Camel endpoints](http://camel.apache.org/components.html). They integrate

- **Apache Camel with Akka Streams:** Camel endpoints can be integrated into Akka Stream applications with the [Camel DSL for Akka Streams](streamz-camel-akka/README.md).
- **Apache Camel with FS2:** Camel endpoints can be integrated into FS2 applications with the [Camel DSL for FS2](streamz-camel-fs2/README.md).
- **Akka Streams with FS2:** Akka Stream `Source`s, `Flow`s and `Sink`s can be converted to FS2 `Stream`s, `Pipe`s and `Sink`s, respectively, and vice versa with [Stream converters](streamz-converter/README.md).
- **Apache Camel with Akka Streams:** Camel endpoints can be integrated into Akka Stream applications with the [Camel DSL for Akka Streams](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-camel-akka/README.md).
- **Apache Camel with FS2:** Camel endpoints can be integrated into FS2 applications with the [Camel DSL for FS2](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-camel-fs2/README.md).
- **Akka Streams with FS2:** Akka Stream `Source`s, `Flow`s and `Sink`s can be converted to FS2 `Stream`s, `Pipe`s and `Sink`s, respectively, and vice versa with [Stream converters](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-converter/README.md).

![Streamz intro](images/streamz-intro.png)

Dependencies
------------

Streamz artifacts are available for Scala 2.11 and 2.12:
Streamz artifacts are available for Scala 2.11 and 2.12 at:

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

### Latest stable release

libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.8.1"

libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.8.1"
libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.8.1" // uses FS2 0.9.5

libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.8.1" // uses FS2 0.9.5

### Latest milestone release

libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.8.1"
libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.9-M1"

libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.9-M1" // uses FS2 0.10.0-M3

libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.9-M1" // uses FS2 0.10.0-M3

Documentation
-------------

### Streamz 0.8.1

- [Camel DSL for Akka Streams](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-camel-akka/README.md)
- [Camel DSL for FS2](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-camel-fs2/README.md)
- [Stream converters](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-converter/README.md)
- [Example application](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-examples/README.md)

### Streamz 0.9.x

- [Camel DSL for Akka Streams](streamz-camel-akka/README.md)
- [Camel DSL for FS2](streamz-camel-fs2/README.md)
- [Stream converters](streamz-converter/README.md)
Expand All @@ -36,9 +55,15 @@ Documentation
API docs
--------

### Streamz 0.8.1

- [API docs for Scala 2.12](http://krasserm.github.io/streamz/scala-2.12/unidoc/index.html)
- [API docs for Scala 2.11](http://krasserm.github.io/streamz/scala-2.11/unidoc/index.html)

### Streamz 0.9.x

Not published yet. Run `sbt unidoc` on branch `r-0.9` or `master` for generating 0.9 API docs.

External examples
-----------------

Expand Down
2 changes: 1 addition & 1 deletion project/Version.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
object Version {
val Akka = "2.4.18"
val Camel = "2.19.0"
val Fs2 = "0.9.5"
val Fs2 = "0.10.0-M3"
val Log4j = "2.5"
val JUnitInterface = "0.11"
val Scalatest = "3.0.1"
Expand Down
2 changes: 1 addition & 1 deletion streamz-camel-akka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The DSL is provided by the `streamz-camel-akka` artifact which is available for

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.8.1"
libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.9-M1"

<a name="scala-dsl"></a>
### Scala DSL
Expand Down
24 changes: 14 additions & 10 deletions streamz-camel-fs2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The DSL is provided by the `streamz-camel-fs2` artifact which is available for S

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.8.1"
libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.9-M1"

<a name="dsl"></a>
### DSL
Expand Down Expand Up @@ -50,15 +50,19 @@ After usage, a `StreamContext` should be stopped with `streamContext.stop()`.
An FS2 stream that emits messages consumed from a Camel endpoint can be created with `receive`. Endpoints are referenced by their [endpoint URI](http://camel.apache.org/uris.html). For example,

```scala
import streamz.camel.StreamMessage
import cats.effect.IO
import fs2.Stream
import streamz.camel.StreamContext
import streamz.camel.StreamMessage
import streamz.camel.fs2.dsl._

val s1: Stream[Task, StreamMessage[String]] = receive[String]("seda:q1")
val s1: Stream[IO, StreamMessage[String]] = receive[String]("seda:q1")
```

creates an FS2 stream that consumes messages from the [SEDA endpoint](http://camel.apache.org/seda.html) `seda:q1` and converts them to `StreamMessage[String]`s. A [`StreamMessage[A]`](http://krasserm.github.io/streamz/scala-2.12/unidoc/streamz/camel/StreamMessage.html) contains a message `body` of type `A` and message `headers`. Calling `receive` with a `String` type parameter creates an FS2 stream that converts consumed message bodies to type `String` before emitting them as `StreamMessage[String]`. Type conversion internally uses a Camel [type converter](http://camel.apache.org/type-converter.html). An FS2 stream that only emits the converted message bodies can be created with `receiveBody`:

```scala
val s1b: Stream[Task, String] = receiveBody[String]("seda:q1")
val s1b: Stream[IO, String] = receiveBody[String]("seda:q1")
```

This is equivalent to `receive[String]("seda:q1").map(_.body)`.
Expand All @@ -74,15 +78,15 @@ This is equivalent to `receive[String]("seda:q1").map(_.body)`.
For sending a `StreamMessage` to a Camel endpoint, the `send` combinator should be used:

```scala
val s2: Stream[Task, StreamMessage[String]] = s1.send("seda:q2")
val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2")
```

This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`.

The `send` combinator is not only available for streams of type `Stream[Task, StreamMessage[A]]` but more generally for any stream of type `Stream[Task, A]`.
The `send` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any stream of type `Stream[IO, A]`.

```scala
val s2b: Stream[Task, String] = s1b.send("seda:q2")
val s2b: Stream[IO, String] = s1b.send("seda:q2")
```

If `A` is not a `StreamMessage`, `send` automatically wraps the message into a `StreamMessage[A]` before sending it to the endpoint and continues the stream with the unwrapped `A`.
Expand All @@ -92,15 +96,15 @@ If `A` is not a `StreamMessage`, `send` automatically wraps the message into a `
For sending a request `StreamMessage` to an endpoint and obtaining a reply, the `sendRequest` combinator should be used:

```scala
val s3: Stream[Task, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight")
val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight")
```

This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example).

The `sendRequest` combinator is not only available for streams of type `Stream[Task, StreamMessage[A]]` but more generally for any stream of type `Stream[Task, A]`:
The `sendRequest` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any stream of type `Stream[IO, A]`:

```scala
val s3b: Stream[Task, Int] = s2b.sendRequest[Int]("bean:service?method=weight")
val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight")
```

If `A` is not a `StreamMessage`, `sendRequest` automatically wraps the message into a `StreamMessage[A]` before sending it to the endpoint and continues the stream with the unwrapped message body `B` of the output `StreamMessage[B]`.
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,86 @@

package streamz.camel.fs2

import cats.effect.IO
import cats.implicits._
import fs2._

import org.apache.camel.spi.Synchronization
import org.apache.camel.{ Exchange, ExchangePattern, TypeConversionException }

import streamz.camel.{ StreamContext, StreamMessage }

import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import scala.util._

package object dsl {
/**
* Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[Task, StreamMessage[A]]`.
* Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[IO, StreamMessage[A]]`.
*/
implicit class SendDsl[A](self: Stream[Task, StreamMessage[A]]) {
implicit class SendDsl[A](self: Stream[IO, StreamMessage[A]]) {
/**
* @see [[dsl.send]]
*/
def send(uri: String)(implicit context: StreamContext): Stream[Task, StreamMessage[A]] =
def send(uri: String)(implicit context: StreamContext): Stream[IO, StreamMessage[A]] =
self.through(dsl.send[A](uri))

/**
* @see [[dsl.sendRequest]]
*/
def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[Task, StreamMessage[B]] =
def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[IO, StreamMessage[B]] =
self.through(dsl.sendRequest[A, B](uri))
}

/**
* Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[Task, A]`.
* Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[IO, A]`.
*/
implicit class SendBodyDsl[A](self: Stream[Task, A]) {
implicit class SendBodyDsl[A](self: Stream[IO, A]) {
/**
* @see [[dsl.sendBody]]
*/
def send(uri: String)(implicit context: StreamContext): Stream[Task, A] =
def send(uri: String)(implicit context: StreamContext): Stream[IO, A] =
self.through(dsl.sendBody[A](uri))

/**
* @see [[dsl.sendRequestBody]]
*/
def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[Task, B] =
def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[IO, B] =
self.through(dsl.sendRequestBody[A, B](uri))
}

/**
* Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[Pure, StreamMessage[A]]`.
*/
implicit class SendDslPure[A](self: Stream[Pure, StreamMessage[A]]) {
/**
* @see [[dsl.send]]
*/
def send(uri: String)(implicit context: StreamContext): Stream[IO, StreamMessage[A]] =
new SendDsl[A](self.covary[IO]).send(uri)

/**
* @see [[dsl.sendRequest()]]
*/
def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[IO, StreamMessage[B]] =
new SendDsl[A](self.covary[IO]).sendRequest(uri)
}

/**
* Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[Pure, A]`.
*/
implicit class SendBodyDslPure[A](self: Stream[Pure, A]) {
/**
* @see [[dsl.sendBody]]
*/
def send(uri: String)(implicit context: StreamContext): Stream[IO, A] =
new SendBodyDsl[A](self.covary[IO]).send(uri)

/**
* @see [[dsl.sendRequestBody]]
*/
def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[IO, B] =
new SendBodyDsl[A](self.covary[IO]).sendRequest(uri)
}

/**
* Creates a stream of [[StreamMessage]]s consumed from the Camel endpoint identified by `uri`.
* [[StreamMessage]] bodies are converted to type `A` using a Camel type converter. The stream
Expand All @@ -73,7 +108,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws TypeConversionException if type conversion fails.
*/
def receive[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[Task, StreamMessage[A]] = {
def receive[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[IO, StreamMessage[A]] = {
consume(uri).filter(_ != null)
}

Expand All @@ -89,7 +124,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws TypeConversionException if type conversion fails.
*/
def receiveBody[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[Task, A] =
def receiveBody[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[IO, A] =
receive(uri).map(_.body)

/**
Expand All @@ -99,7 +134,7 @@ package object dsl {
*
* @param uri Camel endpoint URI.
*/
def send[A](uri: String)(implicit context: StreamContext): Pipe[Task, StreamMessage[A], StreamMessage[A]] =
def send[A](uri: String)(implicit context: StreamContext): Pipe[IO, StreamMessage[A], StreamMessage[A]] =
produce[A, A](uri, ExchangePattern.InOnly, (message, _) => message)

/**
Expand All @@ -109,7 +144,7 @@ package object dsl {
*
* @param uri Camel endpoint URI.
*/
def sendBody[A](uri: String)(implicit context: StreamContext): Pipe[Task, A, A] =
def sendBody[A](uri: String)(implicit context: StreamContext): Pipe[IO, A, A] =
s => s.map(StreamMessage(_)).through(send(uri)).map(_.body)

/**
Expand All @@ -121,7 +156,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws TypeConversionException if type conversion fails.
*/
def sendRequest[A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[Task, StreamMessage[A], StreamMessage[B]] =
def sendRequest[A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[IO, StreamMessage[A], StreamMessage[B]] =
produce[A, B](uri, ExchangePattern.InOut, (_, exchange) => StreamMessage.from[B](exchange.getOut))

/**
Expand All @@ -133,13 +168,13 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws TypeConversionException if type conversion fails.
*/
def sendRequestBody[A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[Task, A, B] =
def sendRequestBody[A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[IO, A, B] =
s => s.map(StreamMessage(_)).through(sendRequest[A, B](uri)).map(_.body)

private def consume[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[Task, StreamMessage[A]] = {
implicit val strategy = Strategy.fromExecutor(context.executorService)
private def consume[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[IO, StreamMessage[A]] = {
implicit val executionContext = ExecutionContext.fromExecutor(context.executorService)
Stream.repeatEval {
Task.async[StreamMessage[A]] { callback =>
IO.shift >> IO.async[StreamMessage[A]] { callback =>
Try(context.consumerTemplate.receive(uri, 500)) match {
case Success(null) =>
callback(Right(null))
Expand All @@ -162,11 +197,11 @@ package object dsl {
}
}

private def produce[A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext): Pipe[Task, StreamMessage[A], StreamMessage[B]] = { s =>
implicit val strategy = Strategy.fromExecutor(context.executorService)
private def produce[A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext): Pipe[IO, StreamMessage[A], StreamMessage[B]] = { s =>
implicit val executionContext = ExecutionContext.fromExecutor(context.executorService)
s.flatMap { message =>
Stream.eval {
Task.async[StreamMessage[B]] { callback =>
IO.shift >> IO.async[StreamMessage[B]] { callback =>
context.producerTemplate.asyncCallback(uri, context.createExchange(message, pattern), new Synchronization {
override def onFailure(exchange: Exchange): Unit =
callback(Left(exchange.getException))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,31 @@ class DslSpec extends WordSpec with Matchers with BeforeAndAfterAll {
"receive" must {
"consume from an endpoint" in {
1 to 3 foreach { i => producerTemplate.sendBody("seda:q1", i) }
receiveBody[Int]("seda:q1").take(3).runLog.unsafeRun should be(Seq(1, 2, 3))
receiveBody[Int]("seda:q1").take(3).runLog.unsafeRunSync() should be(Seq(1, 2, 3))
}
"complete with an error if type conversion fails" in {
producerTemplate.sendBody("seda:q2", "a")
intercept[TypeConversionException](receiveBody[Int]("seda:q2").run.unsafeRun)
intercept[TypeConversionException](receiveBody[Int]("seda:q2").run.unsafeRunSync())
}
}

"send" must {
"send a message to an endpoint and continue with the sent message" in {
val result = Stream(1, 2, 3).send("seda:q3").take(3).runLog.unsafeRunAsyncFuture
val result = Stream(1, 2, 3).send("seda:q3").take(3).runLog.unsafeToFuture()
1 to 3 foreach { i => consumerTemplate.receiveBody("seda:q3") should be(i) }
Await.result(result, 3.seconds) should be(Seq(1, 2, 3))
}
}

"sendRequest" must {
"send a request message to an endpoint and continue with the response message" in {
Stream(1, 2, 3).sendRequest[Int]("bean:service?method=plusOne").runLog.unsafeRun should be(Seq(2, 3, 4))
Stream(1, 2, 3).sendRequest[Int]("bean:service?method=plusOne").runLog.unsafeRunSync() should be(Seq(2, 3, 4))
}
"convert response message types using a Camel type converter" in {
Stream(1, 2, 3).sendRequest[String]("bean:service?method=plusOne").runLog.unsafeRun should be(Seq("2", "3", "4"))
Stream(1, 2, 3).sendRequest[String]("bean:service?method=plusOne").runLog.unsafeRunSync() should be(Seq("2", "3", "4"))
}
"complete with an error if the request fails" in {
intercept[Exception](Stream(-1, 2, 3).sendRequest[Int]("bean:service?method=plusOne").run.unsafeRun).getMessage should be("test")
intercept[Exception](Stream(-1, 2, 3).sendRequest[Int]("bean:service?method=plusOne").run.unsafeRunSync()).getMessage should be("test")
}
}
}
Loading

0 comments on commit 55530f7

Please sign in to comment.