Skip to content

Commit

Permalink
added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodolfo committed Dec 5, 2024
1 parent 5f17f87 commit 816da50
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/main/resources/ddl/2-append_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ BEGIN
correlation_id := NULL; -- Correlation will be set after the first insert
ELSE
causation_id := currentLastSequence; -- For subsequent events, causation_id points to the previous event's sequence_id
correlation_id := previousEventRow.correlation_id; -- Same correlation_id as the last event
IF previousEventRow.correlation_id IS NULL THEN
correlation_id := causation_id;
ELSE
correlation_id := previousEventRow.correlation_id; -- Same correlation_id as the last event
END IF;
END IF;

-- Lock the transaction using the correlation_id to prevent conflicts
Expand Down
189 changes: 189 additions & 0 deletions src/test/kotlin/crablet/postgres/AccountTransferScenarioTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,195 @@ class AccountTransferScenarioTest : AbstractCrabletTest() {
}
}

@Test
@Order(4)
fun `it can transfer $10 from Account 2 to Account 1 within the same db transaction`(testContext: VertxTestContext) {
val domainIdentifiers = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1")),
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQuery = StreamQuery(
identifiers = domainIdentifiers,
eventTypes = eventTypes
)
val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(5))
val eventsToAppend = listOf(
JsonObject().put("type", "AmountTransferred").put("fromAcct", 2).put("toAcct", 1).put("amount", 10)
)
eventsAppender.appendIf(eventsToAppend, appendCondition)
.compose {
dumpEvents()
}
.compose {
// assert acct1 state
val domainIdentifiersAcct1 = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1"))
)
val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes)
stateBuilder.buildFor(streamQueryAcct1)
.onSuccess { (state, sequence) ->
testContext.verify {
assertEquals(6, sequence.value)
assertEquals("1", state.id)
assertEquals(80, state.balance)
}
}
.onFailure {
testContext.failNow(it)
}
}
.compose {
// assert acct2 state
val domainIdentifiersAcct2 = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes)
stateBuilder.buildFor(streamQueryAcct2)
.onSuccess { (state, sequence) ->
testContext.verify {
assertEquals(6, sequence.value)
assertEquals("2", state.id)
assertEquals(20, state.balance)
}
}
.onFailure {
testContext.failNow(it)
}
}
.onSuccess {
testContext.completeNow()
}
.onFailure {
testContext.failNow(it)
}
}

@Test
@Order(5)
fun `it can transfer $1 from Account 2 to Account 1 within the same db transaction`(testContext: VertxTestContext) {
val domainIdentifiers = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1")),
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQuery = StreamQuery(
identifiers = domainIdentifiers,
eventTypes = eventTypes
)
val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(6))
val eventsToAppend = listOf(
JsonObject().put("type", "AmountTransferred").put("fromAcct", 2).put("toAcct", 1).put("amount", 1)
)
eventsAppender.appendIf(eventsToAppend, appendCondition)
.compose {
dumpEvents()
}
.compose {
// assert acct1 state
val domainIdentifiersAcct1 = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1"))
)
val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes)
stateBuilder.buildFor(streamQueryAcct1)
.onSuccess { (state, sequence) ->
testContext.verify {
assertEquals(7, sequence.value)
assertEquals("1", state.id)
assertEquals(81, state.balance)
}
}
.onFailure {
testContext.failNow(it)
}
}
.compose {
// assert acct2 state
val domainIdentifiersAcct2 = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes)
stateBuilder.buildFor(streamQueryAcct2)
.onSuccess { (state, sequence) ->
testContext.verify {
assertEquals(7, sequence.value)
assertEquals("2", state.id)
assertEquals(19, state.balance)
}
}
.onFailure {
testContext.failNow(it)
}
}
.onSuccess {
testContext.completeNow()
}
.onFailure {
testContext.failNow(it)
}
}

@Test
@Order(5)
fun `it can transfer $1 from Account 1 to Account 2 within the same db transaction`(testContext: VertxTestContext) {
val domainIdentifiers = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1")),
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQuery = StreamQuery(
identifiers = domainIdentifiers,
eventTypes = eventTypes
)
val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(7))
val eventsToAppend = listOf(
JsonObject().put("type", "AmountTransferred").put("fromAcct", 1).put("toAcct", 2).put("amount", 1)
)
eventsAppender.appendIf(eventsToAppend, appendCondition)
.compose {
dumpEvents()
}
.compose {
// assert acct1 state
val domainIdentifiersAcct1 = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("1"))
)
val streamQueryAcct1 = StreamQuery(identifiers = domainIdentifiersAcct1, eventTypes = eventTypes)
stateBuilder.buildFor(streamQueryAcct1)
.onSuccess { (state, sequence) ->
testContext.verify {
assertEquals(8, sequence.value)
assertEquals("1", state.id)
assertEquals(80, state.balance)
}
}
.onFailure {
testContext.failNow(it)
}
}
.compose {
// assert acct2 state
val domainIdentifiersAcct2 = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId("2"))
)
val streamQueryAcct2 = StreamQuery(identifiers = domainIdentifiersAcct2, eventTypes = eventTypes)
stateBuilder.buildFor(streamQueryAcct2)
.onSuccess { (state, sequence) ->
testContext.verify {
assertEquals(8, sequence.value)
assertEquals("2", state.id)
assertEquals(20, state.balance)
}
}
.onFailure {
testContext.failNow(it)
}
}
.onSuccess {
testContext.completeNow()
}
.onFailure {
testContext.failNow(it)
}
}

companion object {
lateinit var eventsAppender: CrabletEventsAppender
lateinit var stateBuilder: CrabletStateBuilder<Account>
Expand Down

0 comments on commit 816da50

Please sign in to comment.