From 816da502e851a5b671301dd34a782d5b407f4f68 Mon Sep 17 00:00:00 2001 From: Rodolfo Date: Thu, 5 Dec 2024 02:00:44 -0300 Subject: [PATCH] added tests --- src/main/resources/ddl/2-append_events.sql | 6 +- .../postgres/AccountTransferScenarioTest.kt | 189 ++++++++++++++++++ 2 files changed, 194 insertions(+), 1 deletion(-) diff --git a/src/main/resources/ddl/2-append_events.sql b/src/main/resources/ddl/2-append_events.sql index fa919ce..3177c1d 100644 --- a/src/main/resources/ddl/2-append_events.sql +++ b/src/main/resources/ddl/2-append_events.sql @@ -38,7 +38,11 @@ BEGIN correlation_id := NULL; -- Correlation will be set after the first insert ELSE causation_id := currentLastSequence; -- For subsequent events, causation_id points to the previous event's sequence_id - correlation_id := previousEventRow.correlation_id; -- Same correlation_id as the last event + IF previousEventRow.correlation_id IS NULL THEN + correlation_id := causation_id; + ELSE + correlation_id := previousEventRow.correlation_id; -- Same correlation_id as the last event + END IF; END IF; -- Lock the transaction using the correlation_id to prevent conflicts diff --git a/src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt b/src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt index 0bf95be..90e747c 100644 --- a/src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt +++ b/src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt @@ -150,6 +150,195 @@ class AccountTransferScenarioTest : AbstractCrabletTest() { } } + @Test + @Order(4) + fun `it can transfer $10 from Account 2 to Account 1 within the same db transaction`(testContext: VertxTestContext) { + val domainIdentifiers = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("1")), + DomainIdentifier(name = StateName("Account"), id = StateId("2")) + ) + val streamQuery = StreamQuery( + identifiers = domainIdentifiers, + eventTypes = eventTypes + ) + val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(5)) + val eventsToAppend = listOf( + JsonObject().put("type", "AmountTransferred").put("fromAcct", 2).put("toAcct", 1).put("amount", 10) + ) + eventsAppender.appendIf(eventsToAppend, appendCondition) + .compose { + dumpEvents() + } + .compose { + // assert acct1 state + val domainIdentifiersAcct1 = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("1")) + ) + val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) + stateBuilder.buildFor(streamQueryAcct1) + .onSuccess { (state, sequence) -> + testContext.verify { + assertEquals(6, sequence.value) + assertEquals("1", state.id) + assertEquals(80, state.balance) + } + } + .onFailure { + testContext.failNow(it) + } + } + .compose { + // assert acct2 state + val domainIdentifiersAcct2 = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("2")) + ) + val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) + stateBuilder.buildFor(streamQueryAcct2) + .onSuccess { (state, sequence) -> + testContext.verify { + assertEquals(6, sequence.value) + assertEquals("2", state.id) + assertEquals(20, state.balance) + } + } + .onFailure { + testContext.failNow(it) + } + } + .onSuccess { + testContext.completeNow() + } + .onFailure { + testContext.failNow(it) + } + } + + @Test + @Order(5) + fun `it can transfer $1 from Account 2 to Account 1 within the same db transaction`(testContext: VertxTestContext) { + val domainIdentifiers = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("1")), + DomainIdentifier(name = StateName("Account"), id = StateId("2")) + ) + val streamQuery = StreamQuery( + identifiers = domainIdentifiers, + eventTypes = eventTypes + ) + val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(6)) + val eventsToAppend = listOf( + JsonObject().put("type", "AmountTransferred").put("fromAcct", 2).put("toAcct", 1).put("amount", 1) + ) + eventsAppender.appendIf(eventsToAppend, appendCondition) + .compose { + dumpEvents() + } + .compose { + // assert acct1 state + val domainIdentifiersAcct1 = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("1")) + ) + val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) + stateBuilder.buildFor(streamQueryAcct1) + .onSuccess { (state, sequence) -> + testContext.verify { + assertEquals(7, sequence.value) + assertEquals("1", state.id) + assertEquals(81, state.balance) + } + } + .onFailure { + testContext.failNow(it) + } + } + .compose { + // assert acct2 state + val domainIdentifiersAcct2 = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("2")) + ) + val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) + stateBuilder.buildFor(streamQueryAcct2) + .onSuccess { (state, sequence) -> + testContext.verify { + assertEquals(7, sequence.value) + assertEquals("2", state.id) + assertEquals(19, state.balance) + } + } + .onFailure { + testContext.failNow(it) + } + } + .onSuccess { + testContext.completeNow() + } + .onFailure { + testContext.failNow(it) + } + } + + @Test + @Order(5) + fun `it can transfer $1 from Account 1 to Account 2 within the same db transaction`(testContext: VertxTestContext) { + val domainIdentifiers = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("1")), + DomainIdentifier(name = StateName("Account"), id = StateId("2")) + ) + val streamQuery = StreamQuery( + identifiers = domainIdentifiers, + eventTypes = eventTypes + ) + val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(7)) + val eventsToAppend = listOf( + JsonObject().put("type", "AmountTransferred").put("fromAcct", 1).put("toAcct", 2).put("amount", 1) + ) + eventsAppender.appendIf(eventsToAppend, appendCondition) + .compose { + dumpEvents() + } + .compose { + // assert acct1 state + val domainIdentifiersAcct1 = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("1")) + ) + val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes) + stateBuilder.buildFor(streamQueryAcct1) + .onSuccess { (state, sequence) -> + testContext.verify { + assertEquals(8, sequence.value) + assertEquals("1", state.id) + assertEquals(80, state.balance) + } + } + .onFailure { + testContext.failNow(it) + } + } + .compose { + // assert acct2 state + val domainIdentifiersAcct2 = listOf( + DomainIdentifier(name = StateName("Account"), id = StateId("2")) + ) + val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes) + stateBuilder.buildFor(streamQueryAcct2) + .onSuccess { (state, sequence) -> + testContext.verify { + assertEquals(8, sequence.value) + assertEquals("2", state.id) + assertEquals(20, state.balance) + } + } + .onFailure { + testContext.failNow(it) + } + } + .onSuccess { + testContext.completeNow() + } + .onFailure { + testContext.failNow(it) + } + } + companion object { lateinit var eventsAppender: CrabletEventsAppender lateinit var stateBuilder: CrabletStateBuilder