Skip to content

Commit

Permalink
Shutdown smtp server when we shutdown the queue to then unlock the qu…
Browse files Browse the repository at this point in the history
…eue directly
  • Loading branch information
MTRNord committed Nov 4, 2023
1 parent bb0133e commit 8b1f4ef
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 11 deletions.
28 changes: 26 additions & 2 deletions crates/erooster_smtp/src/servers/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use erooster_deps::{
TlsAcceptor,
},
tokio_stream::wrappers::TcpListenerStream,
tokio_util::codec::Framed,
tokio_util::{codec::Framed, sync::CancellationToken},
tracing::{self, debug, error, info, instrument},
};
use std::{
Expand Down Expand Up @@ -96,6 +96,7 @@ impl Encrypted {
config: Config,
database: &DB,
storage: &Storage,
shutdown_flag: CancellationToken,
) -> color_eyre::eyre::Result<()> {
let acceptor = get_tls_acceptor(&config)?;
// Opens the listener
Expand All @@ -117,8 +118,17 @@ impl Encrypted {
let storage = storage.clone();
let acceptor = acceptor.clone();
let config = config.clone();
let shutdown_flag_clone = shutdown_flag.clone();
tokio::spawn(async move {
listen(stream, &config, &database, &storage, acceptor.clone()).await;
listen(
stream,
&config,
&database,
&storage,
acceptor.clone(),
shutdown_flag_clone.clone(),
)
.await;
});
}

Expand All @@ -133,9 +143,14 @@ async fn listen(
database: &DB,
storage: &Storage,
acceptor: TlsAcceptor,
shutdown_flag: CancellationToken,
) {
// Looks for new peers
let shutdown_flag_clone = shutdown_flag.clone();
while let Some(Ok(tcp_stream)) = stream.next().await {
if shutdown_flag_clone.clone().is_cancelled() {
break;
}
if let Err(e) = listen_tls(
tcp_stream,
config,
Expand All @@ -144,12 +159,14 @@ async fn listen(
acceptor.clone(),
None,
false,
shutdown_flag_clone.clone(),
) {
error!("[SMTP][ENCRYPTED] Error while listening: {}", e);
}
}
}

#[allow(clippy::too_many_arguments)]
pub fn listen_tls(
tcp_stream: TcpStream,
config: &Config,
Expand All @@ -158,6 +175,7 @@ pub fn listen_tls(
acceptor: TlsAcceptor,
upper_data: Option<Data>,
starttls: bool,
shutdown_flag: CancellationToken,
) -> color_eyre::eyre::Result<()> {
let peer = tcp_stream
.peer_addr()
Expand Down Expand Up @@ -209,6 +227,12 @@ pub fn listen_tls(
};
// Read lines from the stream
while let Some(Ok(line)) = lines_reader.next().await {
if shutdown_flag.is_cancelled() {
if let Err(e) = lines_sender.send(String::from("421 Shutting down")).await {
error!("[SMTP] Error sending response: {:?}", e);
}
break;
}
debug!("[SMTP][TLS] [{}] Got Command: {}", peer, line);

{
Expand Down
35 changes: 29 additions & 6 deletions crates/erooster_smtp/src/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ use erooster_deps::{
sync::Mutex,
time::timeout,
},
tokio_util::sync::CancellationToken,
tracing::{self, error, info, instrument, warn},
yaque::{recovery::recover, Receiver, ReceiverBuilder, Sender},
yaque::{
recovery::{recover, unlock_queue},
Receiver, ReceiverBuilder, Sender,
},
};

use self::sending::EmailPayload;
Expand Down Expand Up @@ -58,20 +62,32 @@ pub async fn start(
database: &DB,
storage: &Storage,
) -> color_eyre::eyre::Result<()> {
let shutdown_flag = CancellationToken::new();
let db_clone = database.clone();
let storage_clone = storage.clone();
let config_clone = config.clone();
let shutdown_flag_clone = shutdown_flag.clone();
tokio::spawn(async move {
if let Err(e) = unencrypted::Unencrypted::run(config_clone, &db_clone, &storage_clone).await
if let Err(e) = unencrypted::Unencrypted::run(
config_clone,
&db_clone,
&storage_clone,
shutdown_flag_clone,
)
.await
{
panic!("Unable to start server: {e:?}");
}
});
let db_clone = database.clone();
let storage_clone = storage.clone();
let config_clone = config.clone();
let shutdown_flag_clone = shutdown_flag.clone();
tokio::spawn(async move {
if let Err(e) = encrypted::Encrypted::run(config_clone, &db_clone, &storage_clone).await {
if let Err(e) =
encrypted::Encrypted::run(config_clone, &db_clone, &storage_clone, shutdown_flag_clone)
.await
{
panic!("Unable to start TLS server: {e:?}");
}
});
Expand All @@ -97,6 +113,7 @@ pub async fn start(
let receiver = Arc::new(Mutex::const_new(receiver));
let receiver_clone = Arc::clone(&receiver);

let config_clone = config.clone();
tokio::spawn(async move {
loop {
let mut receiver_lock = Arc::clone(&receiver).lock_owned().await;
Expand Down Expand Up @@ -142,10 +159,10 @@ pub async fn start(

tokio::select! {
_ = tokio::signal::ctrl_c() => {
cleanup(&receiver_clone).await;
cleanup(&receiver_clone, &shutdown_flag, &config_clone).await;
}
_ = sigterms.recv() => {
cleanup(&receiver_clone).await;
cleanup(&receiver_clone, &shutdown_flag, &config_clone).await;
}
}
Ok(())
Expand All @@ -157,12 +174,18 @@ pub async fn start(
}
}

async fn cleanup(receiver: &Arc<Mutex<Receiver>>) {
async fn cleanup(
receiver: &Arc<Mutex<Receiver>>,
shutdown_flag: &CancellationToken,
config: &Config,
) {
info!("Received ctr-c. Cleaning up");
let mut lock = receiver.lock().await;

info!("Gained lock. Saving queue");
lock.save().expect("Unable to save queue");
info!("Saved queue. Exiting");
shutdown_flag.cancel();
unlock_queue(&config.task_folder).expect("Failed to unlock queue");
exit(0);
}
27 changes: 24 additions & 3 deletions crates/erooster_smtp/src/servers/unencrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use erooster_deps::{
futures::{SinkExt, StreamExt},
tokio::{self, net::TcpListener, task::JoinHandle},
tokio_stream::wrappers::TcpListenerStream,
tokio_util::codec::Framed,
tokio_util::{codec::Framed, sync::CancellationToken},
tracing::{self, debug, error, info, instrument},
};
use std::net::SocketAddr;
Expand All @@ -37,6 +37,7 @@ impl Unencrypted {
config: Config,
database: &DB,
storage: &Storage,
shutdown_flag: CancellationToken,
) -> color_eyre::eyre::Result<()> {
let addrs: Vec<SocketAddr> = if let Some(listen_ips) = &config.listen_ips {
listen_ips
Expand All @@ -57,8 +58,9 @@ impl Unencrypted {
let database = database.clone();
let storage = storage.clone();
let config = config.clone();
let shutdown_flag = shutdown_flag.clone();
tokio::spawn(async move {
listen(stream, &config, &database, &storage).await;
listen(stream, &config, &database, &storage, shutdown_flag.clone()).await;
});
}

Expand All @@ -67,14 +69,25 @@ impl Unencrypted {
}

#[allow(clippy::too_many_lines)]
async fn listen(mut stream: TcpListenerStream, config: &Config, database: &DB, storage: &Storage) {
async fn listen(
mut stream: TcpListenerStream,
config: &Config,
database: &DB,
storage: &Storage,
shutdown_flag: CancellationToken,
) {
let shutdown_flag_clone = shutdown_flag.clone();
while let Some(Ok(tcp_stream)) = stream.next().await {
if shutdown_flag_clone.clone().is_cancelled() {
break;
}
let peer = tcp_stream.peer_addr().expect("[SMTP] peer addr to exist");
debug!("[SMTP] Got new peer: {}", peer);

let database = database.clone();
let storage = storage.clone();
let config = config.clone();
let shutdown_flag_clone = shutdown_flag_clone.clone();
let connection: JoinHandle<Result<()>> = tokio::spawn(async move {
let lines = Framed::new(tcp_stream, LinesCodec::new_with_max_length(LINE_LIMIT));
let (mut lines_sender, mut lines_reader) = lines.split();
Expand All @@ -89,7 +102,14 @@ async fn listen(mut stream: TcpListenerStream, config: &Config, database: &DB, s
let mut data = Data { con_state: state };

let mut do_starttls = false;
let shutdown_flag_clone = shutdown_flag_clone.clone();
while let Some(Ok(line)) = lines_reader.next().await {
if shutdown_flag_clone.clone().is_cancelled() {
if let Err(e) = lines_sender.send(String::from("421 Shutting down")).await {
error!("[SMTP] Error sending response: {:?}", e);
}
break;
}
debug!("[SMTP] [{}] Got Command: {}", peer, line);

// TODO make sure to handle IDLE different as it needs us to stream lines
Expand Down Expand Up @@ -145,6 +165,7 @@ async fn listen(mut stream: TcpListenerStream, config: &Config, database: &DB, s
acceptor,
Some(data),
true,
shutdown_flag_clone.clone(),
) {
error!("[SMTP] Error while upgrading to tls: {}", e);
}
Expand Down

0 comments on commit 8b1f4ef

Please sign in to comment.