diff --git a/src/main/kotlin/crablet/query/CrabletQueryApi.kt b/src/main/kotlin/crablet/query/CrabletQueryApi.kt index 61660b6..44329be 100644 --- a/src/main/kotlin/crablet/query/CrabletQueryApi.kt +++ b/src/main/kotlin/crablet/query/CrabletQueryApi.kt @@ -12,11 +12,16 @@ data class SubscriptionSource( ) sealed interface EventSink { - interface DefaultEventSink : EventSink { - fun handle(eventAsJson: List): Future + + interface SingleEventSink : EventSink { + fun handle(eventAsJson: JsonObject): Future + } + + interface BatchEventSink : EventSink { + fun handle(eventAsJsonList: List): Future } - interface PostgresEventSync : EventSink { + interface PostgresSingleEventSync : EventSink { fun handle( sqlConnection: SqlConnection, eventAsJson: JsonObject, diff --git a/src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt b/src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt index 5d206cc..bd06903 100644 --- a/src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt +++ b/src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt @@ -30,7 +30,7 @@ class SubscriptionComponent( rowSet.map { row -> row.toJson() } }.flatMap { jsonList: List -> when (val eventSync = subscriptionConfig.eventSink) { - is EventSink.PostgresEventSync -> { + is EventSink.PostgresSingleEventSync -> { jsonList .fold(Future.succeededFuture()) { future, eventJson -> future.compose { @@ -38,7 +38,15 @@ class SubscriptionComponent( } }.map { jsonList } } - is EventSink.DefaultEventSink -> eventSync.handle(jsonList) + is EventSink.SingleEventSink -> { + jsonList + .fold(Future.succeededFuture()) { future, eventJson -> + future.compose { + eventSync.handle(eventJson) + } + }.map { jsonList } + } + is EventSink.BatchEventSink -> eventSync.handle(jsonList) }.map { jsonList } }.compose { jsonList: List -> if (jsonList.isNotEmpty()) { diff --git a/src/test/kotlin/crablet/query/AccountsPostgresEventProcessor.kt b/src/test/kotlin/crablet/query/AccountsPostgresSingleEventProcessor.kt similarity index 95% rename from src/test/kotlin/crablet/query/AccountsPostgresEventProcessor.kt rename to src/test/kotlin/crablet/query/AccountsPostgresSingleEventProcessor.kt index 33741b9..a54bba4 100644 --- a/src/test/kotlin/crablet/query/AccountsPostgresEventProcessor.kt +++ b/src/test/kotlin/crablet/query/AccountsPostgresSingleEventProcessor.kt @@ -5,7 +5,7 @@ import io.vertx.core.json.JsonObject import io.vertx.sqlclient.SqlConnection import io.vertx.sqlclient.Tuple -class AccountsPostgresEventProcessor : EventSink.PostgresEventSync { +class AccountsPostgresSingleEventProcessor : EventSink.PostgresSingleEventSync { override fun handle( sqlConnection: SqlConnection, eventAsJson: JsonObject, diff --git a/src/test/kotlin/crablet/query/AccountsViewProjectionTest.kt b/src/test/kotlin/crablet/query/AccountsViewProjectionTest.kt index b0204b9..be0345d 100644 --- a/src/test/kotlin/crablet/query/AccountsViewProjectionTest.kt +++ b/src/test/kotlin/crablet/query/AccountsViewProjectionTest.kt @@ -208,7 +208,7 @@ class AccountsViewProjectionTest : AbstractCrabletTest() { val subscriptionConfig = SubscriptionConfig( source = source, - eventSink = AccountsPostgresEventProcessor(), + eventSink = AccountsPostgresSingleEventProcessor(), callback = callback )