diff --git a/Cargo.toml b/Cargo.toml index dd8f5fd..d70ea51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ tokio = { version = "1.27", features = ["fs", "io-util", "rt", "macros"] } serde = "1.0" bytes = "1.4" tower = "0.4.13" +flume = "0.10" cloud-util = { package = "cloud-util", git = "https://github.com/cita-cloud/cloud-common-rs.git" } cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cita-cloud/cloud-common-rs.git" } diff --git a/src/main.rs b/src/main.rs index d065806..54bf42c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,6 @@ mod storage; mod utils; use clap::{crate_authors, crate_version, value_parser, Arg, ArgAction, Command}; -use cloud_util::panic_hook::set_panic_handler; use config::ConsensusServiceConfig; use peer::Peer; use slog::info; @@ -110,14 +109,12 @@ fn main() { log_builder.build().expect("can't build file logger") }; - set_panic_handler(); - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.spawn(cloud_util::signal::handle_signals()); rt.block_on(async move { - let mut peer = Peer::setup(config, logger.clone()).await; - peer.run().await; + let rx_signal = cloud_util::graceful_shutdown::graceful_shutdown(); + let mut peer = Peer::setup(config, logger.clone(), rx_signal.clone()).await; + peer.run(rx_signal).await; info!(logger, "raft service exit"); }); } diff --git a/src/peer.rs b/src/peer.rs index 208f9c6..a709439 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -129,7 +129,11 @@ impl Peer { } } - pub async fn setup(config: ConsensusServiceConfig, logger: Logger) -> Self { + pub async fn setup( + config: ConsensusServiceConfig, + logger: Logger, + rx_signal: flume::Receiver<()>, + ) -> Self { let node_addr = { let s = &config.node_addr; hex::decode(s.strip_prefix("0x").unwrap_or(s)).expect("decode node_addr failed") @@ -193,7 +197,10 @@ impl Peer { .add_service(ConsensusServiceServer::new(raft_svc)) .add_service(NetworkMsgHandlerServiceServer::new(network_svc)) .add_service(HealthServer::new(HealthCheckServer {})) - .serve(addr) + .serve_with_shutdown( + addr, + cloud_util::graceful_shutdown::grpc_serve_listen_term(rx_signal), + ) .await; if let Err(e) = res { @@ -210,7 +217,10 @@ impl Peer { .add_service(ConsensusServiceServer::new(raft_svc)) .add_service(NetworkMsgHandlerServiceServer::new(network_svc)) .add_service(HealthServer::new(HealthCheckServer {})) - .serve(addr) + .serve_with_shutdown( + addr, + cloud_util::graceful_shutdown::grpc_serve_listen_term(rx_signal), + ) .await; if let Err(e) = res { @@ -407,7 +417,7 @@ impl Peer { } } - pub async fn run(&mut self) { + pub async fn run(&mut self, rx_signal: flume::Receiver<()>) { let mut fetching_proposal: Option>> = None; let tick_interval = Duration::from_millis(200); @@ -470,6 +480,11 @@ impl Peer { error!(self.logger, "step raft msg failed: `{}`", e); } } + + // signal to exit + _ = rx_signal.recv_async() => { + break; + }, } //if incoming change needs to transfer leader, then transfer leader first