From 1fa0b9f37fed6c6303ba4889347c6b1194f470b7 Mon Sep 17 00:00:00 2001 From: Rodolfo Date: Fri, 10 Jan 2025 22:04:45 -0300 Subject: [PATCH] introducing EventSink --- .../kotlin/crablet/query/CrabletQueryApi.kt | 18 ++++++++++++------ .../query/impl/SubscriptionComponent.kt | 9 ++++++++- .../query/AccountsViewProjectionTest.kt | 2 +- .../crablet/query/AccountsViewProjector.kt | 2 +- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/crablet/query/CrabletQueryApi.kt b/src/main/kotlin/crablet/query/CrabletQueryApi.kt index 44f6691..229813d 100644 --- a/src/main/kotlin/crablet/query/CrabletQueryApi.kt +++ b/src/main/kotlin/crablet/query/CrabletQueryApi.kt @@ -11,16 +11,22 @@ data class SubscriptionSource( val maxNumberOfRowsToPull: Int = 250, ) -interface ViewProjector { - fun project( - sqlConnection: SqlConnection, - eventAsJson: JsonObject, - ): Future +sealed interface EventSink { + interface DefaultEventSink : EventSink { + fun project(eventAsJson: JsonObject): Future + } + + interface PostgresEventSync : EventSink { + fun project( + sqlConnection: SqlConnection, + eventAsJson: JsonObject, + ): Future + } } class SubscriptionConfig( val source: SubscriptionSource, - val viewProjector: ViewProjector, + val eventSink: EventSink, val callback: ((name: String, List) -> Unit)? = null, ) diff --git a/src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt b/src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt index e352374..7ff5e38 100644 --- a/src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt +++ b/src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt @@ -1,5 +1,6 @@ package crablet.query.impl +import crablet.query.EventSink import crablet.query.SubscriptionConfig import io.vertx.core.Future import io.vertx.core.json.JsonObject @@ -30,7 +31,13 @@ class SubscriptionComponent( }.flatMap { jsonList: List -> jsonList .fold(Future.succeededFuture()) { future, eventJson -> - future.compose { subscriptionConfig.viewProjector.project(tx, eventJson) } + future.compose { + when (val eventSync = subscriptionConfig.eventSink) { + is EventSink.PostgresEventSync -> eventSync.project(tx, eventJson) + is EventSink.DefaultEventSink -> eventSync.project(eventJson) + else -> Future.succeededFuture() + } + } }.map { jsonList } }.compose { jsonList: List -> if (jsonList.isNotEmpty()) { diff --git a/src/test/kotlin/crablet/query/AccountsViewProjectionTest.kt b/src/test/kotlin/crablet/query/AccountsViewProjectionTest.kt index 11a20e9..e85d4a8 100644 --- a/src/test/kotlin/crablet/query/AccountsViewProjectionTest.kt +++ b/src/test/kotlin/crablet/query/AccountsViewProjectionTest.kt @@ -208,7 +208,7 @@ class AccountsViewProjectionTest : AbstractCrabletTest() { val subscriptionConfig = SubscriptionConfig( source = source, - viewProjector = AccountsViewProjector(), + eventSink = AccountsViewProjector(), callback = callback ) diff --git a/src/test/kotlin/crablet/query/AccountsViewProjector.kt b/src/test/kotlin/crablet/query/AccountsViewProjector.kt index de82d8d..6cdb46f 100644 --- a/src/test/kotlin/crablet/query/AccountsViewProjector.kt +++ b/src/test/kotlin/crablet/query/AccountsViewProjector.kt @@ -5,7 +5,7 @@ import io.vertx.core.json.JsonObject import io.vertx.sqlclient.SqlConnection import io.vertx.sqlclient.Tuple -class AccountsViewProjector : ViewProjector { +class AccountsViewProjector : EventSink.PostgresEventSync { override fun project( sqlConnection: SqlConnection, eventAsJson: JsonObject,