From f099d80b617c87f6a07c1dd0495f7cab887c7745 Mon Sep 17 00:00:00 2001 From: rink1969 Date: Wed, 13 Mar 2024 13:36:52 +0800 Subject: [PATCH] fix proc reconfigure msg in setup --- src/peer.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/peer.rs b/src/peer.rs index 07d5eee..1e6f40e 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -148,7 +148,7 @@ impl Peer { }; // Communicate with controller - let (controller_tx, mut controller_rx) = mpsc::channel::(1); + let (controller_tx, mut controller_rx) = mpsc::channel::(1000); let raft_svc = RaftConsensusService(controller_tx); // Network grpc client @@ -241,8 +241,19 @@ impl Peer { info!(logger, "waiting for `reconfigure` from controller.."); let (wants_campaign, trigger_config) = loop { tokio::time::sleep(Duration::from_secs(3)).await; - match controller_rx.try_recv() { - Ok(mut trigger_config) => { + let mut opt_trigger_config = None; + // read all reconfigure messages in channel + // when start a new node, sync blocks quickly, there will be many reconfigure messages in channel + // we only need proc latest one + while let Ok(config) = controller_rx.try_recv() { + opt_trigger_config = Some(config); + } + match opt_trigger_config { + Some(mut trigger_config) => { + info!( + logger, + "get reconfigure from controller, height is {}", trigger_config.height + ); // if recorded_height == trigger_height + 1, try to recommit entry let recorded_height = storage.get_block_height(); let trigger_height = trigger_config.height; @@ -309,7 +320,7 @@ impl Peer { ); } } - Err(_) => { + None => { let pwp = ProposalWithProof { proposal: Some(Proposal { height: u64::MAX, @@ -386,13 +397,15 @@ impl Peer { send_time_out_to_transferee: false, - logger, + logger: logger.clone(), }; this.maybe_pending_conf_change(); if wants_campaign { + info!(logger, "start campaign ..."); this.core.campaign().unwrap(); + info!(logger, "campaign success"); } this