Skip to content

Commit

Permalink
Merge pull request #25 from avast/UpdateToMonix3xRC2
Browse files Browse the repository at this point in the history
Upgrade to Monix 3.0.0-RC2
  • Loading branch information
jakubjanecek authored Apr 10, 2019
2 parents 077ed13 + ae03535 commit fbf2bcc
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ subprojects {
bytesVersion = "2.0.5"
circeVersion = "0.10.1"
cactusVersion = "0.14.3"
monixVersion = "3.0.0-RC1"
monixVersion = "3.0.0-RC2"
catsVersion = "1.5.0"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.avast.metrics.scalaapi.Monitor
import com.rabbitmq.client.ShutdownSignalException
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import monix.eval.Task
import monix.eval.{Task, TaskLift}
import monix.execution.Scheduler

import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -208,7 +208,8 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection,
}

private def convertToF[A](task: Task[A]): F[A] = {
task.to[F](Effect[F], blockingScheduler)
implicit val ec = blockingScheduler
task.to[F](TaskLift[F])
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object PoisonedMessageHandler {
private val handler = new DefaultPoisonedMessageHandler[Task, Bytes](maxAttempts)(d => Task.deferFuture(wrapped.asScala.apply(d)))

override def apply(t: javaapi.Delivery): CompletableFuture[javaapi.DeliveryResult] = {
handler(t.asScala).map(_.asJava).runAsync.asJava
handler(t.asScala).map(_.asJava).runToFuture.asJava
}
}

Expand All @@ -116,7 +116,7 @@ object PoisonedMessageHandler {
}

override def apply(t: javaapi.Delivery): CompletableFuture[javaapi.DeliveryResult] = {
handler(t.asScala).map(_.asJava).runAsync.asJava
handler(t.asScala).map(_.asJava).runToFuture.asJava
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import com.avast.clients.rabbitmq.{FromTask, ToTask}
import monix.eval.Task
import monix.execution.{ExecutionModel, Scheduler}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try

object TestImplicits {
def fkTaskToTry(timeout: Duration)(implicit ec: ExecutionContext): FromTask[Try] = new FunctionK[Task, Try] {
override def apply[A](task: Task[A]): Try[A] = Try {
task.coeval(Scheduler(ec).withExecutionModel(ExecutionModel.SynchronousExecution))() match {
implicit val scheduler = Scheduler(ec).withExecutionModel(ExecutionModel.SynchronousExecution)
task.runSyncStep match {
case Right(a) => a
case Left(fa) => Await.result(fa, timeout)
case Left(fa) => fa.runSyncUnsafe(timeout)
}
}
}
Expand Down

0 comments on commit fbf2bcc

Please sign in to comment.