Skip to content

Commit

Permalink
using TestContainers
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodolfo committed Dec 4, 2024
1 parent 363b777 commit 3e5a2c4
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 60 deletions.
21 changes: 1 addition & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,9 @@

So far, just an experiment

## Running

1. On terminal, run:
```bash
docker-compose up
```

2. Run the main method of [`CrabletTest1.kt`](src/test/kotlin/crablet/example/CrabletTest1.kt)

```text
Vertx Pool started
Append operation
--> eventsToAppend: [{"type":"AccountOpened","id":10}, {"type":"AmountDeposited","amount":100}]
--> appendCondition: AppendCondition(query=StreamQuery(identifiers=[DomainIdentifier(name=StateName(value=Account), id=StateId(value=51834a25-31b4-461d-8cfe-cc75a842bbcb))], eventTypes=[EventName(value=AccountOpened), EventName(value=AmountDeposited)]), maximumEventSequence=SequenceNumber(value=0))
New sequence id ---> SequenceNumber(value=12)
New state ---> [{"type":"AccountOpened","id":10},{"type":"AmountDeposited","amount":100}]
```

## References

* https://github.com/crabzilla/crabzilla
* https://www.youtube.com/watch?v=GzrZworHpIk
* https://www.youtube.com/watch?v=DhhxKoOpJe0
* https://github.com/crabzilla/crabzilla
* https://github.com/imrafaelmerino/json-values
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ services:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
volumes:
- ./sql:/docker-entrypoint-initdb.d
- ./ddl:/docker-entrypoint-initdb.d
ports:
- 5432:5432
31 changes: 15 additions & 16 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.16.3</quarkus.platform.version>
<quarkus.platform.version>3.17.2</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.5.0</surefire-plugin.version>
</properties>
Expand Down Expand Up @@ -44,15 +44,14 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-pg-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>

<!-- <dependency>-->
<!-- <groupId>io.quarkus</groupId>-->
<!-- <artifactId>quarkus-reactive-routes</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.quarkus</groupId>-->
<!-- <artifactId>quarkus-arc</artifactId>-->
<!-- </dependency>-->

<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -74,12 +73,12 @@
<artifactId>kotlin-extensions</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.testcontainers</groupId>-->
<!-- <artifactId>postgresql</artifactId>-->
<!-- <version>1.19.8</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.19.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.imrafaelmerino</groupId>
<artifactId>json-values</artifactId>
Expand Down
12 changes: 6 additions & 6 deletions src/main/kotlin/crablet/Crablet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ data class DomainIdentifier(val name: StateName, val id: StateId) {

data class StreamQuery(val identifiers: List<DomainIdentifier>, val eventTypes: List<EventName>)

// read

interface StateBuilder<S> {
fun buildFor(query: StreamQuery): Future<Pair<S, SequenceNumber>>
}

// write

data class AppendCondition(val query: StreamQuery, val maximumEventSequence: SequenceNumber)

interface EventsAppender {
fun appendIf(events: List<JsonObject>, appendCondition: AppendCondition): Future<SequenceNumber>
}

// read

interface StateBuilder<S> {
fun buildFor(query: StreamQuery): Future<Pair<S, SequenceNumber>>
}
2 changes: 1 addition & 1 deletion src/main/kotlin/crablet/postgres/CrabletEventsAppender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CrabletEventsAppender(private val client: Pool) : EventsAppender {
.execute(params)
}.onSuccess { rowSet ->
// Extract the result (last_sequence_id) from the first row
val latestSequenceId = rowSet.firstOrNull()?.getLong("last_sequence_id")
val latestSequenceId = rowSet.first().getLong("last_sequence_id")
if (latestSequenceId != null) {
promise.complete(SequenceNumber(latestSequenceId))
} else {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ BEGIN
-- After the first insert, set causation_id to the sequence_id of the last event inserted
causation_id := sequence_id;

currentLastSequence = sequence_id;

END LOOP;
ELSE
-- Raise an exception if sequence mismatch is detected
Expand Down
61 changes: 61 additions & 0 deletions src/test/kotlin/crablet/example/CrabletTest2.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package crablet.example

import crablet.AppendCondition
import crablet.DomainIdentifier
import crablet.EventName
import crablet.SequenceNumber
import crablet.StateId
import crablet.StateName
import crablet.StreamQuery
import crablet.postgres.CrabletEventsAppender
import crablet.postgres.CrabletStateBuilder
import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import java.util.*

fun main() {

println("Vertx Pool started")

val eventsAppender = CrabletEventsAppender(pool)

val stateBuilder = CrabletStateBuilder(
client = pool,
initialState = JsonArray(),
evolveFunction = { state, event -> state.add(event) })

val domainIdentifiers = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId(UUID.randomUUID().toString()))
)

val streamQuery = StreamQuery(
identifiers = domainIdentifiers,
eventTypes = listOf("AccountOpened", "AmountDeposited").map { EventName(it) }
)

val appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(0))

val eventsToAppend: List<JsonObject> = listOf(
JsonObject().put("type", "AccountOpened").put("id", 10),
JsonObject().put("type", "AmountDeposited").put("amount", 100)
)

println("Append operation")
println("--> eventsToAppend: $eventsToAppend")
println("--> appendCondition: $appendCondition ")

// append events
eventsAppender.appendIf(eventsToAppend, appendCondition)
.compose {
// print the resulting sequenceId
println()
println("New sequence id ---> $it")
// now project a state given the past events
stateBuilder.buildFor(streamQuery)
}
.onSuccess { stateResult: Pair<JsonArray, SequenceNumber> ->
println("New state ---> ${stateResult.first}")
}
.onFailure { it.printStackTrace() }

}
53 changes: 53 additions & 0 deletions src/test/kotlin/crablet/postgres/AbstractCrabletTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package crablet.postgres

