Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Prototype of Context Receivers /JVM #76

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions application-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ kotlin {
compilations.all {
kotlinOptions.jvmTarget = "1.8"
kotlinOptions.verbose = true
kotlinOptions.freeCompilerArgs = kotlinOptions.freeCompilerArgs + "-Xcontext-receivers"
}

withJava()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.fraktalio.fmodel.application

import arrow.core.continuations.Effect
import arrow.core.continuations.effect
import com.fraktalio.fmodel.application.Error.CommandHandlingFailed
import com.fraktalio.fmodel.application.Error.*
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.*

Expand Down Expand Up @@ -99,25 +99,25 @@ fun <C, S, E, V> EventSourcingLockingOrchestratingAggregate<C, S, E, V>.handleOp
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleWithEffect(commands: Flow<C>): Flow<Effect<Error, E>> =
commands
.flatMapConcat { handleWithEffect(it) }
.catch { emit(effect { shift(CommandHandlingFailed(it)) }) }
.catch { emit(effect { shift(CommandPublishingFailed(it)) }) }

@FlowPreview
fun <C, S, E> EventSourcingOrchestratingAggregate<C, S, E>.handleWithEffect(commands: Flow<C>): Flow<Effect<Error, E>> =
commands
.flatMapConcat { handleWithEffect(it) }
.catch { emit(effect { shift(CommandHandlingFailed(it)) }) }
.catch { emit(effect { shift(CommandPublishingFailed(it)) }) }

@FlowPreview
fun <C, S, E, V> EventSourcingLockingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(commands: Flow<C>): Flow<Effect<Error, Pair<E, V>>> =
commands
.flatMapConcat { handleOptimisticallyWithEffect(it) }
.catch { emit(effect { shift(CommandHandlingFailed(it)) }) }
.catch { emit(effect { shift(CommandPublishingFailed(it)) }) }

@FlowPreview
fun <C, S, E, V> EventSourcingLockingOrchestratingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(commands: Flow<C>): Flow<Effect<Error, Pair<E, V>>> =
commands
.flatMapConcat { handleOptimisticallyWithEffect(it) }
.catch { emit(effect { shift(CommandHandlingFailed(it)) }) }
.catch { emit(effect { shift(CommandPublishingFailed(it)) }) }

@FlowPreview
fun <C, E> C.publishWithEffect(aggregate: EventSourcingAggregate<C, *, E>): Flow<Effect<Error, E>> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,14 @@ import kotlinx.coroutines.flow.map
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
suspend fun <S, E, I> I.handleWithEffect(event: E): Effect<Error, S> where I : ViewStateComputation<S, E>, I : ViewStateRepository<E, S> {

fun S?.computeNewStateWithEffect(event: E): Effect<Error, S> =
effect {
try {
computeNewState(event)
} catch (t: Throwable) {
shift(CalculatingNewViewStateFailed(this@computeNewStateWithEffect, event, t.nonFatalOrThrow()))
}
}

suspend fun E.fetchStateWithEffect(): Effect<Error, S?> =
effect {
try {
fetchState()
} catch (t: Throwable) {
shift(FetchingViewStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow()))
}
suspend fun <S, E, I> I.handleWithEffect(event: E): Effect<Error, S> where I : ViewStateComputation<S, E>, I : ViewStateRepository<E, S> =
effect {
try {
event.fetchState().computeNewState(event).save()
} catch (t: Throwable) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 11 similar findings have been found in this PR


TooGenericExceptionCaught: The caught exception is too generic. Prefer catching specific exceptions to the case that is currently handled.


🔎 Expand here to view all instances of this finding
File Path Line Number
application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowExtension.kt 57
application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowExtension.kt 79
application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowExtension.kt 41
application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowExtension.kt 64
application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualExtension.kt 16
application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualExtension.kt 24
application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualExtension.kt 34
application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualExtension.kt 19
application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualExtension.kt 34
application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualExtension.kt 47

Showing 10 of 11 findings. Visit the Lift Web Console to see all.


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sonatype-lift ignoreall

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ignoreall command is active on this PR, all the existing Lift issues are ignored.

shift(EventHandlingFailed(event, t.nonFatalOrThrow()))
}

suspend fun S.saveWithEffect(): Effect<Error, S> =
effect {
try {
save()
} catch (t: Throwable) {
shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow()))
}
}

return effect {
event.fetchStateWithEffect().bind()
.computeNewStateWithEffect(event).bind()
.saveWithEffect().bind()
}
}

