diff --git a/backend/src/config.rs b/backend/src/config.rs index b34ee8499..c1edc8228 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -107,7 +107,7 @@ impl BackendConfig { postgres: postgres.clone(), }), #[cfg(feature = "worker")] - (true, _, _, _, _, _, _) => bail!("Worker configuration is missing"), + (true, _, _, _, _, _) => bail!("Worker configuration is missing"), _ => bail!("Calling must_worker_config on a non-worker backend"), } } @@ -232,19 +232,10 @@ impl ThrottleConfig { /// Load the worker configuration from the worker_config.toml file and from the /// environment. pub async fn load_config() -> Result { - let mut cfg = Config::builder() + let cfg = Config::builder() .add_source(config::File::with_name("backend_config")) .add_source(config::Environment::with_prefix("RCH").separator("__")); - // The RCH__WORKER__RABBITMQ__QUEUES is always read as a str, whereas we - // sometimes want to parse it as a list of strings, if it's not "all". We - // handle this case separately. - if let Ok(queues) = env::var("RCH__WORKER__RABBITMQ__QUEUES") { - if queues != "all" { - let queues: Vec = queues.split(',').map(String::from).collect(); - cfg = cfg.set_override("worker.rabbitmq.queues", queues)?; - } - } let cfg = cfg.build()?.try_deserialize::()?; if !cfg.worker.enable && (cfg.worker.rabbitmq.is_some() || cfg.worker.throttle.is_some()) { diff --git a/backend/src/http/error.rs b/backend/src/http/error.rs index 524bb28c8..ac4c01278 100644 --- a/backend/src/http/error.rs +++ b/backend/src/http/error.rs @@ -17,7 +17,6 @@ use check_if_email_exists::{CheckEmailInputBuilderError, LOG_TARGET}; use serde::ser::SerializeStruct; use serde::Serialize; -use sqlx::any; use std::fmt; use std::fmt::Debug; use tracing::error; diff --git a/backend/src/http/v1/bulk/post.rs b/backend/src/http/v1/bulk/post.rs index 55e11246a..abe05e4a0 100644 --- a/backend/src/http/v1/bulk/post.rs +++ b/backend/src/http/v1/bulk/post.rs @@ -76,7 +76,8 @@ async fn http_handler( .map_err(ReacherResponseError::from)?; let n = body.input.len(); - let stream = futures::stream::iter(body.input); + let webhook = body.webhook.clone(); + let stream = futures::stream::iter(body.input.into_iter()); let properties = BasicProperties::default() .with_content_type("application/json".into()) @@ -95,7 +96,7 @@ async fn http_handler( let task = CheckEmailTask { input, job_id: CheckEmailJobId::Bulk(rec.id), - webhook: body.webhook.clone(), + webhook: webhook.clone(), }; publish_task( @@ -113,7 +114,7 @@ async fn http_handler( info!( target: LOG_TARGET, queue = CHECK_EMAIL_QUEUE, - "Added {n} emails to the queue", + "Added {n} emails", ); Ok(warp::reply::json(&Response { job_id: rec.id })) } diff --git a/docker-compose.local.yaml b/docker-compose.local.yaml index 47e5abbcc..d23883553 100644 --- a/docker-compose.local.yaml +++ b/docker-compose.local.yaml @@ -27,39 +27,37 @@ services: - ./backend/postgres_data:/var/lib/postgresql/data restart: always - worker_smtp: + worker1: build: context: . dockerfile: backend/Dockerfile - container_name: worker_smtp + container_name: worker1 ports: - "8080:8080" depends_on: - postgres - rabbitmq environment: - RCH__BACKEND_NAME: worker_smtp + RCH__BACKEND_NAME: worker1 RUST_LOG: reacher=info - RCH__HTTP_HOST: 0.0.0.0 RCH__WORKER__ENABLE: true RCH__WORKER__RABBITMQ__URL: amqp://guest:guest@rabbitmq:5672 RCH__WORKER__POSTGRES__DB_URL: postgres://postgres:postgres@postgres:5432/reacher_db restart: always - worker_headless: + worker2: build: context: . dockerfile: backend/Dockerfile - container_name: worker_headless + container_name: worker2 ports: - "8081:8080" depends_on: - postgres - rabbitmq environment: - RCH__BACKEND_NAME: worker_headless + RCH__BACKEND_NAME: worker2 RUST_LOG: reacher=info - RCH__HTTP_HOST: 0.0.0.0 RCH__WORKER__ENABLE: true RCH__WORKER__RABBITMQ__URL: amqp://guest:guest@rabbitmq:5672 RCH__WORKER__POSTGRES__DB_URL: postgres://postgres:postgres@postgres:5432/reacher_db diff --git a/docker-compose.yaml b/docker-compose.yaml index fe207e6ca..80ab8444c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -23,40 +23,36 @@ services: POSTGRES_DB: reacher_db restart: always - worker_smtp: + worker1: image: reacherhq/backend:beta - container_name: worker_smtp + container_name: worker1 ports: - "8080:8080" depends_on: - postgres - rabbitmq environment: - RCH__BACKEND_NAME: worker_smtp + RCH__BACKEND_NAME: worker1 RUST_LOG: reacher=info - RCH__HTTP_HOST: 0.0.0.0 RCH__WORKER__ENABLE: true RCH__WORKER__RABBITMQ__URL: amqp://guest:guest@rabbitmq:5672 - RCH__WORKER__RABBITMQ__QUEUES: check.gmail,check.hotmailb2b,check.everything_else RCH__WORKER__POSTGRES__DB_URL: postgres://postgres:postgres@postgres:5432/reacher_db RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_DAY: 10000 # Recommended limit per IP per day for SMTP requests restart: always - worker_headless: + worker2: image: reacherhq/backend:beta - container_name: worker_headless + container_name: worker2 ports: - "8081:8080" depends_on: - postgres - rabbitmq environment: - RCH__BACKEND_NAME: worker_headless + RCH__BACKEND_NAME: worker2 RUST_LOG: reacher=info - RCH__HTTP_HOST: 0.0.0.0 RCH__WORKER__ENABLE: true RCH__WORKER__RABBITMQ__URL: amqp://guest:guest@rabbitmq:5672 - RCH__WORKER__RABBITMQ__QUEUES: check.hotmailb2c,check.yahoo RCH__WORKER__POSTGRES__DB_URL: postgres://postgres:postgres@postgres:5432/reacher_db RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_MINUTE: 100 # Recommended limit for headless verifications restart: always