Skip to content

Commit

Permalink
added OptimisticLockingErrorTest and minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodolfo committed Jan 6, 2025
1 parent 96f3fa2 commit fe41ee8
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 98 deletions.
7 changes: 4 additions & 3 deletions src/main/kotlin/crablet/Crablet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ data class DomainIdentifier(val name: StateName, val id: StateId) {
fun toStorageFormat(): String = this.name.value.plus("@").plus(this.id.value)
}

data class StreamQuery(val identifiers: List<DomainIdentifier>, val eventTypes: List<EventName>)
data class TransactionContext(val identifiers: List<DomainIdentifier>, val eventTypes: List<EventName>)

data class AppendCondition(val query: StreamQuery, val maximumEventSequence: SequenceNumber)
data class AppendCondition(val transactionContext: TransactionContext, val expectedCurrentSequence: SequenceNumber)

// write

Expand All @@ -32,5 +32,6 @@ interface EventsAppender {
// read

interface StateBuilder<S> {
fun buildFor(query: StreamQuery): Future<Pair<S, SequenceNumber>>
fun buildFor(transactionContext: TransactionContext): Future<Pair<S, SequenceNumber>>
}

71 changes: 42 additions & 29 deletions src/main/kotlin/crablet/postgres/CrabletEventsAppender.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package crablet.postgres

import crablet.AppendCondition
import crablet.DomainIdentifier
import crablet.EventName
import crablet.EventsAppender
import crablet.SequenceNumber
import io.vertx.core.Future
import io.vertx.core.Promise
import io.vertx.core.json.JsonObject
import io.vertx.sqlclient.Pool
import io.vertx.sqlclient.Row
import io.vertx.sqlclient.RowSet
import io.vertx.sqlclient.SqlConnection
import io.vertx.sqlclient.Tuple

class CrabletEventsAppender(private val client: Pool) : EventsAppender {
Expand All @@ -15,36 +20,44 @@ class CrabletEventsAppender(private val client: Pool) : EventsAppender {
val promise = Promise.promise<SequenceNumber>()

client.withTransaction { connection ->
val jsonArrayDomainIds =
appendCondition.query.identifiers.map { it.toStorageFormat() }.sorted().toTypedArray()
val jsonArrayEventTypes = appendCondition.query.eventTypes.map { it.value }.toTypedArray()
val jsonArrayEventPayloads = events.map { it.encode() }.toTypedArray()

// Create a Tuple to pass as parameters
val params = Tuple.of(
jsonArrayDomainIds,
appendCondition.maximumEventSequence.value,
jsonArrayEventTypes,
jsonArrayEventPayloads
)

// Use prepared statement for SELECT query
val functionCall = "SELECT append_events($1, $2, $3, $4) AS last_sequence_id"

// Execute the prepared query, it is in transaction due to the use of #withTransaction method.
connection.preparedQuery(functionCall)
.execute(params)
}.onSuccess { rowSet ->
if (rowSet.rowCount() == 1 && rowSet.first().getLong("last_sequence_id") != null) {
// Extract the result (last_sequence_id) from the first row
promise.complete(SequenceNumber(rowSet.first().getLong("last_sequence_id")))
} else {
promise.fail("No last_sequence_id returned from append_events function")
}
}.onFailure {
promise.fail(it)
}
val params = prepareQueryParams(appendCondition, events)
executeQuery(connection, params)
}.onSuccess { rowSet -> processRowSet(rowSet, promise) }
.onFailure { throwable -> promise.fail(throwable) }

return promise.future()
}

private fun prepareQueryParams(appendCondition: AppendCondition, events: List<JsonObject>) =
Tuple.of(
identifiersToSortedArray(appendCondition.transactionContext.identifiers),
appendCondition.expectedCurrentSequence.value,
eventTypesToArray(appendCondition.transactionContext.eventTypes),
eventPayloadsToArray(events)
)

private fun identifiersToSortedArray(identifiers: List<DomainIdentifier>) =
identifiers.map(DomainIdentifier::toStorageFormat).sorted().toTypedArray()

private fun eventTypesToArray(eventTypes: List<EventName>) = eventTypes.map(EventName::value).toTypedArray()

private fun eventPayloadsToArray(events: List<JsonObject>) = events.map(JsonObject::encode).toTypedArray()

private fun executeQuery(connection: SqlConnection, params: Tuple): Future<RowSet<Row>> =
connection.preparedQuery("SELECT append_events($1, $2, $3, $4) AS $LAST_SEQUENCE_ID")
.execute(params)

private fun processRowSet(rowSet: RowSet<Row>, promise: Promise<SequenceNumber>) {
val firstRowSequenceId = rowSet.first().getLong(LAST_SEQUENCE_ID)

if (rowSet.rowCount() == 1 && firstRowSequenceId != null) {
promise.complete(SequenceNumber(firstRowSequenceId))
} else {
promise.fail("No last_sequence_id returned from append_events function")
}
}

companion object {
const val LAST_SEQUENCE_ID = "last_sequence_id"
}
}
28 changes: 14 additions & 14 deletions src/main/kotlin/crablet/postgres/CrabletStateBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package crablet.postgres

