Skip to content

Commit

Permalink
AccountTransferScenarioTest and bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodolfo committed Dec 4, 2024
1 parent 3e5a2c4 commit 680444a
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 176 deletions.
16 changes: 8 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-pg-client</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.quarkus</groupId>-->
<!-- <artifactId>quarkus-reactive-routes</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.quarkus</groupId>-->
<!-- <artifactId>quarkus-arc</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.quarkus</groupId>-->
<!-- <artifactId>quarkus-reactive-routes</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.quarkus</groupId>-->
<!-- <artifactId>quarkus-arc</artifactId>-->
<!-- </dependency>-->

<dependency>
<groupId>io.quarkus</groupId>
Expand Down
5 changes: 3 additions & 2 deletions src/main/resources/ddl/2-append_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ BEGIN

-- Ensure the provided _based_sequence_id matches the current sequence or is null (allowing the first insert)
IF currentLastSequence IS NULL OR currentLastSequence = _based_sequence_id THEN
-- Loop through the event types and payloads
FOR i IN 1 .. array_length(_event_types, 1)
-- Loop through the event payloads
FOR i IN 1 .. array_length(_event_payloads, 1)
LOOP
event_payload := _event_payloads[i]::json;
event_type := event_payload ->> 'type';
-- RAISE NOTICE '----------> will append event_type: %', event_type;

-- Insert the event into the database with the appropriate causation and correlation IDs
INSERT INTO events (event_type, event_payload, domain_ids, causation_id, correlation_id)
Expand Down
19 changes: 19 additions & 0 deletions src/test/kotlin/crablet/postgres/AbstractCrabletTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package crablet.postgres

import io.vertx.core.Future
import io.vertx.pgclient.PgConnectOptions
import io.vertx.sqlclient.Pool
import io.vertx.sqlclient.PoolOptions
Expand Down Expand Up @@ -48,6 +49,24 @@ abstract class AbstractCrabletTest {
val poolOptions = PoolOptions().setMaxSize(5)
Pool.pool(connectOptions, poolOptions)
}

fun cleanDatabase(): Future<Void> {
return pool.query("TRUNCATE TABLE events").execute()
.compose {
pool.query("ALTER SEQUENCE events_sequence_id_seq RESTART WITH 1").execute()
}.mapEmpty()
}

fun dumpEvents(): Future<Void> {
return pool.query("select * from events order by sequence_id").execute()
.onSuccess { rs ->
println("Events -------------")
rs.forEach {
println(it.toJson())
}
}
.mapEmpty()
}
}

}
197 changes: 197 additions & 0 deletions src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package crablet.postgres

import crablet.AppendCondition
import crablet.DomainIdentifier
import crablet.EventName
import crablet.SequenceNumber
import crablet.StateId
import crablet.StateName
import crablet.StreamQuery
import io.vertx.core.json.JsonObject
import io.vertx.junit5.VertxExtension
import io.vertx.junit5.VertxTestContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.MethodOrderer
import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestMethodOrder
import org.junit.jupiter.api.extension.ExtendWith

@ExtendWith(VertxExtension::class)
@TestMethodOrder(MethodOrderer.OrderAnnotation::class)
class AccountTransferScenarioTest : AbstractCrabletTest() {

data class Account(val id: String? = null, val balance: Int = 0)

val eventTypes = listOf("AccountOpened", "AmountDeposited", "AmountTransferred").map { EventName(it) }

@Test
@Order(1)
fun `it can open Account 1 with $100`(testContext: VertxTestContext) {
val domainIdentifiers = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1"))
)
val streamQuery = StreamQuery(
identifiers = domainIdentifiers,
eventTypes = eventTypes
)
val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(0))
val eventsToAppend = listOf(
JsonObject().put("type", "AccountOpened").put("id", "1"),
JsonObject().put("type", "AmountDeposited").put("amount", 100)
)
eventsAppender.appendIf(eventsToAppend, appendCondition)
.compose {
dumpEvents()
}
.compose {
stateBuilder.buildFor(streamQuery)
}
.onSuccess { (state, sequence): Pair<Account, SequenceNumber> ->
testContext.verify {
assertEquals(2, sequence.value)
assertEquals("1", state.id)
assertEquals(100, state.balance)
}
testContext.completeNow()
}
.onFailure { it ->
testContext.failNow(it)
}

}

