Skip to content

Commit

Permalink
Rework queue signal handling
Browse files Browse the repository at this point in the history
  • Loading branch information
MTRNord committed Nov 4, 2023
1 parent 26882f8 commit bb0133e
Showing 1 changed file with 54 additions and 51 deletions.
105 changes: 54 additions & 51 deletions crates/erooster_smtp/src/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::{
process::exit,
sync::{Arc, Mutex},
thread,
time::Duration,
};
use std::{process::exit, sync::Arc, time::Duration};

use crate::servers::sending::send_email_job;
use erooster_core::{
Expand All @@ -21,6 +16,8 @@ use erooster_deps::{
tokio::{
self,
signal::unix::{signal, SignalKind},
sync::Mutex,
time::timeout,
},
tracing::{self, error, info, instrument, warn},
yaque::{recovery::recover, Receiver, ReceiverBuilder, Sender},
Expand Down Expand Up @@ -97,55 +94,61 @@ pub async fn start(

match receiver {
Ok(receiver) => {
let receiver = Arc::new(Mutex::new(receiver));
let receiver = Arc::new(Mutex::const_new(receiver));
let receiver_clone = Arc::clone(&receiver);
let receiver_clone_2 = Arc::clone(&receiver);

tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for ctrl-c event");
cleanup(&receiver_clone);
});

tokio::spawn(async move {
sigterms
.recv()
.await
.expect("failed to listen for ctrl-c event");
cleanup(&receiver_clone_2);
loop {
let mut receiver_lock = Arc::clone(&receiver).lock_owned().await;
let data = timeout(Duration::from_secs(1), receiver_lock.recv()).await;

if let Ok(data) = data {
match data {
Ok(data) => {
let email_bytes = &*data;
let email_json =
serde_json::from_slice::<EmailPayload>(email_bytes)
.expect("Unable to parse job json");

if let Err(e) = send_email_job(&email_json).await {
tracing::error!(
"Error while sending email: {:?}. Adding it to the queue again",
e
);
// FIXME: This can race the lock leading to an error. We should
// probably handle this better.
let mut sender = Sender::open(config.task_folder.clone())
.expect("Unable to open sender");
let json_bytes = serde_json::to_vec(&email_json)
.expect("Unable to convert email job to json");
sender
.send(json_bytes)
.await
.expect("Unable to requeue emaul");
}
// Mark the job as complete
data.commit().expect("Unable to commit job");
}
Err(e) => {
tracing::error!(
"Error while receiving data from receiver: {:?}",
e
);
}
};
}
}
});

loop {
// Marginally slow it down to have it not locked constantly
thread::sleep(Duration::from_millis(5));
let mut receiver_lock = receiver.lock().expect("Unable to lock receiver");
let data = receiver_lock.recv().await;

match data {
Ok(data) => {
let email_bytes = &*data;
let email_json = serde_json::from_slice::<EmailPayload>(email_bytes)?;

if let Err(e) = send_email_job(&email_json).await {
tracing::error!(
"Error while sending email: {:?}. Adding it to the queue again",
e
);
// FIXME: This can race the lock leading to an error. We should
// probably handle this better.
let mut sender = Sender::open(config.task_folder.clone())?;
let json_bytes = serde_json::to_vec(&email_json)?;
sender.send(json_bytes).await?;
}
// Mark the job as complete
data.commit()?;
}
Err(e) => {
tracing::error!("Error while receiving data from receiver: {:?}", e);
}
};
tokio::select! {
_ = tokio::signal::ctrl_c() => {
cleanup(&receiver_clone).await;
}
_ = sigterms.recv() => {
cleanup(&receiver_clone).await;
}
}
Ok(())
}
Err(e) => {
error!("Unable to open receiver: {:?}. Giving up.", e);
Expand All @@ -154,9 +157,9 @@ pub async fn start(
}
}

fn cleanup(receiver: &Arc<Mutex<Receiver>>) {
async fn cleanup(receiver: &Arc<Mutex<Receiver>>) {
info!("Received ctr-c. Cleaning up");
let mut lock = receiver.lock().expect("Unable to lock receiver");
let mut lock = receiver.lock().await;

info!("Gained lock. Saving queue");
lock.save().expect("Unable to save queue");
Expand Down

0 comments on commit bb0133e

Please sign in to comment.