Skip to content

Commit

Permalink
Improved tests to be more transparent and to detect #6 bug
Browse files Browse the repository at this point in the history
  • Loading branch information
jendakol committed Nov 25, 2019
1 parent 3774a92 commit 0790a93
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 42 deletions.
55 changes: 36 additions & 19 deletions core/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ myConfig {
testing {
name = "Testing"

queueName = "test"
queueName = "QUEUE1"

declare {
enabled = true
Expand All @@ -28,7 +28,7 @@ myConfig {
routingKeys = ["test"]

exchange {
name = "myclient"
name = "EXCHANGE1"

declare {
enabled = true
Expand All @@ -40,7 +40,7 @@ myConfig {
routingKeys = ["test2"]

exchange {
name = "myclient2"
name = "EXCHANGE2"

declare {
enabled = true
Expand All @@ -55,7 +55,7 @@ myConfig {
testingPull {
name = "Testing"

queueName = "test"
queueName = "QUEUE1"

declare {
enabled = true
Expand All @@ -66,7 +66,7 @@ myConfig {
routingKeys = ["test"]

exchange {
name = "myclient"
name = "EXCHANGE1"

declare {
enabled = true
Expand All @@ -78,7 +78,7 @@ myConfig {
routingKeys = ["test2"]

exchange {
name = "myclient2"
name = "EXCHANGE2"

declare {
enabled = true
Expand All @@ -95,49 +95,66 @@ myConfig {
testing {
name = "Testing"

exchange = "myclient"
exchange = "EXCHANGE1"

declare {
enabled = true

type = "direct" //fanout, topic
type = "direct"
}
}

testing2 {
name = "Testing2"

exchange = "myclient2"
exchange = "EXCHANGE2"

declare {
enabled = true

type = "direct" //fanout, topic
type = "direct"
}
}

testing3 {
name = "Testing3"

exchange = "EXCHANGE4"

declare {
enabled = true

type = "direct"
}
}
}

declarations {
declareExchange {
name = "myclient3"
name = "EXCHANGE3"
type = "direct"
}

declareQueue {
name = "test2"
bindExchange1 {
sourceExchangeName = "EXCHANGE4"
routingKeys = ["test"]
destExchangeName = "EXCHANGE3"
}

bindExchange {
sourceExchangeName = "myclient2"
destExchangeName = "myclient3"
bindExchange2 {
sourceExchangeName = "EXCHANGE4"
routingKeys = ["test"]
destExchangeName = "EXCHANGE1"
}

declareQueue {
name = "QUEUE2"
}

bindQueue {
queueName = "test2"
queueName = "QUEUE2"
routingKeys = ["test"]
exchangeName = "myclient3"
exchangeName = "EXCHANGE3"
}
}

}
59 changes: 36 additions & 23 deletions core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ class LiveTest extends TestBase with ScalaFutures {

private def createConfig() = new {

val queueName: String = randomString(10)
val exchange1: String = randomString(10)
val exchange2: String = randomString(10)

val queueName2: String = randomString(10)
val exchange3: String = randomString(10)
val queueName1: String = randomString(4) + "_QU1"
val queueName2: String = randomString(4) + "_QU2"
val exchange1: String = randomString(4) + "_EX1"
val exchange2: String = randomString(4) + "_EX2"
val exchange3: String = randomString(4) + "_EX3"
val exchange4: String = randomString(4) + "_EX4"

private val original = ConfigFactory.load().getConfig("myConfig")

Expand All @@ -42,16 +42,19 @@ class LiveTest extends TestBase with ScalaFutures {
bindConfigs(1) = bindConfigs(1).withValue("exchange.name", ConfigValueFactory.fromAnyRef(exchange2))

val config: Config = original
.withValue("consumers.testing.queueName", ConfigValueFactory.fromAnyRef(queueName))
.withValue("consumers.testing.queueName", ConfigValueFactory.fromAnyRef(queueName1))
.withValue("consumers.testing.processTimeout", ConfigValueFactory.fromAnyRef("500ms"))
.withValue("consumers.testing.bindings", ConfigValueFactory.fromIterable(bindConfigs.toSeq.map(_.root()).asJava))
.withValue("consumers.testingPull.queueName", ConfigValueFactory.fromAnyRef(queueName))
.withValue("consumers.testingPull.queueName", ConfigValueFactory.fromAnyRef(queueName1))
.withValue("consumers.testingPull.bindings", ConfigValueFactory.fromIterable(bindConfigs.toSeq.map(_.root()).asJava))
.withValue("producers.testing.exchange", ConfigValueFactory.fromAnyRef(exchange1))
.withValue("producers.testing2.exchange", ConfigValueFactory.fromAnyRef(exchange2))
.withValue("producers.testing3.exchange", ConfigValueFactory.fromAnyRef(exchange4))
.withValue("declarations.declareExchange.name", ConfigValueFactory.fromAnyRef(exchange3))
.withValue("declarations.bindExchange.sourceExchangeName", ConfigValueFactory.fromAnyRef(exchange1))
.withValue("declarations.bindExchange.destExchangeName", ConfigValueFactory.fromAnyRef(exchange3))
.withValue("declarations.bindExchange1.sourceExchangeName", ConfigValueFactory.fromAnyRef(exchange4))
.withValue("declarations.bindExchange1.destExchangeName", ConfigValueFactory.fromAnyRef(exchange3))
.withValue("declarations.bindExchange2.sourceExchangeName", ConfigValueFactory.fromAnyRef(exchange4))
.withValue("declarations.bindExchange2.destExchangeName", ConfigValueFactory.fromAnyRef(exchange1))
.withValue("declarations.declareQueue.name", ConfigValueFactory.fromAnyRef(queueName2))
.withValue("declarations.bindQueue.exchangeName", ConfigValueFactory.fromAnyRef(exchange3))
.withValue("declarations.bindQueue.queueName", ConfigValueFactory.fromAnyRef(queueName2))
Expand Down Expand Up @@ -92,7 +95,7 @@ class LiveTest extends TestBase with ScalaFutures {

eventually {
assertResult(1)(counter.get())
assertResult(0)(testHelper.getMessagesCount(queueName))
assertResult(0)(testHelper.getMessagesCount(queueName1))
}
}
}
Expand Down Expand Up @@ -131,7 +134,7 @@ class LiveTest extends TestBase with ScalaFutures {

eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) {
assertResult(true)(latch.await(1000, TimeUnit.MILLISECONDS))
assertResult(0)(testHelper.getMessagesCount(queueName))
assertResult(0)(testHelper.getMessagesCount(queueName1))
}
}
}
Expand Down Expand Up @@ -159,7 +162,7 @@ class LiveTest extends TestBase with ScalaFutures {
}

assertResult(true, latch.getCount)(latch.await(1000, TimeUnit.MILLISECONDS))
assertResult(0)(testHelper.getMessagesCount(queueName))
assertResult(0)(testHelper.getMessagesCount(queueName1))
}
}
}
Expand Down Expand Up @@ -189,7 +192,7 @@ class LiveTest extends TestBase with ScalaFutures {

eventually(timeout(Span(3, Seconds)), interval(Span(0.25, Seconds))) {
assert(cnt.get() >= 40)
assert(testHelper.getMessagesCount(queueName) <= 20)
assert(testHelper.getMessagesCount(queueName1) <= 20)
}
}
}
Expand Down Expand Up @@ -220,7 +223,7 @@ class LiveTest extends TestBase with ScalaFutures {

eventually(timeout(Span(5, Seconds)), interval(Span(0.25, Seconds))) {
assert(cnt.get() >= 40)
assert(testHelper.getMessagesCount(queueName) <= 20)
assert(testHelper.getMessagesCount(queueName1) <= 20)
}
}
}
Expand All @@ -231,6 +234,12 @@ class LiveTest extends TestBase with ScalaFutures {
val c = createConfig()
import c._

/*
-- > EXCHANGE4 ---(test) --> EXCHANGE3 --(test)--> QUEUE2
|
|--(test) --> EXCHANGE1 --(test)--> QUEUE1
*/

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

Expand All @@ -243,24 +252,28 @@ class LiveTest extends TestBase with ScalaFutures {
}

cons.withResource { _ =>
rabbitConnection.newProducer[Bytes]("testing", Monitor.noOp()).withResource { sender =>
rabbitConnection.newProducer[Bytes]("testing3", Monitor.noOp()).withResource { sender =>
// additional declarations

(for { // the order consumer -> producer -> declarations is required!
_ <- rabbitConnection.declareExchange("declareExchange")
_ <- rabbitConnection.bindExchange("bindExchange")
_ <- rabbitConnection.bindExchange("bindExchange1")
_ <- rabbitConnection.bindExchange("bindExchange2")
_ <- rabbitConnection.declareQueue("declareQueue")
_ <- rabbitConnection.bindQueue("bindQueue")
} yield ()).unsafeRunSync()

assertResult(0)(testHelper.getMessagesCount(queueName1))
assertResult(0)(testHelper.getMessagesCount(queueName2))

for (_ <- 1 to 10) {
sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).unsafeRunSync()
}

eventually(timeout(Span(2, Seconds)), interval(Span(200, Milliseconds))) {
assertResult(true)(latch.await(500, TimeUnit.MILLISECONDS))

assertResult(0)(testHelper.getMessagesCount(queueName))
assertResult(0)(testHelper.getMessagesCount(queueName1))
assertResult(10)(testHelper.getMessagesCount(queueName2))
}
}
Expand Down Expand Up @@ -296,7 +309,7 @@ class LiveTest extends TestBase with ScalaFutures {

eventually(timeout(Span(2, Seconds)), interval(Span(0.25, Seconds))) {
assertResult(20)(processed.get())
assertResult(0)(testHelper.getMessagesCount(queueName))
assertResult(0)(testHelper.getMessagesCount(queueName1))
assertResult(10)(poisoned.get())
}
}
Expand All @@ -321,7 +334,7 @@ class LiveTest extends TestBase with ScalaFutures {
}

eventually(timeout = timeout(Span(5, Seconds))) {
assertResult(10)(testHelper.getMessagesCount(queueName))
assertResult(10)(testHelper.getMessagesCount(queueName1))
}

for (_ <- 1 to 3) {
Expand All @@ -330,7 +343,7 @@ class LiveTest extends TestBase with ScalaFutures {
}

eventually(timeout = timeout(Span(5, Seconds))) {
assertResult(7)(testHelper.getMessagesCount(queueName))
assertResult(7)(testHelper.getMessagesCount(queueName1))
}

for (_ <- 1 to 7) {
Expand All @@ -339,7 +352,7 @@ class LiveTest extends TestBase with ScalaFutures {
}

eventually(timeout = timeout(Span(5, Seconds))) {
assertResult(0)(testHelper.getMessagesCount(queueName))
assertResult(0)(testHelper.getMessagesCount(queueName1))
}

for (_ <- 1 to 10) {
Expand Down Expand Up @@ -388,7 +401,7 @@ class LiveTest extends TestBase with ScalaFutures {
sender.send("test", Bytes.copyFromUtf8(randomString(10))).unsafeRunSync()

eventually(timeout = timeout(Span(5, Seconds))) {
assertResult(0)(testHelper.getMessagesCount(queueName))
assertResult(0)(testHelper.getMessagesCount(queueName1))

assertResult(0)(processing.get())
assertResult(4)(parsingFailures.get())
Expand Down

0 comments on commit 0790a93

Please sign in to comment.