From 3c5e07e16bdc3394f8089e56ba6875778f426c2c Mon Sep 17 00:00:00 2001 From: lauener Date: Sat, 22 Apr 2023 18:35:02 +0200 Subject: [PATCH 01/11] Initialize new actors in Server.scala --- be2-scala/src/main/scala/ch/epfl/pop/Server.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala index 071e16ed3e..57160689bf 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala @@ -10,6 +10,7 @@ import akka.http.scaladsl.server.{RequestContext, RouteResult} import akka.pattern.AskableActorRef import akka.util.Timeout import ch.epfl.pop.config.{RuntimeEnvironment, ServerConf} +import ch.epfl.pop.decentralized.{ConnectionMediator, HeartbeatGenerator, Monitor} import ch.epfl.pop.pubsub.{MessageRegistry, PubSubMediator, PublishSubscribe} import ch.epfl.pop.storage.DbActor import org.iq80.leveldb.Options @@ -46,6 +47,11 @@ object Server { val pubSubMediatorRef: ActorRef = system.actorOf(PubSubMediator.props, "PubSubMediator") val dbActorRef: AskableActorRef = system.actorOf(Props(DbActor(pubSubMediatorRef, messageRegistry)), "DbActor") + // Create necessary actors for server-server communications + val heartbeatGenRef: ActorRef = system.actorOf(HeartbeatGenerator.props(dbActorRef)) + val monitorRef: ActorRef = system.actorOf(Monitor.props(heartbeatGenRef)) + val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, messageRegistry)) + // Setup routes def publishSubscribeRoute: RequestContext => Future[RouteResult] = { path(config.clientPath) { From 327dfdfd19298ad3093998edea72e3c0ac287378 Mon Sep 17 00:00:00 2001 From: lauener Date: Sat, 22 Apr 2023 18:40:26 +0200 Subject: [PATCH 02/11] Add clientActor ability to know if it is a server --- be2-scala/src/main/scala/ch/epfl/pop/Server.scala | 8 ++++++-- .../ch/epfl/pop/decentralized/ConnectionMediator.scala | 4 +++- .../src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala | 5 +++-- .../main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala | 4 ++-- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala index 57160689bf..a28d12aa58 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala @@ -55,9 +55,13 @@ object Server { // Setup routes def publishSubscribeRoute: RequestContext => Future[RouteResult] = { path(config.clientPath) { - handleWebSocketMessages(PublishSubscribe.buildGraph(pubSubMediatorRef, dbActorRef, messageRegistry)(system)) + handleWebSocketMessages( + PublishSubscribe.buildGraph(pubSubMediatorRef, dbActorRef, messageRegistry, connectionMediatorRef, isServer = false)(system) + ) } ~ path(config.serverPath) { - handleWebSocketMessages(PublishSubscribe.buildGraph(pubSubMediatorRef, dbActorRef, messageRegistry)(system)) + handleWebSocketMessages( + PublishSubscribe.buildGraph(pubSubMediatorRef, dbActorRef, messageRegistry, connectionMediatorRef, isServer = true)(system) + ) } } diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala index d0e6b0caca..771c168c82 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala @@ -31,7 +31,9 @@ final case class ConnectionMediator( PublishSubscribe.buildGraph( mediatorRef, dbActorRef, - messageRegistry + messageRegistry, + self, + isServer = true ) ) ) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala index a911b1e208..ecd78dbb95 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala @@ -13,7 +13,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future} import scala.util.Failure -final case class ClientActor(mediator: ActorRef) extends Actor with ActorLogging with AskPatternConstants { +final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef, isServer: Boolean) extends Actor with ActorLogging with AskPatternConstants { private var wsHandle: Option[ActorRef] = None private val subscribedChannels: mutable.Set[Channel] = mutable.Set.empty @@ -79,7 +79,8 @@ final case class ClientActor(mediator: ActorRef) extends Actor with ActorLogging } object ClientActor { - def props(mediator: ActorRef): Props = Props(new ClientActor(mediator)) + def props(mediator: ActorRef, connectionMediatorRef: ActorRef, isServer: Boolean): Props = + Props(new ClientActor(mediator, connectionMediatorRef, isServer)) sealed trait ClientActorMessage diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 4a989076c0..07d063f1a8 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -16,12 +16,12 @@ object PublishSubscribe { def getDbActorRef: AskableActorRef = dbActorRef - def buildGraph(mediatorActorRef: ActorRef, dbActorRefT: AskableActorRef, messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[Message, Message, NotUsed] = Flow.fromGraph(GraphDSL.create() { + def buildGraph(mediatorActorRef: ActorRef, dbActorRefT: AskableActorRef, messageRegistry: MessageRegistry, connectionMediatorRef: ActorRef, isServer: Boolean)(implicit system: ActorSystem): Flow[Message, Message, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => { import GraphDSL.Implicits._ - val clientActorRef: ActorRef = system.actorOf(ClientActor.props(mediatorActorRef)) + val clientActorRef: ActorRef = system.actorOf(ClientActor.props(mediatorActorRef, connectionMediatorRef, isServer)) dbActorRef = dbActorRefT /* partitioner port numbers */ From 78c07805e4a4bad6e07dfbb797b90bdde91f0c45 Mon Sep 17 00:00:00 2001 From: lauener Date: Mon, 1 May 2023 18:03:10 +0200 Subject: [PATCH 03/11] Tell connectionMediator about coming and leaving clientActors --- .../main/scala/ch/epfl/pop/pubsub/ClientActor.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala index ecd78dbb95..0b27c8caac 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala @@ -3,6 +3,7 @@ package ch.epfl.pop.pubsub import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.event.LoggingReceive import akka.pattern.AskableActorRef +import ch.epfl.pop.decentralized.ConnectionMediator import ch.epfl.pop.model.objects.Channel import ch.epfl.pop.pubsub.ClientActor._ import ch.epfl.pop.pubsub.PubSubMediator._ @@ -20,6 +21,10 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef private val mediatorAskable: AskableActorRef = mediator + // Tell connectionMediator we are online + if (isServer) + connectionMediatorRef ! ConnectionMediator.NewServerConnected(self) + private def messageWsHandle(event: ClientActorMessage): Unit = event match { case ClientAnswer(graphMessage) => wsHandle.fold(())(_ ! graphMessage) } @@ -30,7 +35,10 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef log.info(s"Connecting wsHandle $wsClient to actor ${this.self}") wsHandle = Some(wsClient) - case DisconnectWsHandle => subscribedChannels.foreach(channel => mediator ! PubSubMediator.UnsubscribeFrom(channel, this.self)) + case DisconnectWsHandle => + if (isServer) + connectionMediatorRef ! ConnectionMediator.ServerLeft(self) + subscribedChannels.foreach(channel => mediator ! PubSubMediator.UnsubscribeFrom(channel, this.self)) case ClientActor.SubscribeTo(channel) => val ask: Future[PubSubMediatorMessage] = (mediatorAskable ? PubSubMediator.SubscribeTo(channel, this.self)).map { From 676f9411d40a7e8a4822f4f9a1bcb2aefd55fb44 Mon Sep 17 00:00:00 2001 From: lauener Date: Sat, 22 Apr 2023 19:03:24 +0200 Subject: [PATCH 04/11] Add monitor in the graph --- .../src/main/scala/ch/epfl/pop/Server.scala | 18 ++++++++++++++++-- .../pop/decentralized/ConnectionMediator.scala | 1 + .../ch/epfl/pop/pubsub/PublishSubscribe.scala | 18 +++++++++++++++--- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala index a28d12aa58..ef8d4b366e 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/Server.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/Server.scala @@ -56,11 +56,25 @@ object Server { def publishSubscribeRoute: RequestContext => Future[RouteResult] = { path(config.clientPath) { handleWebSocketMessages( - PublishSubscribe.buildGraph(pubSubMediatorRef, dbActorRef, messageRegistry, connectionMediatorRef, isServer = false)(system) + PublishSubscribe.buildGraph( + pubSubMediatorRef, + dbActorRef, + messageRegistry, + monitorRef, + connectionMediatorRef, + isServer = false + )(system) ) } ~ path(config.serverPath) { handleWebSocketMessages( - PublishSubscribe.buildGraph(pubSubMediatorRef, dbActorRef, messageRegistry, connectionMediatorRef, isServer = true)(system) + PublishSubscribe.buildGraph( + pubSubMediatorRef, + dbActorRef, + messageRegistry, + monitorRef, + connectionMediatorRef, + isServer = true + )(system) ) } } diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala index 771c168c82..47a34bb841 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala @@ -32,6 +32,7 @@ final case class ConnectionMediator( mediatorRef, dbActorRef, messageRegistry, + monitorRef, self, isServer = true ) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 07d063f1a8..5498a0aec3 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -5,7 +5,8 @@ import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.model.ws.{Message, TextMessage} import akka.pattern.AskableActorRef import akka.stream.FlowShape -import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition} +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Partition} +import ch.epfl.pop.decentralized.Monitor import ch.epfl.pop.model.network.JsonRpcRequest import ch.epfl.pop.pubsub.graph._ import ch.epfl.pop.pubsub.graph.handlers.{ParamsWithChannelHandler, ParamsWithMessageHandler} @@ -16,7 +17,14 @@ object PublishSubscribe { def getDbActorRef: AskableActorRef = dbActorRef - def buildGraph(mediatorActorRef: ActorRef, dbActorRefT: AskableActorRef, messageRegistry: MessageRegistry, connectionMediatorRef: ActorRef, isServer: Boolean)(implicit system: ActorSystem): Flow[Message, Message, NotUsed] = Flow.fromGraph(GraphDSL.create() { + def buildGraph( + mediatorActorRef: ActorRef, + dbActorRefT: AskableActorRef, + messageRegistry: MessageRegistry, + monitorRef: ActorRef, + connectionMediatorRef: ActorRef, + isServer: Boolean + )(implicit system: ActorSystem): Flow[Message, Message, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => { import GraphDSL.Implicits._ @@ -52,7 +60,9 @@ object PublishSubscribe { val hasChannelPartition = builder.add(ParamsWithChannelHandler.graph(clientActorRef)) val merger = builder.add(Merge[GraphMessage](totalPorts)) + val broadcast = builder.add(Broadcast[GraphMessage](2)) + val monitorSink = builder.add(Monitor.sink(monitorRef)) val jsonRpcAnswerGenerator = builder.add(AnswerGenerator.generator) val jsonRpcAnswerer = builder.add(Answerer.answerer(clientActorRef, mediatorActorRef)) @@ -66,7 +76,9 @@ object PublishSubscribe { methodPartitioner.out(portParamsWithMessage) ~> hasMessagePartition ~> merger methodPartitioner.out(portParamsWithChannel) ~> hasChannelPartition ~> merger - merger ~> jsonRpcAnswerGenerator ~> jsonRpcAnswerer ~> output + merger ~> broadcast + broadcast ~> jsonRpcAnswerGenerator ~> jsonRpcAnswerer ~> output + broadcast ~> monitorSink /* close the shape */ FlowShape(input.in, output.out) From 56b2644b99087bad29f529b5da12906a028fa390 Mon Sep 17 00:00:00 2001 From: lauener Date: Sat, 22 Apr 2023 19:28:51 +0200 Subject: [PATCH 05/11] Add heartbeat and getmsg support in Validator.scala --- .../ch/epfl/pop/pubsub/graph/Validator.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/Validator.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/Validator.scala index 281299c177..6eff9b3518 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/Validator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/Validator.scala @@ -28,12 +28,14 @@ object Validator { private def validateMethodContent(graphMessage: GraphMessage): GraphMessage = graphMessage match { case Right(jsonRpcRequest: JsonRpcRequest) => jsonRpcRequest.getParams match { - case _: Broadcast => validateBroadcast(jsonRpcRequest) - case _: Catchup => validateCatchup(jsonRpcRequest) - case _: Publish => validatePublish(jsonRpcRequest) - case _: Subscribe => validateSubscribe(jsonRpcRequest) - case _: Unsubscribe => validateUnsubscribe(jsonRpcRequest) - case _ => Left(validationError(jsonRpcRequest.id)) + case _: Broadcast => validateBroadcast(jsonRpcRequest) + case _: Catchup => validateCatchup(jsonRpcRequest) + case _: Publish => validatePublish(jsonRpcRequest) + case _: Subscribe => validateSubscribe(jsonRpcRequest) + case _: Unsubscribe => validateUnsubscribe(jsonRpcRequest) + case _: Heartbeat => graphMessage // No check necessary + case _: GetMessagesById => graphMessage // No check necessary + case _ => Left(validationError(jsonRpcRequest.id)) } case Right(jsonRpcResponse: JsonRpcResponse) => Left(PipelineError( ErrorCodes.SERVER_ERROR.id, @@ -45,12 +47,14 @@ object Validator { private def validateMessageContent(graphMessage: GraphMessage): GraphMessage = graphMessage match { case Right(jsonRpcRequest: JsonRpcRequest) => jsonRpcRequest.getParams match { - case _: Broadcast => validateMessage(jsonRpcRequest) - case _: Catchup => graphMessage - case _: Publish => validateMessage(jsonRpcRequest) - case _: Subscribe => graphMessage - case _: Unsubscribe => graphMessage - case _ => Left(validationError(jsonRpcRequest.id)) + case _: Broadcast => validateMessage(jsonRpcRequest) + case _: Catchup => graphMessage + case _: Publish => validateMessage(jsonRpcRequest) + case _: Subscribe => graphMessage + case _: Unsubscribe => graphMessage + case _: Heartbeat => graphMessage + case _: GetMessagesById => graphMessage + case _ => Left(validationError(jsonRpcRequest.id)) } case graphMessage @ _ => graphMessage } From 3095e735a114a5c30e024cad75178790ee13dbb9 Mon Sep 17 00:00:00 2001 From: lauener Date: Sat, 22 Apr 2023 19:28:59 +0200 Subject: [PATCH 06/11] Add ParamsWihMapHandler in the graph --- .../main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 5498a0aec3..6211ab08ad 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -9,7 +9,7 @@ import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Partition} import ch.epfl.pop.decentralized.Monitor import ch.epfl.pop.model.network.JsonRpcRequest import ch.epfl.pop.pubsub.graph._ -import ch.epfl.pop.pubsub.graph.handlers.{ParamsWithChannelHandler, ParamsWithMessageHandler} +import ch.epfl.pop.pubsub.graph.handlers.{ParamsWithChannelHandler, ParamsWithMapHandler, ParamsWithMessageHandler} object PublishSubscribe { @@ -36,7 +36,8 @@ object PublishSubscribe { val portPipelineError = 0 val portParamsWithMessage = 1 val portParamsWithChannel = 2 - val totalPorts = 3 + val portParamsWithMap = 3 + val totalPorts = 4 /* building blocks */ // input message from the client @@ -52,12 +53,14 @@ object PublishSubscribe { { case Right(m: JsonRpcRequest) if m.hasParamsMessage => portParamsWithMessage // Publish and Broadcast messages case Right(m: JsonRpcRequest) if m.hasParamsChannel => portParamsWithChannel + case Right(_: JsonRpcRequest) => portParamsWithMap case _ => portPipelineError // Pipeline error goes directly in merger } )) val hasMessagePartition = builder.add(ParamsWithMessageHandler.graph(messageRegistry)) val hasChannelPartition = builder.add(ParamsWithChannelHandler.graph(clientActorRef)) + val hasMapPartition = builder.add(ParamsWithMapHandler.graph(dbActorRef)) val merger = builder.add(Merge[GraphMessage](totalPorts)) val broadcast = builder.add(Broadcast[GraphMessage](2)) @@ -75,6 +78,7 @@ object PublishSubscribe { methodPartitioner.out(portPipelineError) ~> merger methodPartitioner.out(portParamsWithMessage) ~> hasMessagePartition ~> merger methodPartitioner.out(portParamsWithChannel) ~> hasChannelPartition ~> merger + methodPartitioner.out(portParamsWithMap) ~> hasMapPartition ~> merger merger ~> broadcast broadcast ~> jsonRpcAnswerGenerator ~> jsonRpcAnswerer ~> output From 929b86ad4eda0fdaa5ebf4c52d5d0a62c2a3fee7 Mon Sep 17 00:00:00 2001 From: lauener Date: Sat, 22 Apr 2023 19:49:16 +0200 Subject: [PATCH 07/11] Let get_messages_by_id request go through AnswerGenerator --- .../scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala index d1b67cd6e5..d46565f402 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala @@ -3,8 +3,8 @@ package ch.epfl.pop.pubsub.graph import akka.NotUsed import akka.pattern.AskableActorRef import akka.stream.scaladsl.Flow -import ch.epfl.pop.model.network.method.{Broadcast, Catchup} -import ch.epfl.pop.model.network.{ResultObject, _} +import ch.epfl.pop.model.network.method.{Broadcast, Catchup, GetMessagesById} +import ch.epfl.pop.model.network._ import ch.epfl.pop.model.objects.DbActorNAckException import ch.epfl.pop.pubsub.AskPatternConstants import ch.epfl.pop.pubsub.graph.validators.RpcValidator @@ -48,6 +48,9 @@ class AnswerGenerator(dbActor: => AskableActorRef) extends AskPatternConstants { rpcRequest.id )) + // Let get_messages_by_id request go through + case GetMessagesById(_) => graphMessage + // Standard answer res == 0 case _ => Right(JsonRpcResponse( RpcValidator.JSON_RPC_VERSION, From 5f1eddec4e3de3d8640f775506320671504be50c Mon Sep 17 00:00:00 2001 From: lauener Date: Sun, 23 Apr 2023 10:58:42 +0200 Subject: [PATCH 08/11] Let get_messages_by_id answer go through --- .../main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala index d46565f402..b2d4e7b406 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/AnswerGenerator.scala @@ -60,6 +60,9 @@ class AnswerGenerator(dbActor: => AskableActorRef) extends AskPatternConstants { )) } + // Let get_messages_by_id answer go through + case Right(_: JsonRpcResponse) => graphMessage + // Convert PipelineErrors into negative JsonRpcResponses case Left(pipelineError: PipelineError) => Right(JsonRpcResponse( RpcValidator.JSON_RPC_VERSION, From ca2cf2d9e140d940f7952fbcb1e12f16b928b7f4 Mon Sep 17 00:00:00 2001 From: lauener Date: Tue, 25 Apr 2023 18:57:50 +0200 Subject: [PATCH 09/11] let RpcResponse go through Validator --- .../main/scala/ch/epfl/pop/pubsub/graph/Validator.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/Validator.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/Validator.scala index 6eff9b3518..93cd568951 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/Validator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/Validator.scala @@ -37,11 +37,7 @@ object Validator { case _: GetMessagesById => graphMessage // No check necessary case _ => Left(validationError(jsonRpcRequest.id)) } - case Right(jsonRpcResponse: JsonRpcResponse) => Left(PipelineError( - ErrorCodes.SERVER_ERROR.id, - "Unsupported action: MethodValidator was given a response message", - jsonRpcResponse.id - )) + case _ => graphMessage } @@ -63,7 +59,6 @@ object Validator { case Right(_) => validateJsonRpcContent(graphMessage) match { case Right(_) => validateMethodContent(graphMessage) match { case Right(_) => validateMessageContent(graphMessage) match { - case Right(_) => graphMessage case graphMessage @ _ => graphMessage } case graphMessage @ _ => graphMessage From 800becdd8d3eeec9579b039cd7e2f72d0c94703a Mon Sep 17 00:00:00 2001 From: lauener Date: Tue, 25 Apr 2023 18:58:45 +0200 Subject: [PATCH 10/11] Add GetMessagesByIdResponseHandler in the graph --- .../scala/ch/epfl/pop/pubsub/PublishSubscribe.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 6211ab08ad..1394568435 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -7,9 +7,9 @@ import akka.pattern.AskableActorRef import akka.stream.FlowShape import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Partition} import ch.epfl.pop.decentralized.Monitor -import ch.epfl.pop.model.network.JsonRpcRequest +import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse} import ch.epfl.pop.pubsub.graph._ -import ch.epfl.pop.pubsub.graph.handlers.{ParamsWithChannelHandler, ParamsWithMapHandler, ParamsWithMessageHandler} +import ch.epfl.pop.pubsub.graph.handlers.{GetMessagesByIdResponseHandler, ParamsWithChannelHandler, ParamsWithMapHandler, ParamsWithMessageHandler} object PublishSubscribe { @@ -37,7 +37,8 @@ object PublishSubscribe { val portParamsWithMessage = 1 val portParamsWithChannel = 2 val portParamsWithMap = 3 - val totalPorts = 4 + val portResponseHandler = 4 + val totalPorts = 5 /* building blocks */ // input message from the client @@ -54,6 +55,7 @@ object PublishSubscribe { case Right(m: JsonRpcRequest) if m.hasParamsMessage => portParamsWithMessage // Publish and Broadcast messages case Right(m: JsonRpcRequest) if m.hasParamsChannel => portParamsWithChannel case Right(_: JsonRpcRequest) => portParamsWithMap + case Right(_: JsonRpcResponse) => portResponseHandler case _ => portPipelineError // Pipeline error goes directly in merger } )) @@ -61,6 +63,7 @@ object PublishSubscribe { val hasMessagePartition = builder.add(ParamsWithMessageHandler.graph(messageRegistry)) val hasChannelPartition = builder.add(ParamsWithChannelHandler.graph(clientActorRef)) val hasMapPartition = builder.add(ParamsWithMapHandler.graph(dbActorRef)) + val responsePartition = builder.add(GetMessagesByIdResponseHandler.graph(dbActorRef.actorRef)) val merger = builder.add(Merge[GraphMessage](totalPorts)) val broadcast = builder.add(Broadcast[GraphMessage](2)) @@ -79,6 +82,7 @@ object PublishSubscribe { methodPartitioner.out(portParamsWithMessage) ~> hasMessagePartition ~> merger methodPartitioner.out(portParamsWithChannel) ~> hasChannelPartition ~> merger methodPartitioner.out(portParamsWithMap) ~> hasMapPartition ~> merger + methodPartitioner.out(portResponseHandler) ~> responsePartition ~> merger merger ~> broadcast broadcast ~> jsonRpcAnswerGenerator ~> jsonRpcAnswerer ~> output From 6fe67df09cdd8a31347c62b21d8f520b0d28d515 Mon Sep 17 00:00:00 2001 From: lauener Date: Tue, 2 May 2023 22:42:51 +0200 Subject: [PATCH 11/11] Minor refactoring --- .../src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala | 8 +++++--- .../main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala | 4 +++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala index 0b27c8caac..c27b34917b 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala @@ -22,8 +22,9 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef private val mediatorAskable: AskableActorRef = mediator // Tell connectionMediator we are online - if (isServer) + if (isServer) { connectionMediatorRef ! ConnectionMediator.NewServerConnected(self) + } private def messageWsHandle(event: ClientActorMessage): Unit = event match { case ClientAnswer(graphMessage) => wsHandle.fold(())(_ ! graphMessage) @@ -36,9 +37,10 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef wsHandle = Some(wsClient) case DisconnectWsHandle => - if (isServer) - connectionMediatorRef ! ConnectionMediator.ServerLeft(self) subscribedChannels.foreach(channel => mediator ! PubSubMediator.UnsubscribeFrom(channel, this.self)) + if (isServer) { + connectionMediatorRef ! ConnectionMediator.ServerLeft(self) + } case ClientActor.SubscribeTo(channel) => val ask: Future[PubSubMediatorMessage] = (mediatorAskable ? PubSubMediator.SubscribeTo(channel, this.self)).map { diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index 1394568435..74ddad367b 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -40,6 +40,8 @@ object PublishSubscribe { val portResponseHandler = 4 val totalPorts = 5 + val totalBroadcastPort = 2 + /* building blocks */ // input message from the client val input = builder.add(Flow[Message].collect { case TextMessage.Strict(s) => println(s">>> Incoming message : $s"); s }) @@ -66,7 +68,7 @@ object PublishSubscribe { val responsePartition = builder.add(GetMessagesByIdResponseHandler.graph(dbActorRef.actorRef)) val merger = builder.add(Merge[GraphMessage](totalPorts)) - val broadcast = builder.add(Broadcast[GraphMessage](2)) + val broadcast = builder.add(Broadcast[GraphMessage](totalBroadcastPort)) val monitorSink = builder.add(Monitor.sink(monitorRef)) val jsonRpcAnswerGenerator = builder.add(AnswerGenerator.generator)