Skip to content

Commit

Permalink
Make sure we exit at the right time
Browse files Browse the repository at this point in the history
  • Loading branch information
MTRNord committed Nov 4, 2023
1 parent 39b2296 commit d96d22e
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 114 deletions.
88 changes: 75 additions & 13 deletions crates/erooster/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#![allow(clippy::missing_panics_doc)]
#![allow(clippy::items_after_statements)]

use std::sync::Arc;

use erooster_core::{
backend::{database::get_database, storage::get_storage},
panic_handler::EroosterPanicMessage,
Expand All @@ -45,9 +47,15 @@ use erooster_deps::{
clap::{self, Parser},
color_eyre::{self, eyre::Result},
opentelemetry,
tokio::{self, signal},
tokio::{
self,
signal::unix::{signal, SignalKind},
sync::Mutex,
},
tokio_util::sync::CancellationToken,
tracing::{error, info, warn},
tracing_error, tracing_subscriber,
yaque::{recovery::recover, Receiver, ReceiverBuilder},
};

#[derive(Parser, Debug)]
Expand All @@ -58,7 +66,7 @@ struct Args {
}

#[tokio::main]
#[allow(clippy::too_many_lines)]
#[allow(clippy::too_many_lines, clippy::redundant_pub_crate)]
async fn main() -> Result<()> {
// Setup logging and metrics
let builder = color_eyre::config::HookBuilder::default().panic_message(EroosterPanicMessage);
Expand Down Expand Up @@ -118,20 +126,74 @@ async fn main() -> Result<()> {
let database = get_database(&config).await?;
let storage = get_storage(database.clone(), config.clone());

// Startup servers
erooster_imap::start(config.clone(), &database, &storage)?;
// We do need the let here to make sure that the runner is bound to the lifetime of main.
erooster_smtp::servers::start(config.clone(), &database, &storage).await?;
// Start listening for tasks
let mut receiver = ReceiverBuilder::default()
.save_every_nth(None)
.open(config.task_folder.clone());
if let Err(e) = receiver {
warn!("Unable to open receiver: {:?}. Trying to recover.", e);
recover(&config.task_folder)?;
receiver = ReceiverBuilder::default()
.save_every_nth(None)
.open(config.task_folder.clone());
info!("Recovered queue successfully");
}

// Get SIGTERMs
let mut sigterms = signal(SignalKind::terminate())?;
let shutdown_flag = CancellationToken::new();

match receiver {
Ok(receiver) => {
let receiver = Arc::new(Mutex::const_new(receiver));

// Startup servers
erooster_imap::start(config.clone(), &database, &storage)?;

let config_clone = config.clone();
tokio::spawn(async move {
erooster_web::start(&config_clone)
.await
.expect("Unable to start webserver");
});

erooster_web::start(&config).await?;
let receiver_clone: Arc<Mutex<Receiver>> = Arc::clone(&receiver);
let shutdown_flag_clone = shutdown_flag.clone();
// We do need the let here to make sure that the runner is bound to the lifetime of main.
erooster_smtp::servers::start(
config.clone(),
&database,
&storage,
shutdown_flag_clone,
receiver,
)
.await?;

match signal::ctrl_c().await {
Ok(()) => {}
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
// we also shut down in case of error
tokio::select! {
_ = tokio::signal::ctrl_c() => {
cleanup(&receiver_clone, &shutdown_flag).await;
}
_ = sigterms.recv() => {
cleanup(&receiver_clone, &shutdown_flag).await;
}
}
}
Err(e) => {
error!("Unable to open receiver: {:?}. Giving up.", e);
}
}
opentelemetry::global::shutdown_tracer_provider();
Ok(())
}

async fn cleanup(receiver: &Arc<Mutex<Receiver>>, shutdown_flag: &CancellationToken) {
info!("Received ctr-c. Cleaning up");
shutdown_flag.cancel();
opentelemetry::global::shutdown_tracer_provider();
{
let mut lock = receiver.lock().await;

info!("Gained lock. Saving queue");
lock.save().expect("Unable to save queue");
info!("Saved queue. Exiting");
};
}
146 changes: 45 additions & 101 deletions crates/erooster_smtp/src/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,10 @@ use erooster_deps::{
color_eyre,
futures::{Sink, SinkExt},
serde_json,
tokio::{
self,
signal::unix::{signal, SignalKind},
sync::Mutex,
time::timeout,
},
tokio::{self, sync::Mutex, time::timeout},
tokio_util::sync::CancellationToken,
tracing::{self, error, info, instrument, warn},
yaque::{recovery::recover, Receiver, ReceiverBuilder, Sender},
tracing::{self, instrument},
yaque::{Receiver, Sender},
};

use self::sending::EmailPayload;
Expand Down Expand Up @@ -53,13 +48,14 @@ where
/// # Errors
///
/// Returns an error if the server startup fails
#[instrument(skip(config, database, storage))]
#[instrument(skip(config, database, storage, shutdown_flag, receiver))]
pub async fn start(
config: Config,
database: &DB,
storage: &Storage,
shutdown_flag: CancellationToken,
receiver: Arc<Mutex<Receiver>>,
) -> color_eyre::eyre::Result<()> {
let shutdown_flag = CancellationToken::new();
let db_clone = database.clone();
let storage_clone = storage.clone();
let config_clone = config.clone();
Expand Down Expand Up @@ -89,99 +85,47 @@ pub async fn start(
}
});

// Start listening for tasks
let mut receiver = ReceiverBuilder::default()
.save_every_nth(None)
.open(config.task_folder.clone());
if let Err(e) = receiver {
warn!("Unable to open receiver: {:?}. Trying to recover.", e);
recover(&config.task_folder)?;
receiver = ReceiverBuilder::default()
.save_every_nth(None)
.open(config.task_folder.clone());
info!("Recovered queue successfully");
}

// Get SIGTERMs
let mut sigterms = signal(SignalKind::terminate())?;

match receiver {
Ok(receiver) => {
let receiver = Arc::new(Mutex::const_new(receiver));
let receiver_clone = Arc::clone(&receiver);

let shutdown_flag_clone = shutdown_flag.clone();
tokio::spawn(async move {
loop {
if shutdown_flag_clone.is_cancelled() {
break;
let shutdown_flag_clone = shutdown_flag.clone();
tokio::spawn(async move {
loop {
if shutdown_flag_clone.is_cancelled() {
break;
}
let mut receiver_lock = Arc::clone(&receiver).lock_owned().await;
let data = timeout(Duration::from_secs(1), receiver_lock.recv()).await;

if let Ok(data) = data {
match data {
Ok(data) => {
let email_bytes = &*data;
let email_json = serde_json::from_slice::<EmailPayload>(email_bytes)
.expect("Unable to parse job json");

if let Err(e) = send_email_job(&email_json).await {
tracing::error!(
"Error while sending email: {:?}. Adding it to the queue again",
e
);
// FIXME: This can race the lock leading to an error. We should
// probably handle this better.
let mut sender = Sender::open(config.task_folder.clone())
.expect("Unable to open sender");
let json_bytes = serde_json::to_vec(&email_json)
.expect("Unable to convert email job to json");
sender
.send(json_bytes)
.await
.expect("Unable to requeue emaul");
}
// Mark the job as complete
data.commit().expect("Unable to commit job");
}
let mut receiver_lock = Arc::clone(&receiver).lock_owned().await;
let data = timeout(Duration::from_secs(1), receiver_lock.recv()).await;

if let Ok(data) = data {
match data {
Ok(data) => {
let email_bytes = &*data;
let email_json =
serde_json::from_slice::<EmailPayload>(email_bytes)
.expect("Unable to parse job json");

if let Err(e) = send_email_job(&email_json).await {
tracing::error!(
"Error while sending email: {:?}. Adding it to the queue again",
e
);
// FIXME: This can race the lock leading to an error. We should
// probably handle this better.
let mut sender = Sender::open(config.task_folder.clone())
.expect("Unable to open sender");
let json_bytes = serde_json::to_vec(&email_json)
.expect("Unable to convert email job to json");
sender
.send(json_bytes)
.await
.expect("Unable to requeue emaul");
}
// Mark the job as complete
data.commit().expect("Unable to commit job");
}
Err(e) => {
tracing::error!(
"Error while receiving data from receiver: {:?}",
e
);
}
};
Err(e) => {
tracing::error!("Error while receiving data from receiver: {:?}", e);
}
}
});

tokio::select! {
_ = tokio::signal::ctrl_c() => {
cleanup(&receiver_clone, &shutdown_flag).await;
}
_ = sigterms.recv() => {
cleanup(&receiver_clone, &shutdown_flag).await;
}
};
}
Ok(())
}
Err(e) => {
error!("Unable to open receiver: {:?}. Giving up.", e);
Ok(())
}
}
}

async fn cleanup(receiver: &Arc<Mutex<Receiver>>, shutdown_flag: &CancellationToken) {
info!("Received ctr-c. Cleaning up");
shutdown_flag.cancel();
{
let mut lock = receiver.lock().await;

info!("Gained lock. Saving queue");
lock.save().expect("Unable to save queue");
info!("Saved queue. Exiting");
};
});
Ok(())
}

0 comments on commit d96d22e

Please sign in to comment.