Skip to content

Commit

Permalink
Converters for common types (#24)
Browse files Browse the repository at this point in the history
converters for common types
  • Loading branch information
augi authored Mar 29, 2019
1 parent 88ce959 commit 077ed13
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 19 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ val consumerAndProducer: Task[(RabbitMQConsumer[Task], RabbitMQProducer[Task, By
Task.now(DeliveryResult.Reject)
}

producer <- connection.newProducer("producer", monitor)
producer <- connection.newProducer[Bytes]("producer", monitor)
} yield (consumer, producer)
}

Expand All @@ -232,7 +232,7 @@ val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) {
Task.now(DeliveryResult.Reject)
}.runSyncUnsafe(10.seconds) // RabbitMQConsumer[Task]

val sender = rabbitConnection.newProducer("producer", monitor).runSyncUnsafe(10.seconds) // RabbitMQProducer[Task, Bytes]
val sender = rabbitConnection.newProducer[Bytes]("producer", monitor).runSyncUnsafe(10.seconds) // RabbitMQProducer[Task, Bytes]

sender.send(...).runAsync // because it's Task, don't forget to run it ;-)
```
Expand All @@ -254,7 +254,7 @@ val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) {
Future.successful(DeliveryResult.Reject)
}.await // RabbitMQConsumer[Future]

val sender = rabbitConnection.newProducer("producer", monitor).await // RabbitMQProducer[Future]
val sender = rabbitConnection.newProducer[Bytes]("producer", monitor).await // RabbitMQProducer[Future, Bytes]

sender.send(...) // Future[Unit]
```
Expand Down
30 changes: 28 additions & 2 deletions core/src/main/scala/com/avast/clients/rabbitmq/converters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ trait CheckedDeliveryConverter[A] extends DeliveryConverter[A] {

object DeliveryConverter {
implicit val identity: DeliveryConverter[Bytes] = (b: Bytes) => Right(b)
implicit val bytesArray: DeliveryConverter[Array[Byte]] = (b: Bytes) => Right(b.toByteArray)
implicit val utf8Text: CheckedDeliveryConverter[String] = new CheckedDeliveryConverter[String] {
override def canConvert(d: Delivery[Bytes]): Boolean = d.properties.contentType match {
case Some(contentType) =>
val ct = contentType.toLowerCase
ct.startsWith("text") && (ct.contains("charset=utf-8") || ct.contains("charset=\"utf-8\""))
case None => true
}
override def convert(b: Bytes): Either[ConversionException, String] = Right(b.toStringUtf8)
}
}

@implicitNotFound("Could not find ProductConverter for ${A}, try to import or define some")
Expand All @@ -35,7 +45,23 @@ trait ProductConverter[A] {
object ProductConverter {
implicit val identity: ProductConverter[Bytes] = new ProductConverter[Bytes] {
override def convert(p: Bytes): Either[ConversionException, Bytes] = Right(p)

override def fillProperties(properties: MessageProperties): MessageProperties = properties
override def fillProperties(properties: MessageProperties): MessageProperties = properties.contentType match {
case None => properties.copy(contentType = Some("application/octet-stream"))
case _ => properties
}
}
implicit val bytesArray: ProductConverter[Array[Byte]] = new ProductConverter[Array[Byte]] {
override def convert(p: Array[Byte]): Either[ConversionException, Bytes] = Right(Bytes.copyFrom(p))
override def fillProperties(properties: MessageProperties): MessageProperties = properties.contentType match {
case None => properties.copy(contentType = Some("application/octet-stream"))
case _ => properties
}
}
implicit val utf8Text: ProductConverter[String] = new ProductConverter[String] {
override def convert(p: String): Either[ConversionException, Bytes] = Right(Bytes.copyFromUtf8(p))
override def fillProperties(properties: MessageProperties): MessageProperties = properties.contentType match {
case None => properties.copy(contentType = Some("text/plain; charset=utf-8"))
case _ => properties
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.function

import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.javaapi.JavaConverters._
import com.avast.clients.rabbitmq.{RabbitMQConnection => ScalaConnection}
import com.avast.clients.rabbitmq.{DeliveryConverter, RabbitMQConnection => ScalaConnection}
import com.avast.metrics.api.Monitor
import com.avast.metrics.scalaapi.{Monitor => ScalaMonitor}
import monix.execution.Scheduler
Expand All @@ -30,10 +30,10 @@ private class RabbitMQJavaConnectionImpl(scalaConnection: ScalaConnection[Future
}

override def newPullConsumer(configName: String, monitor: Monitor, executor: ExecutorService): RabbitMQPullConsumer = {
implicit val sch: SchedulerService = Scheduler(executor)

Await.result(
scalaConnection.newPullConsumer(configName, ScalaMonitor(monitor)).map(new DefaultRabbitMQPullConsumer(_, initTimeout)),
scalaConnection
.newPullConsumer(configName, ScalaMonitor(monitor))(DeliveryConverter.identity, ExecutionContext.fromExecutorService(executor))
.map(new DefaultRabbitMQPullConsumer(_, initTimeout)),
initTimeout
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
)

val properties = new AMQP.BasicProperties.Builder()
.contentType("application/octet-stream")
.build()

val body = Bytes.copyFromUtf8(Random.nextString(10))
Expand Down
20 changes: 10 additions & 10 deletions core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class LiveTest extends TestBase with ScalaFutures {
}
.await

val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).await
val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await

sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await

Expand Down Expand Up @@ -124,7 +124,7 @@ class LiveTest extends TestBase with ScalaFutures {
}
.await

val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).await
val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await

for (_ <- 1 to count) {
sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await
Expand All @@ -151,8 +151,8 @@ class LiveTest extends TestBase with ScalaFutures {
}
.await

val sender1 = rabbitConnection.newProducer("producer", Monitor.noOp()).await
val sender2 = rabbitConnection.newProducer("producer2", Monitor.noOp()).await
val sender1 = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await
val sender2 = rabbitConnection.newProducer[Bytes]("producer2", Monitor.noOp()).await

for (_ <- 1 to 10) {
sender1.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await
Expand Down Expand Up @@ -182,7 +182,7 @@ class LiveTest extends TestBase with ScalaFutures {
}
.await

val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).await
val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await

for (_ <- 1 to 10) {
sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await
Expand Down Expand Up @@ -210,7 +210,7 @@ class LiveTest extends TestBase with ScalaFutures {
}
.getOrElse(fail())

val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).getOrElse(fail())
val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).getOrElse(fail())

for (_ <- 1 to 10) {
sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).getOrElse(fail())
Expand All @@ -230,7 +230,7 @@ class LiveTest extends TestBase with ScalaFutures {

val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, ex).await.imapK[Try]

val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).getOrElse(fail())
val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).getOrElse(fail())

// additional declarations

Expand Down Expand Up @@ -281,7 +281,7 @@ class LiveTest extends TestBase with ScalaFutures {

rabbitConnection.newConsumer("consumer", Monitor.noOp())(h).await

val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).await
val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await

for (_ <- 1 to 10) {
sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await
Expand All @@ -302,7 +302,7 @@ class LiveTest extends TestBase with ScalaFutures {

val consumer = rabbitConnection.newPullConsumer[Bytes]("consumer", Monitor.noOp()).futureValue

val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).futureValue
val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).futureValue

for (_ <- 1 to 10) {
sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).futureValue
Expand Down Expand Up @@ -365,7 +365,7 @@ class LiveTest extends TestBase with ScalaFutures {
})
}

val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).futureValue
val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).futureValue

sender.send("test", Bytes.copyFromUtf8(randomString(10))).futureValue

Expand Down

0 comments on commit 077ed13

Please sign in to comment.