From 6fe67df09cdd8a31347c62b21d8f520b0d28d515 Mon Sep 17 00:00:00 2001 From: lauener Date: Tue, 2 May 2023 22:42:51 +0200 Subject: [PATCH] Minor refactoring --- .../src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala | 8 +++++--- .../main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala | 4 +++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala index 0b27c8caac..c27b34917b 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala @@ -22,8 +22,9 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef private val mediatorAskable: AskableActorRef = mediator // Tell connectionMediator we are online - if (isServer) + if (isServer) { connectionMediatorRef ! ConnectionMediator.NewServerConnected(self) + } private def messageWsHandle(event: ClientActorMessage): Unit = event match { case ClientAnswer(graphMessage) => wsHandle.fold(())(_ ! graphMessage) @@ -36,9 +37,10 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef wsHandle = Some(wsClient) case DisconnectWsHandle => - if (isServer) - connectionMediatorRef ! ConnectionMediator.ServerLeft(self) subscribedChannels.foreach(channel => mediator ! PubSubMediator.UnsubscribeFrom(channel, this.self)) + if (isServer) { + connectionMediatorRef ! ConnectionMediator.ServerLeft(self) + } case ClientActor.SubscribeTo(channel) => val ask: Future[PubSubMediatorMessage] = (mediatorAskable ? PubSubMediator.SubscribeTo(channel, this.self)).map { diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 1394568435..74ddad367b 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -40,6 +40,8 @@ object PublishSubscribe { val portResponseHandler = 4 val totalPorts = 5 + val totalBroadcastPort = 2 + /* building blocks */ // input message from the client val input = builder.add(Flow[Message].collect { case TextMessage.Strict(s) => println(s">>> Incoming message : $s"); s }) @@ -66,7 +68,7 @@ object PublishSubscribe { val responsePartition = builder.add(GetMessagesByIdResponseHandler.graph(dbActorRef.actorRef)) val merger = builder.add(Merge[GraphMessage](totalPorts)) - val broadcast = builder.add(Broadcast[GraphMessage](2)) + val broadcast = builder.add(Broadcast[GraphMessage](totalBroadcastPort)) val monitorSink = builder.add(Monitor.sink(monitorRef)) val jsonRpcAnswerGenerator = builder.add(AnswerGenerator.generator)