diff --git a/README.md b/README.md index 29c8cdf..503565a 100644 --- a/README.md +++ b/README.md @@ -6,28 +6,47 @@ Streamz Streamz provides combinator libraries for integrating [Functional Streams for Scala](https://github.com/functional-streams-for-scala/fs2) (FS2), [Akka Streams](http://doc.akka.io/docs/akka/2.4/scala/stream/index.html) and [Apache Camel endpoints](http://camel.apache.org/components.html). They integrate -- **Apache Camel with Akka Streams:** Camel endpoints can be integrated into Akka Stream applications with the [Camel DSL for Akka Streams](streamz-camel-akka/README.md). -- **Apache Camel with FS2:** Camel endpoints can be integrated into FS2 applications with the [Camel DSL for FS2](streamz-camel-fs2/README.md). -- **Akka Streams with FS2:** Akka Stream `Source`s, `Flow`s and `Sink`s can be converted to FS2 `Stream`s, `Pipe`s and `Sink`s, respectively, and vice versa with [Stream converters](streamz-converter/README.md). +- **Apache Camel with Akka Streams:** Camel endpoints can be integrated into Akka Stream applications with the [Camel DSL for Akka Streams](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-camel-akka/README.md). +- **Apache Camel with FS2:** Camel endpoints can be integrated into FS2 applications with the [Camel DSL for FS2](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-camel-fs2/README.md). +- **Akka Streams with FS2:** Akka Stream `Source`s, `Flow`s and `Sink`s can be converted to FS2 `Stream`s, `Pipe`s and `Sink`s, respectively, and vice versa with [Stream converters](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-converter/README.md). ![Streamz intro](images/streamz-intro.png) Dependencies ------------ -Streamz artifacts are available for Scala 2.11 and 2.12: +Streamz artifacts are available for Scala 2.11 and 2.12 at: resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven" +### Latest stable release + libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.8.1" - libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.8.1" + libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.8.1" // uses FS2 0.9.5 + + libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.8.1" // uses FS2 0.9.5 + +### Latest milestone release - libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.8.1" + libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.9-M1" + + libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.9-M1" // uses FS2 0.10.0-M3 + + libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.9-M1" // uses FS2 0.10.0-M3 Documentation ------------- +### Streamz 0.8.1 + +- [Camel DSL for Akka Streams](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-camel-akka/README.md) +- [Camel DSL for FS2](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-camel-fs2/README.md) +- [Stream converters](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-converter/README.md) +- [Example application](https://github.com/krasserm/streamz/blob/v-0.8.1/streamz-examples/README.md) + +### Streamz 0.9.x + - [Camel DSL for Akka Streams](streamz-camel-akka/README.md) - [Camel DSL for FS2](streamz-camel-fs2/README.md) - [Stream converters](streamz-converter/README.md) @@ -36,9 +55,15 @@ Documentation API docs -------- +### Streamz 0.8.1 + - [API docs for Scala 2.12](http://krasserm.github.io/streamz/scala-2.12/unidoc/index.html) - [API docs for Scala 2.11](http://krasserm.github.io/streamz/scala-2.11/unidoc/index.html) +### Streamz 0.9.x + +Not published yet. Run `sbt unidoc` on branch `r-0.9` or `master` for generating 0.9 API docs. + External examples ----------------- diff --git a/project/Version.scala b/project/Version.scala index 7633f27..eef1124 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -1,7 +1,7 @@ object Version { val Akka = "2.4.18" val Camel = "2.19.0" - val Fs2 = "0.9.5" + val Fs2 = "0.10.0-M3" val Log4j = "2.5" val JUnitInterface = "0.11" val Scalatest = "3.0.1" diff --git a/streamz-camel-akka/README.md b/streamz-camel-akka/README.md index 28374a1..854ff62 100644 --- a/streamz-camel-akka/README.md +++ b/streamz-camel-akka/README.md @@ -9,7 +9,7 @@ The DSL is provided by the `streamz-camel-akka` artifact which is available for resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven" - libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.8.1" + libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.9-M1" ### Scala DSL diff --git a/streamz-camel-fs2/README.md b/streamz-camel-fs2/README.md index f439c4c..e2ca4e8 100644 --- a/streamz-camel-fs2/README.md +++ b/streamz-camel-fs2/README.md @@ -9,7 +9,7 @@ The DSL is provided by the `streamz-camel-fs2` artifact which is available for S resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven" - libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.8.1" + libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.9-M1" ### DSL @@ -50,15 +50,19 @@ After usage, a `StreamContext` should be stopped with `streamContext.stop()`. An FS2 stream that emits messages consumed from a Camel endpoint can be created with `receive`. Endpoints are referenced by their [endpoint URI](http://camel.apache.org/uris.html). For example, ```scala -import streamz.camel.StreamMessage +import cats.effect.IO +import fs2.Stream +import streamz.camel.StreamContext +import streamz.camel.StreamMessage +import streamz.camel.fs2.dsl._ -val s1: Stream[Task, StreamMessage[String]] = receive[String]("seda:q1") +val s1: Stream[IO, StreamMessage[String]] = receive[String]("seda:q1") ``` creates an FS2 stream that consumes messages from the [SEDA endpoint](http://camel.apache.org/seda.html) `seda:q1` and converts them to `StreamMessage[String]`s. A [`StreamMessage[A]`](http://krasserm.github.io/streamz/scala-2.12/unidoc/streamz/camel/StreamMessage.html) contains a message `body` of type `A` and message `headers`. Calling `receive` with a `String` type parameter creates an FS2 stream that converts consumed message bodies to type `String` before emitting them as `StreamMessage[String]`. Type conversion internally uses a Camel [type converter](http://camel.apache.org/type-converter.html). An FS2 stream that only emits the converted message bodies can be created with `receiveBody`: ```scala -val s1b: Stream[Task, String] = receiveBody[String]("seda:q1") +val s1b: Stream[IO, String] = receiveBody[String]("seda:q1") ``` This is equivalent to `receive[String]("seda:q1").map(_.body)`. @@ -74,15 +78,15 @@ This is equivalent to `receive[String]("seda:q1").map(_.body)`. For sending a `StreamMessage` to a Camel endpoint, the `send` combinator should be used: ```scala -val s2: Stream[Task, StreamMessage[String]] = s1.send("seda:q2") +val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2") ``` This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`. -The `send` combinator is not only available for streams of type `Stream[Task, StreamMessage[A]]` but more generally for any stream of type `Stream[Task, A]`. +The `send` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any stream of type `Stream[IO, A]`. ```scala -val s2b: Stream[Task, String] = s1b.send("seda:q2") +val s2b: Stream[IO, String] = s1b.send("seda:q2") ``` If `A` is not a `StreamMessage`, `send` automatically wraps the message into a `StreamMessage[A]` before sending it to the endpoint and continues the stream with the unwrapped `A`. @@ -92,15 +96,15 @@ If `A` is not a `StreamMessage`, `send` automatically wraps the message into a ` For sending a request `StreamMessage` to an endpoint and obtaining a reply, the `sendRequest` combinator should be used: ```scala -val s3: Stream[Task, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight") +val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight") ``` This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example). -The `sendRequest` combinator is not only available for streams of type `Stream[Task, StreamMessage[A]]` but more generally for any stream of type `Stream[Task, A]`: +The `sendRequest` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any stream of type `Stream[IO, A]`: ```scala -val s3b: Stream[Task, Int] = s2b.sendRequest[Int]("bean:service?method=weight") +val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight") ``` If `A` is not a `StreamMessage`, `sendRequest` automatically wraps the message into a `StreamMessage[A]` before sending it to the endpoint and continues the stream with the unwrapped message body `B` of the output `StreamMessage[B]`. \ No newline at end of file diff --git a/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala b/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala index 9849cf1..e60370c 100644 --- a/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala +++ b/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala @@ -16,51 +16,86 @@ package streamz.camel.fs2 +import cats.effect.IO +import cats.implicits._ import fs2._ - import org.apache.camel.spi.Synchronization import org.apache.camel.{ Exchange, ExchangePattern, TypeConversionException } - import streamz.camel.{ StreamContext, StreamMessage } +import scala.concurrent.ExecutionContext import scala.reflect.ClassTag import scala.util._ package object dsl { /** - * Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[Task, StreamMessage[A]]`. + * Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[IO, StreamMessage[A]]`. */ - implicit class SendDsl[A](self: Stream[Task, StreamMessage[A]]) { + implicit class SendDsl[A](self: Stream[IO, StreamMessage[A]]) { /** * @see [[dsl.send]] */ - def send(uri: String)(implicit context: StreamContext): Stream[Task, StreamMessage[A]] = + def send(uri: String)(implicit context: StreamContext): Stream[IO, StreamMessage[A]] = self.through(dsl.send[A](uri)) /** * @see [[dsl.sendRequest]] */ - def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[Task, StreamMessage[B]] = + def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[IO, StreamMessage[B]] = self.through(dsl.sendRequest[A, B](uri)) } /** - * Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[Task, A]`. + * Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[IO, A]`. */ - implicit class SendBodyDsl[A](self: Stream[Task, A]) { + implicit class SendBodyDsl[A](self: Stream[IO, A]) { /** * @see [[dsl.sendBody]] */ - def send(uri: String)(implicit context: StreamContext): Stream[Task, A] = + def send(uri: String)(implicit context: StreamContext): Stream[IO, A] = self.through(dsl.sendBody[A](uri)) /** * @see [[dsl.sendRequestBody]] */ - def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[Task, B] = + def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[IO, B] = self.through(dsl.sendRequestBody[A, B](uri)) } + /** + * Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[Pure, StreamMessage[A]]`. + */ + implicit class SendDslPure[A](self: Stream[Pure, StreamMessage[A]]) { + /** + * @see [[dsl.send]] + */ + def send(uri: String)(implicit context: StreamContext): Stream[IO, StreamMessage[A]] = + new SendDsl[A](self.covary[IO]).send(uri) + + /** + * @see [[dsl.sendRequest()]] + */ + def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[IO, StreamMessage[B]] = + new SendDsl[A](self.covary[IO]).sendRequest(uri) + } + + /** + * Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[Pure, A]`. + */ + implicit class SendBodyDslPure[A](self: Stream[Pure, A]) { + /** + * @see [[dsl.sendBody]] + */ + def send(uri: String)(implicit context: StreamContext): Stream[IO, A] = + new SendBodyDsl[A](self.covary[IO]).send(uri) + + /** + * @see [[dsl.sendRequestBody]] + */ + def sendRequest[B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[IO, B] = + new SendBodyDsl[A](self.covary[IO]).sendRequest(uri) + } + /** * Creates a stream of [[StreamMessage]]s consumed from the Camel endpoint identified by `uri`. * [[StreamMessage]] bodies are converted to type `A` using a Camel type converter. The stream @@ -73,7 +108,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws TypeConversionException if type conversion fails. */ - def receive[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[Task, StreamMessage[A]] = { + def receive[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[IO, StreamMessage[A]] = { consume(uri).filter(_ != null) } @@ -89,7 +124,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws TypeConversionException if type conversion fails. */ - def receiveBody[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[Task, A] = + def receiveBody[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[IO, A] = receive(uri).map(_.body) /** @@ -99,7 +134,7 @@ package object dsl { * * @param uri Camel endpoint URI. */ - def send[A](uri: String)(implicit context: StreamContext): Pipe[Task, StreamMessage[A], StreamMessage[A]] = + def send[A](uri: String)(implicit context: StreamContext): Pipe[IO, StreamMessage[A], StreamMessage[A]] = produce[A, A](uri, ExchangePattern.InOnly, (message, _) => message) /** @@ -109,7 +144,7 @@ package object dsl { * * @param uri Camel endpoint URI. */ - def sendBody[A](uri: String)(implicit context: StreamContext): Pipe[Task, A, A] = + def sendBody[A](uri: String)(implicit context: StreamContext): Pipe[IO, A, A] = s => s.map(StreamMessage(_)).through(send(uri)).map(_.body) /** @@ -121,7 +156,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws TypeConversionException if type conversion fails. */ - def sendRequest[A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[Task, StreamMessage[A], StreamMessage[B]] = + def sendRequest[A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[IO, StreamMessage[A], StreamMessage[B]] = produce[A, B](uri, ExchangePattern.InOut, (_, exchange) => StreamMessage.from[B](exchange.getOut)) /** @@ -133,13 +168,13 @@ package object dsl { * @param uri Camel endpoint URI. * @throws TypeConversionException if type conversion fails. */ - def sendRequestBody[A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[Task, A, B] = + def sendRequestBody[A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[IO, A, B] = s => s.map(StreamMessage(_)).through(sendRequest[A, B](uri)).map(_.body) - private def consume[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[Task, StreamMessage[A]] = { - implicit val strategy = Strategy.fromExecutor(context.executorService) + private def consume[A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[IO, StreamMessage[A]] = { + implicit val executionContext = ExecutionContext.fromExecutor(context.executorService) Stream.repeatEval { - Task.async[StreamMessage[A]] { callback => + IO.shift >> IO.async[StreamMessage[A]] { callback => Try(context.consumerTemplate.receive(uri, 500)) match { case Success(null) => callback(Right(null)) @@ -162,11 +197,11 @@ package object dsl { } } - private def produce[A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext): Pipe[Task, StreamMessage[A], StreamMessage[B]] = { s => - implicit val strategy = Strategy.fromExecutor(context.executorService) + private def produce[A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext): Pipe[IO, StreamMessage[A], StreamMessage[B]] = { s => + implicit val executionContext = ExecutionContext.fromExecutor(context.executorService) s.flatMap { message => Stream.eval { - Task.async[StreamMessage[B]] { callback => + IO.shift >> IO.async[StreamMessage[B]] { callback => context.producerTemplate.asyncCallback(uri, context.createExchange(message, pattern), new Synchronization { override def onFailure(exchange: Exchange): Unit = callback(Left(exchange.getException)) diff --git a/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala b/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala index a51e688..05c5f9b 100644 --- a/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala +++ b/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala @@ -57,17 +57,17 @@ class DslSpec extends WordSpec with Matchers with BeforeAndAfterAll { "receive" must { "consume from an endpoint" in { 1 to 3 foreach { i => producerTemplate.sendBody("seda:q1", i) } - receiveBody[Int]("seda:q1").take(3).runLog.unsafeRun should be(Seq(1, 2, 3)) + receiveBody[Int]("seda:q1").take(3).runLog.unsafeRunSync() should be(Seq(1, 2, 3)) } "complete with an error if type conversion fails" in { producerTemplate.sendBody("seda:q2", "a") - intercept[TypeConversionException](receiveBody[Int]("seda:q2").run.unsafeRun) + intercept[TypeConversionException](receiveBody[Int]("seda:q2").run.unsafeRunSync()) } } "send" must { "send a message to an endpoint and continue with the sent message" in { - val result = Stream(1, 2, 3).send("seda:q3").take(3).runLog.unsafeRunAsyncFuture + val result = Stream(1, 2, 3).send("seda:q3").take(3).runLog.unsafeToFuture() 1 to 3 foreach { i => consumerTemplate.receiveBody("seda:q3") should be(i) } Await.result(result, 3.seconds) should be(Seq(1, 2, 3)) } @@ -75,13 +75,13 @@ class DslSpec extends WordSpec with Matchers with BeforeAndAfterAll { "sendRequest" must { "send a request message to an endpoint and continue with the response message" in { - Stream(1, 2, 3).sendRequest[Int]("bean:service?method=plusOne").runLog.unsafeRun should be(Seq(2, 3, 4)) + Stream(1, 2, 3).sendRequest[Int]("bean:service?method=plusOne").runLog.unsafeRunSync() should be(Seq(2, 3, 4)) } "convert response message types using a Camel type converter" in { - Stream(1, 2, 3).sendRequest[String]("bean:service?method=plusOne").runLog.unsafeRun should be(Seq("2", "3", "4")) + Stream(1, 2, 3).sendRequest[String]("bean:service?method=plusOne").runLog.unsafeRunSync() should be(Seq("2", "3", "4")) } "complete with an error if the request fails" in { - intercept[Exception](Stream(-1, 2, 3).sendRequest[Int]("bean:service?method=plusOne").run.unsafeRun).getMessage should be("test") + intercept[Exception](Stream(-1, 2, 3).sendRequest[Int]("bean:service?method=plusOne").run.unsafeRunSync()).getMessage should be("test") } } } diff --git a/streamz-converter/README.md b/streamz-converter/README.md index c546734..a3289e7 100644 --- a/streamz-converter/README.md +++ b/streamz-converter/README.md @@ -5,7 +5,7 @@ Stream converters convert Akka Stream `Source`s, `Flow`s and `Sink`s to FS2 `Str resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven" - libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.8.1" + libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.9-M1" artifact and can be imported with: @@ -33,9 +33,9 @@ implicit val materializer: ActorMaterializer = ActorMaterializer()(factory) |From |With |To | |----------------------------|-------------|-------------------| -|`Graph[SourceShape[A], M]` |`toStream()` |`Stream[Task, A]` | -|`Graph[SinkShape[A], M]` |`toSink()` |`Sink[Task, A]` | -|`Graph[FlowShape[A, B], M]` |`toPipe()` |`Pipe[Task, A, B]` | +|`Graph[SourceShape[A], M]` |`toStream()` |`Stream[IO, A]` | +|`Graph[SinkShape[A], M]` |`toSink()` |`Sink[IO, A]` | +|`Graph[FlowShape[A, B], M]` |`toPipe()` |`Pipe[IO, A, B]` | **Examples** ([source code](https://github.com/krasserm/streamz/blob/master/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala)): @@ -43,7 +43,8 @@ implicit val materializer: ActorMaterializer = ActorMaterializer()(factory) import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource } import akka.{ Done, NotUsed } -import fs2.{ Pipe, Sink, Stream, Task } +import cats.effect.IO +import fs2.{ Pipe, Sink, Stream } import scala.collection.immutable.Seq import scala.concurrent.Future @@ -52,20 +53,20 @@ val numbers: Seq[Int] = 1 to 10 def f(i: Int) = List(s"$i-1", s"$i-2") val aSink1: AkkaSink[Int, Future[Done]] = AkkaSink.foreach[Int](println) -val fSink1: Sink[Task, Int] = aSink1.toSink() +val fSink1: Sink[IO, Int] = aSink1.toSink() val aSource1: AkkaSource[Int, NotUsed] = AkkaSource(numbers) -val fStream1: Stream[Task, Int] = aSource1.toStream() +val fStream1: Stream[IO, Int] = aSource1.toStream() val aFlow1: AkkaFlow[Int, String, NotUsed] = AkkaFlow[Int].mapConcat(f) -val fPipe1: Pipe[Task, Int, String] = aFlow1.toPipe() +val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe() -fStream1.to(fSink1).run.unsafeRun() // prints numbers -assert(fStream1.runLog.unsafeRun() == numbers) -assert(fStream1.through(fPipe1).runLog.unsafeRun() == numbers.flatMap(f)) +fStream1.to(fSink1).run.unsafeRunSync() // prints numbers +assert(fStream1.runLog.unsafeRunSync() == numbers) +assert(fStream1.through(fPipe1).runLog.unsafeRunSync() == numbers.flatMap(f)) ``` -`aSink1`, `aSource1` and `aFlow1` are materialized when the `Task`s of the FS2 streams that compose `fSink1`, `fStream1` and `fPipe1` are run. Their materialized value can be obtained via the `onMaterialization` callback that is a parameter of `toStream(onMaterialization: M => Unit)`, `toSink(onMaterialization: M => Unit)` and `toPipe(onMaterialization: M => Unit)` (not shown in the examples). +`aSink1`, `aSource1` and `aFlow1` are materialized when the `IO`s of the FS2 streams that compose `fSink1`, `fStream1` and `fPipe1` are run. Their materialized value can be obtained via the `onMaterialization` callback that is a parameter of `toStream(onMaterialization: M => Unit)`, `toSink(onMaterialization: M => Unit)` and `toPipe(onMaterialization: M => Unit)` (not shown in the examples). ### Conversions from FS2 to Akka Stream @@ -83,7 +84,7 @@ assert(fStream1.through(fPipe1).runLog.unsafeRun() == numbers.flatMap(f)) import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource, Keep } import akka.{ Done, NotUsed } -import fs2.{ Pipe, Pure, Sink, Stream, pipe } +import fs2.{ Pipe, Pure, Sink, Stream } import scala.collection.immutable.Seq import scala.concurrent.{ Await, Future } @@ -92,13 +93,13 @@ import scala.concurrent.duration._ val numbers: Seq[Int] = 1 to 10 def g(i: Int) = i + 10 -val fSink2: Sink[Pure, Int] = s => pipe.lift(g)(s).map(println) +val fSink2: Sink[Pure, Int] = s => s.map(g).map(println) val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toSink) val fStream2: Stream[Pure, Int] = Stream.emits(numbers) val aSource2: AkkaSource[Int, NotUsed] = AkkaSource.fromGraph(fStream2.toSource) -val fpipe2: Pipe[Pure, Int, Int] = pipe.lift[Pure, Int, Int](g) +val fpipe2: Pipe[Pure, Int, Int] = s => s.map(g) val aFlow2: AkkaFlow[Int, Int, NotUsed] = AkkaFlow.fromGraph(fpipe2.toFlow) aSource2.toMat(aSink2)(Keep.right).run() // prints numbers diff --git a/streamz-converter/src/main/scala/streamz/converter/Converter.scala b/streamz-converter/src/main/scala/streamz/converter/Converter.scala index 0cc7070..5d4d1df 100644 --- a/streamz-converter/src/main/scala/streamz/converter/Converter.scala +++ b/streamz-converter/src/main/scala/streamz/converter/Converter.scala @@ -16,17 +16,17 @@ package streamz.converter -import akka.{ Done, NotUsed } import akka.actor._ import akka.stream._ -import akka.stream.scaladsl.{ Source => AkkaSource, Flow => AkkaFlow, Sink => AkkaSink, _ } - +import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource, _ } +import akka.{ Done, NotUsed } +import cats.effect.IO +import cats.implicits._ import fs2._ - import streamz.converter.AkkaStreamPublisher._ import streamz.converter.AkkaStreamSubscriber._ -import scala.concurrent.{ Future, ExecutionContext } +import scala.concurrent.{ ExecutionContext, Future } object Converter { type Callback[A] = Either[Throwable, A] => Unit @@ -37,55 +37,55 @@ trait Converter { /** * Converts an Akka Stream [[Graph]] of [[SourceShape]] to an FS2 [[Stream]]. The [[Graph]] is materialized when - * the [[Stream]]'s [[Task]] in run. The materialized value can be obtained with the `onMaterialization` callback. + * the [[Stream]]'s [[IO]] in run. The materialized value can be obtained with the `onMaterialization` callback. */ - def akkaSourceToFs2Stream[A, M](source: Graph[SourceShape[A], M])(onMaterialization: M => Unit)(implicit executionContext: ExecutionContext, materializer: Materializer): Stream[Task, A] = + def akkaSourceToFs2Stream[A, M](source: Graph[SourceShape[A], M])(onMaterialization: M => Unit)(implicit executionContext: ExecutionContext, materializer: Materializer): Stream[IO, A] = Stream.bracket( - Task.delay { + IO { val (mat, subscriber) = AkkaSource.fromGraph(source).toMat(AkkaSink.actorSubscriber[A](AkkaStreamSubscriber.props[A]))(Keep.both).run() onMaterialization(mat) subscriber } - )(subscriberStream[A], _ => Task.now(())) + )(subscriberStream[A], _ => IO.pure(())) /** * Converts an Akka Stream [[Graph]] of [[SinkShape]] to an FS2 [[Sink]]. The [[Graph]] is materialized when - * the [[Sink]]'s [[Task]] in run. The materialized value can be obtained with the `onMaterialization` callback. + * the [[Sink]]'s [[IO]] in run. The materialized value can be obtained with the `onMaterialization` callback. */ - def akkaSinkToFs2Sink[A, M](sink: Graph[SinkShape[A], M])(onMaterialization: M => Unit)(implicit executionContext: ExecutionContext, materializer: Materializer): Sink[Task, A] = { s => + def akkaSinkToFs2Sink[A, M](sink: Graph[SinkShape[A], M])(onMaterialization: M => Unit)(implicit executionContext: ExecutionContext, materializer: Materializer): Sink[IO, A] = { s => Stream.bracket( - Task.delay { + IO { val (publisher, mat) = AkkaSource.actorPublisher[A](AkkaStreamPublisher.props[A]).toMat(sink)(Keep.both).run() onMaterialization(mat) publisher } - )(publisherStream[A](_, s), _ => Task.now(())) + )(publisherStream[A](_, s), _ => IO.pure(())) } /** * Converts an Akka Stream [[Graph]] of [[FlowShape]] to an FS2 [[Pipe]]. The [[Graph]] is materialized when - * the [[Pipe]]'s [[Task]] in run. The materialized value can be obtained with the `onMaterialization` callback. + * the [[Pipe]]'s [[IO]] in run. The materialized value can be obtained with the `onMaterialization` callback. */ - def akkaFlowToFs2Pipe[A, B, M](flow: Graph[FlowShape[A, B], M])(onMaterialization: M => Unit)(implicit executionContext: ExecutionContext, materializer: Materializer): Pipe[Task, A, B] = { s => + def akkaFlowToFs2Pipe[A, B, M](flow: Graph[FlowShape[A, B], M])(onMaterialization: M => Unit)(implicit executionContext: ExecutionContext, materializer: Materializer): Pipe[IO, A, B] = { s => Stream.bracket( - Task.delay { + IO { val src = AkkaSource.actorPublisher[A](AkkaStreamPublisher.props[A]) val snk = AkkaSink.actorSubscriber[B](AkkaStreamSubscriber.props[B]) val ((publisher, mat), subscriber) = src.viaMat(flow)(Keep.both).toMat(snk)(Keep.both).run() onMaterialization(mat) (publisher, subscriber) } - )(ps => transformerStream[A, B](ps._2, ps._1, s), _ => Task.now(())) + )(ps => transformerStream[A, B](ps._2, ps._1, s), _ => IO.pure(())) } /** * Converts an FS2 [[Stream]] to an Akka Stream [[Graph]] of [[SourceShape]]. The [[Stream]] is run when the * [[Graph]] is materialized. */ - def fs2StreamToAkkaSource[A](stream: Stream[Task, A])(implicit executionContext: ExecutionContext): Graph[SourceShape[A], NotUsed] = { + def fs2StreamToAkkaSource[A](stream: Stream[IO, A])(implicit executionContext: ExecutionContext): Graph[SourceShape[A], NotUsed] = { val source = AkkaSource.actorPublisher(AkkaStreamPublisher.props[A]) // A sink that runs an FS2 publisherStream when consuming the publisher actor (= materialized value) of source - val sink = AkkaSink.foreach[ActorRef](publisherStream[A](_, stream).run.unsafeRunAsyncFuture()) + val sink = AkkaSink.foreach[ActorRef](publisherStream[A](_, stream).run.unsafeToFuture()) AkkaSource.fromGraph(GraphDSL.create(source) { implicit builder => source => import GraphDSL.Implicits._ @@ -98,13 +98,13 @@ trait Converter { * Converts an FS2 [[Sink]] to an Akka Stream [[Graph]] of [[SinkShape]]. The [[Sink]] is run when the * [[Graph]] is materialized. */ - def fs2SinkToAkkaSink[A](sink: Sink[Task, A])(implicit executionContext: ExecutionContext): Graph[SinkShape[A], Future[Done]] = { + def fs2SinkToAkkaSink[A](sink: Sink[IO, A])(implicit executionContext: ExecutionContext): Graph[SinkShape[A], Future[Done]] = { val sink1: AkkaSink[A, ActorRef] = AkkaSink.actorSubscriber(AkkaStreamSubscriber.props[A]) // A sink that runs an FS2 subscriberStream when consuming the subscriber actor (= materialized value) of sink1. - // The future returned from unsafeRunAsyncFuture() completes when the subscriber stream completes and is made + // The future returned from unsafeToFuture() completes when the subscriber stream completes and is made // available as materialized value of this sink. val sink2: AkkaSink[ActorRef, Future[Done]] = AkkaFlow[ActorRef] - .map(subscriberStream[A](_).to(sink).run.unsafeRunAsyncFuture()) + .map(subscriberStream[A](_).to(sink).run.unsafeToFuture()) .toMat(AkkaSink.head)(Keep.right).mapMaterializedValue(_.flatMap(_.map(_ => Done))) AkkaSink.fromGraph(GraphDSL.create(sink1, sink2)(Keep.both) { implicit builder => (sink1, sink2) => @@ -118,12 +118,12 @@ trait Converter { * Converts an FS2 [[Pipe]] to an Akka Stream [[Graph]] of [[FlowShape]]. The [[Pipe]] is run when the * [[Graph]] is materialized. */ - def fs2PipeToAkkaFlow[A, B](pipe: Pipe[Task, A, B])(implicit executionContext: ExecutionContext): Graph[FlowShape[A, B], NotUsed] = { + def fs2PipeToAkkaFlow[A, B](pipe: Pipe[IO, A, B])(implicit executionContext: ExecutionContext): Graph[FlowShape[A, B], NotUsed] = { val source = AkkaSource.actorPublisher(AkkaStreamPublisher.props[B]) val sink1: AkkaSink[A, ActorRef] = AkkaSink.actorSubscriber(AkkaStreamSubscriber.props[A]) // A sink that runs an FS2 transformerStream when consuming the publisher actor (= materialized value) of source // and the subscriber actor (= materialized value) of sink1 - val sink2 = AkkaSink.foreach[(ActorRef, ActorRef)](ps => transformerStream(ps._2, ps._1, pipe).run.unsafeRunAsyncFuture()) + val sink2 = AkkaSink.foreach[(ActorRef, ActorRef)](ps => transformerStream(ps._2, ps._1, pipe).run.unsafeToFuture()) AkkaFlow.fromGraph(GraphDSL.create(source, sink1)(Keep.both) { implicit builder => (source, sink1) => import GraphDSL.Implicits._ @@ -132,57 +132,54 @@ trait Converter { }).mapMaterializedValue(_ => NotUsed) } - private def subscriberStream[A](subscriber: ActorRef)(implicit executionContext: ExecutionContext): Stream[Task, A] = { - val subscriberTask = Task.async((callback: Callback[Option[A]]) => subscriber ! Request(callback)) - Stream.repeatEval(subscriberTask).through(pipe.unNoneTerminate) + private def subscriberStream[A](subscriber: ActorRef)(implicit executionContext: ExecutionContext): Stream[IO, A] = { + val subscriberIO = IO.shift >> IO.async((callback: Callback[Option[A]]) => subscriber ! Request(callback)) + Stream.repeatEval(subscriberIO).unNoneTerminate .onFinalize { - Task.delay { + IO { subscriber ! PoisonPill } } } - private def publisherStream[A](publisher: ActorRef, stream: Stream[Task, A])(implicit executionContext: ExecutionContext): Stream[Task, Unit] = { - def publisherTask(i: A): Task[Option[Unit]] = Task.async((callback: Callback[Option[Unit]]) => publisher ! Next(i, callback))(strategy) - stream.flatMap(i => Stream.eval(publisherTask(i))).through(pipe.unNoneTerminate) + private def publisherStream[A](publisher: ActorRef, stream: Stream[IO, A])(implicit executionContext: ExecutionContext): Stream[IO, Unit] = { + def publisherIO(i: A): IO[Option[Unit]] = IO.shift >> IO.async((callback: Callback[Option[Unit]]) => publisher ! Next(i, callback)) + stream.flatMap(i => Stream.eval(publisherIO(i))).unNoneTerminate .onError { ex => publisher ! Error(ex) Stream.fail(ex) } .onFinalize { - Task.delay { + IO { publisher ! Complete publisher ! PoisonPill } } } - private def transformerStream[A, B](subscriber: ActorRef, publisher: ActorRef, stream: Stream[Task, A])(implicit executionContext: ExecutionContext): Stream[Task, B] = + private def transformerStream[A, B](subscriber: ActorRef, publisher: ActorRef, stream: Stream[IO, A])(implicit executionContext: ExecutionContext): Stream[IO, B] = publisherStream[A](publisher, stream).either(subscriberStream[B](subscriber)).collect { case Right(elem) => elem } - private def transformerStream[A, B](subscriber: ActorRef, publisher: ActorRef, pipe: Pipe[Task, A, B])(implicit executionContext: ExecutionContext): Stream[Task, Unit] = + private def transformerStream[A, B](subscriber: ActorRef, publisher: ActorRef, pipe: Pipe[IO, A, B])(implicit executionContext: ExecutionContext): Stream[IO, Unit] = subscriberStream[A](subscriber).through(pipe).to(s => publisherStream(publisher, s)) - - private implicit def strategy(implicit executionContext: ExecutionContext): Strategy = - Strategy.fromExecutionContext(executionContext) } trait ConverterDsl extends Converter { implicit class AkkaSourceDsl[A, M](source: Graph[SourceShape[A], M]) { /** @see [[Converter#akkaSourceToFs2Stream]] */ - def toStream(onMaterialization: M => Unit = _ => ())(implicit executionContext: ExecutionContext, materializer: Materializer): Stream[Task, A] = + def toStream(onMaterialization: M => Unit = _ => ())(implicit executionContext: ExecutionContext, materializer: Materializer): Stream[IO, A] = akkaSourceToFs2Stream(source)(onMaterialization) } implicit class AkkaSinkDsl[A, M](sink: Graph[SinkShape[A], M]) { /** @see [[Converter#akkaSinkToFs2Sink]] */ - def toSink(onMaterialization: M => Unit = _ => ())(implicit executionContext: ExecutionContext, materializer: Materializer): Sink[Task, A] = + def toSink(onMaterialization: M => Unit = _ => ())(implicit executionContext: ExecutionContext, materializer: Materializer): Sink[IO, A] = akkaSinkToFs2Sink(sink)(onMaterialization) } implicit class AkkaFlowDsl[A, B, M](flow: Graph[FlowShape[A, B], M]) { /** @see [[Converter#akkaFlowToFs2Pipe]] */ - def toPipe(onMaterialization: M => Unit = _ => ())(implicit executionContext: ExecutionContext, materializer: Materializer): Pipe[Task, A, B] = + def toPipe(onMaterialization: M => Unit = _ => ())(implicit executionContext: ExecutionContext, materializer: Materializer): Pipe[IO, A, B] = akkaFlowToFs2Pipe(flow)(onMaterialization) } @@ -198,7 +195,7 @@ trait ConverterDsl extends Converter { fs2StreamToAkkaSource(stream) } - implicit class FS2StreamTaskDsl[A](stream: Stream[Task, A])(implicit executionContext: ExecutionContext) { + implicit class FS2StreamIODsl[A](stream: Stream[IO, A])(implicit executionContext: ExecutionContext) { /** @see [[Converter#fs2StreamToAkkaSource]] */ def toSource(implicit executionContext: ExecutionContext): Graph[SourceShape[A], NotUsed] = fs2StreamToAkkaSource(stream) @@ -210,7 +207,7 @@ trait ConverterDsl extends Converter { fs2SinkToAkkaSink(sink) } - implicit class FS2SinkTaskDsl[A](sink: Sink[Task, A])(implicit executionContext: ExecutionContext) { + implicit class FS2SinkIODsl[A](sink: Sink[IO, A])(implicit executionContext: ExecutionContext) { /** @see [[Converter#fs2SinkToAkkaSink]] */ def toSink(implicit executionContext: ExecutionContext): Graph[SinkShape[A], Future[Done]] = fs2SinkToAkkaSink(sink) @@ -222,7 +219,7 @@ trait ConverterDsl extends Converter { fs2PipeToAkkaFlow(pipe) } - implicit class FS2PipeTaskDsl[A, B](pipe: Pipe[Task, A, B])(implicit executionContext: ExecutionContext) { + implicit class FS2PipeIODsl[A, B](pipe: Pipe[IO, A, B])(implicit executionContext: ExecutionContext) { /** @see [[Converter#fs2PipeToAkkaFlow]] */ def toFlow(implicit executionContext: ExecutionContext): Graph[FlowShape[A, B], NotUsed] = fs2PipeToAkkaFlow(pipe) diff --git a/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala b/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala index 1e00dd1..fec1b44 100644 --- a/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala +++ b/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala @@ -21,9 +21,8 @@ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource, _ } import akka.testkit._ - +import cats.effect.IO import fs2._ - import org.scalatest._ import scala.collection.immutable.Seq @@ -63,20 +62,20 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val source = AkkaSource(numbers) val stream = source.toStream() - stream.runLog.unsafeRun() should be(numbers) + stream.runLog.unsafeRunSync() should be(numbers) } "propagate errors from source to stream" in { val source = AkkaSource(numbers) ++ AkkaSource.failed(error) val stream = source.toStream() - expectError(stream.run.unsafeRun()) + expectError(stream.run.unsafeRunSync()) } "propagate cancellation from stream to source (on stream completion)" in { val probe = TestProbe() val source = AkkaSource(numbers).watchTermination()(Keep.right) val stream = source.toStream(mat => mat.onComplete(probe.ref ! _)).take(3) - stream.runLog.unsafeRun() should be(numbers.take(3)) + stream.runLog.unsafeRunSync() should be(numbers.take(3)) probe.expectMsg(Success(Done)) } "propagate cancellation from stream to source (on stream error)" in { @@ -84,7 +83,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val source = AkkaSource(numbers).watchTermination()(Keep.right) val stream = source.toStream(mat => mat.onComplete(probe.ref ! _)) ++ Stream.fail(error) - expectError(stream.run.unsafeRun()) + expectError(stream.run.unsafeRunSync()) probe.expectMsg(Success(Done)) } } @@ -98,7 +97,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val akkaSink = AkkaSink.seq[Int] val fs2Sink = akkaSink.toSink(mat => mat.onComplete(probe.ref ! _)) - Stream.emits(numbers).to(fs2Sink).run.unsafeRun() + Stream.emits(numbers).covary[IO].to(fs2Sink).run.unsafeRunSync() probe.expectMsg(Success(numbers)) } "propagate errors from FS2 sink to AS sink" in { @@ -106,7 +105,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val akkaSink = AkkaSink.seq[Int] val fs2Sink = akkaSink.toSink(mat => mat.onComplete(probe.ref ! _)) - expectError(Stream.fail(error).to(fs2Sink).run.unsafeRun()) + expectError(Stream.fail(error).covary[IO].to(fs2Sink).run.unsafeRunSync()) probe.expectMsg(Failure(error)) } "propagate cancellation from AS sink to FS2 sink (on AS sink completion)" in { @@ -114,7 +113,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val akkaSink = AkkaFlow[Int].take(3).toMat(AkkaSink.seq)(Keep.right) val fs2Sink = akkaSink.toSink(mat => mat.onComplete(probe.ref ! _)) - Stream.emits(numbers).to(fs2Sink).run.unsafeRun() + Stream.emits(numbers).covary[IO].to(fs2Sink).run.unsafeRunSync() probe.expectMsg(Success(numbers.take(3))) } "propagate cancellation from AS sink to FS2 sink (on AS sink error)" in { @@ -122,7 +121,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val akkaSink = AkkaSink.foreach[Int](_ => throw error) val fs2Sink = akkaSink.toSink(mat => mat.onComplete(probe.ref ! _)) - Stream.emits(numbers).to(fs2Sink).run.unsafeRun() + Stream.emits(numbers).covary[IO].to(fs2Sink).run.unsafeRunSync() probe.expectMsg(Failure(error)) } } @@ -135,7 +134,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val flow = AkkaFlow[Int].map(_ + 1) val pipe = flow.toPipe() - Stream.emits(numbers).through(pipe).runLog.unsafeRun() should be(numbers.map(_ + 1)) + Stream.emits(numbers).covary[IO].through(pipe).runLog.unsafeRunSync() should be(numbers.map(_ + 1)) } "propagate processing from pipe to flow (m:n)" in { def logic(i: Int): Seq[Int] = i match { @@ -146,14 +145,14 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val flow = AkkaFlow[Int].mapConcat(logic) val pipe = flow.toPipe() - Stream.emits(numbers).through(pipe).runLog.unsafeRun() should be(numbers.flatMap(logic)) + Stream.emits(numbers).covary[IO].through(pipe).runLog.unsafeRunSync() should be(numbers.flatMap(logic)) } "propagate errors from pipe to flow" in { val probe = TestProbe() val flow = AkkaFlow[Int].map(_ + 1).recover { case e: Exception if e.getMessage == error.getMessage => probe.ref ! Failure(error) } val pipe = flow.toPipe() - expectError(Stream.fail(error).through(pipe).run.unsafeRun()) + expectError(Stream.fail(error).covary[IO].through(pipe).run.unsafeRunSync()) probe.expectMsg(Failure(error)) } "propagate errors from flow to pipe" in { @@ -164,7 +163,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with val flow = AkkaFlow[Int].mapConcat(logic) val pipe = flow.toPipe() - expectError(Stream.emits(numbers).through(pipe).run.unsafeRun()) + expectError(Stream.emits(numbers).covary[IO].through(pipe).run.unsafeRunSync()) } } @@ -174,7 +173,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with // "propagate elements and completion from stream to source" in { val probe = TestProbe() - val stream = Stream.emits(numbers).onFinalize[Task](Task.delay { probe.ref ! Success(Done); Stream.empty }) + val stream = Stream.emits(numbers).onFinalize[IO](IO { probe.ref ! Success(Done); Stream.empty }) val source = AkkaSource.fromGraph(stream.toSource) source.toMat(AkkaSink.seq)(Keep.right).run.await should be(numbers) @@ -189,7 +188,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with } "propagate cancellation from source to stream (on source completion)" in { val probe = TestProbe() - val stream = Stream.emits(numbers).onFinalize[Task](Task.delay { probe.ref ! Success(Done); Stream.empty }) + val stream = Stream.emits(numbers).onFinalize[IO](IO { probe.ref ! Success(Done); Stream.empty }) val source = AkkaSource.fromGraph(stream.toSource) source.via(AkkaFlow[Int].take(3)).toMat(AkkaSink.seq)(Keep.right).run.await should be(numbers.take(3)) @@ -197,7 +196,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with } "propagate cancellation from source to stream (on source error)" in { val probe = TestProbe() - val stream = Stream.emits(numbers).onFinalize[Task](Task.delay { probe.ref ! Success(Done); Stream.empty }) + val stream = Stream.emits(numbers).onFinalize[IO](IO { probe.ref ! Success(Done); Stream.empty }) val source = AkkaSource.fromGraph(stream.toSource) expectError(source.toMat(AkkaSink.foreach(_ => throw error))(Keep.right).run.await) @@ -210,10 +209,10 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with // AS Sink (extern) -> FS2 Sink (intern) // - def seqSink(probe: TestProbe): Sink[Task, Int] = + def seqSink(probe: TestProbe): Sink[IO, Int] = s => s.fold(Seq.empty[Int])(_ :+ _).map(probe.ref ! Success(_)) .onError(err => { probe.ref ! Failure(err); Stream.fail(err) }) - .onFinalize(Task.delay { probe.ref ! Success(Done); Stream.empty }) + .onFinalize(IO { probe.ref ! Success(Done); Stream.empty }) "propagate elements and completion from AS sink to FS2 sink" in { val probe = TestProbe() @@ -234,7 +233,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with } "propagate cancellation from FS2 sink to AS sink (on FS2 sink completion)" in { val probe = TestProbe() - val fs2Sink: Sink[Task, Int] = s => seqSink(probe)(s.take(3)) + val fs2Sink: Sink[IO, Int] = s => seqSink(probe)(s.take(3)) val akkaSink = AkkaSink.fromGraph(fs2Sink.toSink) AkkaSource(numbers).toMat(akkaSink)(Keep.right).run.await @@ -243,7 +242,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with } "propagate cancellation from FS2 sink to AS sink (on FS2 sink error)" in { val probe = TestProbe() - val fs2Sink: Sink[Task, Int] = s => seqSink(probe)(s ++ Stream.fail(error)) + val fs2Sink: Sink[IO, Int] = s => seqSink(probe)(s ++ Stream.fail(error)) val akkaSink = AkkaSink.fromGraph(fs2Sink.toSink) expectError(AkkaSource(numbers).toMat(akkaSink)(Keep.right).run.await) @@ -256,7 +255,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with // AS Flow (intern) <-> FS2 Pipe (extern) // "propagate processing from flow to pipe (1:1)" in { - val pip: Pipe[Pure, Int, Int] = pipe.lift(_ + 1) + val pip: Pipe[Pure, Int, Int] = s => s.map(_ + 1) val flow = AkkaFlow.fromGraph(pip.toFlow) AkkaSource(numbers).via(flow).toMat(AkkaSink.seq[Int])(Keep.right).run.await should be(numbers.map(_ + 1)) diff --git a/streamz-examples/README.md b/streamz-examples/README.md index 7b58795..f010968 100644 --- a/streamz-examples/README.md +++ b/streamz-examples/README.md @@ -179,34 +179,34 @@ object Example extends ExampleContext with App { ### Camel DSL for FS2 -Here, we re-use `ExampleService` and `ExampleContent` from the previous section. +Here, we re-use `ExampleService` and `ExampleContext` from the previous section. ```scala -import fs2.{ Strategy, Stream, Task, text } +import cats.effect.IO +import fs2.{ Stream, text } import streamz.camel.fs2.dsl._ import streamz.examples.camel.ExampleContext object Example extends ExampleContext with App { - implicit val strategy: Strategy = - Strategy.fromExecutionContext(scala.concurrent.ExecutionContext.global) // needed for merge + import scala.concurrent.ExecutionContext.Implicits.global // needed for merge - val tcpLineStream: Stream[Task, String] = + val tcpLineStream: Stream[IO, String] = receiveBody[String](tcpEndpointUri) - val fileLineStream: Stream[Task, String] = + val fileLineStream: Stream[IO, String] = receiveBody[String](fileEndpointUri).through(text.lines) - val linePrefixStream: Stream[Task, String] = + val linePrefixStream: Stream[IO, String] = Stream.iterate(1)(_ + 1).sendRequest[String](serviceEndpointUri) - val stream: Stream[Task, String] = + val stream: Stream[IO, String] = tcpLineStream .merge(fileLineStream) .zipWith(linePrefixStream)((l, n) => n concat l) .send(printerEndpointUri) - stream.run.unsafeRun + stream.run.unsafeRunSync() } ``` diff --git a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala index 841d950..5bd0f02 100644 --- a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala +++ b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala @@ -16,28 +16,28 @@ package streamz.examples.camel.fs2 -import fs2.{ Strategy, Stream, Task, text } +import cats.effect.IO +import fs2.{ Stream, text } import streamz.camel.fs2.dsl._ import streamz.examples.camel.ExampleContext object Example extends ExampleContext with App { - implicit val strategy: Strategy = - Strategy.fromExecutionContext(scala.concurrent.ExecutionContext.global) // needed for merge + import scala.concurrent.ExecutionContext.Implicits.global // needed for merge - val tcpLineStream: Stream[Task, String] = + val tcpLineStream: Stream[IO, String] = receiveBody[String](tcpEndpointUri) - val fileLineStream: Stream[Task, String] = + val fileLineStream: Stream[IO, String] = receiveBody[String](fileEndpointUri).through(text.lines) - val linePrefixStream: Stream[Task, String] = + val linePrefixStream: Stream[IO, String] = Stream.iterate(1)(_ + 1).sendRequest[String](serviceEndpointUri) - val stream: Stream[Task, String] = + val stream: Stream[IO, String] = tcpLineStream .merge(fileLineStream) .zipWith(linePrefixStream)((l, n) => n concat l) .send(printerEndpointUri) - stream.run.unsafeRun + stream.run.unsafeRunSync() } diff --git a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala index ab9cbcf..156ccf3 100644 --- a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala +++ b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala @@ -16,15 +16,15 @@ package streamz.examples.camel.fs2 -import fs2.{ Stream, Task } - -import streamz.camel.{ StreamMessage, StreamContext } +import cats.effect.IO +import fs2.Stream +import streamz.camel.{ StreamContext, StreamMessage } import streamz.camel.fs2.dsl._ object Snippets { implicit val context = StreamContext() - val s: Stream[Task, StreamMessage[Int]] = + val s: Stream[IO, StreamMessage[Int]] = // receive stream message from endpoint receive[String]("seda:q1") // in-only message exchange with endpoint and continue stream with in-message @@ -32,17 +32,17 @@ object Snippets { // in-out message exchange with endpoint and continue stream with out-message .sendRequest[Int]("bean:service?method=weight") - // create task from stream - val t: Task[Unit] = s.run + // create IO from stream + val t: IO[Unit] = s.run - // run task (side effects only here) ... - t.unsafeRun + // run IO (side effects only here) ... + t.unsafeRunSync() - val s1: Stream[Task, StreamMessage[String]] = receive[String]("seda:q1") - val s2: Stream[Task, StreamMessage[String]] = s1.send("seda:q2") - val s3: Stream[Task, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight") + val s1: Stream[IO, StreamMessage[String]] = receive[String]("seda:q1") + val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2") + val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight") - val s1b: Stream[Task, String] = receiveBody[String]("seda:q1") - val s2b: Stream[Task, String] = s1b.send("seda:q2") - val s3b: Stream[Task, Int] = s2b.sendRequest[Int]("bean:service?method=weight") + val s1b: Stream[IO, String] = receiveBody[String]("seda:q1") + val s2b: Stream[IO, String] = s1b.send("seda:q2") + val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight") } diff --git a/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala b/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala index 66cbedb..74fb128 100644 --- a/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala +++ b/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala @@ -16,13 +16,12 @@ package streamz.examples.converter -import akka.{ Done, NotUsed } import akka.actor.{ ActorRefFactory, ActorSystem } import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ Flow => AkkaFlow, Keep, Sink => AkkaSink, Source => AkkaSource } - -import fs2.{ Pipe, Pure, Sink, Stream, Task, pipe } - +import akka.stream.scaladsl.{ Keep, Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource } +import akka.{ Done, NotUsed } +import cats.effect.IO +import fs2.{ Pipe, Pure, Sink, Stream } import streamz.converter._ import scala.collection.immutable.Seq @@ -45,17 +44,17 @@ object Example extends App { def f(i: Int) = List(s"$i-1", s"$i-2") val aSink1: AkkaSink[Int, Future[Done]] = AkkaSink.foreach[Int](println) - val fSink1: Sink[Task, Int] = aSink1.toSink() + val fSink1: Sink[IO, Int] = aSink1.toSink() val aSource1: AkkaSource[Int, NotUsed] = AkkaSource(numbers) - val fStream1: Stream[Task, Int] = aSource1.toStream() + val fStream1: Stream[IO, Int] = aSource1.toStream() val aFlow1: AkkaFlow[Int, String, NotUsed] = AkkaFlow[Int].mapConcat(f) - val fPipe1: Pipe[Task, Int, String] = aFlow1.toPipe() + val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe() - fStream1.to(fSink1).run.unsafeRun() // prints numbers - assert(fStream1.runLog.unsafeRun() == numbers) - assert(fStream1.through(fPipe1).runLog.unsafeRun() == numbers.flatMap(f)) + fStream1.to(fSink1).run.unsafeRunSync() // prints numbers + assert(fStream1.runLog.unsafeRunSync() == numbers) + assert(fStream1.through(fPipe1).runLog.unsafeRunSync() == numbers.flatMap(f)) // -------------------------------- // FS2 to Akka Stream conversions @@ -63,13 +62,13 @@ object Example extends App { def g(i: Int) = i + 10 - val fSink2: Sink[Pure, Int] = s => pipe.lift(g)(s).map(println) + val fSink2: Sink[Pure, Int] = s => s.map(g).map(println) val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toSink) val fStream2: Stream[Pure, Int] = Stream.emits(numbers) val aSource2: AkkaSource[Int, NotUsed] = AkkaSource.fromGraph(fStream2.toSource) - val fpipe2: Pipe[Pure, Int, Int] = pipe.lift[Pure, Int, Int](g) + val fpipe2: Pipe[Pure, Int, Int] = s => s.map(g) val aFlow2: AkkaFlow[Int, Int, NotUsed] = AkkaFlow.fromGraph(fpipe2.toFlow) aSource2.toMat(aSink2)(Keep.right).run() // prints numbers