diff --git a/src/main/kotlin/crablet/Crablet.kt b/src/main/kotlin/crablet/Crablet.kt index b8e07fc..0928b63 100644 --- a/src/main/kotlin/crablet/Crablet.kt +++ b/src/main/kotlin/crablet/Crablet.kt @@ -19,9 +19,9 @@ data class DomainIdentifier(val name: StateName, val id: StateId) { fun toStorageFormat(): String = this.name.value.plus("@").plus(this.id.value) } -data class StreamQuery(val identifiers: List, val eventTypes: List) +data class TransactionContext(val identifiers: List, val eventTypes: List) -data class AppendCondition(val query: StreamQuery, val maximumEventSequence: SequenceNumber) +data class AppendCondition(val transactionContext: TransactionContext, val expectedCurrentSequence: SequenceNumber) // write @@ -32,5 +32,6 @@ interface EventsAppender { // read interface StateBuilder { - fun buildFor(query: StreamQuery): Future> + fun buildFor(transactionContext: TransactionContext): Future> } + diff --git a/src/main/kotlin/crablet/postgres/CrabletEventsAppender.kt b/src/main/kotlin/crablet/postgres/CrabletEventsAppender.kt index d697b6b..0ba9a95 100644 --- a/src/main/kotlin/crablet/postgres/CrabletEventsAppender.kt +++ b/src/main/kotlin/crablet/postgres/CrabletEventsAppender.kt @@ -1,12 +1,17 @@ package crablet.postgres import crablet.AppendCondition +import crablet.DomainIdentifier +import crablet.EventName import crablet.EventsAppender import crablet.SequenceNumber import io.vertx.core.Future import io.vertx.core.Promise import io.vertx.core.json.JsonObject import io.vertx.sqlclient.Pool +import io.vertx.sqlclient.Row +import io.vertx.sqlclient.RowSet +import io.vertx.sqlclient.SqlConnection import io.vertx.sqlclient.Tuple class CrabletEventsAppender(private val client: Pool) : EventsAppender { @@ -15,36 +20,44 @@ class CrabletEventsAppender(private val client: Pool) : EventsAppender { val promise = Promise.promise() client.withTransaction { connection -> - val jsonArrayDomainIds = - appendCondition.query.identifiers.map { it.toStorageFormat() }.sorted().toTypedArray() - val jsonArrayEventTypes = appendCondition.query.eventTypes.map { it.value }.toTypedArray() - val jsonArrayEventPayloads = events.map { it.encode() }.toTypedArray() - - // Create a Tuple to pass as parameters - val params = Tuple.of( - jsonArrayDomainIds, - appendCondition.maximumEventSequence.value, - jsonArrayEventTypes, - jsonArrayEventPayloads - ) - - // Use prepared statement for SELECT query - val functionCall = "SELECT append_events($1, $2, $3, $4) AS last_sequence_id" - - // Execute the prepared query, it is in transaction due to the use of #withTransaction method. - connection.preparedQuery(functionCall) - .execute(params) - }.onSuccess { rowSet -> - if (rowSet.rowCount() == 1 && rowSet.first().getLong("last_sequence_id") != null) { - // Extract the result (last_sequence_id) from the first row - promise.complete(SequenceNumber(rowSet.first().getLong("last_sequence_id"))) - } else { - promise.fail("No last_sequence_id returned from append_events function") - } - }.onFailure { - promise.fail(it) - } + val params = prepareQueryParams(appendCondition, events) + executeQuery(connection, params) + }.onSuccess { rowSet -> processRowSet(rowSet, promise) } + .onFailure { throwable -> promise.fail(throwable) } + return promise.future() } + private fun prepareQueryParams(appendCondition: AppendCondition, events: List) = + Tuple.of( + identifiersToSortedArray(appendCondition.transactionContext.identifiers), + appendCondition.expectedCurrentSequence.value, + eventTypesToArray(appendCondition.transactionContext.eventTypes), + eventPayloadsToArray(events) + ) + + private fun identifiersToSortedArray(identifiers: List) = + identifiers.map(DomainIdentifier::toStorageFormat).sorted().toTypedArray() + + private fun eventTypesToArray(eventTypes: List) = eventTypes.map(EventName::value).toTypedArray() + + private fun eventPayloadsToArray(events: List) = events.map(JsonObject::encode).toTypedArray() + + private fun executeQuery(connection: SqlConnection, params: Tuple): Future> = + connection.preparedQuery("SELECT append_events($1, $2, $3, $4) AS $LAST_SEQUENCE_ID") + .execute(params) + + private fun processRowSet(rowSet: RowSet, promise: Promise) { + val firstRowSequenceId = rowSet.first().getLong(LAST_SEQUENCE_ID) + + if (rowSet.rowCount() == 1 && firstRowSequenceId != null) { + promise.complete(SequenceNumber(firstRowSequenceId)) + } else { + promise.fail("No last_sequence_id returned from append_events function") + } + } + + companion object { + const val LAST_SEQUENCE_ID = "last_sequence_id" + } } \ No newline at end of file diff --git a/src/main/kotlin/crablet/postgres/CrabletStateBuilder.kt b/src/main/kotlin/crablet/postgres/CrabletStateBuilder.kt index 633318b..98dd90a 100644 --- a/src/main/kotlin/crablet/postgres/CrabletStateBuilder.kt +++ b/src/main/kotlin/crablet/postgres/CrabletStateBuilder.kt @@ -2,7 +2,7 @@ package crablet.postgres import crablet.SequenceNumber import crablet.StateBuilder -import crablet.StreamQuery +import crablet.TransactionContext import io.vertx.core.Future import io.vertx.core.Promise import io.vertx.core.json.JsonObject @@ -19,24 +19,14 @@ class CrabletStateBuilder( private val pageSize: Int = 1000, ) : StateBuilder { - private fun sqlQuery(): String { - return """select event_payload, sequence_id - | from events - | where domain_ids @> $1::text[] - | and event_type = ANY($2) - | order by sequence_id - | - """.trimMargin() - } - override fun buildFor( - query: StreamQuery, + transactionContext: TransactionContext, ): Future> { val promise = Promise.promise>() val sql = sqlQuery() - val domainIds = query.identifiers.map { it.toStorageFormat() }.sorted().toTypedArray() - val eventTypes = query.eventTypes.map { it.value }.toTypedArray() + val domainIds = transactionContext.identifiers.map { it.toStorageFormat() }.sorted().toTypedArray() + val eventTypes = transactionContext.eventTypes.map { it.value }.toTypedArray() val tuple = Tuple.of(domainIds, eventTypes) var finalState = initialState var lastSequence = 0L @@ -87,6 +77,16 @@ class CrabletStateBuilder( return promise.future() } + private fun sqlQuery(): String { + return """select event_payload, sequence_id + | from events + | where domain_ids @> $1::text[] + | and event_type = ANY($2) + | order by sequence_id + | + """.trimMargin() + } + companion object { private val logger = LoggerFactory.getLogger(CrabletStateBuilder::class.java) } diff --git a/src/test/kotlin/crablet/lab/SchemaTest.kt b/src/test/kotlin/crablet/lab/jsonvalues/SchemaTest.kt similarity index 85% rename from src/test/kotlin/crablet/lab/SchemaTest.kt rename to src/test/kotlin/crablet/lab/jsonvalues/SchemaTest.kt index fecd7e3..bff047a 100644 --- a/src/test/kotlin/crablet/lab/SchemaTest.kt +++ b/src/test/kotlin/crablet/lab/jsonvalues/SchemaTest.kt @@ -1,13 +1,13 @@ -package crablet.lab - -import crablet.lab.CustomerEvents.CUSTOMER_ACTIVATED -import crablet.lab.CustomerEvents.CUSTOMER_DEACTIVATED -import crablet.lab.CustomerEvents.CUSTOMER_REGISTERED -import crablet.lab.CustomerEvents.CUSTOMER_RENAMED -import crablet.lab.CustomerEventsFields.ID -import crablet.lab.CustomerEventsFields.NAME -import crablet.lab.CustomerEventsFields.REASON -import crablet.lab.CustomerEventsFields.TYPE +package crablet.lab.jsonvalues + +import crablet.lab.jsonvalues.CustomerEvents.CUSTOMER_ACTIVATED +import crablet.lab.jsonvalues.CustomerEvents.CUSTOMER_DEACTIVATED +import crablet.lab.jsonvalues.CustomerEvents.CUSTOMER_REGISTERED +import crablet.lab.jsonvalues.CustomerEvents.CUSTOMER_RENAMED +import crablet.lab.jsonvalues.CustomerEventsFields.ID +import crablet.lab.jsonvalues.CustomerEventsFields.NAME +import crablet.lab.jsonvalues.CustomerEventsFields.REASON +import crablet.lab.jsonvalues.CustomerEventsFields.TYPE import `fun`.gen.Gen import io.vertx.core.json.JsonObject diff --git a/src/test/kotlin/crablet/quarkus/AccountTransferTest.kt b/src/test/kotlin/crablet/lab/quarkus/AccountTransferTest.kt similarity index 98% rename from src/test/kotlin/crablet/quarkus/AccountTransferTest.kt rename to src/test/kotlin/crablet/lab/quarkus/AccountTransferTest.kt index 3fa9a01..78bbf4e 100644 --- a/src/test/kotlin/crablet/quarkus/AccountTransferTest.kt +++ b/src/test/kotlin/crablet/lab/quarkus/AccountTransferTest.kt @@ -7,7 +7,7 @@ //import crablet.SequenceNumber //import crablet.StateId //import crablet.StateName -//import crablet.StreamQuery +//import crablet.TransactionContext //import crablet.postgres.AbstractCrabletTest //import crablet.postgres.AccountTransferScenarioTest //import crablet.postgres.AccountTransferScenarioTest.Companion @@ -56,7 +56,7 @@ // @Test // @Order(1) // fun `it can open Account 1 with $100`() { -// val streamQuery = StreamQuery( +// val streamQuery = TransactionContext( // identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("1"))), // eventTypes = eventTypes // ) diff --git a/src/test/kotlin/crablet/quarkus/CrabletDecoratedEventsAppender.kt b/src/test/kotlin/crablet/lab/quarkus/CrabletDecoratedEventsAppender.kt similarity index 100% rename from src/test/kotlin/crablet/quarkus/CrabletDecoratedEventsAppender.kt rename to src/test/kotlin/crablet/lab/quarkus/CrabletDecoratedEventsAppender.kt diff --git a/src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt b/src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt index e76b34e..efb078b 100644 --- a/src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt +++ b/src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt @@ -6,7 +6,7 @@ import crablet.EventName import crablet.SequenceNumber import crablet.StateId import crablet.StateName -import crablet.StreamQuery +import crablet.TransactionContext import io.kotest.matchers.ints.shouldBeExactly import io.kotest.matchers.longs.shouldBeExactly import io.kotest.matchers.shouldBe @@ -27,11 +27,12 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { @Test @Order(1) fun `it can open Account 1 with $100`(testContext: VertxTestContext) { - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("1"))), eventTypes = eventTypes ) - val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(0)) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(0)) val eventsToAppend = listOf( JsonObject().put("type", "AccountOpened").put("id", 1), JsonObject().put("type", "AmountDeposited").put("amount", 50), @@ -42,7 +43,7 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { dumpEvents() } .compose { - stateBuilder.buildFor(streamQuery) + stateBuilder.buildFor(transactionContext) } .onSuccess { (state, sequence): Pair -> testContext.verify { @@ -61,11 +62,12 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { @Test @Order(2) fun `it can open Account 2 with $0`(testContext: VertxTestContext) { - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("2"))), eventTypes = eventTypes ) - val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(0)) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(0)) val eventsToAppend = listOf( JsonObject().put("type", "AccountOpened").put("id", 2) ) @@ -74,7 +76,7 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { dumpEvents() } .compose { - stateBuilder.buildFor(streamQuery) + stateBuilder.buildFor(transactionContext) } .onSuccess { (state, sequence): Pair -> testContext.verify { @@ -96,11 +98,12 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { DomainIdentifier(name = StateName("Account"), id = StateId("1")), DomainIdentifier(name = StateName("Account"), id = StateId("2")) ) - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = domainIdentifiers, eventTypes = eventTypes ) - val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(3)) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(3)) val eventsToAppend = listOf( JsonObject().put("type", "AmountTransferred").put("fromAcct", 1).put("toAcct", 2).put("amount", 30) ) @@ -113,8 +116,9 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { val domainIdentifiersAcct1 = listOf( DomainIdentifier(name = StateName("Account"), id = StateId("1")) ) - val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) - stateBuilder.buildFor(streamQueryAcct1) + val transactionContextAcct1 = + TransactionContext(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) + stateBuilder.buildFor(transactionContextAcct1) .onSuccess { (state, sequence) -> testContext.verify { sequence.value shouldBeExactly 5L @@ -131,8 +135,9 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { val domainIdentifiersAcct2 = listOf( DomainIdentifier(name = StateName("Account"), id = StateId("2")) ) - val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) - stateBuilder.buildFor(streamQueryAcct2) + val transactionContextAcct2 = + TransactionContext(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) + stateBuilder.buildFor(transactionContextAcct2) .onSuccess { (state, sequence) -> testContext.verify { sequence.value shouldBeExactly 5L @@ -159,11 +164,12 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { DomainIdentifier(name = StateName("Account"), id = StateId("1")), DomainIdentifier(name = StateName("Account"), id = StateId("2")) ) - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = domainIdentifiers, eventTypes = eventTypes ) - val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(5)) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(5)) val eventsToAppend = listOf( JsonObject().put("type", "AmountTransferred").put("fromAcct", 2).put("toAcct", 1).put("amount", 10) ) @@ -176,8 +182,9 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { val domainIdentifiersAcct1 = listOf( DomainIdentifier(name = StateName("Account"), id = StateId("1")) ) - val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) - stateBuilder.buildFor(streamQueryAcct1) + val transactionContextAcct1 = + TransactionContext(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) + stateBuilder.buildFor(transactionContextAcct1) .onSuccess { (state, sequence) -> testContext.verify { sequence.value shouldBeExactly 6L @@ -194,8 +201,9 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { val domainIdentifiersAcct2 = listOf( DomainIdentifier(name = StateName("Account"), id = StateId("2")) ) - val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) - stateBuilder.buildFor(streamQueryAcct2) + val transactionContextAcct2 = + TransactionContext(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) + stateBuilder.buildFor(transactionContextAcct2) .onSuccess { (state, sequence) -> testContext.verify { sequence.value shouldBeExactly 6L @@ -222,11 +230,12 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { DomainIdentifier(name = StateName("Account"), id = StateId("1")), DomainIdentifier(name = StateName("Account"), id = StateId("2")) ) - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = domainIdentifiers, eventTypes = eventTypes ) - val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(6)) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(6)) val eventsToAppend = listOf( JsonObject().put("type", "AmountTransferred").put("fromAcct", 2).put("toAcct", 1).put("amount", 1) ) @@ -239,8 +248,9 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { val domainIdentifiersAcct1 = listOf( DomainIdentifier(name = StateName("Account"), id = StateId("1")) ) - val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) - stateBuilder.buildFor(streamQueryAcct1) + val transactionContextAcct1 = + TransactionContext(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) + stateBuilder.buildFor(transactionContextAcct1) .onSuccess { (state, sequence) -> testContext.verify { sequence.value shouldBeExactly 7L @@ -257,8 +267,9 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { val domainIdentifiersAcct2 = listOf( DomainIdentifier(name = StateName("Account"), id = StateId("2")) ) - val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) - stateBuilder.buildFor(streamQueryAcct2) + val transactionContextAcct2 = + TransactionContext(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) + stateBuilder.buildFor(transactionContextAcct2) .onSuccess { (state, sequence) -> testContext.verify { sequence.value shouldBeExactly 7L @@ -285,11 +296,12 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { DomainIdentifier(name = StateName("Account"), id = StateId("1")), DomainIdentifier(name = StateName("Account"), id = StateId("2")) ) - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = domainIdentifiers, eventTypes = eventTypes ) - val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(7)) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(7)) val eventsToAppend = listOf( JsonObject().put("type", "AmountTransferred").put("fromAcct", 1).put("toAcct", 2).put("amount", 1) ) @@ -302,8 +314,9 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { val domainIdentifiersAcct1 = listOf( DomainIdentifier(name = StateName("Account"), id = StateId("1")) ) - val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) - stateBuilder.buildFor(streamQueryAcct1) + val transactionContextAcct1 = + TransactionContext(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) + stateBuilder.buildFor(transactionContextAcct1) .onSuccess { (state, sequence) -> testContext.verify { sequence.value shouldBeExactly 8L @@ -320,8 +333,9 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { val domainIdentifiersAcct2 = listOf( DomainIdentifier(name = StateName("Account"), id = StateId("2")) ) - val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) - stateBuilder.buildFor(streamQueryAcct2) + val transactionContextAcct2 = + TransactionContext(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) + stateBuilder.buildFor(transactionContextAcct2) .onSuccess { (state, sequence) -> testContext.verify { sequence.value shouldBeExactly 8L @@ -344,11 +358,11 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { @Test @Order(7) fun `Account 1 state is correct`(testContext: VertxTestContext) { - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("1"))), eventTypes = eventTypes ) - stateBuilder.buildFor(streamQuery) + stateBuilder.buildFor(transactionContext) .onSuccess { (state, sequence): Pair -> testContext.verify { sequence.value shouldBeExactly 8L @@ -365,11 +379,11 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { @Test @Order(8) fun `Account 2 state is correct`(testContext: VertxTestContext) { - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("2"))), eventTypes = eventTypes ) - stateBuilder.buildFor(streamQuery) + stateBuilder.buildFor(transactionContext) .onSuccess { (state, sequence): Pair -> testContext.verify { sequence.value shouldBeExactly 8L diff --git a/src/test/kotlin/crablet/postgres/CausationCorrelationIdsTest.kt b/src/test/kotlin/crablet/postgres/CausationCorrelationIdsTest.kt index 22fc732..4362a32 100644 --- a/src/test/kotlin/crablet/postgres/CausationCorrelationIdsTest.kt +++ b/src/test/kotlin/crablet/postgres/CausationCorrelationIdsTest.kt @@ -6,7 +6,7 @@ import crablet.EventName import crablet.SequenceNumber import crablet.StateId import crablet.StateName -import crablet.StreamQuery +import crablet.TransactionContext import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.ints.shouldBeExactly import io.vertx.core.json.JsonObject @@ -28,11 +28,12 @@ class CausationCorrelationIdsTest : AbstractCrabletTest() { fun `it can open Account 1 with correct IDs`(testContext: VertxTestContext) { val testRepository = TestRepository(pool) - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("1"))), eventTypes = eventTypes ) - val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(0)) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(0)) val eventsToAppend = listOf( JsonObject().put("type", "AccountOpened").put("id", 1), JsonObject().put("type", "AmountDeposited").put("amount", 10), @@ -72,11 +73,12 @@ class CausationCorrelationIdsTest : AbstractCrabletTest() { fun `it can open Account 2 with correct IDs`(testContext: VertxTestContext) { val testRepository = TestRepository(pool) - val streamQuery = StreamQuery( + val transactionContext = TransactionContext( identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("2"))), eventTypes = eventTypes ) - val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(0)) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(0)) val eventsToAppend = listOf( JsonObject().put("type", "AccountOpened").put("id", 2), JsonObject().put("type", "AmountDeposited").put("amount", 10), diff --git a/src/test/kotlin/crablet/postgres/OptimisticLockingErrorTest.kt b/src/test/kotlin/crablet/postgres/OptimisticLockingErrorTest.kt new file mode 100644 index 0000000..e1837b3 --- /dev/null +++ b/src/test/kotlin/crablet/postgres/OptimisticLockingErrorTest.kt @@ -0,0 +1,147 @@ +package crablet.postgres + +import crablet.AppendCondition +import crablet.DomainIdentifier +import crablet.EventName +import crablet.SequenceNumber +import crablet.StateId +import crablet.StateName +import crablet.TransactionContext +import io.kotest.matchers.ints.shouldBeExactly +import io.kotest.matchers.longs.shouldBeExactly +import io.kotest.matchers.shouldBe +import io.vertx.core.json.JsonObject +import io.vertx.junit5.VertxExtension +import io.vertx.junit5.VertxTestContext +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.MethodOrderer +import org.junit.jupiter.api.Order +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestMethodOrder +import org.junit.jupiter.api.extension.ExtendWith + +@ExtendWith(VertxExtension::class) +@TestMethodOrder(MethodOrderer.OrderAnnotation::class) +class OptimisticLockingErrorTest : AbstractCrabletTest() { + + @Test + @Order(1) + fun `it can open Account 1 with $100`(testContext: VertxTestContext) { + val transactionContext = TransactionContext( + identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("1"))), + eventTypes = eventTypes + ) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(0)) + val eventsToAppend = listOf( + JsonObject().put("type", "AccountOpened").put("id", 1), + JsonObject().put("type", "AmountDeposited").put("amount", 50), + JsonObject().put("type", "AmountDeposited").put("amount", 50) + ) + eventsAppender.appendIf(eventsToAppend, appendCondition) + .compose { + dumpEvents() + } + .compose { + stateBuilder.buildFor(transactionContext) + } + .onSuccess { (state, sequence): Pair -> + testContext.verify { + sequence.value shouldBeExactly 3L + state.id shouldBe 1 + state.balance shouldBeExactly 100 + } + testContext.completeNow() + } + .onFailure { it -> + testContext.failNow(it) + } + + } + + @Test + @Order(2) + fun `it will fail if expectedCurrentSequence does not match`(testContext: VertxTestContext) { + val transactionContext = TransactionContext( + identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("1"))), + eventTypes = eventTypes + ) + val appendCondition = + AppendCondition(transactionContext = transactionContext, expectedCurrentSequence = SequenceNumber(2)) + val eventsToAppend = listOf( + JsonObject().put("type", "AmountDeposited").put("amount", 60) + ) + eventsAppender.appendIf(eventsToAppend, appendCondition) + .onSuccess { + testContext.failNow("It should fail") + } + .onFailure { + testContext.completeNow() + } + } + + @Test + @Order(8) + fun `Account 1 state is intact`(testContext: VertxTestContext) { + val transactionContext = TransactionContext( + identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("1"))), + eventTypes = eventTypes + ) + stateBuilder.buildFor(transactionContext) + .onSuccess { (state, sequence): Pair -> + testContext.verify { + sequence.value shouldBeExactly 3L + state.id shouldBe 1 + state.balance shouldBeExactly 100 + } + testContext.completeNow() + } + .onFailure { it -> + testContext.failNow(it) + } + } + + companion object { + lateinit var eventsAppender: CrabletEventsAppender + lateinit var stateBuilder: CrabletStateBuilder + + data class Account(val id: Int? = null, val balance: Int = 0) + + val eventTypes = listOf("AccountOpened", "AmountDeposited", "AmountTransferred").map { EventName(it) } + + private val evolveFunction: (Account, JsonObject) -> Account = { state, event -> + when (event.getString("type")) { + "AccountOpened" -> state.copy(id = event.getInteger("id")) + "AmountDeposited" -> state.copy(balance = state.balance.plus(event.getInteger("amount"))) + "AmountTransferred" -> { + when { + event.getInteger("fromAcct") == state.id -> state.copy( + balance = state.balance.minus(event.getInteger("amount")) + ) + + event.getInteger("toAcct") == state.id -> state.copy( + balance = state.balance.plus(event.getInteger("amount")) + ) + + else -> state + } + } + + else -> state + } + } + + @BeforeAll + @JvmStatic + fun setUp(testContext: VertxTestContext) { + eventsAppender = CrabletEventsAppender(pool) + stateBuilder = CrabletStateBuilder( + client = pool, + initialState = Account(), + evolveFunction = evolveFunction + ) + cleanDatabase().onSuccess { testContext.completeNow() } + } + } + +} \ No newline at end of file