diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d74eaed..1fa4d45 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,48 +32,21 @@ jobs: fetch-depth: 0 - name: Setup Java and Scala - uses: olafurpg/setup-scala@v5 + uses: olafurpg/setup-scala@v10 with: java-version: ${{ matrix.java }} - - name: Cache ivy2 - uses: actions/cache@v1 - with: - path: ~/.ivy2/cache - key: ${{ runner.os }}-sbt-ivy-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - - name: Cache coursier (generic) - uses: actions/cache@v1 - with: - path: ~/.coursier/cache/v1 - key: ${{ runner.os }}-generic-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - - name: Cache coursier (linux) - if: contains(runner.os, 'linux') - uses: actions/cache@v1 - with: - path: ~/.cache/coursier/v1 - key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - - name: Cache coursier (macOS) - if: contains(runner.os, 'macos') - uses: actions/cache@v1 - with: - path: ~/Library/Caches/Coursier/v1 - key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - - name: Cache coursier (windows) - if: contains(runner.os, 'windows') - uses: actions/cache@v1 - with: - path: ~/AppData/Local/Coursier/Cache/v1 - key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - name: Cache sbt - uses: actions/cache@v1 + uses: actions/cache@v2 with: - path: ~/.sbt - key: ${{ runner.os }}-sbt-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} + path: | + ~/.sbt + ~/.ivy2/cache + ~/.coursier/cache/v1 + ~/.cache/coursier/v1 + ~/AppData/Local/Coursier/Cache/v1 + ~/Library/Caches/Coursier/v1 + key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - name: Check that workflows are up to date run: sbt ++${{ matrix.scala }} githubWorkflowCheck @@ -107,48 +80,21 @@ jobs: fetch-depth: 0 - name: Setup Java and Scala - uses: olafurpg/setup-scala@v5 + uses: olafurpg/setup-scala@v10 with: java-version: ${{ matrix.java }} - - name: Cache ivy2 - uses: actions/cache@v1 - with: - path: ~/.ivy2/cache - key: ${{ runner.os }}-sbt-ivy-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - - name: Cache coursier (generic) - uses: actions/cache@v1 - with: - path: ~/.coursier/cache/v1 - key: ${{ runner.os }}-generic-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - - name: Cache coursier (linux) - if: contains(runner.os, 'linux') - uses: actions/cache@v1 - with: - path: ~/.cache/coursier/v1 - key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - - name: Cache coursier (macOS) - if: contains(runner.os, 'macos') - uses: actions/cache@v1 - with: - path: ~/Library/Caches/Coursier/v1 - key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - - name: Cache coursier (windows) - if: contains(runner.os, 'windows') - uses: actions/cache@v1 - with: - path: ~/AppData/Local/Coursier/Cache/v1 - key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - name: Cache sbt - uses: actions/cache@v1 - with: - path: ~/.sbt - key: ${{ runner.os }}-sbt-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} + uses: actions/cache@v2 + with: + path: | + ~/.sbt + ~/.ivy2/cache + ~/.coursier/cache/v1 + ~/.cache/coursier/v1 + ~/AppData/Local/Coursier/Cache/v1 + ~/Library/Caches/Coursier/v1 + key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - name: Download target directories (2.12.12) uses: actions/download-artifact@v2 diff --git a/project/Version.scala b/project/Version.scala index 276276b..278bceb 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -1,9 +1,9 @@ object Version { val Akka = "2.6.9" val Camel = "2.20.4" - val CatsEffect = "2.2.0" val Config = "1.4.0" - val Fs2 = "2.4.4" + val CatsEffect = "3.0.0-M3" + val Fs2 = "3.0.0-M3" val JUnitInterface = "0.11" val Log4j = "2.13.0" val ScalaCollectionCompat = "2.2.0" diff --git a/project/build.properties b/project/build.properties index 6db9842..947bdd3 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.4.0 +sbt.version=1.4.3 diff --git a/project/plugins.sbt b/project/plugins.sbt index 96aced6..21555a2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,9 +4,9 @@ addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0") addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.3") -addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.13") +addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.15") -addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.9.3") +addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.9.5") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.21") diff --git a/streamz-camel-fs2/README.md b/streamz-camel-fs2/README.md index d687686..0981cd1 100644 --- a/streamz-camel-fs2/README.md +++ b/streamz-camel-fs2/README.md @@ -2,7 +2,7 @@ Camel DSL for FS2 ----------------- [Apache Camel endpoints](http://camel.apache.org/components.html) can be integrated into [FS2](https://github.com/functional-streams-for-scala/fs2) applications with a [DSL](#dsl). - + ### Dependencies The DSL is provided by the `streamz-camel-fs2` artifact which is available for Scala 2.11 and 2.12: @@ -10,7 +10,7 @@ The DSL is provided by the `streamz-camel-fs2` artifact which is available for S resolvers += Resolver.bintrayRepo("krasserm", "maven") libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.10-M2" - + ### Configuration The consumer receive timeout on Camel endpoints defaults to 500 ms. If you need to change that, you can do so in `application.conf`: @@ -31,11 +31,11 @@ Its usage requires an implicit [`StreamContext`](http://krasserm.github.io/strea ```scala import streamz.camel.StreamContext -// contains an internally managed CamelContext +// contains an internally managed CamelContext implicit val streamContext: StreamContext = StreamContext() ``` -Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`: +Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`: ```scala import org.apache.camel.CamelContext @@ -49,7 +49,7 @@ implicit val streamContext: StreamContext = StreamContext(camelContext) ``` A `StreamContext` internally manages an `executorService` for running blocking endpoint operations. Applications can configure a custom executor service by providing an `executorServiceFactory` during `StreamContext` creation. See [API docs](http://krasserm.github.io/streamz/scala-2.12/unidoc/streamz/camel/StreamContext$.html) for details. -After usage, a `StreamContext` should be stopped with `streamContext.stop()`. +After usage, a `StreamContext` should be stopped with `streamContext.stop()`. #### Receiving in-only message exchanges from an endpoint @@ -58,8 +58,8 @@ An FS2 stream that emits messages consumed from a Camel endpoint can be created ```scala import cats.effect.IO import fs2.Stream -import streamz.camel.StreamContext -import streamz.camel.StreamMessage +import streamz.camel.StreamContext +import streamz.camel.StreamMessage import streamz.camel.fs2.dsl._ val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1") @@ -73,7 +73,7 @@ val s1b: Stream[IO, String] = receiveBody[IO, String]("seda:q1") This is equivalent to `receive[IO, String]("seda:q1").map(_.body)`. -`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html). +`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html). #### Receiving in-out message exchanges from an endpoint @@ -87,9 +87,7 @@ For sending a `StreamMessage` to a Camel endpoint, the `send` combinator should val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2") ``` -This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`. - -The `send` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`. +This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`. ```scala val s2b: Stream[IO, String] = s1b.send("seda:q2") @@ -105,9 +103,7 @@ For sending a request `StreamMessage` to an endpoint and obtaining a reply, the val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight") ``` -This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example). - -The `sendRequest` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`. +This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example). ```scala val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight") diff --git a/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala b/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala index 8cbdae8..006f4a4 100644 --- a/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala +++ b/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala @@ -18,8 +18,7 @@ package streamz.camel.fs2 import java.util.concurrent.TimeUnit -import cats.effect.{ Async, ContextShift } -import cats.implicits._ +import cats.effect.Async import fs2._ import org.apache.camel.spi.Synchronization import org.apache.camel.{ Exchange, ExchangePattern } @@ -33,7 +32,7 @@ package object dsl { /** * Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[F, StreamMessage[A]]`. */ - implicit class SendDsl[F[_]: ContextShift: Async, A](self: Stream[F, StreamMessage[A]]) { + implicit class SendDsl[F[_]: Async, A](self: Stream[F, StreamMessage[A]]) { /** * @see [[dsl.send]] */ @@ -50,7 +49,7 @@ package object dsl { /** * Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[F, A]`. */ - implicit class SendBodyDsl[F[_]: ContextShift: Async, A](self: Stream[F, A]) { + implicit class SendBodyDsl[F[_]: Async, A](self: Stream[F, A]) { /** * @see [[dsl.sendBody]] */ @@ -71,13 +70,13 @@ package object dsl { /** * @see [[dsl.send]] */ - def send[F[_]](uri: String)(implicit context: StreamContext, contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[A]] = + def send[F[_]](uri: String)(implicit context: StreamContext, async: Async[F]): Stream[F, StreamMessage[A]] = new SendDsl[F, A](self.covary[F]).send(uri) /** * @see [[dsl.sendRequest()]] */ - def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[B]] = + def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], async: Async[F]): Stream[F, StreamMessage[B]] = new SendDsl[F, A](self.covary[F]).sendRequest(uri) } @@ -88,13 +87,13 @@ package object dsl { /** * @see [[dsl.sendBody]] */ - def send[F[_]: ContextShift: Async](uri: String)(implicit context: StreamContext): Stream[F, A] = + def send[F[_]: Async](uri: String)(implicit context: StreamContext): Stream[F, A] = new SendBodyDsl[F, A](self.covary[F]).send(uri) /** * @see [[dsl.sendRequestBody]] */ - def sendRequest[F[_]: ContextShift: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] = + def sendRequest[F[_]: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] = new SendBodyDsl[F, A](self.covary[F]).sendRequest(uri) } @@ -110,7 +109,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws org.apache.camel.TypeConversionException if type conversion fails. */ - def receive[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = { + def receive[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = { consume(uri).filter(_ != null) } @@ -126,7 +125,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws org.apache.camel.TypeConversionException if type conversion fails. */ - def receiveBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] = + def receiveBody[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] = receive(uri).map(_.body) /** @@ -136,7 +135,7 @@ package object dsl { * * @param uri Camel endpoint URI. */ - def send[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] = + def send[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] = produce[F, A, A](uri, ExchangePattern.InOnly, (message, _) => message) /** @@ -146,7 +145,7 @@ package object dsl { * * @param uri Camel endpoint URI. */ - def sendBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] = + def sendBody[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] = s => s.map(StreamMessage(_)).through(send(uri)).map(_.body) /** @@ -158,7 +157,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws org.apache.camel.TypeConversionException if type conversion fails. */ - def sendRequest[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] = + def sendRequest[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] = produce[F, A, B](uri, ExchangePattern.InOut, (_, exchange) => StreamMessage.from[B](exchange.getOut)) /** @@ -170,13 +169,13 @@ package object dsl { * @param uri Camel endpoint URI. * @throws org.apache.camel.TypeConversionException if type conversion fails. */ - def sendRequestBody[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] = + def sendRequestBody[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] = s => s.map(StreamMessage(_)).through(sendRequest[F, A, B](uri)).map(_.body) - private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], contextShift: ContextShift[F], F: Async[F]): Stream[F, StreamMessage[A]] = { + private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], F: Async[F]): Stream[F, StreamMessage[A]] = { val timeout = context.config.getDuration("streamz.camel.consumer.receive.timeout", TimeUnit.MILLISECONDS) Stream.repeatEval { - contextShift.shift >> F.async[StreamMessage[A]] { callback => + F.async_[StreamMessage[A]] { callback => Try(context.consumerTemplate.receive(uri, timeout)) match { case Success(null) => callback(Right(null)) @@ -199,10 +198,10 @@ package object dsl { } } - private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, contextShift: ContextShift[F], F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s => + private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s => s.flatMap { message => Stream.eval { - contextShift.shift >> F.async[StreamMessage[B]] { callback => + F.async_[StreamMessage[B]] { callback => context.producerTemplate.asyncCallback(uri, context.createExchange(message, pattern), new Synchronization { override def onFailure(exchange: Exchange): Unit = callback(Left(exchange.getException)) diff --git a/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala b/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala index 4e3690d..ced718e 100644 --- a/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala +++ b/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala @@ -30,6 +30,8 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec class DslSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { + import cats.effect.unsafe.implicits.global + val camelRegistry = new SimpleRegistry val camelContext = new DefaultCamelContext() @@ -37,7 +39,6 @@ class DslSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { camelRegistry.put("service", new Service) implicit val streamContext = new StreamContext(camelContext) - implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global) import streamContext._ diff --git a/streamz-converter/build.sbt b/streamz-converter/build.sbt index 4d56d0b..0ae7803 100644 --- a/streamz-converter/build.sbt +++ b/streamz-converter/build.sbt @@ -2,7 +2,8 @@ name := "streamz-converter" libraryDependencies ++= Seq( "co.fs2" %% "fs2-core" % Version.Fs2, - "org.typelevel" %% "cats-effect" % Version.CatsEffect, + "org.typelevel" %% "cats-effect-std" % Version.CatsEffect, + "org.typelevel" %% "cats-effect" % Version.CatsEffect % "test", "com.typesafe.akka" %% "akka-stream" % Version.Akka, "com.typesafe.akka" %% "akka-stream-testkit" % Version.Akka % "test", "com.typesafe.akka" %% "akka-testkit" % Version.Akka % "test", diff --git a/streamz-converter/src/main/scala/streamz/converter/Converter.scala b/streamz-converter/src/main/scala/streamz/converter/Converter.scala index b898cef..4c260c4 100644 --- a/streamz-converter/src/main/scala/streamz/converter/Converter.scala +++ b/streamz-converter/src/main/scala/streamz/converter/Converter.scala @@ -16,18 +16,17 @@ package streamz.converter -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.{ Failure, Success } - import akka.stream._ import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource, _ } import akka.{ Done, NotUsed } -import cats.effect._ -import cats.effect.concurrent.Deferred -import cats.effect.implicits._ +import cats.effect.{ Async, Sync } +import cats.effect.Resource.ExitCase +import cats.effect.std.Dispatcher import cats.implicits._ import fs2._ + import scala.annotation.implicitNotFound +import scala.concurrent.Future trait Converter { @@ -35,9 +34,9 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[SourceShape]] to an FS2 [[Stream]]. * If the materialized value needs be obtained, use [[akkaSourceToFs2StreamMat]]. */ - def akkaSourceToFs2Stream[F[_]: Async: ContextShift, A](source: Graph[SourceShape[A], NotUsed])(implicit materializer: Materializer): Stream[F, A] = + def akkaSourceToFs2Stream[F[_]: Async, A](source: Graph[SourceShape[A], NotUsed])(implicit materializer: Materializer): Stream[F, A] = Stream.force { - Async[F].delay { + Sync[F].delay { val subscriber = AkkaSource.fromGraph(source).toMat(AkkaSink.queue[A]())(Keep.right).run() subscriberStream[F, A](subscriber) } @@ -47,20 +46,20 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[SourceShape]] to an FS2 [[Stream]]. This method returns the FS2 [[Stream]] * and the materialized value of the [[Graph]]. */ - def akkaSourceToFs2StreamMat[F[_]: Async: ContextShift, A, M](source: Graph[SourceShape[A], M])(implicit materializer: Materializer): F[(Stream[F, A], M)] = - Async[F].delay { + def akkaSourceToFs2StreamMat[F[_]: Async, A, M](source: Graph[SourceShape[A], M])(implicit materializer: Materializer): F[(Stream[F, A], M)] = + Sync[F].delay { val (mat, subscriber) = AkkaSource.fromGraph(source).toMat(AkkaSink.queue[A]())(Keep.both).run() (subscriberStream[F, A](subscriber), mat) } /** * Converts an Akka Stream [[Graph]] of [[SinkShape]] to an FS2 [[Pipe]]. - * If the materialized value needs be obtained, use [[akkaSinkToFs2PipeMat]]. + * If the materialized value needs be obtained, use [[akkaSinkToFs2PipeWithMat]]. */ - def akkaSinkToFs2Pipe[F[_]: Concurrent: ContextShift, A](sink: Graph[SinkShape[A], NotUsed])(implicit materializer: Materializer): Pipe[F, A, Unit] = + def akkaSinkToFs2Pipe[F[_]: Async, A](sink: Graph[SinkShape[A], NotUsed])(implicit materializer: Materializer): Pipe[F, A, Unit] = (s: Stream[F, A]) => Stream.force { - Async[F].delay { + Sync[F].delay { val publisher = AkkaSource.queue[A](0, OverflowStrategy.backpressure).toMat(sink)(Keep.left).run() publisherStream[F, A](publisher, s) } @@ -70,8 +69,8 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[SinkShape]] to an FS2 [[Pipe]]. This method returns the FS2 [[Pipe]] * and the materialized value of the [[Graph]]. */ - def akkaSinkToFs2PipeMat[F[_]: Concurrent: ContextShift, A, M](sink: Graph[SinkShape[A], M])(implicit materializer: Materializer): F[(Pipe[F, A, Unit], M)] = - Concurrent[F].delay { + def akkaSinkToFs2PipeWithMat[F[_]: Async, A, M](sink: Graph[SinkShape[A], M])(implicit materializer: Materializer): F[(Pipe[F, A, Unit], M)] = + Sync[F].delay { val (publisher, mat) = AkkaSource.queue[A](0, OverflowStrategy.backpressure).toMat(sink)(Keep.both).run() ((s: Stream[F, A]) => publisherStream[F, A](publisher, s), mat) } @@ -82,42 +81,29 @@ trait Converter { * The stream returned by this will emit the Future's value one time at the end, * then terminate. */ - def akkaSinkToFs2PipeMat[F[_]: ConcurrentEffect: ContextShift, A, M](akkaSink: Graph[SinkShape[A], Future[M]])( + def akkaSinkToFs2PipeMat[F[_], A, M](akkaSink: Graph[SinkShape[A], Future[M]])( implicit - ec: ExecutionContext, - m: Materializer): F[Pipe[F, A, Either[Throwable, M]]] = - for { - promise <- Deferred[F, Either[Throwable, M]] - fs2Sink <- akkaSinkToFs2PipeMat[F, A, Future[M]](akkaSink).flatMap { - case (stream, mat) => - // This callback tells the akka materialized future to store its result status into the Promise - val callback = ConcurrentEffect[F].delay( - mat.onComplete { - case Failure(ex) => promise.complete(ex.asLeft).toIO.unsafeRunSync() - case Success(value) => promise.complete(value.asRight).toIO.unsafeRunSync() - }) - callback.map(_ => stream) - } - } yield { - in: Stream[F, A] => - { - // Async wait on the promise to be completed - val materializedResultStream = Stream.eval(promise.get) - val fs2Stream: Stream[F, Unit] = fs2Sink.apply(in) - - // Run the akka sink for its effects and then run stream containing the effect of getting the Promise results - fs2Stream.drain ++ materializedResultStream - } + F: Async[F], + m: Materializer): Pipe[F, A, Either[Throwable, M]] = { in: Stream[F, A] => + + Stream.eval(akkaSinkToFs2PipeWithMat[F, A, Future[M]](akkaSink)).flatMap { + case (pipe, matResult) => + // NB: `pure` here because the future is by now already spawned - we just need a handle to it + val getMatResult = Stream.eval(F.fromFuture(F.pure(matResult)).attempt) + + // Run `pipe` for its effects, and then terminate with the result of the materialized future + in.through(pipe).drain ++ getMatResult } + } /** * Converts an Akka Stream [[Graph]] of [[FlowShape]] to an FS2 [[Pipe]]. - * If the materialized value needs be obtained, use [[akkaSinkToFs2PipeMat]]. + * If the materialized value needs be obtained, use [[akkaSinkToFs2PipeWithMat]]. */ - def akkaFlowToFs2Pipe[F[_]: Concurrent: ContextShift, A, B](flow: Graph[FlowShape[A, B], NotUsed])(implicit materializer: Materializer): Pipe[F, A, B] = + def akkaFlowToFs2Pipe[F[_]: Async, A, B](flow: Graph[FlowShape[A, B], NotUsed])(implicit materializer: Materializer): Pipe[F, A, B] = (s: Stream[F, A]) => Stream.force { - Concurrent[F].delay { + Sync[F].delay { val src = AkkaSource.queue[A](0, OverflowStrategy.backpressure) val snk = AkkaSink.queue[B]() val (publisher, subscriber) = src.viaMat(flow)(Keep.left).toMat(snk)(Keep.both).run() @@ -129,8 +115,8 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[FlowShape]] to an FS2 [[Pipe]]. This method returns the FS2 [[Pipe]] * and the materialized value of the [[Graph]]. */ - def akkaFlowToFs2PipeMat[F[_]: Concurrent: ContextShift, A, B, M](flow: Graph[FlowShape[A, B], M])(implicit materializer: Materializer): F[(Pipe[F, A, B], M)] = - Concurrent[F].delay { + def akkaFlowToFs2PipeMat[F[_]: Async, A, B, M](flow: Graph[FlowShape[A, B], M])(implicit materializer: Materializer): F[(Pipe[F, A, B], M)] = + Sync[F].delay { val src = AkkaSource.queue[A](0, OverflowStrategy.backpressure) val snk = AkkaSink.queue[B]() val ((publisher, mat), subscriber) = src.viaMat(flow)(Keep.both).toMat(snk)(Keep.both).run() @@ -141,12 +127,12 @@ trait Converter { * Converts an FS2 [[Stream]] to an Akka Stream [[Graph]] of [[SourceShape]]. The [[Stream]] is run when the * [[Graph]] is materialized. */ - def fs2StreamToAkkaSource[F[_]: ConcurrentEffect: ContextShift, A](stream: Stream[F, A]): Graph[SourceShape[A], NotUsed] = { + def fs2StreamToAkkaSource[F[_]: Async, A](stream: Stream[F, A])(implicit dispatcher: Dispatcher[F]): Graph[SourceShape[A], NotUsed] = { val source = AkkaSource.queue[A](0, OverflowStrategy.backpressure) // A sink that runs an FS2 publisherStream when consuming the publisher actor (= materialized value) of source val sink = AkkaSink.foreach[SourceQueueWithComplete[A]] { p => // Fire and forget Future so it runs in the background - publisherStream[F, A](p, stream).compile.drain.toIO.unsafeToFuture() + dispatcher.unsafeRunAndForget(publisherStream[F, A](p, stream).compile.drain) () } @@ -158,20 +144,18 @@ trait Converter { } /** - * Converts an FS2 [[Pipe]] to an Akka Stream [[Graph]] of [[SinkShape]]. The [[Sink]] is run when the + * Converts an FS2 [[Pipe]] to an Akka Stream [[Graph]] of [[SinkShape]]. The [[akka.stream.scaladsl.Sink]] is run when the * [[Graph]] is materialized. */ - def fs2PipeToAkkaSink[F[_]: ContextShift: Effect, A](sink: Pipe[F, A, Unit]): Graph[SinkShape[A], Future[Done]] = { + def fs2PipeToAkkaSink[F[_], A](sink: Pipe[F, A, Unit])(implicit F: Async[F], dispatcher: Dispatcher[F]): Graph[SinkShape[A], Future[Done]] = { val sink1: AkkaSink[A, SinkQueueWithCancel[A]] = AkkaSink.queue[A]() // A sink that runs an FS2 subscriberStream when consuming the subscriber actor (= materialized value) of sink1. // The future returned from unsafeToFuture() completes when the subscriber stream completes and is made // available as materialized value of this sink. val sink2: AkkaSink[SinkQueueWithCancel[A], Future[Done]] = AkkaFlow[SinkQueueWithCancel[A]] - .map(s => subscriberStream[F, A](s).through(sink).compile.drain.toIO.as(Done: Done).unsafeToFuture()) + .map(s => dispatcher.unsafeToFuture(subscriberStream[F, A](s).through(sink).compile.drain.as(Done))) .toMat(AkkaSink.head)(Keep.right) - .mapMaterializedValue(ffd => Async.fromFuture(Async.fromFuture(Effect[F].pure(ffd))).toIO.unsafeToFuture()) - // fromFuture dance above is because scala 2.11 lacks Future#flatten. `pure` instead of `delay` - // because the future value is already strict by the time we get it. + .mapMaterializedValue(ffd => dispatcher.unsafeToFuture(F.fromFuture(F.delay(ffd.flatten)))) AkkaSink.fromGraph(GraphDSL.create(sink1, sink2)(Keep.both) { implicit builder => (sink1, sink2) => import GraphDSL.Implicits._ @@ -184,15 +168,14 @@ trait Converter { * Converts an FS2 [[Pipe]] to an Akka Stream [[Graph]] of [[FlowShape]]. The [[Pipe]] is run when the * [[Graph]] is materialized. */ - def fs2PipeToAkkaFlow[F[_]: ConcurrentEffect: ContextShift, A, B](pipe: Pipe[F, A, B]): Graph[FlowShape[A, B], NotUsed] = { + def fs2PipeToAkkaFlow[F[_]: Async, A, B](pipe: Pipe[F, A, B])(implicit dispatcher: Dispatcher[F]): Graph[FlowShape[A, B], NotUsed] = { val source = AkkaSource.queue[B](0, OverflowStrategy.backpressure) val sink1: AkkaSink[A, SinkQueueWithCancel[A]] = AkkaSink.queue[A]() // A sink that runs an FS2 transformerStream when consuming the publisher actor (= materialized value) of source // and the subscriber actor (= materialized value) of sink1 val sink2 = AkkaSink.foreach[(SourceQueueWithComplete[B], SinkQueueWithCancel[A])] { ps => // Fire and forget Future so it runs in the background - ConcurrentEffect[F].toIO(transformerStream(ps._2, ps._1, pipe).compile.drain).unsafeToFuture() - () + dispatcher.unsafeRunAndForget(transformerStream(ps._2, ps._1, pipe).compile.drain) } AkkaFlow.fromGraph(GraphDSL.create(source, sink1)(Keep.both) { implicit builder => (source, sink1) => @@ -202,18 +185,18 @@ trait Converter { }).mapMaterializedValue(_ => NotUsed) } - private def subscriberStream[F[_]: Async: ContextShift, A](subscriber: SinkQueueWithCancel[A]): Stream[F, A] = { - val pull = Async.fromFuture(Async[F].delay(subscriber.pull())) - val cancel = Async[F].delay(subscriber.cancel()) + private def subscriberStream[F[_]: Async, A](subscriber: SinkQueueWithCancel[A]): Stream[F, A] = { + val pull = Async[F].fromFuture(Sync[F].delay(subscriber.pull())) + val cancel = Sync[F].delay(subscriber.cancel()) Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel) } - private def publisherStream[F[_]: Concurrent: ContextShift, A](publisher: SourceQueueWithComplete[A], stream: Stream[F, A]): Stream[F, Unit] = { - def publish(a: A): F[Option[Unit]] = Async.fromFuture(Concurrent[F].delay(publisher.offer(a))).flatMap { + private def publisherStream[F[_]: Async, A](publisher: SourceQueueWithComplete[A], stream: Stream[F, A]): Stream[F, Unit] = { + def publish(a: A): F[Option[Unit]] = Async[F].fromFuture(Sync[F].delay(publisher.offer(a))).flatMap { case QueueOfferResult.Enqueued => ().some.pure[F] - case QueueOfferResult.Failure(cause) => Concurrent[F].raiseError[Option[Unit]](cause) + case QueueOfferResult.Failure(cause) => Sync[F].raiseError[Option[Unit]](cause) case QueueOfferResult.QueueClosed => none[Unit].pure[F] - case QueueOfferResult.Dropped => Concurrent[F].raiseError[Option[Unit]](new IllegalStateException("This should never happen because we use OverflowStrategy.backpressure")) + case QueueOfferResult.Dropped => Sync[F].raiseError[Option[Unit]](new IllegalStateException("This should never happen because we use OverflowStrategy.backpressure")) }.recover { // This handles a race condition between `interruptWhen` and `publish`. // There's no guarantee that, when the akka sink is terminated, we will observe the @@ -222,21 +205,23 @@ trait Converter { case _: StreamDetachedException => none[Unit] } - def watchCompletion: F[Unit] = Async.fromFuture(Concurrent[F].delay(publisher.watchCompletion())).void - def fail(e: Throwable): F[Unit] = Concurrent[F].delay(publisher.fail(e)) >> watchCompletion - def complete: F[Unit] = Concurrent[F].delay(publisher.complete()) >> watchCompletion + def watchCompletion: F[Unit] = Async[F].fromFuture(Sync[F].delay(publisher.watchCompletion())).void + + def fail(e: Throwable): F[Unit] = Sync[F].delay(publisher.fail(e)) >> watchCompletion + + def complete: F[Unit] = Sync[F].delay(publisher.complete()) >> watchCompletion stream.interruptWhen(watchCompletion.attempt).evalMap(publish).unNoneTerminate .onFinalizeCase { - case ExitCase.Completed | ExitCase.Canceled => complete - case ExitCase.Error(e) => fail(e) + case ExitCase.Succeeded | ExitCase.Canceled => complete + case ExitCase.Errored(e) => fail(e) } } - private def transformerStream[F[_]: ContextShift: Concurrent, A, B](subscriber: SinkQueueWithCancel[B], publisher: SourceQueueWithComplete[A], stream: Stream[F, A]): Stream[F, B] = + private def transformerStream[F[_]: Async, A, B](subscriber: SinkQueueWithCancel[B], publisher: SourceQueueWithComplete[A], stream: Stream[F, A]): Stream[F, B] = subscriberStream[F, B](subscriber).concurrently(publisherStream[F, A](publisher, stream)) - private def transformerStream[F[_]: ContextShift: Concurrent, A, B](subscriber: SinkQueueWithCancel[A], publisher: SourceQueueWithComplete[B], pipe: Pipe[F, A, B]): Stream[F, Unit] = + private def transformerStream[F[_]: Async, A, B](subscriber: SinkQueueWithCancel[A], publisher: SourceQueueWithComplete[B], pipe: Pipe[F, A, B]): Stream[F, Unit] = subscriberStream[F, A](subscriber).through(pipe).through(s => publisherStream(publisher, s)) } @@ -245,33 +230,25 @@ trait ConverterDsl extends Converter { implicit class AkkaSourceDsl[A, M](source: Graph[SourceShape[A], M]) { /** @see [[Converter#akkaSourceToFs2Stream]] */ - def toStream[F[_]: ContextShift: Async](implicit materializer: Materializer, @implicitNotFound( - "Cannot convert `Source[A, M]` to `Stream[F, A]` - `M` value would be discarded.\nIf that is intended, first convert the `Source` to `Source[A, NotUsed]`.\nIf `M` should not be discarded, then use `source.toStreamMat[F]` instead.") ev: M <:< NotUsed): Stream[F, A] = { + def toStream[F[_]: Async]( + implicit + materializer: Materializer, + @implicitNotFound( + "Cannot convert `Source[A, M]` to `Stream[F, A]` - `M` value would be discarded.\nIf that is intended, first convert the `Source` to `Source[A, NotUsed]`.\nIf `M` should not be discarded, then use `source.toStreamMat[F]` instead.") ev: M <:< NotUsed): Stream[F, A] = { val _ = ev // to suppress 'never used' warning. The warning fires on 2.12 but not on 2.13, so I can't use `nowarn` akkaSourceToFs2Stream(source.asInstanceOf[Graph[SourceShape[A], NotUsed]]) } /** @see [[Converter#akkaSourceToFs2StreamMat]] */ - def toStreamMat[F[_]: ContextShift: Async](implicit materializer: Materializer): F[(Stream[F, A], M)] = + def toStreamMat[F[_]: Async](implicit materializer: Materializer): F[(Stream[F, A], M)] = akkaSourceToFs2StreamMat(source) - @deprecated(message = "Use `.toStream[F]` for M=NotUsed; use `.toStreamMat[F]` for other M. This version relies on side effects.", since = "0.11") - def toStream[F[_]: ContextShift: Async](onMaterialization: M => Unit = _ => ())(implicit materializer: Materializer): Stream[F, A] = - Stream.force( - akkaSourceToFs2StreamMat(source).map { - case (akkaStream, mat) => - onMaterialization(mat) - akkaStream - }) } implicit class AkkaSinkFutureDsl[A, M](sink: Graph[SinkShape[A], Future[M]]) { /** @see [[Converter#akkaSinkToFs2SinkMat]] */ - def toPipeMatWithResult[F[_]: ConcurrentEffect: ContextShift]( - implicit - ec: ExecutionContext, - m: Materializer): F[Pipe[F, A, Either[Throwable, M]]] = + def toPipeMatWithResult[F[_]: Async](implicit m: Materializer): Pipe[F, A, Either[Throwable, M]] = akkaSinkToFs2PipeMat[F, A, M](sink) } @@ -279,7 +256,7 @@ trait ConverterDsl extends Converter { implicit class AkkaSinkDsl[A, M](sink: Graph[SinkShape[A], M]) { /** @see [[Converter#akkaSinkToFs2Sink]] */ - def toPipe[F[_]: ContextShift: Concurrent](implicit + def toPipe[F[_]: Async](implicit materializer: Materializer, @implicitNotFound( "Cannot convert `Sink[A, M]` to `Pipe[F, A, Unit]` - `M` value would be discarded.\nIf that is intended, first convert the `Sink` to `Sink[A, NotUsed]`.\nIf `M` should not be discarded, then use `sink.toPipeMat[F]` instead.") ev: M <:< NotUsed): Pipe[F, A, Unit] = { @@ -288,26 +265,15 @@ trait ConverterDsl extends Converter { } /** @see [[Converter#akkaSinkToFs2SinkMat]] */ - def toPipeMat[F[_]: ContextShift: Concurrent](implicit materializer: Materializer): F[(Pipe[F, A, Unit], M)] = - akkaSinkToFs2PipeMat(sink) - - @deprecated(message = "Use `.toSink[F]` for M=NotUsed; use `.toSinkMat[F]` for other M. This version relies on side effects.", since = "0.11") - def toSink[F[_]: ContextShift: Concurrent](onMaterialization: M => Unit)(implicit materializer: Materializer): Pipe[F, A, Unit] = - (s: Stream[F, A]) => - Stream.force { - akkaSinkToFs2PipeMat(sink).map { - case (fs2Sink, mat) => - onMaterialization(mat) - s.through(fs2Sink) - } - } + def toPipeWithMat[F[_]: Async](implicit materializer: Materializer): F[(Pipe[F, A, Unit], M)] = + akkaSinkToFs2PipeWithMat(sink) } implicit class AkkaFlowDsl[A, B, M](flow: Graph[FlowShape[A, B], M]) { /** @see [[Converter#akkaFlowToFs2Pipe]] */ - def toPipe[F[_]: ContextShift: ConcurrentEffect]( + def toPipe[F[_]: Async]( implicit materializer: Materializer, @implicitNotFound( @@ -317,53 +283,29 @@ trait ConverterDsl extends Converter { } /** @see [[Converter#akkaFlowToFs2PipeMat]] */ - def toPipeMat[F[_]: ContextShift: ConcurrentEffect](implicit materializer: Materializer): F[(Pipe[F, A, B], M)] = + def toPipeMat[F[_]: Async](implicit materializer: Materializer): F[(Pipe[F, A, B], M)] = akkaFlowToFs2PipeMat(flow) - @deprecated(message = "Use `.toPipe[F]` for M=NotUsed; use `.toPipeMat[F]` for other M. This version relies on side effects.", since = "0.11") - def toPipe[F[_]: ContextShift: ConcurrentEffect](onMaterialization: M => Unit = _ => ())(implicit materializer: Materializer): Pipe[F, A, B] = - (s: Stream[F, A]) => Stream.force { - akkaFlowToFs2PipeMat(flow).map { - case (fs2Pipe, mat) => - onMaterialization(mat) - s.through(fs2Pipe) - } - } } - implicit class FS2StreamNothingDsl[A](stream: Stream[Nothing, A]) { + implicit class FS2StreamIODsl[F[_], A](stream: Stream[F, A]) { /** @see [[Converter#fs2StreamToAkkaSource]] */ - @deprecated("Use `stream.covary[F].toSource` instead", "0.10") - def toSource(implicit contextShift: ContextShift[IO]): Graph[SourceShape[A], NotUsed] = - fs2StreamToAkkaSource(stream: Stream[IO, A]) - } - - implicit class FS2StreamPureDsl[A](stream: Stream[Pure, A]) { - - /** @see [[Converter#fs2StreamToAkkaSource]] */ - @deprecated("Use `stream.covary[F].toSource` instead", "0.10") - def toSource(implicit contextShift: ContextShift[IO]): Graph[SourceShape[A], NotUsed] = - fs2StreamToAkkaSource(stream: Stream[IO, A]) - } - - implicit class FS2StreamIODsl[F[_]: ContextShift: ConcurrentEffect, A](stream: Stream[F, A]) { - - /** @see [[Converter#fs2StreamToAkkaSource]] */ - def toSource: Graph[SourceShape[A], NotUsed] = + def toSource(implicit F: Async[F], dispatcher: Dispatcher[F]): Graph[SourceShape[A], NotUsed] = fs2StreamToAkkaSource(stream) } - implicit class FS2SinkIODsl[F[_]: Effect: ContextShift, A](sink: Pipe[F, A, Unit]) { + implicit class FS2SinkIODsl[F[_], A](sink: Pipe[F, A, Unit]) { /** @see [[Converter#fs2PipeToAkkaSink]] */ - def toSink: Graph[SinkShape[A], Future[Done]] = + def toSink(implicit F: Async[F], dispatcher: Dispatcher[F]): Graph[SinkShape[A], Future[Done]] = fs2PipeToAkkaSink(sink) } - implicit class FS2PipeIODsl[F[_]: ContextShift: ConcurrentEffect, A, B](pipe: Pipe[F, A, B]) { + implicit class FS2PipeIODsl[F[_], A, B](pipe: Pipe[F, A, B]) { /** @see [[Converter#fs2PipeToAkkaFlow]] */ - def toFlow: Graph[FlowShape[A, B], NotUsed] = + def toFlow(implicit F: Async[F], dispatcher: Dispatcher[F]): Graph[FlowShape[A, B], NotUsed] = fs2PipeToAkkaFlow(pipe) } + } diff --git a/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala b/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala index 52e0af9..faaf4e0 100644 --- a/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala +++ b/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala @@ -18,18 +18,21 @@ package streamz.converter import akka.Done import akka.actor.ActorSystem -import akka.stream.Materializer import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource, _ } import akka.testkit._ import cats.effect.IO +import cats.effect.std.Dispatcher import fs2._ +import org.scalactic.source.Position import org.scalatest._ + import scala.collection.immutable.Seq import scala.concurrent._ import scala.concurrent.duration._ import scala.util._ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike + import scala.annotation.nowarn object ConverterSpec { @@ -43,13 +46,27 @@ object ConverterSpec { class ConverterSpec extends TestKit(ActorSystem("test")) with AnyWordSpecLike with Matchers with BeforeAndAfterAll { import ConverterSpec._ + import cats.effect.unsafe.implicits.global + + private implicit val dispatcherEC = system.dispatcher - private implicit val materializer = Materializer.createMaterializer(system) - private implicit val dispatcher = system.dispatcher - private implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global) + private implicit def dispatcher(implicit pos: Position): Dispatcher[IO] = + _dispatcher.getOrElse(fail("Dispatcher not initialized")) + + private var _dispatcher: Option[Dispatcher[IO]] = None + private var shutdownDispatcher: IO[Unit] = IO.unit + + override protected def beforeAll(): Unit = { + super.beforeAll() + val (d, s) = Dispatcher[IO].allocated.unsafeRunSync() + _dispatcher = Some(d) + shutdownDispatcher = s + } override def afterAll(): Unit = { - materializer.shutdown() + _dispatcher = None + shutdownDispatcher.unsafeRunSync() + shutdownDispatcher = IO.unit TestKit.shutdownActorSystem(system) super.afterAll() } @@ -111,7 +128,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with AnyWordSpecLike wi "propagate elements and completion from FS2 sink to AS sink" in { val probe = TestProbe() val akkaSink = AkkaSink.seq[Int] - val fs2Sink = akkaSink.toPipeMat[IO].map { + val fs2Sink = akkaSink.toPipeWithMat[IO].map { case (akkaStream, mat) => mat.onComplete(probe.ref ! _) akkaStream @@ -123,7 +140,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with AnyWordSpecLike wi "propagate errors from FS2 sink to AS sink" in { val probe = TestProbe() val akkaSink = AkkaSink.seq[Int] - val fs2Sink = akkaSink.toPipeMat[IO].map { + val fs2Sink = akkaSink.toPipeWithMat[IO].map { case (akkaStream, mat) => mat.onComplete(probe.ref ! _) akkaStream @@ -134,7 +151,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with AnyWordSpecLike wi } "propagate early termination from AS sink to FS2 sink (using Mat Future)" in { val akkaSink = AkkaFlow[Int].take(3).toMat(AkkaSink.seq)(Keep.right) - val fs2Sink = akkaSink.toPipeMatWithResult[IO].unsafeRunSync() + val fs2Sink = akkaSink.toPipeMatWithResult[IO] val result = Stream.emits(numbers).through(fs2Sink).compile.lastOrError.unsafeRunSync() result shouldBe Right(numbers.take(3)) @@ -142,7 +159,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with AnyWordSpecLike wi } "propagate early termination from AS sink (due to errors) to FS2 sink" in { val akkaSink = AkkaSink.foreach[Int](_ => throw error) - val fs2Sink = akkaSink.toPipeMatWithResult[IO].unsafeRunSync() + val fs2Sink = akkaSink.toPipeMatWithResult[IO] val result = Stream.emits(numbers).through(fs2Sink).compile.lastOrError.unsafeRunSync() result shouldBe Left(error) @@ -252,7 +269,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with AnyWordSpecLike wi def seqSink(probe: TestProbe): Pipe[IO, Int, Unit] = s => s.fold(Seq.empty[Int])(_ :+ _).map(probe.ref ! Success(_)) - .handleErrorWith(err => Stream.eval_(IO(probe.ref ! Failure(err))) ++ Stream.raiseError[IO](err)) + .handleErrorWith(err => Stream.exec(IO(probe.ref ! Failure(err))) ++ Stream.raiseError[IO](err)) .onFinalize(IO(probe.ref ! Success(Done))) "propagate elements and completion from AS sink to FS2 sink" in { diff --git a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala index b33df99..edcfa10 100644 --- a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala +++ b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala @@ -20,24 +20,27 @@ import cats.effect.IO import fs2.{ Stream, text } import streamz.camel.fs2.dsl._ import streamz.examples.camel.ExampleContext +import cats.effect.{ ExitCode, IOApp } -object Example extends ExampleContext with App { - implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global) +object Example extends IOApp with ExampleContext { - val tcpLineStream: Stream[IO, String] = - receiveBody[IO, String](tcpEndpointUri) + def run(args: List[String]) = { - val fileLineStream: Stream[IO, String] = - receiveBody[IO, String](fileEndpointUri).through(text.lines) + val tcpLineStream: Stream[IO, String] = + receiveBody[IO, String](tcpEndpointUri) - val linePrefixStream: Stream[IO, String] = - Stream.iterate(1)(_ + 1).sendRequest[IO, String](serviceEndpointUri) + val fileLineStream: Stream[IO, String] = + receiveBody[IO, String](fileEndpointUri).through(text.lines) - val stream: Stream[IO, String] = - tcpLineStream - .merge(fileLineStream) - .zipWith(linePrefixStream)((l, n) => n concat l) - .send(printerEndpointUri) + val linePrefixStream: Stream[IO, String] = + Stream.iterate(1)(_ + 1).sendRequest[IO, String](serviceEndpointUri) - stream.compile.drain.unsafeRunSync() + val stream: Stream[IO, String] = + tcpLineStream + .merge(fileLineStream) + .zipWith(linePrefixStream)((l, n) => n concat l) + .send(printerEndpointUri) + + stream.compile.drain.as(ExitCode.Success) + } } diff --git a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala index 6906916..1cea821 100644 --- a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala +++ b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala @@ -23,7 +23,6 @@ import streamz.camel.fs2.dsl._ object Snippets { implicit val context = StreamContext() - implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global) val s: Stream[IO, StreamMessage[Int]] = // receive stream message from endpoint @@ -37,6 +36,7 @@ object Snippets { val t: IO[Unit] = s.compile.drain // run IO (side effects only here) ... + import cats.effect.unsafe.implicits.global // Normally taken as implicit param t.unsafeRunSync() val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1") diff --git a/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala b/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala index 0b65dde..a9c2fb4 100644 --- a/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala +++ b/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala @@ -16,65 +16,77 @@ package streamz.examples.converter -import akka.actor.{ ActorRefFactory, ActorSystem } -import akka.stream.Materializer +import akka.actor.ActorSystem import akka.stream.scaladsl.{ Keep, Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource } import akka.{ Done, NotUsed } -import cats.effect.{ ContextShift, IO } +import cats.effect.{ ExitCode, IO, IOApp } +import cats.effect.kernel.Resource +import cats.effect.std.Dispatcher +import cats.syntax.all._ import fs2.{ Pipe, Pure, Stream } import streamz.converter._ -import scala.collection.immutable.Seq + import scala.concurrent._ -import scala.concurrent.duration._ -object Example extends App { - val system: ActorSystem = ActorSystem("example") - val factory: ActorRefFactory = system +object Example extends IOApp { + + val mkSystem: Resource[IO, ActorSystem] = Resource.make(IO(ActorSystem("example")))(s => IO.fromFuture(IO(s.terminate())).map(_ => ())) - implicit val executionContext: ExecutionContext = factory.dispatcher - implicit val materializer: Materializer = Materializer.createMaterializer(system) - implicit val contextShift: ContextShift[IO] = IO.contextShift(materializer.executionContext) + def run(args: List[String]) = Dispatcher[IO].use { implicit dispatcher: Dispatcher[IO] => + mkSystem.use { implicit system: ActorSystem => - val numbers: Seq[Int] = 1 to 10 + val numbers: List[Int] = (1 to 10).toList - // -------------------------------- - // Akka Stream to FS2 conversions - // -------------------------------- + // -------------------------------- + // Akka Stream to FS2 conversions + // -------------------------------- - def f(i: Int) = List(s"$i-1", s"$i-2") + def f(i: Int) = List(s"$i-1", s"$i-2") - val aSink1: AkkaSink[Int, NotUsed] = AkkaFlow[Int].to(AkkaSink.foreach[Int](println)) - val fSink1: Pipe[IO, Int, Unit] = aSink1.toPipe[IO] + val aSink1: AkkaSink[Int, NotUsed] = + AkkaSink.foreach[Int](println).mapMaterializedValue(_ => NotUsed) + val fSink1: Pipe[IO, Int, Unit] = aSink1.toPipe[IO] - val aSource1: AkkaSource[Int, NotUsed] = AkkaSource(numbers) - val fStream1: Stream[IO, Int] = aSource1.toStream[IO] + val aSource1: AkkaSource[Int, NotUsed] = AkkaSource(numbers) + val fStream1: Stream[IO, Int] = aSource1.toStream[IO] - val aFlow1: AkkaFlow[Int, String, NotUsed] = AkkaFlow[Int].mapConcat(f) - val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe[IO] + val aFlow1: AkkaFlow[Int, String, NotUsed] = AkkaFlow[Int].mapConcat(f) + val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe[IO] - fStream1.through(fSink1).compile.drain.unsafeRunSync() // prints numbers - assert(fStream1.compile.toVector.unsafeRunSync() == numbers) - assert(fStream1.through(fPipe1).compile.toVector.unsafeRunSync() == numbers.flatMap(f)) + val akkaToFs2Example = for { + _ <- fStream1.through(fSink1).compile.drain + numbersResult <- fStream1.compile.toList + numbersPipeResult <- fStream1.through(fPipe1).compile.toList + } yield { + assert(numbersResult === numbers) + assert(numbersPipeResult === numbers.flatMap(f)) + } - // -------------------------------- - // FS2 to Akka Stream conversions - // -------------------------------- + // -------------------------------- + // FS2 to Akka Stream conversions + // -------------------------------- - def g(i: Int) = i + 10 + def g(i: Int) = i + 10 - val fSink2: Pipe[IO, Int, Unit] = s => s.map(g).evalMap(i => IO(println(i))) - val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toSink) + val fSink2: Pipe[IO, Int, Unit] = s => s.map(g).evalMap(i => IO(println(i))) + val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toSink) - val fStream2: Stream[Pure, Int] = Stream.emits(numbers) - val aSource2: AkkaSource[Int, NotUsed] = AkkaSource.fromGraph(fStream2.covary[IO].toSource) + val fStream2: Stream[Pure, Int] = Stream.emits(numbers) + val aSource2: AkkaSource[Int, NotUsed] = AkkaSource.fromGraph(fStream2.covary[IO].toSource) - val fpipe2: Pipe[IO, Int, Int] = s => s.map(g) - val aFlow2: AkkaFlow[Int, Int, NotUsed] = AkkaFlow.fromGraph(fpipe2.toFlow) + val fpipe2: Pipe[IO, Int, Int] = s => s.map(g) + val aFlow2: AkkaFlow[Int, Int, NotUsed] = AkkaFlow.fromGraph(fpipe2.toFlow) - aSource2.toMat(aSink2)(Keep.right).run() // prints numbers - assert(Await.result(aSource2.toMat(AkkaSink.seq)(Keep.right).run(), 5.seconds) == numbers) - assert(Await.result(aSource2.via(aFlow2).toMat(AkkaSink.seq)(Keep.right).run(), 5.seconds) == numbers.map(g)) + val fs2ToAkkaExample = for { + _ <- IO.fromFuture(IO(aSource2.toMat(aSink2)(Keep.right).run())) // prints numbers + numbersResult <- IO.fromFuture(IO(aSource2.toMat(AkkaSink.seq)(Keep.right).run())) + numbersFlowResult <- IO.fromFuture(IO(aSource2.via(aFlow2).toMat(AkkaSink.seq)(Keep.right).run())) + } yield { + assert(numbersResult.toList === numbers) + assert(numbersFlowResult.toList === numbers.map(g)) + } - materializer.shutdown() - system.terminate() -} + akkaToFs2Example >> fs2ToAkkaExample.as(ExitCode.Success) + } + } +} \ No newline at end of file