import io.vertx.pgclient.PgConnectOptions
import io.vertx.sqlclient.Pool
import io.vertx.sqlclient.PoolOptions
import org.testcontainers.containers.PostgreSQLContainer
import org.testcontainers.utility.MountableFile

abstract class AbstractCrabletTest {

companion object {

val PG_DOCKER_IMAGE = "postgres:latest"
val DB_NAME = "postgres"
val DB_USERNAME = "postgres"
val DB_PASSWORD = "postgres"

val postgresqlContainer =
PostgreSQLContainer<Nothing>(PG_DOCKER_IMAGE)
.apply {
withDatabaseName(DB_NAME)
withUsername(DB_USERNAME)
withPassword(DB_PASSWORD)
withCopyFileToContainer(
MountableFile.forClasspathResource("./ddl/1-events_table.sql"),
"/docker-entrypoint-initdb.d/1-events_table.sql",
)
withCopyFileToContainer(
MountableFile.forClasspathResource("./ddl/2-append_events.sql"),
"/docker-entrypoint-initdb.d/2-append_events.sql",
)
withEnv("POSTGRES_LOG_CONNECTIONS", "on")
withEnv("POSTGRES_LOG_DISCONNECTIONS", "on")
withEnv("POSTGRES_LOG_DURATION", "on")
withEnv("POSTGRES_LOG_STATEMENT", "all")
withEnv("POSTGRES_LOG_DIRECTORY", "/var/log/pg_log")
withLogConsumer { frame -> println(frame.utf8String) }
}

val pool: Pool by lazy {
postgresqlContainer.start()
val connectOptions = PgConnectOptions()
.setPort(if (postgresqlContainer.isRunning) postgresqlContainer.firstMappedPort else 5432)
.setHost("127.0.0.1")
.setDatabase(DB_NAME)
.setUser(DB_USERNAME)
.setPassword(DB_PASSWORD)
val poolOptions = PoolOptions().setMaxSize(5)
Pool.pool(connectOptions, poolOptions)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import io.vertx.junit5.VertxExtension
import io.vertx.junit5.VertxTestContext
import io.vertx.pgclient.PgConnectOptions
import io.vertx.sqlclient.Pool
import io.vertx.sqlclient.PoolOptions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertTrue
Expand All @@ -23,7 +20,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import java.util.*

@ExtendWith(VertxExtension::class)
class CrabletPostgresTest {
class Scenario1TestAbstract : AbstractCrabletTest() {

lateinit var eventsAppender: CrabletEventsAppender
lateinit var stateBuilder: CrabletStateBuilder<JsonArray>
Expand All @@ -33,7 +30,6 @@ class CrabletPostgresTest {

@BeforeEach
fun setUp(testContext: VertxTestContext) {
val pool = createPool()
eventsAppender = CrabletEventsAppender(pool)
stateBuilder = CrabletStateBuilder(
client = pool,
Expand Down Expand Up @@ -83,14 +79,4 @@ class CrabletPostgresTest {
}
}

private fun createPool(): Pool {
val connectOptions = PgConnectOptions()
.setPort(5432)
.setHost("127.0.0.1")
.setDatabase("postgres")
.setUser("postgres")
.setPassword("postgres")
val poolOptions = PoolOptions().setMaxSize(5)
return Pool.pool(connectOptions, poolOptions)
}
}
84 changes: 84 additions & 0 deletions src/test/kotlin/crablet/postgres/Scenario2TestAbstract.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package crablet.postgres

import crablet.AppendCondition
import crablet.DomainIdentifier
import crablet.EventName
import crablet.SequenceNumber
import crablet.StateId
import crablet.StateName
import crablet.StreamQuery
import io.vertx.core.json.JsonObject
import io.vertx.junit5.VertxExtension
import io.vertx.junit5.VertxTestContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.util.*

@ExtendWith(VertxExtension::class)
class Scenario2TestAbstract : AbstractCrabletTest() {

data class Account(val id: Int? = null, val balance: Int = 0)

lateinit var eventsAppender: CrabletEventsAppender
lateinit var stateBuilder: CrabletStateBuilder<Account>
lateinit var appendCondition: AppendCondition
lateinit var eventsToAppend: List<JsonObject>
lateinit var streamQuery: StreamQuery

@BeforeEach
fun setUp(testContext: VertxTestContext) {
eventsAppender = CrabletEventsAppender(pool)
stateBuilder = CrabletStateBuilder(
client = pool,
initialState = Account(),
evolveFunction = { state, event ->
when (event.getString("type")) {
"AccountOpened" -> state.copy(id = event.getInteger("id"))
"AmountDeposited" -> state.copy(balance = state.balance.plus(event.getInteger("amount")))
else -> state
}
})

val domainIdentifiers = listOf(
DomainIdentifier(name = StateName("Account"), id = StateId(UUID.randomUUID().toString()))
)
streamQuery = StreamQuery(
identifiers = domainIdentifiers,
eventTypes = listOf("AccountOpened", "AmountDeposited").map { EventName(it) }
)
appendCondition = AppendCondition(query = streamQuery, maximumEventSequence = SequenceNumber(0))
eventsToAppend = listOf(
JsonObject().put("type", "AccountOpened").put("id", 10),
JsonObject().put("type", "AmountDeposited").put("amount", 100)
)
testContext.completeNow()
}

@Test
fun testAppendAndBuildState(testContext: VertxTestContext) {
// Append events and build the state
eventsAppender.appendIf(eventsToAppend, appendCondition)
.compose {
stateBuilder.buildFor(streamQuery)
}
.onSuccess { (state, sequence): Pair<Account, SequenceNumber> ->

assertNotNull(state)
assertNotNull(sequence)

assertEquals(state.id, 10)
assertEquals(state.balance, 100)

// Complete the test context indicating the test passed
testContext.completeNow()
}
.onFailure { it ->
// Fail the test context indicating the test failed
testContext.failNow(it)
}
}

}
2 changes: 1 addition & 1 deletion sql/3-test.sql → src/test/resources/3-test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ SELECT append_events(
'{"type": "PasswordChanged", "user_id": "123", "password": "newpassword123"}',
'{"type": "UserProfileUpdated", "user_id": "123", "profile": {"age": 30, "city": "New York"}}'
]::TEXT[] -- event payloads (as JSON string)
) AS last_sequence_id;
);
File renamed without changes.

0 comments on commit 3e5a2c4

Please sign in to comment.