/**
* Extension function - Handles the event of type [E]
Expand All @@ -76,41 +49,15 @@ suspend fun <S, E, I> I.handleWithEffect(event: E): Effect<Error, S> where I : V
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
suspend fun <S, E, V, I> I.handleOptimisticallyWithEffect(event: E): Effect<Error, Pair<S, V>> where I : ViewStateComputation<S, E>, I : ViewStateLockingRepository<E, S, V> {
fun S?.computeNewStateWithEffect(event: E): Effect<Error, S> =
effect {
try {
computeNewState(event)
} catch (t: Throwable) {
shift(CalculatingNewViewStateFailed(this@computeNewStateWithEffect, event, t.nonFatalOrThrow()))
}
}

suspend fun E.fetchStateWithEffect(): Effect<Error, Pair<S?, V?>> =
effect {
try {
fetchState()
} catch (t: Throwable) {
shift(FetchingViewStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow()))
}
}

suspend fun S.saveWithEffect(currentVersion: V?): Effect<Error, Pair<S, V>> =
effect {
try {
save(currentVersion)
} catch (t: Throwable) {
shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow()))
}
suspend fun <S, E, V, I> I.handleOptimisticallyWithEffect(event: E): Effect<Error, Pair<S, V>> where I : ViewStateComputation<S, E>, I : ViewStateLockingRepository<E, S, V> =
effect {
try {
val (state, version) = event.fetchState()
state.computeNewState(event).save(version)
} catch (t: Throwable) {
shift(EventHandlingFailed(event, t.nonFatalOrThrow()))
}

return effect {
val (state, version) = event.fetchStateWithEffect().bind()
state
.computeNewStateWithEffect(event).bind()
.saveWithEffect(version).bind()
}
}

/**
* Extension function - Handles the event of type [E]
Expand All @@ -124,39 +71,14 @@ suspend fun <S, E, V, I> I.handleOptimisticallyWithEffect(event: E): Effect<Erro
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
suspend fun <S, E, EV, SV, I> I.handleOptimisticallyWithDeduplicationWithEffect(eventAndVersion: Pair<E, EV>): Effect<Error, Pair<S, SV>> where I : ViewStateComputation<S, E>, I : ViewStateLockingDeduplicationRepository<E, S, EV, SV> {
fun S?.computeNewStateWithEffect(event: E): Effect<Error, S> =
effect {
try {
computeNewState(event)
} catch (t: Throwable) {
shift(CalculatingNewViewStateFailed(this@computeNewStateWithEffect, event, t.nonFatalOrThrow()))
}
}

suspend fun E.fetchStateWithEffect(): Effect<Error, Pair<S?, SV?>> =
effect {
try {
fetchState()
} catch (t: Throwable) {
shift(FetchingViewStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow()))
}
}

suspend fun S.saveWithEffect(entityVersion: EV, currentStateVersion: SV?): Effect<Error, Pair<S, SV>> =
effect {
try {
save(entityVersion, currentStateVersion)
} catch (t: Throwable) {
shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow()))
}
}

return effect {
val (event, eventVersion) = eventAndVersion
val (state, currentStateVersion) = event.fetchStateWithEffect().bind()
state
.computeNewStateWithEffect(event).bind()
.saveWithEffect(eventVersion, currentStateVersion).bind()
try {
val (state, currentStateVersion) = event.fetchState()
state.computeNewState(event).save(eventVersion, currentStateVersion)
} catch (t: Throwable) {
shift(EventHandlingFailed(event, t.nonFatalOrThrow()))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fun <AR, A> SagaManager<AR, A>.handleWithEffect(actionResult: AR): Flow<Effect<E
.computeNewActions()
.publish()
.map { effect<Error, A> { it } }
.catch { emit(effect { shift(ActionResultHandlingFailed(actionResult)) }) }
.catch { emit(effect { shift(ActionResultHandlingFailed(actionResult, it)) }) }

/**
* Extension function - Handles the [Flow] of action results of type [AR].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package com.fraktalio.fmodel.application
import arrow.core.continuations.Effect
import arrow.core.continuations.effect
import arrow.core.nonFatalOrThrow
import com.fraktalio.fmodel.application.Error.*
import com.fraktalio.fmodel.application.Error.CommandHandlingFailed
import com.fraktalio.fmodel.application.Error.CommandPublishingFailed
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.*

/**
* Extension function - Handles the command message of type [C]
Expand All @@ -35,59 +34,15 @@ import kotlinx.coroutines.flow.map
*/
@FlowPreview
suspend fun <C, S, E, I> I.handleWithEffect(command: C): Effect<Error, S> where I : StateComputation<C, S, E>,
I : StateRepository<C, S> {
/**
* Inner function - Computes new State based on the previous State and the [command] or fails.
*
* @param command of type [C]
* @return [Effect] (either the newly computed state of type [S] or [Error])
*/
suspend fun S?.computeNewStateWithEffect(command: C): Effect<Error, S> =
effect {
try {
computeNewState(command)
} catch (t: Throwable) {
shift(CalculatingNewStateFailed(this@computeNewStateWithEffect, command, t.nonFatalOrThrow()))
}
}

/**
* Inner function - Fetch state - either version
*
* @receiver Command of type [C]
* @return [Effect] (either [Error] or the State of type [S]?)
*/
suspend fun C.fetchStateWithEffect(): Effect<Error, S?> =
effect {
try {
fetchState()
} catch (t: Throwable) {
shift(FetchingStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow()))
}
}
I : StateRepository<C, S> =
effect {
try {
command.fetchState().computeNewState(command).save()
} catch (t: Throwable) {
shift(CommandHandlingFailed(this, t.nonFatalOrThrow()))

/**
* Inner function - Save state - either version
*
* @receiver State of type [S]
* @return [Effect] (either [Error] or the newly saved State of type [S])
*/
suspend fun S.saveWithEffect(): Effect<Error, S> =
effect {
try {
save()
} catch (t: Throwable) {
shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow()))
}
}