import crablet.SequenceNumber
import crablet.StateBuilder
import crablet.StreamQuery
import crablet.TransactionContext
import io.vertx.core.Future
import io.vertx.core.Promise
import io.vertx.core.json.JsonObject
Expand All @@ -19,24 +19,14 @@ class CrabletStateBuilder<S>(
private val pageSize: Int = 1000,
) : StateBuilder<S> {

private fun sqlQuery(): String {
return """select event_payload, sequence_id
| from events
| where domain_ids @> $1::text[]
| and event_type = ANY($2)
| order by sequence_id
|
""".trimMargin()
}

override fun buildFor(
query: StreamQuery,
transactionContext: TransactionContext,
): Future<Pair<S, SequenceNumber>> {

val promise = Promise.promise<Pair<S, SequenceNumber>>()
val sql = sqlQuery()
val domainIds = query.identifiers.map { it.toStorageFormat() }.sorted().toTypedArray()
val eventTypes = query.eventTypes.map { it.value }.toTypedArray()
val domainIds = transactionContext.identifiers.map { it.toStorageFormat() }.sorted().toTypedArray()
val eventTypes = transactionContext.eventTypes.map { it.value }.toTypedArray()
val tuple = Tuple.of(domainIds, eventTypes)
var finalState = initialState
var lastSequence = 0L
Expand Down Expand Up @@ -87,6 +77,16 @@ class CrabletStateBuilder<S>(
return promise.future()
}

private fun sqlQuery(): String {
return """select event_payload, sequence_id
| from events
| where domain_ids @> $1::text[]
| and event_type = ANY($2)
| order by sequence_id
|
""".trimMargin()
}

companion object {
private val logger = LoggerFactory.getLogger(CrabletStateBuilder::class.java)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package crablet.lab

import crablet.lab.CustomerEvents.CUSTOMER_ACTIVATED
import crablet.lab.CustomerEvents.CUSTOMER_DEACTIVATED
import crablet.lab.CustomerEvents.CUSTOMER_REGISTERED
import crablet.lab.CustomerEvents.CUSTOMER_RENAMED
import crablet.lab.CustomerEventsFields.ID
import crablet.lab.CustomerEventsFields.NAME
import crablet.lab.CustomerEventsFields.REASON
import crablet.lab.CustomerEventsFields.TYPE
package crablet.lab.jsonvalues

import crablet.lab.jsonvalues.CustomerEvents.CUSTOMER_ACTIVATED
import crablet.lab.jsonvalues.CustomerEvents.CUSTOMER_DEACTIVATED
import crablet.lab.jsonvalues.CustomerEvents.CUSTOMER_REGISTERED
import crablet.lab.jsonvalues.CustomerEvents.CUSTOMER_RENAMED
import crablet.lab.jsonvalues.CustomerEventsFields.ID
import crablet.lab.jsonvalues.CustomerEventsFields.NAME
import crablet.lab.jsonvalues.CustomerEventsFields.REASON
import crablet.lab.jsonvalues.CustomerEventsFields.TYPE
import `fun`.gen.Gen

import io.vertx.core.json.JsonObject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//import crablet.SequenceNumber
//import crablet.StateId
//import crablet.StateName
//import crablet.StreamQuery
//import crablet.TransactionContext
//import crablet.postgres.AbstractCrabletTest
//import crablet.postgres.AccountTransferScenarioTest
//import crablet.postgres.AccountTransferScenarioTest.Companion
Expand Down Expand Up @@ -56,7 +56,7 @@
// @Test
// @Order(1)
// fun `it can open Account 1 with $100`() {
// val streamQuery = StreamQuery(
// val streamQuery = TransactionContext(
// identifiers = listOf(DomainIdentifier(name = StateName("Account"), id = StateId("1"))),
// eventTypes = eventTypes
// )
Expand Down
Loading

0 comments on commit fe41ee8

Please sign in to comment.