Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: wait for registration before starting p2p #4160

Merged
merged 9 commits into from
Oct 31, 2023
3 changes: 3 additions & 0 deletions engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ async fn run_main(
btc_outgoing_sender,
btc_incoming_receiver,
peer_update_sender,
p2p_ready_receiver,
p2p_fut,
) = p2p::start(
state_chain_client.clone(),
Expand Down Expand Up @@ -336,6 +337,8 @@ async fn run_main(
peer_update_sender,
));

p2p_ready_receiver.await.unwrap();

has_completed_initialising.store(true, std::sync::atomic::Ordering::Relaxed);

Ok(())
Expand Down
65 changes: 41 additions & 24 deletions engine/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
p2p::core::ed25519_secret_key_to_x25519_secret_key,
settings::P2P as P2PSettings,
state_chain_observer::client::{
extrinsic_api::signed::SignedExtrinsicApi, storage_api::StorageApi,
chain_api::ChainApi, extrinsic_api::signed::SignedExtrinsicApi, storage_api::StorageApi,
},
};

Expand All @@ -29,7 +29,10 @@ use futures::{Future, FutureExt};
use multisig::p2p::OutgoingMultisigStageMessages;
use muxer::P2PMuxer;
use sp_core::{ed25519, H256};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use tracing::{info_span, Instrument};
use zeroize::Zeroizing;

Expand Down Expand Up @@ -87,7 +90,7 @@ fn pk_to_string(pk: &XPublicKey) -> String {
pub async fn start<StateChainClient>(
state_chain_client: Arc<StateChainClient>,
settings: P2PSettings,
latest_block_hash: H256,
initial_block_hash: H256,
) -> anyhow::Result<(
MultisigMessageSender<EvmCrypto>,
MultisigMessageReceiver<EvmCrypto>,
Expand All @@ -96,10 +99,11 @@ pub async fn start<StateChainClient>(
MultisigMessageSender<BitcoinCrypto>,
MultisigMessageReceiver<BitcoinCrypto>,
UnboundedSender<PeerUpdate>,
oneshot::Receiver<()>,
impl Future<Output = anyhow::Result<()>>,
)>
where
StateChainClient: StorageApi + SignedExtrinsicApi + 'static + Send + Sync,
StateChainClient: StorageApi + SignedExtrinsicApi + ChainApi + 'static + Send + Sync,
{
if settings.ip_address == IpAddr::V4(Ipv4Addr::UNSPECIFIED) {
anyhow::bail!("Should provide a valid IP address");
Expand All @@ -123,20 +127,22 @@ where
};

let current_peers =
peer_info_submitter::get_current_peer_infos(&state_chain_client, latest_block_hash)
peer_info_submitter::get_current_peer_infos(&state_chain_client, initial_block_hash)
.await
.context("Failed to get initial peer info")?;
let our_account_id = state_chain_client.account_id();

let own_peer_info = current_peers.iter().find(|pi| pi.account_id == our_account_id).cloned();

let (
outgoing_message_sender,
peer_update_sender,
incoming_message_receiver,
own_peer_info_receiver,
p2p_fut,
) = core::start(&node_key, settings.port, current_peers, our_account_id);
let (incoming_message_sender, incoming_message_receiver) =
tokio::sync::mpsc::unbounded_channel();

let (outgoing_message_sender, outgoing_message_receiver) =
tokio::sync::mpsc::unbounded_channel();

let (peer_update_sender, peer_update_receiver) = tokio::sync::mpsc::unbounded_channel();

let (p2p_ready_sender, p2p_ready_receiver) = oneshot::channel();

let (
eth_outgoing_sender,
Expand All @@ -150,22 +156,32 @@ where

let fut = task_scope(move |scope| {
async move {
scope.spawn(async {
p2p_fut.await;
Ok(())
});

scope.spawn(
peer_info_submitter::start(
node_key,
state_chain_client,
scope.spawn(async move {
peer_info_submitter::ensure_peer_info_registered(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make more sense if this was outside the spawn? So that the engine waits for the peer registration before stating it is healthly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how it was originally, but then I noticed that this blocked other modules from starting to initialize, delaying them by 20 seconds or so, thus I change this behaviour in 25caf97. They is no reason why these modules shouldn't start initialising in parallel.

That's also why I mentioned yesterday that we might want to have critical modules signal when they are ready, so we could wait for them before submitting heartbeats (either in addition to or replacing the current 60 seconds delay).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, looks like it is easy to add a signal awaited before has_completed_initialising is updated, so I just did that in 586d662

&node_key,
&state_chain_client,
settings.ip_address,
settings.port,
own_peer_info,
own_peer_info_receiver,
)
.instrument(info_span!("P2PClient")),
);
.instrument(info_span!("P2PClient"))
.await?;

p2p_ready_sender.send(()).unwrap();

core::start(
node_key,
settings.port,
current_peers,
our_account_id,
incoming_message_sender,
outgoing_message_receiver,
peer_update_receiver,
)
.await;

Ok(())
});

scope.spawn(async move {
muxer_future.await;
Expand All @@ -185,6 +201,7 @@ where
btc_outgoing_sender,
btc_incoming_receiver,
peer_update_sender,
p2p_ready_receiver,
fut,
))
}
Loading