return effect {
command
.fetchStateWithEffect().bind()
.computeNewStateWithEffect(command).bind()
.saveWithEffect().bind()
}
}

/**
* Extension function - Handles the command message of type [C] to the locking state stored aggregate, optimistically
Expand All @@ -99,60 +54,17 @@ suspend fun <C, S, E, I> I.handleWithEffect(command: C): Effect<Error, S> where
*/
@FlowPreview
suspend fun <C, S, E, V, I> I.handleOptimisticallyWithEffect(command: C): Effect<Error, Pair<S, V>> where I : StateComputation<C, S, E>,
I : StateLockingRepository<C, S, V> {
/**
* Inner function - Computes new State based on the previous State and the [command] or fails.
*
* @param command of type [C]
* @return [Effect] (either the newly computed state of type [S] or [Error])
*/
suspend fun S?.computeNewStateWithEffect(command: C): Effect<Error, S> =
effect {
try {
computeNewState(command)
} catch (t: Throwable) {
shift(CalculatingNewStateFailed(this@computeNewStateWithEffect, command, t.nonFatalOrThrow()))
}
}

/**
* Inner function - Fetch state - either version
*
* @receiver Command of type [C]
* @return [Effect] (either [Error] or the State of type [S]?)
*/
suspend fun C.fetchStateWithEffect(): Effect<Error, Pair<S?, V?>> =
effect {
try {
fetchState()
} catch (t: Throwable) {
shift(FetchingStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow()))
}
I : StateLockingRepository<C, S, V> =
effect {
try {
val (state, version) = command.fetchState()
state
.computeNewState(command)
.save(version)
} catch (t: Throwable) {
shift(CommandHandlingFailed(this, t.nonFatalOrThrow()))
}

/**
* Inner function - Save state - either version
*
* @receiver State of type [S]
* @return [Effect] (either [Error] or the newly saved State of type [S])
*/
suspend fun S.saveWithEffect(currentStateVersion: V?): Effect<Error, Pair<S, V>> =
effect {
try {
save(currentStateVersion)
} catch (t: Throwable) {
shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow()))
}
}

return effect {
val (state, version) = command.fetchStateWithEffect().bind()
state
.computeNewStateWithEffect(command).bind()
.saveWithEffect(version).bind()
}
}


/**
* Extension function - Handles the [Flow] of command messages of type [C]
Expand Down Expand Up @@ -235,5 +147,3 @@ fun <C, S, E, A> Flow<C>.publishWithEffect(aggregate: A): Flow<Effect<Error, S>>
fun <C, S, E, V, A> Flow<C>.publishOptimisticallyWithEffect(aggregate: A): Flow<Effect<Error, Pair<S, V>>> where A : StateComputation<C, S, E>,
A : StateLockingRepository<C, S, V> =
aggregate.handleOptimisticallyWithEffect(this)

private fun <S, V> S.pairWith(version: V): Pair<S, V> = Pair(this, version)
Loading