Skip to content

Commit

Permalink
Merge fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jendakol committed Nov 5, 2018
1 parent 187a306 commit ab91c86
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ Alternatively you are able to use any `F[_]` which is convertible to/from `monix
The Scala API uses types-conversions for both consumer and producer, that means you don't have to work directly with `Bytes` (however you
still can, if you want) and you touch only your business class which is then (de)serialized using provided converter.

Since the API is fully effectful don't forget

The library uses two types of executors - one is for blocking (IO) operations and the second for callbacks. You _have to_ provide both of them:
1. Blocking executor as `ExecutorService`
1. Callback executor as `scala.concurrent.ExecutionContext`
Expand Down
51 changes: 49 additions & 2 deletions core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,19 @@ package object rabbitmq {
.newConsumer(configName, monitor) { d: Delivery[A] =>
toTask { readAction(d) }
}
.map { consumer =>
new RabbitMQConsumer[G] { override def close(): G[Unit] = fromTask { consumer.close() } }
.map { consumer => () =>
fromTask { consumer.close() }
}
}

override def newConsumer[A: DeliveryConverter](consumerConfig: ConsumerConfig, monitor: Monitor)(
readAction: DeliveryReadAction[G, A])(implicit ec: ExecutionContext): G[RabbitMQConsumer[G]] = fromTask {
connection
.newConsumer(consumerConfig, monitor) { d: Delivery[A] =>
toTask { readAction(d) }
}
.map { consumer => () =>
fromTask { consumer.close() }
}
}

Expand All @@ -80,6 +91,20 @@ package object rabbitmq {
}
}

override def newProducer[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor)(
implicit ec: ExecutionContext): G[RabbitMQProducer[G, A]] = fromTask {
connection.newProducer(producerConfig, monitor).map { producer =>
new RabbitMQProducer[G, A] {

override def send(routingKey: String, body: A, properties: Option[MessageProperties]): G[Unit] = taskToG[G, Unit] {
producer.send(routingKey, body, properties)
}

override def close(): G[Unit] = fromTask { producer.close() }
}
}
}

override def newPullConsumer[A: DeliveryConverter](configName: String, monitor: Monitor)(
implicit ec: ExecutionContext): G[RabbitMQPullConsumer[G, A]] = fromTask {
connection.newPullConsumer(configName, monitor).map { consumer =>
Expand All @@ -102,6 +127,28 @@ package object rabbitmq {
}
}

override def newPullConsumer[A: DeliveryConverter](pullConsumerConfig: PullConsumerConfig, monitor: Monitor)(
implicit ec: ExecutionContext): G[RabbitMQPullConsumer[G, A]] = fromTask {
connection.newPullConsumer(pullConsumerConfig, monitor).map { consumer =>
new RabbitMQPullConsumer[G, A] {

override def pull(): G[PullResult[G, A]] = taskToG[G, PullResult[G, A]] {
consumer.pull().map {
case PullResult.Ok(deliveryWithHandle) =>
PullResult.Ok(new DeliveryWithHandle[G, A] {
override def delivery: Delivery[A] = deliveryWithHandle.delivery

override def handle(result: DeliveryResult): G[Unit] = taskToG[G, Unit](deliveryWithHandle.handle(result))
})
case PullResult.EmptyQueue => PullResult.EmptyQueue
}
}

override def close(): G[Unit] = fromTask { consumer.close() }
}
}
}

override def declareExchange(configName: String): G[Unit] = connection.declareExchange(configName)
override def declareQueue(configName: String): G[Unit] = connection.declareQueue(configName)
override def bindQueue(configName: String): G[Unit] = connection.bindQueue(configName)
Expand Down

0 comments on commit ab91c86

Please sign in to comment.