Skip to content

Commit

Permalink
refactor: Merge worker into backend (#1404)
Browse files Browse the repository at this point in the history
* Put worker into backend

* Rename to http

* Add try join

* Change LOG_TARGET

* Revemo stray logs

* fmt

* fmt

* Fix clippy

* clippy
  • Loading branch information
amaury1093 authored Dec 11, 2023
1 parent f5f4f81 commit c0ddce2
Show file tree
Hide file tree
Showing 35 changed files with 270 additions and 597 deletions.
1 change: 0 additions & 1 deletion .github/workflows/deploy_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,3 @@ jobs:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
tags: "${{ steps.vars.outputs.GITHUB_TAG }}"
dockerfile: Dockerfile.backend
25 changes: 0 additions & 25 deletions .github/workflows/deploy_worker.yml

This file was deleted.

3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"rust-analyzer.cargo.features": ["worker"]
}
25 changes: 6 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[workspace]
members = ["backend", "cli", "core", "worker"]
members = ["backend", "cli", "core"]
2 changes: 1 addition & 1 deletion Dockerfile.backend → Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ USER root
RUN apk add --no-cache chromium-chromedriver

COPY --from=cargo-build /usr/src/reacher/target/x86_64-unknown-linux-musl/release/reacher_backend .
COPY --from=cargo-build /usr/src/reacher/backend/docker.sh .
COPY --from=cargo-build /usr/src/reacher/docker.sh .

RUN chown chrome:chrome reacher_backend
RUN chown chrome:chrome docker.sh
Expand Down
55 changes: 0 additions & 55 deletions Dockerfile.worker

This file was deleted.

19 changes: 18 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ async-smtp = "0.6"
check-if-email-exists = { path = "../core", features = ["headless"] }
csv = "1.3.0"
dotenv = "0.15.0"
env_logger = "0.10"
futures = { version = "0.3.29", optional = true }
futures-lite = { version = "2.1.0", optional = true }
lapin = { version = "2.3.1", optional = true }
log = "0.4"
openssl = { version = "0.10.57", features = ["vendored"] }
reqwest = { version = "0.11.22", features = ["json"], optional = true }
sentry = "0.23"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand All @@ -26,5 +29,19 @@ sqlx = { version = "0.7", features = [
] }
sqlxmq = "0.5"
tokio = { version = "1.29", features = ["macros"] }
tokio-executor-trait = { version = "2.1.1", optional = true }
tokio-reactor-trait = { version = "1.1.0", optional = true }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
uuid = "1.6"
warp = "0.3"

[features]
worker = [
"futures",
"futures-lite",
"lapin",
"reqwest",
"tokio-executor-trait",
"tokio-reactor-trait",
]
4 changes: 2 additions & 2 deletions backend/src/bin/prune_db.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use log::info;
use sqlx::PgPool;
use sqlx::Result;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
dotenv::dotenv().expect("Unable to load environment variables from .env file");
env_logger::init(); // Initialize the logger
tracing_subscriber::fmt::init();

let db_url = std::env::var("DATABASE_URL").expect("Unable to read DATABASE_URL env var");
let dry_mode: bool = std::env::var("DRY_RUN").is_ok();
Expand Down
File renamed without changes.
File renamed without changes.
10 changes: 5 additions & 5 deletions backend/src/routes/bulk/get.rs → backend/src/http/bulk/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

//! This file implements the `GET /bulk/{id}` endpoint.
use check_if_email_exists::LOG_TARGET;
use serde::Serialize;
use sqlx::types::chrono::{DateTime, Utc};
use sqlx::{Pool, Postgres};
use tracing::error;
use warp::Filter;

use super::{db::with_db, error::BulkError};
use check_if_email_exists::LOG_TARGET;

/// NOTE: Type conversions from postgres to rust types
/// are according to the table given by
Expand Down Expand Up @@ -83,11 +84,10 @@ async fn job_status(
.fetch_one(&conn_pool)
.await
.map_err(|e| {
log::error!(
error!(
target: LOG_TARGET,
"Failed to get job record for [job={}] with [error={}]",
job_id,
e
job_id, e
);
BulkError::from(e)
})?;
Expand All @@ -109,7 +109,7 @@ async fn job_status(
.fetch_one(&conn_pool)
.await
.map_err(|e| {
log::error!(
error!(
target: LOG_TARGET,
"Failed to get aggregate info for [job={}] with [error={}]",
job_id,
Expand Down
65 changes: 65 additions & 0 deletions backend/src/http/bulk/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Reacher - Email Verification
// Copyright (C) 2018-2023 Reacher

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

mod db;
mod error;
pub mod get;
pub mod post;
pub mod results;
mod task;

use std::env;

use check_if_email_exists::LOG_TARGET;
use sqlx::{Pool, Postgres};
use sqlxmq::{JobRegistry, JobRunnerHandle};
use tracing::info;

pub use task::email_verification_task;

/// Create a job registry with one task: the email verification task.
pub async fn create_job_registry(pool: &Pool<Postgres>) -> Result<JobRunnerHandle, sqlx::Error> {
let min_task_conc = env::var("RCH_MINIMUM_TASK_CONCURRENCY").map_or(10, |var| {
var.parse::<usize>()
.expect("Environment variable RCH_MINIMUM_TASK_CONCURRENCY should parse to usize")
});
let max_conc_task_fetch = env::var("RCH_MAXIMUM_CONCURRENT_TASK_FETCH").map_or(20, |var| {
var.parse::<usize>()
.expect("Environment variable RCH_MAXIMUM_CONCURRENT_TASK_FETCH should parse to usize")
});

// registry needs to be given list of jobs it can accept
let registry = JobRegistry::new(&[email_verification_task]);

// create runner for the message queue associated
// with this job registry
let registry = registry
// Create a job runner using the connection pool.
.runner(pool)
// Here is where you can configure the job runner
// Aim to keep 10-20 jobs running at a time.
.set_concurrency(min_task_conc, max_conc_task_fetch)
// Start the job runner in the background.
.run()
.await?;

info!(
target: LOG_TARGET,
"Bulk endpoints enabled with concurrency min={min_task_conc} to max={max_conc_task_fetch}."
);

Ok(registry)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use check_if_email_exists::CheckEmailInputProxy;
use check_if_email_exists::LOG_TARGET;
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Postgres};
use tracing::{debug, error};
use warp::Filter;

use super::{
Expand Down Expand Up @@ -106,23 +107,21 @@ async fn create_bulk_request(
.fetch_one(&conn_pool)
.await
.map_err(|e| {
log::error!(
error!(
target: LOG_TARGET,
"Failed to create job record for [body={:?}] with [error={}]",
&body,
e
&body, e
);
BulkError::from(e)
})?;

for task_input in body.into_iter() {
let task_uuid = submit_job(&conn_pool, rec.id, task_input).await?;

log::debug!(
debug!(
target: LOG_TARGET,
"Submitted task to sqlxmq for [job={}] with [uuid={}]",
rec.id,
task_uuid
rec.id, task_uuid
);
}

Expand All @@ -147,6 +146,6 @@ pub fn create_bulk_job(
.and(warp::body::content_length_limit(1024 * 16))
.and(warp::body::json())
.and_then(create_bulk_request)
// View access logs by setting `RUST_LOG=reacher`.
// View access logs by setting `RUST_LOG=reacher_backend`.
.with(warp::log(LOG_TARGET))
}
File renamed without changes.
Loading

0 comments on commit c0ddce2

Please sign in to comment.