Skip to content

Commit

Permalink
refactoring EventSink
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodolfo committed Jan 11, 2025
1 parent 677ed04 commit 120c0f4
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
11 changes: 8 additions & 3 deletions src/main/kotlin/crablet/query/CrabletQueryApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ data class SubscriptionSource(
)

sealed interface EventSink {
interface DefaultEventSink : EventSink {
fun handle(eventAsJson: List<JsonObject>): Future<Void>

interface SingleEventSink : EventSink {
fun handle(eventAsJson: JsonObject): Future<Void>
}

interface BatchEventSink : EventSink {
fun handle(eventAsJsonList: List<JsonObject>): Future<Void>
}

interface PostgresEventSync : EventSink {
interface PostgresSingleEventSync : EventSink {
fun handle(
sqlConnection: SqlConnection,
eventAsJson: JsonObject,
Expand Down
12 changes: 10 additions & 2 deletions src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,23 @@ class SubscriptionComponent(
rowSet.map { row -> row.toJson() }
}.flatMap { jsonList: List<JsonObject> ->
when (val eventSync = subscriptionConfig.eventSink) {
is EventSink.PostgresEventSync -> {
is EventSink.PostgresSingleEventSync -> {
jsonList
.fold(Future.succeededFuture<Void>()) { future, eventJson ->
future.compose {
eventSync.handle(tx, eventJson)
}
}.map { jsonList }
}
is EventSink.DefaultEventSink -> eventSync.handle(jsonList)
is EventSink.SingleEventSink -> {
jsonList
.fold(Future.succeededFuture<Void>()) { future, eventJson ->
future.compose {
eventSync.handle(eventJson)
}
}.map { jsonList }
}
is EventSink.BatchEventSink -> eventSync.handle(jsonList)
}.map { jsonList }
}.compose { jsonList: List<JsonObject> ->
if (jsonList.isNotEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.vertx.core.json.JsonObject
import io.vertx.sqlclient.SqlConnection
import io.vertx.sqlclient.Tuple

class AccountsPostgresEventProcessor : EventSink.PostgresEventSync {
class AccountsPostgresSingleEventProcessor : EventSink.PostgresSingleEventSync {
override fun handle(
sqlConnection: SqlConnection,
eventAsJson: JsonObject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class AccountsViewProjectionTest : AbstractCrabletTest() {

val subscriptionConfig = SubscriptionConfig(
source = source,
eventSink = AccountsPostgresEventProcessor(),
eventSink = AccountsPostgresSingleEventProcessor(),
callback = callback
)

Expand Down

0 comments on commit 120c0f4

Please sign in to comment.