diff --git a/README.md b/README.md index 83cc0f24..bee1272b 100644 --- a/README.md +++ b/README.md @@ -215,7 +215,7 @@ val consumerAndProducer: Task[(RabbitMQConsumer[Task], RabbitMQProducer[Task, By Task.now(DeliveryResult.Reject) } - producer <- connection.newProducer("producer", monitor) + producer <- connection.newProducer[Bytes]("producer", monitor) } yield (consumer, producer) } @@ -232,7 +232,7 @@ val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) { Task.now(DeliveryResult.Reject) }.runSyncUnsafe(10.seconds) // RabbitMQConsumer[Task] -val sender = rabbitConnection.newProducer("producer", monitor).runSyncUnsafe(10.seconds) // RabbitMQProducer[Task, Bytes] +val sender = rabbitConnection.newProducer[Bytes]("producer", monitor).runSyncUnsafe(10.seconds) // RabbitMQProducer[Task, Bytes] sender.send(...).runAsync // because it's Task, don't forget to run it ;-) ``` @@ -254,7 +254,7 @@ val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) { Future.successful(DeliveryResult.Reject) }.await // RabbitMQConsumer[Future] -val sender = rabbitConnection.newProducer("producer", monitor).await // RabbitMQProducer[Future] +val sender = rabbitConnection.newProducer[Bytes]("producer", monitor).await // RabbitMQProducer[Future, Bytes] sender.send(...) // Future[Unit] ``` diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/converters.scala b/core/src/main/scala/com/avast/clients/rabbitmq/converters.scala index f5949a7c..666daa36 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/converters.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/converters.scala @@ -21,6 +21,16 @@ trait CheckedDeliveryConverter[A] extends DeliveryConverter[A] { object DeliveryConverter { implicit val identity: DeliveryConverter[Bytes] = (b: Bytes) => Right(b) + implicit val bytesArray: DeliveryConverter[Array[Byte]] = (b: Bytes) => Right(b.toByteArray) + implicit val utf8Text: CheckedDeliveryConverter[String] = new CheckedDeliveryConverter[String] { + override def canConvert(d: Delivery[Bytes]): Boolean = d.properties.contentType match { + case Some(contentType) => + val ct = contentType.toLowerCase + ct.startsWith("text") && (ct.contains("charset=utf-8") || ct.contains("charset=\"utf-8\"")) + case None => true + } + override def convert(b: Bytes): Either[ConversionException, String] = Right(b.toStringUtf8) + } } @implicitNotFound("Could not find ProductConverter for ${A}, try to import or define some") @@ -35,7 +45,23 @@ trait ProductConverter[A] { object ProductConverter { implicit val identity: ProductConverter[Bytes] = new ProductConverter[Bytes] { override def convert(p: Bytes): Either[ConversionException, Bytes] = Right(p) - - override def fillProperties(properties: MessageProperties): MessageProperties = properties + override def fillProperties(properties: MessageProperties): MessageProperties = properties.contentType match { + case None => properties.copy(contentType = Some("application/octet-stream")) + case _ => properties + } + } + implicit val bytesArray: ProductConverter[Array[Byte]] = new ProductConverter[Array[Byte]] { + override def convert(p: Array[Byte]): Either[ConversionException, Bytes] = Right(Bytes.copyFrom(p)) + override def fillProperties(properties: MessageProperties): MessageProperties = properties.contentType match { + case None => properties.copy(contentType = Some("application/octet-stream")) + case _ => properties + } + } + implicit val utf8Text: ProductConverter[String] = new ProductConverter[String] { + override def convert(p: String): Either[ConversionException, Bytes] = Right(Bytes.copyFromUtf8(p)) + override def fillProperties(properties: MessageProperties): MessageProperties = properties.contentType match { + case None => properties.copy(contentType = Some("text/plain; charset=utf-8")) + case _ => properties + } } } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQJavaConnectionImpl.scala b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQJavaConnectionImpl.scala index 4c1df68c..6306dd8b 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQJavaConnectionImpl.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQJavaConnectionImpl.scala @@ -5,7 +5,7 @@ import java.util.function import com.avast.bytes.Bytes import com.avast.clients.rabbitmq.javaapi.JavaConverters._ -import com.avast.clients.rabbitmq.{RabbitMQConnection => ScalaConnection} +import com.avast.clients.rabbitmq.{DeliveryConverter, RabbitMQConnection => ScalaConnection} import com.avast.metrics.api.Monitor import com.avast.metrics.scalaapi.{Monitor => ScalaMonitor} import monix.execution.Scheduler @@ -30,10 +30,10 @@ private class RabbitMQJavaConnectionImpl(scalaConnection: ScalaConnection[Future } override def newPullConsumer(configName: String, monitor: Monitor, executor: ExecutorService): RabbitMQPullConsumer = { - implicit val sch: SchedulerService = Scheduler(executor) - Await.result( - scalaConnection.newPullConsumer(configName, ScalaMonitor(monitor)).map(new DefaultRabbitMQPullConsumer(_, initTimeout)), + scalaConnection + .newPullConsumer(configName, ScalaMonitor(monitor))(DeliveryConverter.identity, ExecutionContext.fromExecutorService(executor)) + .map(new DefaultRabbitMQPullConsumer(_, initTimeout)), initTimeout ) } diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala index 3d72483c..6d72a83c 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala @@ -32,6 +32,7 @@ class DefaultRabbitMQProducerTest extends TestBase { ) val properties = new AMQP.BasicProperties.Builder() + .contentType("application/octet-stream") .build() val body = Bytes.copyFromUtf8(Random.nextString(10)) diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala index 9e35870f..d3d25fc1 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala @@ -85,7 +85,7 @@ class LiveTest extends TestBase with ScalaFutures { } .await - val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).await + val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await @@ -124,7 +124,7 @@ class LiveTest extends TestBase with ScalaFutures { } .await - val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).await + val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await for (_ <- 1 to count) { sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await @@ -151,8 +151,8 @@ class LiveTest extends TestBase with ScalaFutures { } .await - val sender1 = rabbitConnection.newProducer("producer", Monitor.noOp()).await - val sender2 = rabbitConnection.newProducer("producer2", Monitor.noOp()).await + val sender1 = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await + val sender2 = rabbitConnection.newProducer[Bytes]("producer2", Monitor.noOp()).await for (_ <- 1 to 10) { sender1.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await @@ -182,7 +182,7 @@ class LiveTest extends TestBase with ScalaFutures { } .await - val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).await + val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await for (_ <- 1 to 10) { sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await @@ -210,7 +210,7 @@ class LiveTest extends TestBase with ScalaFutures { } .getOrElse(fail()) - val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).getOrElse(fail()) + val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).getOrElse(fail()) for (_ <- 1 to 10) { sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).getOrElse(fail()) @@ -230,7 +230,7 @@ class LiveTest extends TestBase with ScalaFutures { val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, ex).await.imapK[Try] - val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).getOrElse(fail()) + val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).getOrElse(fail()) // additional declarations @@ -281,7 +281,7 @@ class LiveTest extends TestBase with ScalaFutures { rabbitConnection.newConsumer("consumer", Monitor.noOp())(h).await - val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).await + val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).await for (_ <- 1 to 10) { sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await @@ -302,7 +302,7 @@ class LiveTest extends TestBase with ScalaFutures { val consumer = rabbitConnection.newPullConsumer[Bytes]("consumer", Monitor.noOp()).futureValue - val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).futureValue + val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).futureValue for (_ <- 1 to 10) { sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).futureValue @@ -365,7 +365,7 @@ class LiveTest extends TestBase with ScalaFutures { }) } - val sender = rabbitConnection.newProducer("producer", Monitor.noOp()).futureValue + val sender = rabbitConnection.newProducer[Bytes]("producer", Monitor.noOp()).futureValue sender.send("test", Bytes.copyFromUtf8(randomString(10))).futureValue