Skip to content

Commit

Permalink
introducing EventSink
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodolfo committed Jan 11, 2025
1 parent d80efc3 commit 1fa0b9f
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 9 deletions.
18 changes: 12 additions & 6 deletions src/main/kotlin/crablet/query/CrabletQueryApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ data class SubscriptionSource(
val maxNumberOfRowsToPull: Int = 250,
)

interface ViewProjector {
fun project(
sqlConnection: SqlConnection,
eventAsJson: JsonObject,
): Future<Void>
sealed interface EventSink {
interface DefaultEventSink : EventSink {
fun project(eventAsJson: JsonObject): Future<Void>
}

interface PostgresEventSync : EventSink {
fun project(
sqlConnection: SqlConnection,
eventAsJson: JsonObject,
): Future<Void>
}
}

class SubscriptionConfig(
val source: SubscriptionSource,
val viewProjector: ViewProjector,
val eventSink: EventSink,
val callback: ((name: String, List<JsonObject>) -> Unit)? = null,
)

Expand Down
9 changes: 8 additions & 1 deletion src/main/kotlin/crablet/query/impl/SubscriptionComponent.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package crablet.query.impl

import crablet.query.EventSink
import crablet.query.SubscriptionConfig
import io.vertx.core.Future
import io.vertx.core.json.JsonObject
Expand Down Expand Up @@ -30,7 +31,13 @@ class SubscriptionComponent(
}.flatMap { jsonList: List<JsonObject> ->
jsonList
.fold(Future.succeededFuture<Void>()) { future, eventJson ->
future.compose { subscriptionConfig.viewProjector.project(tx, eventJson) }
future.compose {
when (val eventSync = subscriptionConfig.eventSink) {
is EventSink.PostgresEventSync -> eventSync.project(tx, eventJson)
is EventSink.DefaultEventSink -> eventSync.project(eventJson)
else -> Future.succeededFuture()
}
}
}.map { jsonList }
}.compose { jsonList: List<JsonObject> ->
if (jsonList.isNotEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class AccountsViewProjectionTest : AbstractCrabletTest() {

val subscriptionConfig = SubscriptionConfig(
source = source,
viewProjector = AccountsViewProjector(),
eventSink = AccountsViewProjector(),
callback = callback
)

Expand Down
2 changes: 1 addition & 1 deletion src/test/kotlin/crablet/query/AccountsViewProjector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.vertx.core.json.JsonObject
import io.vertx.sqlclient.SqlConnection
import io.vertx.sqlclient.Tuple

class AccountsViewProjector : ViewProjector {
class AccountsViewProjector : EventSink.PostgresEventSync {
override fun project(
sqlConnection: SqlConnection,
eventAsJson: JsonObject,
Expand Down

0 comments on commit 1fa0b9f

Please sign in to comment.