Skip to content

Commit

Permalink
Restructure akka-streams to fs2 DSL to prevent dropping Mat value (#74)
Browse files Browse the repository at this point in the history
Fix #54
  • Loading branch information
milanvdm authored Apr 20, 2020
1 parent 8639ef6 commit ac5d75d
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 108 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,20 @@ Streamz artifacts are available for Scala 2.12 and 2.13 at:
Documentation
-------------

### Streamz 0.10-M2
### Streamz 0.11-RC1

- [Camel DSL for Akka Streams](streamz-camel-akka/README.md)
- [Camel DSL for FS2](streamz-camel-fs2/README.md)
- [Stream converters](streamz-converter/README.md)
- [Example application](streamz-examples/README.md)

### Streamz 0.10-M2

- [Camel DSL for Akka Streams](https://github.com/krasserm/streamz/blob/v-0.10-M2/streamz-camel-akka/README.md)
- [Camel DSL for FS2](https://github.com/krasserm/streamz/blob/v-0.10-M2/streamz-camel-fs2/README.md)
- [Stream converters](https://github.com/krasserm/streamz/blob/v-0.10-M2/streamz-converter/README.md)
- [Example application](https://github.com/krasserm/streamz/blob/v-0.10-M2/streamz-examples/README.md)

### Streamz 0.9.1

- [Camel DSL for Akka Streams](https://github.com/krasserm/streamz/blob/v-0.9.1/streamz-camel-akka/README.md)
Expand Down
38 changes: 21 additions & 17 deletions streamz-converter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Stream converters convert Akka Stream `Source`s, `Flow`s and `Sink`s to FS2 `Str

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.10-M2"
libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.11-RC1"

artifact and can be imported with:

Expand All @@ -16,26 +16,30 @@ import streamz.converter._
They require the following `implicit`s in scope:

```scala
import akka.actor.ActorRefFactory
import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import akka.stream.Materializer

import scala.concurrent.ExecutionContext

val factory: ActorRefFactory = ???
implicit val system: ActorSystem = ActorSystem("example")

implicit val executionContext: ExecutionContext = factory.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()(factory)
implicit val executionContext: ExecutionContext = system.dispatcher
implicit val materializer: Materializer = Materializer.createMaterializer(system)
```

### Conversions from Akka Stream to FS2

**Overview**:

|From |With |To |
|----------------------------|-------------|-------------------|
|`Graph[SourceShape[A], M]` |`toStream()` |`Stream[IO, A]` |
|`Graph[SinkShape[A], M]` |`toSink()` |`Sink[IO, A]` |
|`Graph[FlowShape[A, B], M]` |`toPipe()` |`Pipe[IO, A, B]` |
|From |With |To |
|------------------------------------|---------------------------|--------------------------------------|
|`Graph[SourceShape[A], NotUsed]` |`toStream[F]` |`Stream[F, A]` |
|`Graph[SourceShape[A], M]` |`toStreamMat[F]` |`F[(Stream[F, A], M)]` |
|`Graph[SinkShape[A], NotUsed]` |`toSink[F]` |`Sink[F, A]` |
|`Graph[SinkShape[A], M]` |`toSinkMat[F]` |`F[(Sink[F, A], M)]` |
|`Graph[FlowShape[A, B], NotUsed]` |`toPipe[F]` |`Pipe[F, A, B]` |
|`Graph[FlowShape[A, B], M]` |`toPipeMat[F]` |`F[(Pipe[F, A, B], M)]` |
|`Graph[FlowShape[A, B], Future[M]]` |`toPipeMatWithResult[F]` |`F[Pipe[F, A, Either[Throwable, M]]]` |

**Examples** ([source code](https://github.com/krasserm/streamz/blob/master/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala)):

Expand All @@ -44,7 +48,7 @@ import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => Akka
import akka.{ Done, NotUsed }

import cats.effect.IO
import fs2.{ Pipe, Sink, Stream }
import fs2.{ Pipe, Stream }

import scala.collection.immutable.Seq
import scala.concurrent.Future
Expand All @@ -53,13 +57,13 @@ val numbers: Seq[Int] = 1 to 10
def f(i: Int) = List(s"$i-1", s"$i-2")

val aSink1: AkkaSink[Int, Future[Done]] = AkkaSink.foreach[Int](println)
val fSink1: Sink[IO, Int] = aSink1.toSink[IO]()
val fSink1: Pipe[IO, Int, Unit] = aSink1.toPipe[IO]

val aSource1: AkkaSource[Int, NotUsed] = AkkaSource(numbers)
val fStream1: Stream[IO, Int] = aSource1.toStream[IO]()
val fStream1: Stream[IO, Int] = aSource1.toStream[IO]

val aFlow1: AkkaFlow[Int, String, NotUsed] = AkkaFlow[Int].mapConcat(f)
val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe[IO]()
val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe[IO]

fStream1.to(fSink1).compile.drain.unsafeRunSync() // prints numbers
assert(fStream1.compile.toVector.unsafeRunSync() == numbers)
Expand Down Expand Up @@ -93,8 +97,8 @@ import scala.concurrent.duration._
val numbers: Seq[Int] = 1 to 10
def g(i: Int) = i + 10

val fSink2: Sink[Pure, Int] = s => s.map(g).map(println)
val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toSink)
val fSink2: Pipe[Pure, Int, Unit] = s => s.map(g).map(println)
val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toPipe)

val fStream2: Stream[Pure, Int] = Stream.emits(numbers)
val aSource2: AkkaSource[Int, NotUsed] = AkkaSource.fromGraph(fStream2.toSource)
Expand Down
Loading

0 comments on commit ac5d75d

Please sign in to comment.