Skip to content

Commit

Permalink
chore: the executed event now wait for the update of dependent events
Browse files Browse the repository at this point in the history
  • Loading branch information
giacomoaccursi committed Jan 11, 2024
1 parent 871e6ab commit ccce5b6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
5 changes: 5 additions & 0 deletions src/main/kotlin/entity/Event.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ interface Event {
* Allows to observe its execution.
*/
fun observeExecution(): Flow<Event>

/**
* Allows an event to notify another event of its update.
*/
fun notifyUpdate()
}
25 changes: 21 additions & 4 deletions src/main/kotlin/entity/EventImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import java.util.concurrent.CountDownLatch
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -27,15 +29,17 @@ class EventImpl(
override val node: Node,
private val conditions: ArrayList<Condition> = ArrayList(),
private val actions: ArrayList<Action> = ArrayList(),
private val coroutineContext: CoroutineContext = Dispatchers.Default,
private val engine: Engine,
private val environment: Environment,
coroutineContext: CoroutineContext = Dispatchers.Default,
) : Event {

private val timeDistribution: TimeDistribution = TimeDistribution(DoubleTime(2.0))
private val observedLocalEvents: HashMap<Event, Job> = hashMapOf()
private val observedNeighborEvents: HashMap<Event, Job> = hashMapOf()
private val executionFlow: MutableSharedFlow<Event> = MutableSharedFlow()
private val coroutineScope = CoroutineScope(coroutineContext)
private lateinit var observerLatch: CountDownLatch

init {
observeLocalEvents()
Expand All @@ -45,8 +49,12 @@ class EventImpl(
override val tau: Time get() = timeDistribution.getNextOccurrence()

override suspend fun execute() {
actions.forEach { it.execute() }
observerLatch = CountDownLatch(executionFlow.subscriptionCount.value)
executionFlow.emit(this)
withContext(Dispatchers.IO) {
observerLatch.await()
}
actions.forEach { it.execute() }
}

override fun canExecute(): Boolean {
Expand All @@ -57,11 +65,18 @@ class EventImpl(
updateEvent(currentTime)
}

override fun notifyUpdate() {
observerLatch.countDown()
}

override fun getNumberOfEventExecutionObserver() = executionFlow.subscriptionCount.value
override fun eventRemoved() {
observedLocalEvents.values.forEach {
it.cancel()
}
observedNeighborEvents.values.forEach {
it.cancel()
}
}

override fun observeExecution(): Flow<Event> = executionFlow.asSharedFlow()
Expand All @@ -72,7 +87,7 @@ class EventImpl(
}

private fun observeLocalEvents() {
CoroutineScope(coroutineContext).launch {
coroutineScope.launch {
node.events.collect {
val removed = observedLocalEvents.keys - it.toSet() - setOf(this@EventImpl)
val added = it.toSet() - setOf(this@EventImpl) - observedLocalEvents.keys
Expand All @@ -84,6 +99,7 @@ class EventImpl(
val job = launch {
event.observeExecution().collect { event ->
updateEvent(event.tau)
event.notifyUpdate()
}
}
observedLocalEvents[event] = job
Expand All @@ -94,7 +110,7 @@ class EventImpl(

@OptIn(ExperimentalCoroutinesApi::class)
private fun observeNeighborEvents() {
CoroutineScope(coroutineContext).launch {
coroutineScope.launch {
environment.neighbors(node).mapLatest {
it.flatMap { node ->
node.events.value
Expand All @@ -110,6 +126,7 @@ class EventImpl(
val job = launch {
event.observeExecution().collect { event ->
updateEvent(event.tau)
event.notifyUpdate()
}
}
observedNeighborEvents[event] = job
Expand Down

0 comments on commit ccce5b6

Please sign in to comment.