Skip to content

Commit

Permalink
Replace sqlxmq with yaque to support pgbouncer
Browse files Browse the repository at this point in the history
  • Loading branch information
MTRNord committed Sep 9, 2023
1 parent daff5af commit c27879b
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 276 deletions.
114 changes: 72 additions & 42 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/erooster/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async fn main() -> Result<()> {
// Startup servers
erooster_imap::start(Arc::clone(&config), &database, &storage)?;
// We do need the let here to make sure that the runner is bound to the lifetime of main.
let _runner = erooster_smtp::servers::start(Arc::clone(&config), &database, &storage).await?;
erooster_smtp::servers::start(Arc::clone(&config), &database, &storage).await?;

erooster_web::start(Arc::clone(&config)).await?;

Expand Down
2 changes: 2 additions & 0 deletions crates/erooster_core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct Config {
pub webserver: Webserver,
/// The config related to the optional rspamd integration
pub rspamd: Option<Rspamd>,
/// Folder where the pending tasks are stored
pub task_folder: String,
}

/// The config for the webserver
Expand Down
2 changes: 1 addition & 1 deletion crates/erooster_imap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ tokio-stream = { version = "0.1.12", features = ["net", "io-util"] }
tokio-util = { version = "0.7.7", features = ["full"] }
tracing = "0.1.37"
secrecy = "0.8.0"
time = { version = "0.3.24", features = ["parsing"] }
time = { version = "0.3.24", features = ["parsing", "macros"] }

[dev-dependencies]
convert_case = "0.6.0"
Expand Down
7 changes: 3 additions & 4 deletions crates/erooster_smtp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ rustls-pemfile = "1.0.1"
serde = { version = "=1.0.171", features = ["derive"] }
simdutf8 = { version = "0.1.4" }
mailparse = "0.14.0"
sqlxmq = { version = "0.4", default-features = false, features = [
"runtime-tokio-rustls",
], git = "https://github.com/erooster-mail/sqlxmq.git" }
tokio = { version = "1.24.2", features = ["full"] }
tokio-rustls = { version = "0.24.1", features = ["tls12"] }
tokio-stream = { version = "0.1.12", features = ["net", "io-util"] }
Expand All @@ -41,14 +38,16 @@ trust-dns-resolver = "0.22.0"
webpki-roots = "0.25.0"
secrecy = "0.8.0"
mail-auth = { version = "0.3.0", features = ["rust-crypto"] }
time = { version = "0.3.20", features = ["formatting"] }
time = { version = "0.3.20", features = ["formatting", "macros"] }
reqwest = { version = "0.11.20", default-features = false, features = [
"json",
"rustls-tls",
"trust-dns",
] }
serde_json = "1.0.91"
cfg-if = "1.0.0"
yaque = "0.6.5"
uuid = { version = "1.4.1", features = ["v4", "serde"] }

[features]
default = []
Expand Down
25 changes: 9 additions & 16 deletions crates/erooster_smtp/src/commands/data.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use crate::{
commands::Data,
servers::{
sending::{send_email_job, EmailPayload},
state::Data as StateData,
state::State,
},
servers::{sending::EmailPayload, state::Data as StateData, state::State},
utils::rspamd::Response,
};
use color_eyre::eyre::ContextCompat;
use erooster_core::{
backend::{
database::{Database, DB},
storage::{MailStorage, Storage},
},
backend::storage::{MailStorage, Storage},
config::{Config, Rspamd},
};
use futures::{Sink, SinkExt};
Expand All @@ -22,6 +15,7 @@ use std::io::Write;
use std::{collections::BTreeMap, path::Path, sync::Arc, time::Duration};
use time::{macros::format_description, OffsetDateTime};
use tracing::{debug, instrument};
use yaque::Sender;

#[allow(clippy::module_name_repetitions)]
pub struct DataCommand<'a> {
Expand Down Expand Up @@ -51,13 +45,12 @@ impl DataCommand<'_> {
Ok(())
}

#[instrument(skip(self, config, lines, line, database, storage))]
#[instrument(skip(self, config, lines, line, storage))]
pub async fn receive<S, E>(
&self,
config: Arc<Config>,
lines: &mut S,
line: &str,
database: &DB,
storage: &Storage,
) -> color_eyre::eyre::Result<()>
where
Expand Down Expand Up @@ -119,6 +112,7 @@ impl DataCommand<'_> {
};

let email_payload = EmailPayload {
id: uuid::Uuid::new_v4(),
to,
from: write_lock
.sender
Expand All @@ -129,12 +123,11 @@ impl DataCommand<'_> {
dkim_key_path: config.mail.dkim_key_path.clone(),
dkim_key_selector: config.mail.dkim_key_selector.clone(),
};
let pool = database.get_pool();
send_email_job
.builder()
.set_json(&email_payload)?
.spawn(pool)
let payload_as_json_bytes = serde_json::to_vec(&email_payload)?;
Sender::open(config.task_folder.clone())?
.send(&payload_as_json_bytes)
.await?;

debug!("Email added to queue");
}

Expand Down
2 changes: 1 addition & 1 deletion crates/erooster_smtp/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl Data {
let state = { con_clone.read().await.state.clone() };
if matches!(state, State::ReceivingData(_)) {
DataCommand { data: self }
.receive(config, lines, &line, database, storage)
.receive(config, lines, &line, storage)
.await?;
// We are done here
return Ok(Response::Continue);
Expand Down
Loading

0 comments on commit c27879b

Please sign in to comment.