diff --git a/src/main/kotlin/entity/Event.kt b/src/main/kotlin/entity/Event.kt index 8fd37ec..ced6766 100644 --- a/src/main/kotlin/entity/Event.kt +++ b/src/main/kotlin/entity/Event.kt @@ -60,4 +60,9 @@ interface Event { * Allows to observe its execution. */ fun observeExecution(): Flow + + /** + * Allows an event to notify another event of its update. + */ + fun notifyUpdate() } diff --git a/src/main/kotlin/entity/EventImpl.kt b/src/main/kotlin/entity/EventImpl.kt index c143983..751ba80 100644 --- a/src/main/kotlin/entity/EventImpl.kt +++ b/src/main/kotlin/entity/EventImpl.kt @@ -18,6 +18,8 @@ import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.mapLatest import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import java.util.concurrent.CountDownLatch import kotlin.coroutines.CoroutineContext /** @@ -27,15 +29,17 @@ class EventImpl( override val node: Node, private val conditions: ArrayList = ArrayList(), private val actions: ArrayList = ArrayList(), - private val coroutineContext: CoroutineContext = Dispatchers.Default, private val engine: Engine, private val environment: Environment, + coroutineContext: CoroutineContext = Dispatchers.Default, ) : Event { private val timeDistribution: TimeDistribution = TimeDistribution(DoubleTime(2.0)) private val observedLocalEvents: HashMap = hashMapOf() private val observedNeighborEvents: HashMap = hashMapOf() private val executionFlow: MutableSharedFlow = MutableSharedFlow() + private val coroutineScope = CoroutineScope(coroutineContext) + private lateinit var observerLatch: CountDownLatch init { observeLocalEvents() @@ -45,8 +49,12 @@ class EventImpl( override val tau: Time get() = timeDistribution.getNextOccurrence() override suspend fun execute() { - actions.forEach { it.execute() } + observerLatch = CountDownLatch(executionFlow.subscriptionCount.value) executionFlow.emit(this) + withContext(Dispatchers.IO) { + observerLatch.await() + } + actions.forEach { it.execute() } } override fun canExecute(): Boolean { @@ -57,11 +65,18 @@ class EventImpl( updateEvent(currentTime) } + override fun notifyUpdate() { + observerLatch.countDown() + } + override fun getNumberOfEventExecutionObserver() = executionFlow.subscriptionCount.value override fun eventRemoved() { observedLocalEvents.values.forEach { it.cancel() } + observedNeighborEvents.values.forEach { + it.cancel() + } } override fun observeExecution(): Flow = executionFlow.asSharedFlow() @@ -72,7 +87,7 @@ class EventImpl( } private fun observeLocalEvents() { - CoroutineScope(coroutineContext).launch { + coroutineScope.launch { node.events.collect { val removed = observedLocalEvents.keys - it.toSet() - setOf(this@EventImpl) val added = it.toSet() - setOf(this@EventImpl) - observedLocalEvents.keys @@ -84,6 +99,7 @@ class EventImpl( val job = launch { event.observeExecution().collect { event -> updateEvent(event.tau) + event.notifyUpdate() } } observedLocalEvents[event] = job @@ -94,7 +110,7 @@ class EventImpl( @OptIn(ExperimentalCoroutinesApi::class) private fun observeNeighborEvents() { - CoroutineScope(coroutineContext).launch { + coroutineScope.launch { environment.neighbors(node).mapLatest { it.flatMap { node -> node.events.value @@ -110,6 +126,7 @@ class EventImpl( val job = launch { event.observeExecution().collect { event -> updateEvent(event.tau) + event.notifyUpdate() } } observedNeighborEvents[event] = job