@Test
@Order(2)
fun `it can open Account 2 with $0`(testContext: VertxTestContext) {
val domainIdentifiers = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQuery = StreamQuery(
identifiers = domainIdentifiers,
eventTypes = eventTypes
)
val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(0))
val eventsToAppend = listOf(
JsonObject().put("type", "AccountOpened").put("id", "2"),
JsonObject().put("type", "AmountDeposited").put("amount", 0)
)
eventsAppender.appendIf(eventsToAppend, appendCondition)
.compose {
dumpEvents()
}
.compose {
stateBuilder.buildFor(streamQuery)
}
.onSuccess { (state, sequence): Pair<Account, SequenceNumber> ->
testContext.verify {
assertEquals(4, sequence.value)
assertEquals("2", state.id)
assertEquals(0, state.balance)
}
testContext.completeNow()
}
.onFailure { it ->
testContext.failNow(it)
}
}

@Test
@Order(3)
fun `it can transfer $30 from Account 1 to Account 2 within the same db transaction`(testContext: VertxTestContext) {
val domainIdentifiers = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1")),
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQuery = StreamQuery(
identifiers = domainIdentifiers,
eventTypes = eventTypes
)
val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(4))
val eventsToAppend = listOf(
JsonObject().put("type", "AmountTransferred").put("fromAcct", 1).put("toAcct", 2).put("amount", 30)
)
eventsAppender.appendIf(eventsToAppend, appendCondition)
.compose {
dumpEvents()
}
.compose {
// assert acct1 state
val domainIdentifiersAcct1 = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1"))
)
val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes)
stateBuilder.buildFor(streamQueryAcct1)
.onSuccess { (state, sequence) ->
testContext.verify {
assertEquals(5, sequence.value)
assertEquals("1", state.id)
assertEquals(70, state.balance)
}
}
}
.compose {
// assert acct2 state
val domainIdentifiersAcct1 = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes)
stateBuilder.buildFor(streamQueryAcct1)
.onSuccess { (state, sequence) ->
testContext.verify {
assertEquals(5, sequence.value)
assertEquals("2", state.id)
assertEquals(30, state.balance)
}
}
}
.onSuccess {
testContext.completeNow()
}
.onFailure {
testContext.failNow(it)
}
}

companion object {
lateinit var eventsAppender: CrabletEventsAppender
lateinit var stateBuilder: CrabletStateBuilder<Account>

@BeforeAll
@JvmStatic
fun setUp(testContext: VertxTestContext) {
cleanDatabase()
eventsAppender = CrabletEventsAppender(pool)
stateBuilder = CrabletStateBuilder(
client = pool,
initialState = Account(),
evolveFunction = { state, event ->
when (event.getString("type")) {
"AccountOpened" -> state.copy(id = event.getString("id"))
"AmountDeposited" -> state.copy(balance = state.balance.plus(event.getInteger("amount")))
"AmountTransferred" -> {
when {
event.getString("fromAcct") == state.id -> state.copy(
balance = state.balance.minus(
event.getInteger("amount")
)
)
event.getString("toAcct") == state.id -> state.copy(
balance = state.balance.plus(
event.getInteger(
"amount"
)
)
)
else -> state
}
}
else -> state
}
})
testContext.completeNow()
}
}

}
82 changes: 0 additions & 82 deletions src/test/kotlin/crablet/postgres/Scenario1TestAbstract.kt

This file was deleted.

Loading

0 comments on commit 680444a

Please sign in to comment.