-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathrestarter.rs
174 lines (161 loc) · 6.02 KB
/
restarter.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! Sample restarter application.
//! This implements a TCP server that accepts connections,
//! outputs a short line describing the running process,
//! then echoes back anything sent to it by the client.
//!
//! While the application is running, another instance can be invoked with the
//! `restart` command which will trigger a restart. Existing connections will be maintained and the
//! old process will terminate as soon as all clients disconnect. The new process will listen on
//! another socket (as this library does not provide for socket inheritance or rebinding).
use anyhow::Error;
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use shellflip::lifecycle::*;
use shellflip::{RestartConfig, ShutdownCoordinator, ShutdownHandle, ShutdownSignal};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::{pin, select};
/// Simple program to test graceful shutdown and restart
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Args {
#[command(subcommand)]
command: Option<Commands>,
/// Restart coordination socket path
#[arg(short, long, default_value = "/tmp/restarter.sock")]
socket: String,
}
#[derive(Subcommand)]
enum Commands {
/// Trigger restart
Restart,
}
struct AppData {
restart_generation: u32,
}
#[async_trait]
impl LifecycleHandler for AppData {
async fn send_to_new_process(&mut self, mut write_pipe: PipeWriter) -> std::io::Result<()> {
if self.restart_generation > 4 {
log::info!("Four restarts is more than anybody needs, surely?");
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"The operation completed successfully",
));
}
write_pipe.write_u32(self.restart_generation).await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::init();
let args = Args::parse();
let mut app_data = AppData {
restart_generation: 0,
};
if let Some(mut handover_pipe) = receive_from_old_process() {
app_data.restart_generation = handover_pipe.read_u32().await? + 1;
}
let restart_generation = app_data.restart_generation;
// Configure the essential requirements for implementing graceful restart.
let restart_conf = RestartConfig {
enabled: true,
coordination_socket_path: args.socket.into(),
lifecycle_handler: Box::new(app_data),
..Default::default()
};
match args.command {
// Restart an already-running process
Some(Commands::Restart) => {
let res = restart_conf.request_restart().await;
match res {
Ok(id) => {
log::info!("Restart succeeded, child pid is {}", id);
return Ok(());
}
Err(e) => {
log::error!("Restart failed: {}", e);
return Err(e);
}
}
}
// Standard operating mode
None => {}
}
// Start the restart thread and get a task that will complete when a restart completes.
let restart_task = restart_conf.try_into_restart_task()?;
// (need to pin this because of the loop below!)
pin!(restart_task);
// Create a shutdown coordinator so that we can wait for all client connections to complete.
let shutdown_coordinator = ShutdownCoordinator::new();
// Bind a TCP listener socket to give us something to do
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
println!(
"Instance no. {} listening on {}",
restart_generation,
listener.local_addr().unwrap()
);
loop {
select! {
res = listener.accept() => {
match res {
Ok((sock, addr)) => {
log::info!("Received connection from {}", addr);
// Spawn a new task to handle the client connection.
// Give it a shutdown handle so we can await its completion.
tokio::spawn(echo(sock, shutdown_coordinator.handle()));
}
Err(e) => {
log::warn!("Accept error: {}", e);
}
}
}
res = &mut restart_task => {
match res {
Ok(_) => {
log::info!("Restart successful, waiting for tasks to complete");
}
Err(e) => {
log::error!("Restart task failed: {}", e);
}
}
// Wait for all clients to complete.
shutdown_coordinator.shutdown().await;
log::info!("Exiting...");
return Ok(());
}
}
}
}
async fn echo(mut sock: TcpStream, shutdown_handle: Arc<ShutdownHandle>) {
// Get notification that shutdown has been requested.
// Note that we still keep the shutdown_handle active during the lifetime of this task.
let mut shutdown_signal = ShutdownSignal::from(&*shutdown_handle);
let mut buf = [0u8; 1024];
let out = format!("Hello, this is process {}\n", std::process::id());
let _ = sock.write_all(out.as_bytes()).await;
loop {
select! {
r = sock.read(&mut buf) => {
match r {
Ok(0) => return,
Ok(n) => {
if let Err(e) = sock.write_all(&buf[..n]).await {
log::error!("write failed: {}", e);
return;
}
}
Err(e) => {
log::error!("read failed: {}", e);
return;
}
}
}
_ = shutdown_signal.on_shutdown() => {
log::info!("shutdown requested but client {} is still active", sock.peer_addr().unwrap());
}
}
}
}