diff --git a/README.md b/README.md index d532f410..a5a1e94a 100644 --- a/README.md +++ b/README.md @@ -167,8 +167,6 @@ Alternatively you are able to use any `F[_]` which is convertible to/from `monix The Scala API uses types-conversions for both consumer and producer, that means you don't have to work directly with `Bytes` (however you still can, if you want) and you touch only your business class which is then (de)serialized using provided converter. -Since the API is fully effectful don't forget - The library uses two types of executors - one is for blocking (IO) operations and the second for callbacks. You _have to_ provide both of them: 1. Blocking executor as `ExecutorService` 1. Callback executor as `scala.concurrent.ExecutionContext` diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala b/core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala index 83af2510..e3fc517a 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala @@ -61,8 +61,19 @@ package object rabbitmq { .newConsumer(configName, monitor) { d: Delivery[A] => toTask { readAction(d) } } - .map { consumer => - new RabbitMQConsumer[G] { override def close(): G[Unit] = fromTask { consumer.close() } } + .map { consumer => () => + fromTask { consumer.close() } + } + } + + override def newConsumer[A: DeliveryConverter](consumerConfig: ConsumerConfig, monitor: Monitor)( + readAction: DeliveryReadAction[G, A])(implicit ec: ExecutionContext): G[RabbitMQConsumer[G]] = fromTask { + connection + .newConsumer(consumerConfig, monitor) { d: Delivery[A] => + toTask { readAction(d) } + } + .map { consumer => () => + fromTask { consumer.close() } } } @@ -80,6 +91,20 @@ package object rabbitmq { } } + override def newProducer[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor)( + implicit ec: ExecutionContext): G[RabbitMQProducer[G, A]] = fromTask { + connection.newProducer(producerConfig, monitor).map { producer => + new RabbitMQProducer[G, A] { + + override def send(routingKey: String, body: A, properties: Option[MessageProperties]): G[Unit] = taskToG[G, Unit] { + producer.send(routingKey, body, properties) + } + + override def close(): G[Unit] = fromTask { producer.close() } + } + } + } + override def newPullConsumer[A: DeliveryConverter](configName: String, monitor: Monitor)( implicit ec: ExecutionContext): G[RabbitMQPullConsumer[G, A]] = fromTask { connection.newPullConsumer(configName, monitor).map { consumer => @@ -102,6 +127,28 @@ package object rabbitmq { } } + override def newPullConsumer[A: DeliveryConverter](pullConsumerConfig: PullConsumerConfig, monitor: Monitor)( + implicit ec: ExecutionContext): G[RabbitMQPullConsumer[G, A]] = fromTask { + connection.newPullConsumer(pullConsumerConfig, monitor).map { consumer => + new RabbitMQPullConsumer[G, A] { + + override def pull(): G[PullResult[G, A]] = taskToG[G, PullResult[G, A]] { + consumer.pull().map { + case PullResult.Ok(deliveryWithHandle) => + PullResult.Ok(new DeliveryWithHandle[G, A] { + override def delivery: Delivery[A] = deliveryWithHandle.delivery + + override def handle(result: DeliveryResult): G[Unit] = taskToG[G, Unit](deliveryWithHandle.handle(result)) + }) + case PullResult.EmptyQueue => PullResult.EmptyQueue + } + } + + override def close(): G[Unit] = fromTask { consumer.close() } + } + } + } + override def declareExchange(configName: String): G[Unit] = connection.declareExchange(configName) override def declareQueue(configName: String): G[Unit] = connection.declareQueue(configName) override def bindQueue(configName: String): G[Unit] = connection.bindQueue(configName)