Skip to content

Commit

Permalink
graceful shutdown (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
JLerxky authored Jul 7, 2023
1 parent 5078f9a commit f57e79b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio = { version = "1.27", features = ["fs", "io-util", "rt", "macros"] }
serde = "1.0"
bytes = "1.4"
tower = "0.4.13"
flume = "0.10"

cloud-util = { package = "cloud-util", git = "https://github.com/cita-cloud/cloud-common-rs.git" }
cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cita-cloud/cloud-common-rs.git" }
Expand Down
9 changes: 3 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ mod storage;
mod utils;

use clap::{crate_authors, crate_version, value_parser, Arg, ArgAction, Command};
use cloud_util::panic_hook::set_panic_handler;
use config::ConsensusServiceConfig;
use peer::Peer;
use slog::info;
Expand Down Expand Up @@ -110,14 +109,12 @@ fn main() {
log_builder.build().expect("can't build file logger")
};

set_panic_handler();

let rt = tokio::runtime::Runtime::new().unwrap();

rt.spawn(cloud_util::signal::handle_signals());
rt.block_on(async move {
let mut peer = Peer::setup(config, logger.clone()).await;
peer.run().await;
let rx_signal = cloud_util::graceful_shutdown::graceful_shutdown();
let mut peer = Peer::setup(config, logger.clone(), rx_signal.clone()).await;
peer.run(rx_signal).await;
info!(logger, "raft service exit");
});
}
Expand Down
23 changes: 19 additions & 4 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ impl Peer {
}
}

pub async fn setup(config: ConsensusServiceConfig, logger: Logger) -> Self {
pub async fn setup(
config: ConsensusServiceConfig,
logger: Logger,
rx_signal: flume::Receiver<()>,
) -> Self {
let node_addr = {
let s = &config.node_addr;
hex::decode(s.strip_prefix("0x").unwrap_or(s)).expect("decode node_addr failed")
Expand Down Expand Up @@ -193,7 +197,10 @@ impl Peer {
.add_service(ConsensusServiceServer::new(raft_svc))
.add_service(NetworkMsgHandlerServiceServer::new(network_svc))
.add_service(HealthServer::new(HealthCheckServer {}))
.serve(addr)
.serve_with_shutdown(
addr,
cloud_util::graceful_shutdown::grpc_serve_listen_term(rx_signal),
)
.await;

if let Err(e) = res {
Expand All @@ -210,7 +217,10 @@ impl Peer {
.add_service(ConsensusServiceServer::new(raft_svc))
.add_service(NetworkMsgHandlerServiceServer::new(network_svc))
.add_service(HealthServer::new(HealthCheckServer {}))
.serve(addr)
.serve_with_shutdown(
addr,
cloud_util::graceful_shutdown::grpc_serve_listen_term(rx_signal),
)
.await;

if let Err(e) = res {
Expand Down Expand Up @@ -407,7 +417,7 @@ impl Peer {
}
}

pub async fn run(&mut self) {
pub async fn run(&mut self, rx_signal: flume::Receiver<()>) {
let mut fetching_proposal: Option<JoinHandle<Result<Proposal, tonic::Status>>> = None;

let tick_interval = Duration::from_millis(200);
Expand Down Expand Up @@ -470,6 +480,11 @@ impl Peer {
error!(self.logger, "step raft msg failed: `{}`", e);
}
}

// signal to exit
_ = rx_signal.recv_async() => {
break;
},
}

//if incoming change needs to transfer leader, then transfer leader first
Expand Down

0 comments on commit f57e79b

Please sign in to comment.