diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index a1a90cdd26..163167c0dc 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -6,7 +6,7 @@ import akka.pattern.{AskableActorRef, ask} import akka.stream.scaladsl.Flow import ch.epfl.pop.decentralized.GossipManager.TriggerPullState import ch.epfl.pop.model.network.MethodType.rumor_state -import ch.epfl.pop.model.network.method.{Rumor, RumorState} +import ch.epfl.pop.model.network.method.{GreetServer, Rumor, RumorState} import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.method.message.data.ActionType import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType} @@ -163,27 +163,36 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou case None => -1 } - private def sendRumorState(): Unit = { - val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer() - Await.result(randomPeer, duration) match { - case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) => - val rumorStateGet = dbActorRef ? GetRumorState() - Await.result(rumorStateGet, duration) match - case DbActorGetRumorStateAck(rumorState) => - log.info(s"Sending rumor_state ${rumorState.state} to ${greetServer.serverAddress}") - serverRef ! ClientAnswer( - Right(JsonRpcRequest( - RpcValidator.JSON_RPC_VERSION, - rumor_state, - rumorState, - Some(jsonId) - )) - ) - jsonId += 1 - case _ => log.error(s"Actor $self failed on creating rumor state. State wasn't gossiped.") - case m @ _ => - log.warning(s"Received an unexpected message $m waiting for a random peer") - } + private def sendRumorState(actorRefDest: ActorRef = ActorRef.noSender, greetServerDest: Option[GreetServer] = None): Unit = { + var serverRef = ActorRef.noSender + var greetServer_ : Option[GreetServer] = None + if actorRefDest == Actor.noSender then + val randomPeerGet = connectionMediatorRef ? ConnectionMediator.GetRandomPeer() + serverRef = Await.result(randomPeerGet, duration) match { + case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) => + greetServer_ = Some(greetServer) + serverRef + case m @ _ => + log.warning(s"Received an unexpected message $m waiting for a random peer") + serverRef + } + else + serverRef = actorRefDest + greetServer_ = greetServerDest + val rumorStateGet = dbActorRef ? GetRumorState() + Await.result(rumorStateGet, duration) match + case DbActorGetRumorStateAck(rumorState) => + log.info(s"Sending rumor_state ${rumorState.state} to ${if greetServer_.isDefined then greetServer_.get.serverAddress else ""}") + serverRef ! ClientAnswer( + Right(JsonRpcRequest( + RpcValidator.JSON_RPC_VERSION, + rumor_state, + rumorState, + Some(jsonId) + )) + ) + jsonId += 1 + case _ => log.error(s"Actor $self failed on creating rumor state. State wasn't gossiped.") } private def peersAlreadyReceived(jsonRpcRequest: JsonRpcRequest): Set[ActorRef] = { @@ -225,6 +234,9 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou case Monitor.NoServerConnected => timers.cancel(periodicRumorStateKey) + case ConnectionMediator.NewServerConnected(serverRef, greetServer) => + sendRumorState(serverRef, Some(greetServer)) + case TriggerPullState() => sendRumorState()