diff --git a/.sqlx/query-1039a6d3d732a86b9b3b2e19e6c8e3c857125d1c4cf916ac32789bfd0176b6b5.json b/.sqlx/query-1039a6d3d732a86b9b3b2e19e6c8e3c857125d1c4cf916ac32789bfd0176b6b5.json deleted file mode 100644 index 5493f1216..000000000 --- a/.sqlx/query-1039a6d3d732a86b9b3b2e19e6c8e3c857125d1c4cf916ac32789bfd0176b6b5.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n\t\tSELECT result FROM email_results\n\t\tWHERE job_id = $1\n\t\tORDER BY id\n\t\tLIMIT $2 OFFSET $3\n\t\t", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "result", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [ - "Int4", - "Int8", - "Int8" - ] - }, - "nullable": [ - true - ] - }, - "hash": "1039a6d3d732a86b9b3b2e19e6c8e3c857125d1c4cf916ac32789bfd0176b6b5" -} diff --git a/.sqlx/query-13862fe23ea729215fb1cfee3aadc14dfa9373dc8137c4f1da199e3ae66efd50.json b/.sqlx/query-13862fe23ea729215fb1cfee3aadc14dfa9373dc8137c4f1da199e3ae66efd50.json deleted file mode 100644 index 660b058a1..000000000 --- a/.sqlx/query-13862fe23ea729215fb1cfee3aadc14dfa9373dc8137c4f1da199e3ae66efd50.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n\t\tSELECT\n\t\t\tCOUNT(*) as total_processed,\n\t\t\tCOUNT(CASE WHEN result ->> 'is_reachable' LIKE 'safe' THEN 1 END) as safe_count,\n\t\t\tCOUNT(CASE WHEN result ->> 'is_reachable' LIKE 'risky' THEN 1 END) as risky_count,\n\t\t\tCOUNT(CASE WHEN result ->> 'is_reachable' LIKE 'invalid' THEN 1 END) as invalid_count,\n\t\t\tCOUNT(CASE WHEN result ->> 'is_reachable' LIKE 'unknown' THEN 1 END) as unknown_count,\n\t\t\t(SELECT created_at FROM email_results WHERE job_id = $1 ORDER BY created_at DESC LIMIT 1) as finished_at\n\t\tFROM email_results\n\t\tWHERE job_id = $1\n\t\t", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "total_processed", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "safe_count", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "risky_count", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "invalid_count", - "type_info": "Int8" - }, - { - "ordinal": 4, - "name": "unknown_count", - "type_info": "Int8" - }, - { - "ordinal": 5, - "name": "finished_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int4" - ] - }, - "nullable": [ - null, - null, - null, - null, - null, - null - ] - }, - "hash": "13862fe23ea729215fb1cfee3aadc14dfa9373dc8137c4f1da199e3ae66efd50" -} diff --git a/.sqlx/query-1a964da4784832e5f631f2c2e727382532c86c7e66e46254d72ef0af03021975.json b/.sqlx/query-1a964da4784832e5f631f2c2e727382532c86c7e66e46254d72ef0af03021975.json deleted file mode 100644 index 20d1cc509..000000000 --- a/.sqlx/query-1a964da4784832e5f631f2c2e727382532c86c7e66e46254d72ef0af03021975.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n\t\t\tINSERT INTO email_results (job_id, result)\n\t\t\tVALUES ($1, $2)\n\t\t\t", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int4", - "Jsonb" - ] - }, - "nullable": [] - }, - "hash": "1a964da4784832e5f631f2c2e727382532c86c7e66e46254d72ef0af03021975" -} diff --git a/.sqlx/query-47af0157fa867e147e49d80b121b1881df93a6619434a1fd1fc9a58315b4044b.json b/.sqlx/query-47af0157fa867e147e49d80b121b1881df93a6619434a1fd1fc9a58315b4044b.json deleted file mode 100644 index 2c00dfeb0..000000000 --- a/.sqlx/query-47af0157fa867e147e49d80b121b1881df93a6619434a1fd1fc9a58315b4044b.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n\t\tSELECT id, created_at, total_records FROM bulk_jobs\n\t\tWHERE id = $1\n\t\tLIMIT 1\n\t\t", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int4" - }, - { - "ordinal": 1, - "name": "created_at", - "type_info": "Timestamptz" - }, - { - "ordinal": 2, - "name": "total_records", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Int4" - ] - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "47af0157fa867e147e49d80b121b1881df93a6619434a1fd1fc9a58315b4044b" -} diff --git a/.sqlx/query-981f650b6c663feeae8a93e7ecf86326e7a5e6d5c8fd03c03565d86982d0381a.json b/.sqlx/query-981f650b6c663feeae8a93e7ecf86326e7a5e6d5c8fd03c03565d86982d0381a.json deleted file mode 100644 index 991aa1226..000000000 --- a/.sqlx/query-981f650b6c663feeae8a93e7ecf86326e7a5e6d5c8fd03c03565d86982d0381a.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n\t\tINSERT INTO bulk_jobs (total_records)\n\t\tVALUES ($1)\n\t\tRETURNING id\n\t\t", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Int4" - ] - }, - "nullable": [ - false - ] - }, - "hash": "981f650b6c663feeae8a93e7ecf86326e7a5e6d5c8fd03c03565d86982d0381a" -} diff --git a/.sqlx/query-ac5e197ca20a1393e4ea45248d5e702c0edbbf57624f2bb416f0fd0401a44dcf.json b/.sqlx/query-ac5e197ca20a1393e4ea45248d5e702c0edbbf57624f2bb416f0fd0401a44dcf.json deleted file mode 100644 index 6cc885704..000000000 --- a/.sqlx/query-ac5e197ca20a1393e4ea45248d5e702c0edbbf57624f2bb416f0fd0401a44dcf.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT total_records FROM bulk_jobs WHERE id = $1;", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "total_records", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Int4" - ] - }, - "nullable": [ - false - ] - }, - "hash": "ac5e197ca20a1393e4ea45248d5e702c0edbbf57624f2bb416f0fd0401a44dcf" -} diff --git a/.sqlx/query-eeeb6284425ee44f380991494e11cc67e808dde9d5b15aadc6c5ec993cbdc7fe.json b/.sqlx/query-eeeb6284425ee44f380991494e11cc67e808dde9d5b15aadc6c5ec993cbdc7fe.json new file mode 100644 index 000000000..9784b2813 --- /dev/null +++ b/.sqlx/query-eeeb6284425ee44f380991494e11cc67e808dde9d5b15aadc6c5ec993cbdc7fe.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n\t\tINSERT INTO email_results (email, is_reachable, full_result)\n\t\tVALUES ($1, $2, $3)\n\t\tRETURNING id\n\t\t", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Varchar", + "Jsonb" + ] + }, + "nullable": [ + false + ] + }, + "hash": "eeeb6284425ee44f380991494e11cc67e808dde9d5b15aadc6c5ec993cbdc7fe" +} diff --git a/.sqlx/query-f58d4d05a6ab4c1ffda39396df4c403f7588266ae8d954985fc1eda9751febcc.json b/.sqlx/query-f58d4d05a6ab4c1ffda39396df4c403f7588266ae8d954985fc1eda9751febcc.json deleted file mode 100644 index bbf14d4a9..000000000 --- a/.sqlx/query-f58d4d05a6ab4c1ffda39396df4c403f7588266ae8d954985fc1eda9751febcc.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT COUNT(*) FROM email_results WHERE job_id = $1;", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int4" - ] - }, - "nullable": [ - null - ] - }, - "hash": "f58d4d05a6ab4c1ffda39396df4c403f7588266ae8d954985fc1eda9751febcc" -} diff --git a/.vscode/settings.json b/.vscode/settings.json index a7857b019..9f174d017 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,3 @@ { - "rust-analyzer.cargo.features": ["worker"] + "rust-analyzer.cargo.features": ["worker", "postgres"] } diff --git a/Cargo.lock b/Cargo.lock index 6003d7f20..6ad37abc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,12 +119,6 @@ version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "508b352bb5c066aac251f6daf6b36eccd03e8a88e8081cd44959ea277a3af9a8" -[[package]] -name = "anymap2" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" - [[package]] name = "ascii_utils" version = "0.9.3" @@ -703,27 +697,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "csv" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" -dependencies = [ - "csv-core", - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" -dependencies = [ - "memchr", -] - [[package]] name = "darling" version = "0.10.2" @@ -2358,25 +2331,21 @@ version = "0.5.0" dependencies = [ "async-smtp", "check-if-email-exists", - "csv", "dotenv", "futures", "futures-lite 2.1.0", "lapin", - "log", "openssl", "reqwest", "sentry", "serde", "serde_json", "sqlx", - "sqlxmq", "tokio", "tokio-executor-trait", "tokio-reactor-trait", "tracing", "tracing-subscriber", - "uuid 1.6.1", "warp", ] @@ -2867,15 +2836,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "signal-hook-registry" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" -dependencies = [ - "libc", -] - [[package]] name = "signature" version = "2.2.0" @@ -3163,35 +3123,6 @@ dependencies = [ "uuid 1.6.1", ] -[[package]] -name = "sqlxmq" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e914521071581f0413516de0e7931086ebe4a22d719c1100de29eb339c712311" -dependencies = [ - "anymap2", - "chrono", - "dotenv", - "log", - "serde", - "serde_json", - "sqlx", - "sqlxmq_macros", - "tokio", - "uuid 1.6.1", -] - -[[package]] -name = "sqlxmq_macros" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd00667edb120f18e14e2b4a71cddb308dc3605171ed7afda3056129d08b9f4f" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "stringprep" version = "0.1.2" @@ -3378,9 +3309,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", "pin-project-lite", - "signal-hook-registry", "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", @@ -3672,9 +3601,6 @@ name = "uuid" version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" -dependencies = [ - "getrandom", -] [[package]] name = "valuable" diff --git a/Dockerfile b/Dockerfile index 8ee741291..5f8046a43 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ COPY . . ENV SQLX_OFFLINE=true -RUN cargo build --features=reacher_backend/worker --bin reacher_backend --release --target=x86_64-unknown-linux-musl +RUN cargo build --features=reacher_backend/worker,reacher_backend/postgres --bin reacher_backend --release --target=x86_64-unknown-linux-musl # ------------------------------------------------------------------------------ # Final Stage diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 88492b4c4..3ac65b100 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -8,12 +8,10 @@ publish = false [dependencies] async-smtp = "0.6" check-if-email-exists = { path = "../core", features = ["headless"] } -csv = "1.3.0" dotenv = "0.15.0" futures = { version = "0.3.30", optional = true } futures-lite = { version = "2.1.0", optional = true } -lapin = { version = "2.3.1" } -log = "0.4" +lapin = { version = "2.3.1", optional = true } openssl = { version = "0.10.62", features = ["vendored"] } reqwest = { version = "0.11.22", features = ["json"], optional = true } sentry = "0.23" @@ -26,21 +24,22 @@ sqlx = { version = "0.7", features = [ "chrono", "json", "migrate", -] } -sqlxmq = "0.5" +], optional = true } tokio = { version = "1.35", features = ["macros"] } tokio-executor-trait = { version = "2.1.1", optional = true } tokio-reactor-trait = { version = "1.1.0", optional = true } tracing = "0.1.40" tracing-subscriber = "0.3.18" -uuid = "1.6" warp = "0.3" [features] worker = [ "futures", "futures-lite", + "lapin", "reqwest", "tokio-executor-trait", "tokio-reactor-trait", ] + +postgres = ["sqlx"] diff --git a/backend/README.md b/backend/README.md index 7def3e3a5..4084f3bb7 100644 --- a/backend/README.md +++ b/backend/README.md @@ -47,22 +47,19 @@ Then send a `POST http://localhost:8080/v0/check_email` request with the followi These are the environment variables used to configure the HTTP server. To pass them to the Docker container, use the `-e {ENV_VAR}={VALUE}` flag. -| Env Var | Required? | Description | Default | -| ----------------------------------- | --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- | -| `RUST_LOG` | No | One of `trace,debug,warn,error,info`. 💡 PRO TIP: `RUST_LOG=debug` is very handful for debugging purposes. | not defined | -| `RCH_ENABLE_BULK` | No | If set to `1`, then bulk verification endpoints will be added to the backend. | 0 | -| `DATABASE_URL` | Yes if `RCH_ENABLE_BULK==1` | [Bulk] Database connection string for storing results and task queue | not defined | -| `RCH_DATABASE_MAX_CONNECTIONS` | No | [Bulk] Connections created for the database pool | 5 | -| `RCH_MINIMUM_TASK_CONCURRENCY` | No | [Bulk] Minimum number of concurrent running tasks below which more tasks are fetched | 10 | -| `RCH_MAXIMUM_CONCURRENT_TASK_FETCH` | No | [Bulk] Maximum number of tasks fetched at once | 20 | -| `RCH_HTTP_HOST` | No | The host name to bind the HTTP server to. | `127.0.0.1` | -| `PORT` | No | The port to bind the HTTP server to, often populated by the cloud provider. | `8080` | -| `RCH_SENTRY_DSN` | No | If set, bug reports will be sent to this [Sentry](https://sentry.io) DSN. | not defined | -| `RCH_HEADER_SECRET` | No | If set, then all HTTP requests must have the `x-reacher-secret` header set to this value. This is used to protect the backend against public unwanted HTTP requests. | undefined | -| `RCH_FROM_EMAIL` | No | Email to use in the `` SMTP step. Can be overwritten by each API request's `from_email` field. | reacher.email@gmail.com | -| `RCH_HELLO_NAME` | No | Name to use in the `` SMTP step. Can be overwritten by each API request's `hello_name` field. | gmail.com | -| `RCH_SMTP_TIMEOUT` | No | Timeout for each SMTP connection. | 45s | -| `RCH_WEBDRIVER_ADDR` | No | Set to a running WebDriver process endpoint (e.g. `http://localhost:9515`) to use a headless navigator to password recovery pages to check Yahoo and Hotmail/Outlook addresses. We recommend `chromedriver` as it allows parallel requests. | not defined | +| Env Var | Required? | Description | Default | +| ------------------------------ | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| `RUST_LOG` | No | One of `trace,debug,warn,error,info`. 💡 PRO TIP: `RUST_LOG=debug` is very handful for debugging purposes. | not defined | +| `RCH_HTTP_HOST` | No | The host name to bind the HTTP server to. | `127.0.0.1` | +| `PORT` | No | The port to bind the HTTP server to, often populated by the cloud provider. | `8080` | +| `RCH_SENTRY_DSN` | No | If set, bug reports will be sent to this [Sentry](https://sentry.io) DSN. | not defined | +| `RCH_HEADER_SECRET` | No | If set, then all HTTP requests must have the `x-reacher-secret` header set to this value. This is used to protect the backend against public unwanted HTTP requests. | undefined | +| `RCH_FROM_EMAIL` | No | Email to use in the `` SMTP step. Can be overwritten by each API request's `from_email` field. | reacher.email@gmail.com | +| `RCH_HELLO_NAME` | No | Name to use in the `` SMTP step. Can be overwritten by each API request's `hello_name` field. | gmail.com | +| `RCH_SMTP_TIMEOUT` | No | Timeout for each SMTP connection. | 45s | +| `RCH_WEBDRIVER_ADDR` | No | Set to a running WebDriver process endpoint (e.g. `http://localhost:9515`) to use a headless navigator to password recovery pages to check Yahoo and Hotmail/Outlook addresses. We recommend `chromedriver` as it allows parallel requests. | not defined | +| `DATABASE_URL` | No | If set, Reacher will write bulk email verification results to this DB. | not defined | +| `RCH_DATABASE_MAX_CONNECTIONS` | No | [Bulk] Connections created for the database pool | 5 | ## REST API Documentation diff --git a/backend/migrations/20210316025847_setup.down.sql b/backend/migrations/20210316025847_setup.down.sql deleted file mode 100644 index 1aa472e8b..000000000 --- a/backend/migrations/20210316025847_setup.down.sql +++ /dev/null @@ -1,12 +0,0 @@ -DROP FUNCTION mq_checkpoint; -DROP FUNCTION mq_keep_alive; -DROP FUNCTION mq_delete; -DROP FUNCTION mq_commit; -DROP FUNCTION mq_insert; -DROP FUNCTION mq_poll; -DROP FUNCTION mq_active_channels; -DROP FUNCTION mq_latest_message; -DROP TABLE mq_payloads; -DROP TABLE mq_msgs; -DROP FUNCTION mq_uuid_exists; -DROP TYPE mq_new_t; diff --git a/backend/migrations/20210316025847_setup.up.sql b/backend/migrations/20210316025847_setup.up.sql deleted file mode 100644 index bf7f8f859..000000000 --- a/backend/migrations/20210316025847_setup.up.sql +++ /dev/null @@ -1,289 +0,0 @@ -CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; - --- The UDT for creating messages -CREATE TYPE mq_new_t AS ( - -- Unique message ID - id UUID, - -- Delay before message is processed - delay INTERVAL, - -- Number of retries if initial processing fails - retries INT, - -- Initial backoff between retries - retry_backoff INTERVAL, - -- Name of channel - channel_name TEXT, - -- Arguments to channel - channel_args TEXT, - -- Interval for two-phase commit (or NULL to disable two-phase commit) - commit_interval INTERVAL, - -- Whether this message should be processed in order with respect to other - -- ordered messages. - ordered BOOLEAN, - -- Name of message - name TEXT, - -- JSON payload - payload_json TEXT, - -- Binary payload - payload_bytes BYTEA -); - --- Small, frequently updated table of messages -CREATE TABLE mq_msgs ( - id UUID PRIMARY KEY, - created_at TIMESTAMPTZ DEFAULT NOW(), - attempt_at TIMESTAMPTZ DEFAULT NOW(), - attempts INT NOT NULL DEFAULT 5, - retry_backoff INTERVAL NOT NULL DEFAULT INTERVAL '1 second', - channel_name TEXT NOT NULL, - channel_args TEXT NOT NULL, - commit_interval INTERVAL, - after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT -); - --- Insert dummy message so that the 'nil' UUID can be referenced -INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL); - --- Internal helper function to check that a UUID is neither NULL nor NIL -CREATE FUNCTION mq_uuid_exists( - id UUID -) RETURNS BOOLEAN AS $$ - SELECT id IS NOT NULL AND id != uuid_nil() -$$ LANGUAGE SQL IMMUTABLE; - --- Index for polling -CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id); --- Index for adding messages -CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; - --- Index for ensuring strict message order -CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id); - - --- Large, less frequently updated table of message payloads -CREATE TABLE mq_payloads( - id UUID PRIMARY KEY, - name TEXT NOT NULL, - payload_json JSONB, - payload_bytes BYTEA -); - --- Internal helper function to return the most recently added message in a queue. -CREATE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) -RETURNS UUID AS $$ - SELECT COALESCE( - ( - SELECT id FROM mq_msgs - WHERE channel_name = from_channel_name - AND channel_args = from_channel_args - AND after_message_id IS NOT NULL - AND id != uuid_nil() - ORDER BY created_at DESC, id DESC - LIMIT 1 - ), - uuid_nil() - ) -$$ LANGUAGE SQL STABLE; - --- Internal helper function to randomly select a set of channels with "ready" messages. -CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT) -RETURNS TABLE(name TEXT, args TEXT) AS $$ - SELECT channel_name, channel_args - FROM mq_msgs - WHERE id != uuid_nil() - AND attempt_at <= NOW() - AND (channel_names IS NULL OR channel_name = ANY(channel_names)) - AND NOT mq_uuid_exists(after_message_id) - GROUP BY channel_name, channel_args - ORDER BY RANDOM() - LIMIT batch_size -$$ LANGUAGE SQL STABLE; - --- Main entry-point for job runner: pulls a batch of messages from the queue. -CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) -RETURNS TABLE( - id UUID, - is_committed BOOLEAN, - name TEXT, - payload_json TEXT, - payload_bytes BYTEA, - retry_backoff INTERVAL, - wait_time INTERVAL -) AS $$ -BEGIN - RETURN QUERY UPDATE mq_msgs - SET - attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, - attempts = mq_msgs.attempts - 1, - retry_backoff = mq_msgs.retry_backoff * 2 - FROM ( - SELECT - msgs.id - FROM mq_active_channels(channel_names, batch_size) AS active_channels - INNER JOIN LATERAL ( - SELECT * FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() - AND mq_msgs.attempt_at <= NOW() - AND mq_msgs.channel_name = active_channels.name - AND mq_msgs.channel_args = active_channels.args - AND NOT mq_uuid_exists(mq_msgs.after_message_id) - ORDER BY mq_msgs.attempt_at ASC - LIMIT batch_size - ) AS msgs ON TRUE - LIMIT batch_size - ) AS messages_to_update - LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id - WHERE mq_msgs.id = messages_to_update.id - RETURNING - mq_msgs.id, - mq_msgs.commit_interval IS NULL, - mq_payloads.name, - mq_payloads.payload_json::TEXT, - mq_payloads.payload_bytes, - mq_msgs.retry_backoff / 2, - interval '0' AS wait_time; - - IF NOT FOUND THEN - RETURN QUERY SELECT - NULL::UUID, - NULL::BOOLEAN, - NULL::TEXT, - NULL::TEXT, - NULL::BYTEA, - NULL::INTERVAL, - MIN(mq_msgs.attempt_at) - NOW() - FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() - AND NOT mq_uuid_exists(mq_msgs.after_message_id) - AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); - END IF; -END; -$$ LANGUAGE plpgsql; - --- Creates new messages -CREATE FUNCTION mq_insert(new_messages mq_new_t[]) -RETURNS VOID AS $$ -BEGIN - PERFORM pg_notify(CONCAT('mq_', channel_name), '') - FROM unnest(new_messages) AS new_msgs - GROUP BY channel_name; - - IF FOUND THEN - PERFORM pg_notify('mq', ''); - END IF; - - INSERT INTO mq_payloads ( - id, - name, - payload_json, - payload_bytes - ) SELECT - id, - name, - payload_json::JSONB, - payload_bytes - FROM UNNEST(new_messages); - - INSERT INTO mq_msgs ( - id, - attempt_at, - attempts, - retry_backoff, - channel_name, - channel_args, - commit_interval, - after_message_id - ) - SELECT - id, - NOW() + delay + COALESCE(commit_interval, INTERVAL '0'), - retries + 1, - retry_backoff, - channel_name, - channel_args, - commit_interval, - CASE WHEN ordered - THEN - LAG(id, 1, mq_latest_message(channel_name, channel_args)) - OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id) - ELSE - NULL - END - FROM UNNEST(new_messages); -END; -$$ LANGUAGE plpgsql; - --- Commits messages previously created with a non-NULL commit interval. -CREATE FUNCTION mq_commit(msg_ids UUID[]) -RETURNS VOID AS $$ -BEGIN - UPDATE mq_msgs - SET - attempt_at = attempt_at - commit_interval, - commit_interval = NULL - WHERE id = ANY(msg_ids) - AND commit_interval IS NOT NULL; -END; -$$ LANGUAGE plpgsql; - - --- Deletes messages from the queue. This occurs when a message has been --- processed, or when it expires without being processed. -CREATE FUNCTION mq_delete(msg_ids UUID[]) -RETURNS VOID AS $$ -BEGIN - PERFORM pg_notify(CONCAT('mq_', channel_name), '') - FROM mq_msgs - WHERE id = ANY(msg_ids) - AND after_message_id = uuid_nil() - GROUP BY channel_name; - - IF FOUND THEN - PERFORM pg_notify('mq', ''); - END IF; - - DELETE FROM mq_msgs WHERE id = ANY(msg_ids); - DELETE FROM mq_payloads WHERE id = ANY(msg_ids); -END; -$$ LANGUAGE plpgsql; - - --- Can be called during the initial commit interval, or when processing --- a message. Indicates that the caller is still active and will prevent either --- the commit interval elapsing or the message being retried for the specified --- interval. -CREATE FUNCTION mq_keep_alive(msg_ids UUID[], duration INTERVAL) -RETURNS VOID AS $$ - UPDATE mq_msgs - SET - attempt_at = NOW() + duration, - commit_interval = commit_interval + ((NOW() + duration) - attempt_at) - WHERE id = ANY(msg_ids) - AND attempt_at < NOW() + duration; -$$ LANGUAGE SQL; - - --- Called during lengthy processing of a message to checkpoint the progress. --- As well as behaving like `mq_keep_alive`, the message payload can be --- updated. -CREATE FUNCTION mq_checkpoint( - msg_id UUID, - duration INTERVAL, - new_payload_json TEXT, - new_payload_bytes BYTEA, - extra_retries INT -) -RETURNS VOID AS $$ - UPDATE mq_msgs - SET - attempt_at = GREATEST(attempt_at, NOW() + duration), - attempts = attempts + COALESCE(extra_retries, 0) - WHERE id = msg_id; - - UPDATE mq_payloads - SET - payload_json = COALESCE(new_payload_json::JSONB, payload_json), - payload_bytes = COALESCE(new_payload_bytes, payload_bytes) - WHERE - id = msg_id; -$$ LANGUAGE SQL; - diff --git a/backend/migrations/20210921115907_clear.down.sql b/backend/migrations/20210921115907_clear.down.sql deleted file mode 100644 index e15638db2..000000000 --- a/backend/migrations/20210921115907_clear.down.sql +++ /dev/null @@ -1,2 +0,0 @@ -DROP FUNCTION mq_clear; -DROP FUNCTION mq_clear_all; diff --git a/backend/migrations/20210921115907_clear.up.sql b/backend/migrations/20210921115907_clear.up.sql deleted file mode 100644 index bd1c1f607..000000000 --- a/backend/migrations/20210921115907_clear.up.sql +++ /dev/null @@ -1,21 +0,0 @@ --- Deletes all messages from a list of channel names. -CREATE FUNCTION mq_clear(channel_names TEXT[]) -RETURNS VOID AS $$ -BEGIN - WITH deleted_ids AS ( - DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) RETURNING id - ) - DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); -END; -$$ LANGUAGE plpgsql; - --- Deletes all messages. -CREATE FUNCTION mq_clear_all() -RETURNS VOID AS $$ -BEGIN - WITH deleted_ids AS ( - DELETE FROM mq_msgs RETURNING id - ) - DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); -END; -$$ LANGUAGE plpgsql; diff --git a/backend/migrations/20211013151757_fix_mq_latest_message.down.sql b/backend/migrations/20211013151757_fix_mq_latest_message.down.sql deleted file mode 100644 index d09bd4afd..000000000 --- a/backend/migrations/20211013151757_fix_mq_latest_message.down.sql +++ /dev/null @@ -1,15 +0,0 @@ -CREATE OR REPLACE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) -RETURNS UUID AS $$ - SELECT COALESCE( - ( - SELECT id FROM mq_msgs - WHERE channel_name = from_channel_name - AND channel_args = from_channel_args - AND after_message_id IS NOT NULL - AND id != uuid_nil() - ORDER BY created_at DESC, id DESC - LIMIT 1 - ), - uuid_nil() - ) -$$ LANGUAGE SQL STABLE; diff --git a/backend/migrations/20211013151757_fix_mq_latest_message.up.sql b/backend/migrations/20211013151757_fix_mq_latest_message.up.sql deleted file mode 100644 index b987c5e1e..000000000 --- a/backend/migrations/20211013151757_fix_mq_latest_message.up.sql +++ /dev/null @@ -1,19 +0,0 @@ -CREATE OR REPLACE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) -RETURNS UUID AS $$ - SELECT COALESCE( - ( - SELECT id FROM mq_msgs - WHERE channel_name = from_channel_name - AND channel_args = from_channel_args - AND after_message_id IS NOT NULL - AND id != uuid_nil() - AND NOT EXISTS( - SELECT * FROM mq_msgs AS mq_msgs2 - WHERE mq_msgs2.after_message_id = mq_msgs.id - ) - ORDER BY created_at DESC - LIMIT 1 - ), - uuid_nil() - ) -$$ LANGUAGE SQL STABLE; \ No newline at end of file diff --git a/backend/migrations/20220117025847_email_data.down.sql b/backend/migrations/20220117025847_email_data.down.sql deleted file mode 100644 index ed211e337..000000000 --- a/backend/migrations/20220117025847_email_data.down.sql +++ /dev/null @@ -1,4 +0,0 @@ -DROP INDEX job_emails; -DROP TABLE email_results; -DROP TABLE bulk_jobs; -DROP TYPE valid_status; \ No newline at end of file diff --git a/backend/migrations/20220117025847_email_data.up.sql b/backend/migrations/20220117025847_email_data.up.sql deleted file mode 100644 index d53e83d52..000000000 --- a/backend/migrations/20220117025847_email_data.up.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE TABLE bulk_jobs ( - id SERIAL PRIMARY KEY, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - total_records INTEGER NOT NULL -); -CREATE TABLE email_results ( - id SERIAL PRIMARY KEY, - job_id INTEGER, - result JSONB, - FOREIGN KEY (job_id) REFERENCES bulk_jobs(id) -); -CREATE INDEX job_emails ON email_results USING HASH (job_id); \ No newline at end of file diff --git a/backend/migrations/20220208120856_fix_concurrent_poll.down.sql b/backend/migrations/20220208120856_fix_concurrent_poll.down.sql deleted file mode 100644 index 6cd2d21eb..000000000 --- a/backend/migrations/20220208120856_fix_concurrent_poll.down.sql +++ /dev/null @@ -1,60 +0,0 @@ --- Main entry-point for job runner: pulls a batch of messages from the queue. -CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) -RETURNS TABLE( - id UUID, - is_committed BOOLEAN, - name TEXT, - payload_json TEXT, - payload_bytes BYTEA, - retry_backoff INTERVAL, - wait_time INTERVAL -) AS $$ -BEGIN - RETURN QUERY UPDATE mq_msgs - SET - attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, - attempts = mq_msgs.attempts - 1, - retry_backoff = mq_msgs.retry_backoff * 2 - FROM ( - SELECT - msgs.id - FROM mq_active_channels(channel_names, batch_size) AS active_channels - INNER JOIN LATERAL ( - SELECT * FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() - AND mq_msgs.attempt_at <= NOW() - AND mq_msgs.channel_name = active_channels.name - AND mq_msgs.channel_args = active_channels.args - AND NOT mq_uuid_exists(mq_msgs.after_message_id) - ORDER BY mq_msgs.attempt_at ASC - LIMIT batch_size - ) AS msgs ON TRUE - LIMIT batch_size - ) AS messages_to_update - LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id - WHERE mq_msgs.id = messages_to_update.id - RETURNING - mq_msgs.id, - mq_msgs.commit_interval IS NULL, - mq_payloads.name, - mq_payloads.payload_json::TEXT, - mq_payloads.payload_bytes, - mq_msgs.retry_backoff / 2, - interval '0' AS wait_time; - - IF NOT FOUND THEN - RETURN QUERY SELECT - NULL::UUID, - NULL::BOOLEAN, - NULL::TEXT, - NULL::TEXT, - NULL::BYTEA, - NULL::INTERVAL, - MIN(mq_msgs.attempt_at) - NOW() - FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() - AND NOT mq_uuid_exists(mq_msgs.after_message_id) - AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); - END IF; -END; -$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/backend/migrations/20220208120856_fix_concurrent_poll.up.sql b/backend/migrations/20220208120856_fix_concurrent_poll.up.sql deleted file mode 100644 index cae6151ed..000000000 --- a/backend/migrations/20220208120856_fix_concurrent_poll.up.sql +++ /dev/null @@ -1,62 +0,0 @@ - --- Main entry-point for job runner: pulls a batch of messages from the queue. -CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) -RETURNS TABLE( - id UUID, - is_committed BOOLEAN, - name TEXT, - payload_json TEXT, - payload_bytes BYTEA, - retry_backoff INTERVAL, - wait_time INTERVAL -) AS $$ -BEGIN - RETURN QUERY UPDATE mq_msgs - SET - attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, - attempts = mq_msgs.attempts - 1, - retry_backoff = mq_msgs.retry_backoff * 2 - FROM ( - SELECT - msgs.id - FROM mq_active_channels(channel_names, batch_size) AS active_channels - INNER JOIN LATERAL ( - SELECT mq_msgs.id FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() - AND mq_msgs.attempt_at <= NOW() - AND mq_msgs.channel_name = active_channels.name - AND mq_msgs.channel_args = active_channels.args - AND NOT mq_uuid_exists(mq_msgs.after_message_id) - ORDER BY mq_msgs.attempt_at ASC - LIMIT batch_size - ) AS msgs ON TRUE - LIMIT batch_size - ) AS messages_to_update - LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id - WHERE mq_msgs.id = messages_to_update.id - AND mq_msgs.attempt_at <= NOW() - RETURNING - mq_msgs.id, - mq_msgs.commit_interval IS NULL, - mq_payloads.name, - mq_payloads.payload_json::TEXT, - mq_payloads.payload_bytes, - mq_msgs.retry_backoff / 2, - interval '0' AS wait_time; - - IF NOT FOUND THEN - RETURN QUERY SELECT - NULL::UUID, - NULL::BOOLEAN, - NULL::TEXT, - NULL::TEXT, - NULL::BYTEA, - NULL::INTERVAL, - MIN(mq_msgs.attempt_at) - NOW() - FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() - AND NOT mq_uuid_exists(mq_msgs.after_message_id) - AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); - END IF; -END; -$$ LANGUAGE plpgsql; diff --git a/backend/migrations/20220713122907_fix-clear_all-keep-nil-message.down.sql b/backend/migrations/20220713122907_fix-clear_all-keep-nil-message.down.sql deleted file mode 100644 index d2f607c5b..000000000 --- a/backend/migrations/20220713122907_fix-clear_all-keep-nil-message.down.sql +++ /dev/null @@ -1 +0,0 @@ --- Add down migration script here diff --git a/backend/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql b/backend/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql deleted file mode 100644 index 4dd1f0b3a..000000000 --- a/backend/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql +++ /dev/null @@ -1,29 +0,0 @@ -CREATE OR REPLACE FUNCTION mq_clear(channel_names TEXT[]) -RETURNS VOID AS $$ -BEGIN - WITH deleted_ids AS ( - DELETE FROM mq_msgs - WHERE channel_name = ANY(channel_names) - AND id != uuid_nil() - RETURNING id - ) - DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); -END; -$$ LANGUAGE plpgsql; -COMMENT ON FUNCTION mq_clear IS - 'Deletes all messages with corresponding payloads from a list of channel names'; - - -CREATE OR REPLACE FUNCTION mq_clear_all() -RETURNS VOID AS $$ -BEGIN - WITH deleted_ids AS ( - DELETE FROM mq_msgs - WHERE id != uuid_nil() - RETURNING id - ) - DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); -END; -$$ LANGUAGE plpgsql; -COMMENT ON FUNCTION mq_clear_all IS - 'Deletes all messages with corresponding payloads'; diff --git a/backend/migrations/20220810141100_result_created_at.down.sql b/backend/migrations/20220810141100_result_created_at.down.sql deleted file mode 100644 index 80b95f109..000000000 --- a/backend/migrations/20220810141100_result_created_at.down.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE email_results -DROP COLUMN created_at; diff --git a/backend/migrations/20220810141100_result_created_at.up.sql b/backend/migrations/20220810141100_result_created_at.up.sql deleted file mode 100644 index ab28f0191..000000000 --- a/backend/migrations/20220810141100_result_created_at.up.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE email_results -ADD created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(); \ No newline at end of file diff --git a/backend/migrations/20231227103745_init.sql b/backend/migrations/20231227103745_init.sql new file mode 100644 index 000000000..b548cc8a9 --- /dev/null +++ b/backend/migrations/20231227103745_init.sql @@ -0,0 +1,8 @@ +-- Add migration script here +CREATE TABLE email_results ( + id SERIAL NOT NULL PRIMARY KEY, + email TEXT NOT NULL, + is_reachable VARCHAR(10) NOT NULL, + full_result jsonb NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW() +); \ No newline at end of file diff --git a/backend/migrations/README.md b/backend/migrations/README.md index 730cf64de..c7b704431 100644 --- a/backend/migrations/README.md +++ b/backend/migrations/README.md @@ -4,28 +4,6 @@ All migrations in this folder are embedded directly in the `reacher_backend` binary, so you don't need to run the migrations manually. -The migrations come from 2 sources: - -- `sqlxmq` migrations -- Reacher's own migrations - -## `sqlxmq` migrations - -The following migration files have been copied from the [sqlxmq repo](https://github.com/Diggsey/sqlxmq) as per the [given instructions](https://github.com/Diggsey/sqlxmq/blob/6d3ed6fb99e7592e370a7f3ec074ce0bebae62fd/README.md?plain=1#L111): - -- `20210316025847_setup.{up,down}.sql` -- `20210921115907_clear.{up,down}.sql` -- `20211013151757_fix_mq_latest_message.{up,down}.sql` -- `20220208120856_fix_concurrent_poll.{up,down}.sql` -- `20220713122907_fix-clear_all-keep-nil-message.{up,down}.sql` - -## Reacher migrations - -The following migrations are specific to Reacher: - -- `20220117025847_email_data.{up,down}.sql`: set up the `bulk_jobs` and `email_results` tables -- `20220810141100_result_created_at.{up,down}.sql`: add a `created_at` column on `email_result` - ## Advanced Usage For more advanced usage (such as reverting to an old state), please use the `sqlx` CLI command. diff --git a/backend/src/bin/prune_db.rs b/backend/src/bin/prune_db.rs deleted file mode 100644 index 5c59ba8ee..000000000 --- a/backend/src/bin/prune_db.rs +++ /dev/null @@ -1,81 +0,0 @@ -use sqlx::PgPool; -use sqlx::Result; -use tracing::info; - -#[tokio::main] -async fn main() -> Result<()> { - dotenv::dotenv().expect("Unable to load environment variables from .env file"); - tracing_subscriber::fmt::init(); - - let db_url = std::env::var("DATABASE_URL").expect("Unable to read DATABASE_URL env var"); - let dry_mode: bool = std::env::var("DRY_RUN").is_ok(); - let days_old_str = std::env::var("DAYS_OLD").expect("Unable to read DAYS_OLD env var"); - let days_old: i32 = days_old_str - .parse() - .expect("Unable to parse DAYS_OLD as integer"); - - let pool = PgPool::connect(&db_url).await?; - - // Fetch the list of job IDs that match the criteria - let query = format!( - "SELECT b.id - FROM bulk_jobs b - JOIN ( - SELECT job_id, COUNT(*) as total_processed - FROM email_results - GROUP BY job_id - ) e ON b.id = e.job_id - WHERE b.total_records = e.total_processed - AND b.created_at <= current_date - interval '{} days'", - days_old - ); - - let job_ids_to_delete: Vec<(i32,)> = sqlx::query_as(&query).fetch_all(&pool).await?; - - match (dry_mode, job_ids_to_delete.is_empty()) { - (true, _) => info!("Job ids to delete {:?}", job_ids_to_delete), - (false, true) => info!("No jobs to delete"), - (false, false) => { - // Start a transaction - let tx = pool.begin().await?; - - // Before deleting from bulk_jobs, delete the corresponding records from email_results in a batch - let delete_email_results_query = - "DELETE FROM email_results WHERE job_id = ANY($1::int[])"; - - // Convert job_ids_to_delete to Vec before binding - let job_ids_to_delete_vec: Vec = - job_ids_to_delete.iter().map(|&(id,)| id).collect(); - - // Execute the delete query for email_results in a batch within the transaction - sqlx::query(delete_email_results_query) - .bind(&job_ids_to_delete_vec) - .execute(&pool) // Use execute on the query builder - .await?; - - info!( - "Email results for job IDs {:?} deleted successfully.", - job_ids_to_delete - ); - - // safely delete the records from bulk_jobs - let delete_bulk_jobs_query = "DELETE FROM bulk_jobs WHERE id = ANY($1::int[])"; - - // Execute the delete query for bulk_jobs in a batch within the transaction - sqlx::query(delete_bulk_jobs_query) - .bind(&job_ids_to_delete_vec) - .execute(&pool) // Use execute on the query builder - .await?; - - info!( - "Bulk jobs records with IDs {:?} deleted successfully.", - job_ids_to_delete - ); - - // Commit the transaction if both deletes are successful - tx.commit().await?; - } - } - - Ok(()) -} diff --git a/backend/src/http/mod.rs b/backend/src/http/mod.rs index 337035646..b5e977a20 100644 --- a/backend/src/http/mod.rs +++ b/backend/src/http/mod.rs @@ -22,39 +22,29 @@ use std::env; use std::net::IpAddr; use check_if_email_exists::LOG_TARGET; +#[cfg(feature = "worker")] use lapin::Channel; -use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use tracing::info; use warp::Filter; use super::errors; #[cfg(feature = "worker")] -pub fn create_routes( - o: Option>, - channel: Option, +fn create_routes_with_bulk( + channel: Channel, ) -> impl Filter + Clone { version::get::get_version() .or(v0::check_email::post::post_check_email()) .or(v1::bulk::post::create_bulk_job(channel)) - // The 3 following routes will 404 if o is None. - .or(v0::bulk::post::create_bulk_job(o.clone())) - .or(v0::bulk::get::get_bulk_job_status(o.clone())) - .or(v0::bulk::results::get_bulk_job_result(o)) .recover(errors::handle_rejection) } -#[cfg(not(feature = "worker"))] -pub fn create_routes( - o: Option>, - _channel: Option, +/// Creates the routes for the HTTP server. +/// Making it public so that it can be used in tests/check_email.rs. +pub fn create_routes_without_bulk( ) -> impl Filter + Clone { version::get::get_version() .or(v0::check_email::post::post_check_email()) - // The 3 following routes will 404 if o is None. - .or(v0::bulk::post::create_bulk_job(o.clone())) - .or(v0::bulk::get::get_bulk_job_status(o.clone())) - .or(v0::bulk::results::get_bulk_job_result(o)) .recover(errors::handle_rejection) } @@ -63,7 +53,7 @@ pub fn create_routes( /// This function starts the Warp server and listens for incoming requests. /// It returns a `Result` indicating whether the server started successfully or encountered an error. pub async fn run_warp_server( - channel: Option, + #[cfg(feature = "worker")] channel: Channel, ) -> Result<(), Box> { let host = env::var("RCH_HTTP_HOST") .unwrap_or_else(|_| "127.0.0.1".into()) @@ -76,41 +66,13 @@ pub async fn run_warp_server( }) .unwrap_or(8080); - let is_bulk_enabled = env::var("RCH_ENABLE_BULK").unwrap_or_else(|_| "0".into()) == "1"; - let db = if is_bulk_enabled { - let pool = create_db().await?; - let _registry = v0::bulk::create_job_registry(&pool).await?; - Some(pool) - } else { - None - }; - - let routes = create_routes(db, channel); + #[cfg(feature = "worker")] + let routes = create_routes_with_bulk(channel); + #[cfg(not(feature = "worker"))] + let routes = create_routes_without_bulk(); info!(target: LOG_TARGET, host=?host,port=?port, "Server is listening"); warp::serve(routes).run((host, port)).await; Ok(()) } - -/// Create a DB pool. -async fn create_db() -> Result, sqlx::Error> { - let pg_conn = - env::var("DATABASE_URL").expect("Environment variable DATABASE_URL should be set"); - let pg_max_conn = env::var("RCH_DATABASE_MAX_CONNECTIONS").map_or(5, |var| { - var.parse::() - .expect("Environment variable RCH_DATABASE_MAX_CONNECTIONS should parse to u32") - }); - - // create connection pool with database - // connection pool internally the shared db connection - // with arc so it can safely be cloned and shared across threads - let pool = PgPoolOptions::new() - .max_connections(pg_max_conn) - .connect(pg_conn.as_str()) - .await?; - - sqlx::migrate!("./migrations").run(&pool).await?; - - Ok(pool) -} diff --git a/backend/src/http/v0/bulk/db.rs b/backend/src/http/v0/bulk/db.rs deleted file mode 100644 index 9d995f046..000000000 --- a/backend/src/http/v0/bulk/db.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use sqlx::{Pool, Postgres}; -use warp::Filter; - -/// Warp filter that extracts a Pg Pool if the option is Some, or else rejects -/// with a 404. -pub fn with_db( - o: Option>, -) -> impl Filter,), Error = warp::Rejection> + Clone { - warp::any().and_then(move || { - let o = o.clone(); // Still not 100% sure why I need to clone here... - async move { - if let Some(conn_pool) = o { - Ok(conn_pool) - } else { - Err(warp::reject::not_found()) - } - } - }) -} diff --git a/backend/src/http/v0/bulk/error.rs b/backend/src/http/v0/bulk/error.rs deleted file mode 100644 index de5ddc9fd..000000000 --- a/backend/src/http/v0/bulk/error.rs +++ /dev/null @@ -1,44 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use warp::reject; - -#[derive(Debug)] -pub enum CsvError { - CsvLib(csv::Error), - CsvLibWriter(Box>>>), - Parse(&'static str), -} - -/// Catch all error struct for the bulk endpoints -#[derive(Debug)] -pub enum BulkError { - EmptyInput, - JobInProgress, - Db(sqlx::Error), - Csv(CsvError), - Json(serde_json::Error), -} - -// Defaults to Internal server error -impl reject::Reject for BulkError {} - -// wrap sql errors as db errors for reacher -impl From for BulkError { - fn from(e: sqlx::Error) -> Self { - BulkError::Db(e) - } -} diff --git a/backend/src/http/v0/bulk/get.rs b/backend/src/http/v0/bulk/get.rs deleted file mode 100644 index 004f591c1..000000000 --- a/backend/src/http/v0/bulk/get.rs +++ /dev/null @@ -1,171 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -//! This file implements the `GET /bulk/{id}` endpoint. - -use check_if_email_exists::LOG_TARGET; -use serde::Serialize; -use sqlx::types::chrono::{DateTime, Utc}; -use sqlx::{Pool, Postgres}; -use tracing::error; -use warp::Filter; - -use super::{db::with_db, error::BulkError}; - -/// NOTE: Type conversions from postgres to rust types -/// are according to the table given by -/// [sqlx here](https://docs.rs/sqlx/latest/sqlx/postgres/types/index.html) -#[derive(Debug, Serialize, PartialEq, Eq)] -enum ValidStatus { - Running, - Completed, -} - -/// Job record stores the information about a submitted job -/// -/// `job_status` field is an update on read field. It's -/// status will be derived from counting number of -/// completed email verification tasks. It will be updated -/// with the most recent status of the job. -#[derive(sqlx::FromRow, Debug, Serialize)] -struct JobRecord { - id: i32, - created_at: DateTime, - total_records: i32, -} - -/// Summary of a bulk verification job status -#[derive(Debug, Serialize)] -struct JobStatusSummary { - total_safe: i32, - total_risky: i32, - total_invalid: i32, - total_unknown: i32, -} - -/// Complete information about a bulk verification job -#[derive(Debug, Serialize)] -struct JobStatusResponseBody { - job_id: i32, - created_at: DateTime, - finished_at: Option>, - total_records: i32, - total_processed: i32, - summary: JobStatusSummary, - job_status: ValidStatus, -} - -async fn job_status( - job_id: i32, - conn_pool: Pool, -) -> Result { - let job_rec = sqlx::query_as!( - JobRecord, - r#" - SELECT id, created_at, total_records FROM bulk_jobs - WHERE id = $1 - LIMIT 1 - "#, - job_id - ) - .fetch_one(&conn_pool) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to get job record for [job={}] with [error={}]", - job_id, e - ); - BulkError::from(e) - })?; - - let agg_info = sqlx::query!( - r#" - SELECT - COUNT(*) as total_processed, - COUNT(CASE WHEN result ->> 'is_reachable' LIKE 'safe' THEN 1 END) as safe_count, - COUNT(CASE WHEN result ->> 'is_reachable' LIKE 'risky' THEN 1 END) as risky_count, - COUNT(CASE WHEN result ->> 'is_reachable' LIKE 'invalid' THEN 1 END) as invalid_count, - COUNT(CASE WHEN result ->> 'is_reachable' LIKE 'unknown' THEN 1 END) as unknown_count, - (SELECT created_at FROM email_results WHERE job_id = $1 ORDER BY created_at DESC LIMIT 1) as finished_at - FROM email_results - WHERE job_id = $1 - "#, - job_id - ) - .fetch_one(&conn_pool) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to get aggregate info for [job={}] with [error={}]", - job_id, - e - ); - BulkError::from(e) - })?; - - let (job_status, finished_at) = if (agg_info - .total_processed - .expect("sql COUNT() returns an int. qed.") as i32) - < job_rec.total_records - { - (ValidStatus::Running, None) - } else { - ( - ValidStatus::Completed, - Some( - agg_info - .finished_at - .expect("always at least one task in the job. qed."), - ), - ) - }; - - Ok(warp::reply::json(&JobStatusResponseBody { - job_id: job_rec.id, - created_at: job_rec.created_at, - finished_at, - total_records: job_rec.total_records, - total_processed: agg_info - .total_processed - .expect("sql COUNT returns an int. qed.") as i32, - summary: JobStatusSummary { - total_safe: agg_info.safe_count.expect("sql COUNT returns an int. qed.") as i32, - total_risky: agg_info - .risky_count - .expect("sql COUNT returns an int. qed.") as i32, - total_invalid: agg_info - .invalid_count - .expect("sql COUNT returns an int. qed.") as i32, - total_unknown: agg_info - .unknown_count - .expect("sql COUNT returns an int. qed.") as i32, - }, - job_status, - })) -} - -pub fn get_bulk_job_status( - o: Option>, -) -> impl Filter + Clone { - warp::path!("v0" / "bulk" / i32) - .and(warp::get()) - .and(with_db(o)) - .and_then(job_status) - // View access logs by setting `RUST_LOG=reacher`. - .with(warp::log(LOG_TARGET)) -} diff --git a/backend/src/http/v0/bulk/mod.rs b/backend/src/http/v0/bulk/mod.rs deleted file mode 100644 index 061d27e3a..000000000 --- a/backend/src/http/v0/bulk/mod.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -mod db; -mod error; -pub mod get; -pub mod post; -pub mod results; -mod task; - -use std::env; - -use check_if_email_exists::LOG_TARGET; -use sqlx::{Pool, Postgres}; -use sqlxmq::{JobRegistry, JobRunnerHandle}; -use tracing::info; - -pub use task::email_verification_task; - -/// Create a job registry with one task: the email verification task. -pub async fn create_job_registry(pool: &Pool) -> Result { - let min_task_conc = env::var("RCH_MINIMUM_TASK_CONCURRENCY").map_or(10, |var| { - var.parse::() - .expect("Environment variable RCH_MINIMUM_TASK_CONCURRENCY should parse to usize") - }); - let max_conc_task_fetch = env::var("RCH_MAXIMUM_CONCURRENT_TASK_FETCH").map_or(20, |var| { - var.parse::() - .expect("Environment variable RCH_MAXIMUM_CONCURRENT_TASK_FETCH should parse to usize") - }); - - // registry needs to be given list of jobs it can accept - let registry = JobRegistry::new(&[email_verification_task]); - - // create runner for the message queue associated - // with this job registry - let registry = registry - // Create a job runner using the connection pool. - .runner(pool) - // Here is where you can configure the job runner - // Aim to keep 10-20 jobs running at a time. - .set_concurrency(min_task_conc, max_conc_task_fetch) - // Start the job runner in the background. - .run() - .await?; - - info!( - target: LOG_TARGET, - "Bulk endpoints enabled with concurrency min={min_task_conc} to max={max_conc_task_fetch}." - ); - - Ok(registry) -} diff --git a/backend/src/http/v0/bulk/post.rs b/backend/src/http/v0/bulk/post.rs deleted file mode 100644 index 5bc291324..000000000 --- a/backend/src/http/v0/bulk/post.rs +++ /dev/null @@ -1,151 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -//! This file implements the `POST /v0/bulk` endpoint. - -use check_if_email_exists::CheckEmailInputProxy; -use check_if_email_exists::LOG_TARGET; -use serde::{Deserialize, Serialize}; -use sqlx::{Pool, Postgres}; -use tracing::{debug, error}; -use warp::Filter; - -use super::{ - db::with_db, - error::BulkError, - task::{submit_job, TaskInput}, -}; -use crate::check::check_header; - -/// Endpoint request body. -#[derive(Clone, Debug, Deserialize, Serialize)] -struct CreateBulkRequestBody { - input_type: String, - input: Vec, - proxy: Option, - hello_name: Option, - from_email: Option, - smtp_ports: Option>, -} - -struct CreateBulkRequestBodyIterator { - body: CreateBulkRequestBody, - index: usize, -} - -impl IntoIterator for CreateBulkRequestBody { - type Item = TaskInput; - type IntoIter = CreateBulkRequestBodyIterator; - - fn into_iter(self) -> Self::IntoIter { - CreateBulkRequestBodyIterator { - body: self, - index: 0, - } - } -} - -impl Iterator for CreateBulkRequestBodyIterator { - type Item = TaskInput; - - fn next(&mut self) -> Option { - if self.index < self.body.input.len() { - let to_email = &self.body.input[self.index]; - let item = TaskInput { - to_email: to_email.clone(), - smtp_ports: self.body.smtp_ports.clone().unwrap_or_else(|| vec![25]), - proxy: self.body.proxy.clone(), - hello_name: self.body.hello_name.clone(), - from_email: self.body.from_email.clone(), - }; - - self.index += 1; - Some(item) - } else { - None - } - } -} - -/// Endpoint response body. -#[derive(Clone, Debug, Deserialize, Serialize)] -struct CreateBulkResponseBody { - job_id: i32, -} - -/// handles input, creates db entry for job and tasks for verification -async fn create_bulk_request( - conn_pool: Pool, - body: CreateBulkRequestBody, -) -> Result { - if body.input.is_empty() { - return Err(BulkError::EmptyInput.into()); - } - - // create job entry - let rec = sqlx::query!( - r#" - INSERT INTO bulk_jobs (total_records) - VALUES ($1) - RETURNING id - "#, - body.input.len() as i32 - ) - .fetch_one(&conn_pool) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to create job record for [body={:?}] with [error={}]", - &body, e - ); - BulkError::from(e) - })?; - - for task_input in body.into_iter() { - let task_uuid = submit_job(&conn_pool, rec.id, task_input).await?; - - debug!( - target: LOG_TARGET, - "Submitted task to sqlxmq for [job={}] with [uuid={}]", - rec.id, task_uuid - ); - } - - Ok(warp::reply::json(&CreateBulkResponseBody { - job_id: rec.id, - })) -} - -/// Create the `POST /bulk` endpoint. -/// The endpoint accepts list of email address and creates -/// a new job to check them. -pub fn create_bulk_job( - o: Option>, -) -> impl Filter + Clone { - warp::path!("v0" / "bulk") - .and(warp::post()) - .and(check_header()) - .and(with_db(o)) - // When accepting a body, we want a JSON body (and to reject huge - // payloads)... - // TODO: Configure max size limit for a bulk job - .and(warp::body::content_length_limit(1024 * 16)) - .and(warp::body::json()) - .and_then(create_bulk_request) - // View access logs by setting `RUST_LOG=reacher_backend`. - .with(warp::log(LOG_TARGET)) -} diff --git a/backend/src/http/v0/bulk/results/csv_helper.rs b/backend/src/http/v0/bulk/results/csv_helper.rs deleted file mode 100644 index 121e86112..000000000 --- a/backend/src/http/v0/bulk/results/csv_helper.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use serde::Serialize; -use std::convert::TryFrom; - -/// Wrapper for serde json value to convert -/// into a csv response -#[derive(Debug)] -pub struct CsvWrapper(pub serde_json::Value); - -/// Simplified output of `CheckEmailOutput` struct -/// for csv fields. -#[derive(Debug, Serialize)] -pub struct JobResultCsvResponse { - input: String, - is_reachable: String, - #[serde(rename = "misc.is_disposable")] - misc_is_disposable: bool, - #[serde(rename = "misc.is_role_account")] - misc_is_role_account: bool, - #[serde(rename = "misc.gravatar_url")] - misc_gravatar_url: Option, - #[serde(rename = "mx.accepts_mail")] - mx_accepts_mail: bool, - #[serde(rename = "smtp.can_connect")] - smtp_can_connect: bool, - #[serde(rename = "smtp.has_full_inbox")] - smtp_has_full_inbox: bool, - #[serde(rename = "smtp.is_catch_all")] - smtp_is_catch_all: bool, - #[serde(rename = "smtp.is_deliverable")] - smtp_is_deliverable: bool, - #[serde(rename = "smtp.is_disabled")] - smtp_is_disabled: bool, - #[serde(rename = "syntax.is_valid_syntax")] - syntax_is_valid_syntax: bool, - #[serde(rename = "syntax.domain")] - syntax_domain: String, - #[serde(rename = "syntax.username")] - syntax_username: String, - error: Option, -} - -/// Convert csv wrapper to csv response -/// Performs multiple allocations for string fields -/// throw error if field is missing -impl TryFrom for JobResultCsvResponse { - type Error = &'static str; - - fn try_from(value: CsvWrapper) -> Result { - let mut input: String = String::default(); - let mut is_reachable: String = String::default(); - let mut misc_is_disposable: bool = false; - let mut misc_is_role_account: bool = false; - let mut misc_gravatar_url: Option = None; - let mut mx_accepts_mail: bool = false; - let mut smtp_can_connect: bool = false; - let mut smtp_has_full_inbox: bool = false; - let mut smtp_is_catch_all: bool = false; - let mut smtp_is_deliverable: bool = false; - let mut smtp_is_disabled: bool = false; - let mut syntax_is_valid_syntax: bool = false; - let mut syntax_domain: String = String::default(); - let mut syntax_username: String = String::default(); - let mut error: Option = None; - - let top_level = value - .0 - .as_object() - .ok_or("Failed to find top level object")?; - for (key, val) in top_level.keys().zip(top_level.values()) { - match key.as_str() { - "input" => input = val.as_str().ok_or("input should be a string")?.to_string(), - "is_reachable" => { - is_reachable = val - .as_str() - .ok_or("is_reachable should be a string")? - .to_string() - } - "misc" => { - let misc_obj = val.as_object().ok_or("misc field should be an object")?; - for (key, val) in misc_obj.keys().zip(misc_obj.values()) { - match key.as_str() { - "error" => error = Some(val.to_string()), - "is_disposable" => { - misc_is_disposable = - val.as_bool().ok_or("is_disposable should be a boolean")? - } - "is_role_account" => { - misc_is_role_account = - val.as_bool().ok_or("is_role_account should be a boolean")? - } - "gravatar_url" => { - if Option::is_some(&val.as_str()) { - misc_gravatar_url = Some(val.to_string()) - } - } - _ => {} - } - } - } - "mx" => { - let mx_obj = val.as_object().ok_or("mx field should be an object")?; - for (key, val) in mx_obj.keys().zip(mx_obj.values()) { - match key.as_str() { - "error" => error = Some(val.to_string()), - "accepts_email" => { - mx_accepts_mail = - val.as_bool().ok_or("accepts_email should be a boolean")? - } - _ => {} - } - } - } - "smtp" => { - let smtp_obj = val.as_object().ok_or("mx field should be an object")?; - for (key, val) in smtp_obj.keys().zip(smtp_obj.values()) { - match key.as_str() { - "error" => error = Some(val.to_string()), - "can_connect_smtp" => { - smtp_can_connect = val - .as_bool() - .ok_or("can_connect_smtp should be a boolean")? - } - "has_full_inbox" => { - smtp_has_full_inbox = - val.as_bool().ok_or("has_full_inbox should be a boolean")? - } - "is_catch_all" => { - smtp_is_catch_all = - val.as_bool().ok_or("is_catch_all should be a boolean")? - } - "is_deliverable" => { - smtp_is_deliverable = - val.as_bool().ok_or("is_deliverable should be a boolean")? - } - "is_disabled" => { - smtp_is_disabled = - val.as_bool().ok_or("is_disabled should be a boolean")? - } - _ => {} - } - } - } - "syntax" => { - let syntax_obj = val.as_object().ok_or("syntax field should be an object")?; - for (key, val) in syntax_obj.keys().zip(syntax_obj.values()) { - match key.as_str() { - "error" => error = Some(val.to_string()), - "is_valid_syntax" => { - syntax_is_valid_syntax = - val.as_bool().ok_or("is_valid_syntax should be a boolean")? - } - "username" => { - syntax_username = val - .as_str() - .ok_or("username should be a string")? - .to_string() - } - "domain" => { - syntax_domain = - val.as_str().ok_or("domain should be a string")?.to_string() - } - _ => {} - } - } - } - // ignore unknown fields - _ => {} - } - } - - Ok(JobResultCsvResponse { - input, - is_reachable, - misc_is_disposable, - misc_is_role_account, - misc_gravatar_url, - mx_accepts_mail, - smtp_can_connect, - smtp_has_full_inbox, - smtp_is_catch_all, - smtp_is_deliverable, - smtp_is_disabled, - syntax_domain, - syntax_is_valid_syntax, - syntax_username, - error, - }) - } -} diff --git a/backend/src/http/v0/bulk/results/mod.rs b/backend/src/http/v0/bulk/results/mod.rs deleted file mode 100644 index b19dbc9fa..000000000 --- a/backend/src/http/v0/bulk/results/mod.rs +++ /dev/null @@ -1,248 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -//! This file implements the /bulk/{id}/results endpoints. - -use check_if_email_exists::LOG_TARGET; -use csv::WriterBuilder; -use serde::{Deserialize, Serialize}; -use sqlx::{Executor, Pool, Postgres, Row}; -use std::convert::TryInto; -use std::iter::Iterator; -use tracing::error; -use warp::Filter; - -use super::{ - db::with_db, - error::{BulkError, CsvError}, -}; -use csv_helper::{CsvWrapper, JobResultCsvResponse}; - -mod csv_helper; - -/// Defines the download format, passed in as a query param. -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -enum JobResultResponseFormat { - Json, - Csv, -} - -// limit and offset are optional in the request -// If unspecified, offset will default to 0. -#[derive(Serialize, Deserialize)] -struct JobResultRequest { - format: Option, - limit: Option, - offset: Option, -} - -#[derive(Serialize, Deserialize)] -struct JobResultJsonResponse { - results: Vec, -} - -async fn job_result( - job_id: i32, - conn_pool: Pool, - req: JobResultRequest, -) -> Result { - // Throw an error if the job is still running. - // Is there a way to combine these 2 requests in one? - let total_records = sqlx::query!( - r#"SELECT total_records FROM bulk_jobs WHERE id = $1;"#, - job_id - ) - .fetch_one(&conn_pool) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to fetch total_records for [job={}] with [error={}]", - job_id, e - ); - BulkError::from(e) - })? - .total_records; - let total_processed = sqlx::query!( - r#"SELECT COUNT(*) FROM email_results WHERE job_id = $1;"#, - job_id - ) - .fetch_one(&conn_pool) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to get total_processed for [job={}] with [error={}]", - job_id, e - ); - BulkError::from(e) - })? - .count - .unwrap_or(0); - - if total_processed < total_records as i64 { - return Err(BulkError::JobInProgress.into()); - } - - let format = req.format.unwrap_or(JobResultResponseFormat::Json); - match format { - JobResultResponseFormat::Json => { - let data = - job_result_json(job_id, req.limit, req.offset.unwrap_or(0), conn_pool).await?; - - let reply = - serde_json::to_vec(&JobResultJsonResponse { results: data }).map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to convert json results to string for [job={}] with [error={}]", - job_id, e - ); - - BulkError::Json(e) - })?; - - Ok(warp::reply::with_header( - reply, - "Content-Type", - "application/json", - )) - } - JobResultResponseFormat::Csv => { - let data = - job_result_csv(job_id, req.limit, req.offset.unwrap_or(0), conn_pool).await?; - - Ok(warp::reply::with_header(data, "Content-Type", "text/csv")) - } - } -} - -async fn job_result_as_iter( - job_id: i32, - limit: Option, - offset: u64, - conn_pool: Pool, -) -> Result>, BulkError> { - let query = sqlx::query!( - r#" - SELECT result FROM email_results - WHERE job_id = $1 - ORDER BY id - LIMIT $2 OFFSET $3 - "#, - job_id, - limit.map(|l| l as i64), - offset as i64 - ); - - let rows = conn_pool.fetch_all(query).await.map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to get results for [job={}] [limit={}] [offset={}] with [error={}]", - job_id, - limit.map(|s| s.to_string()).unwrap_or_else(|| "n/a".into()), - offset, - e - ); - - BulkError::from(e) - })?; - - Ok(Box::new( - rows.into_iter() - .map(|row| row.get::("result")), - )) -} - -async fn job_result_json( - job_id: i32, - limit: Option, - offset: u64, - conn_pool: Pool, -) -> Result, warp::Rejection> { - // For JSON responses, we don't want ot return more than 50 results at a - // time, to avoid having a too big payload (unless client specifies a limit) - - Ok( - job_result_as_iter(job_id, limit.or(Some(50)), offset, conn_pool) - .await? - .collect(), - ) -} - -async fn job_result_csv( - job_id: i32, - limit: Option, - offset: u64, - conn_pool: Pool, -) -> Result, warp::Rejection> { - let rows = job_result_as_iter(job_id, limit, offset, conn_pool).await?; - let mut wtr = WriterBuilder::new().has_headers(true).from_writer(vec![]); - - for json_value in rows { - let result_csv: JobResultCsvResponse = CsvWrapper(json_value).try_into().map_err(|e: &'static str| { - error!( - target: LOG_TARGET, - "Failed to convert json to csv output struct for [job={}] [limit={}] [offset={}] to csv with [error={}]", - job_id, - limit.map(|s| s.to_string()).unwrap_or_else(|| "n/a".into()), - offset, - e - ); - - BulkError::Csv(CsvError::Parse(e)) - })?; - wtr.serialize(result_csv).map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to serialize result for [job={}] [limit={}] [offset={}] to csv with [error={}]", - job_id, - limit.map(|s| s.to_string()).unwrap_or_else(|| "n/a".into()), - offset, - e - ); - - BulkError::Csv(CsvError::CsvLib(e)) - })?; - } - - let data = wtr.into_inner().map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to convert results for [job={}] [limit={}] [offset={}] to csv with [error={}]", - job_id, - limit.map(|s| s.to_string()).unwrap_or_else(|| "n/a".into()), - offset, - e - ); - - BulkError::Csv(CsvError::CsvLibWriter(Box::new(e))) - })?; - - Ok(data) -} - -pub fn get_bulk_job_result( - o: Option>, -) -> impl Filter + Clone { - warp::path!("v0" / "bulk" / i32 / "results") - .and(warp::get()) - .and(with_db(o)) - .and(warp::query::()) - .and_then(job_result) - // View access logs by setting `RUST_LOG=reacher_backend`. - .with(warp::log(LOG_TARGET)) -} diff --git a/backend/src/http/v0/bulk/task.rs b/backend/src/http/v0/bulk/task.rs deleted file mode 100644 index e4bc07ac3..000000000 --- a/backend/src/http/v0/bulk/task.rs +++ /dev/null @@ -1,230 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -//! This file implements the `POST /bulk` endpoint. - -use check_if_email_exists::LOG_TARGET; -use check_if_email_exists::{CheckEmailInput, CheckEmailInputProxy, CheckEmailOutput, Reachable}; -use serde::{Deserialize, Serialize}; -use sqlx::{Pool, Postgres}; -use sqlxmq::{job, CurrentJob}; -use std::error::Error; -use tracing::{debug, error}; -use uuid::Uuid; - -use super::error::BulkError; -use crate::check::check_email; - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct TaskInput { - // fields for CheckEmailInput - pub to_email: String, // Email from request to verify. - pub smtp_ports: Vec, // Ports to try for each email, in given order. Defaults to [25]. - pub proxy: Option, - pub hello_name: Option, - pub from_email: Option, -} - -pub struct TaskInputIterator { - body: TaskInput, - index: usize, -} - -impl IntoIterator for TaskInput { - type Item = CheckEmailInput; - type IntoIter = TaskInputIterator; - - fn into_iter(self) -> Self::IntoIter { - TaskInputIterator { - body: self, - index: 0, - } - } -} - -/// Iterate through all the `smtp_ports`. -impl Iterator for TaskInputIterator { - type Item = CheckEmailInput; - - fn next(&mut self) -> Option { - if self.index < self.body.smtp_ports.len() { - let mut item = CheckEmailInput::new(self.body.to_email.clone()); - - if let Some(name) = &self.body.hello_name { - item.set_hello_name(name.clone()); - } - - if let Some(email) = &self.body.from_email { - item.set_from_email(email.clone()); - } - - item.set_smtp_port(self.body.smtp_ports[self.index]); - - if let Some(proxy) = &self.body.proxy { - item.set_proxy(proxy.clone()); - } - - self.index += 1; - Some(item) - } else { - None - } - } -} - -/// Struct that's serialized into the sqlxmq own `payload_json` table. -#[derive(Debug, Deserialize, Serialize)] -struct TaskPayload { - id: i32, - input: TaskInput, -} - -pub async fn submit_job( - conn_pool: &Pool, - job_id: i32, - task_input: TaskInput, -) -> Result { - let task_payload = TaskPayload { - id: job_id, - input: task_input, - }; - - let uuid = email_verification_task - .builder() - .set_json(&task_payload) - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to submit task with the following [input={:?}] with [error={}]", - task_payload.input, e - ); - - BulkError::Json(e) - })? - .spawn(conn_pool) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to submit task for [bulk_req={}] with [error={}]", - job_id, e - ); - - e - })?; - - Ok(uuid) -} - -/// Arguments to the `#[job]` attribute allow setting default task options. -/// This task tries to verify the given email and inserts the results -/// into the email verification db table -/// NOTE: if EMAIL_TASK_BATCH_SIZE is made greater than 1 this logic -/// will have to be changed to handle a vector outputs from `check_email`. -/// -/// Small note about namings: what sqlxmq calls a "job", we call it a "task". -/// We call a "job" a user bulk request, i.e. a list of "tasks". -/// Please be careful while reading code. -#[job] -pub async fn email_verification_task( - mut current_job: CurrentJob, - // Additional arguments are optional, but can be used to access context - // provided via [`JobRegistry::set_context`]. -) -> Result<(), Box> { - let task_payload: TaskPayload = current_job.json()?.ok_or("Got empty task.")?; - let job_id = task_payload.id; - - let mut final_response: Option = None; - - for check_email_input in task_payload.input { - debug!( - target: LOG_TARGET, - "Starting task [email={}] for [job={}] and [uuid={}]", - check_email_input.to_email, - task_payload.id, - current_job.id(), - ); - - let to_email = check_email_input.to_email.clone(); - let response = check_email(check_email_input).await; - - debug!( - target: LOG_TARGET, - "Got task result [email={}] for [job={}] and [uuid={}] with [is_reachable={:?}]", - to_email, - task_payload.id, - current_job.id(), - response.is_reachable, - ); - - let is_reachable = response.is_reachable == Reachable::Unknown; - final_response = Some(response); - // unsuccessful validation continue iteration with next possible smtp port - if is_reachable { - continue; - } - // successful validation attempt complete job break iteration - else { - break; - } - } - - // final response can only be empty if there - // were no validation attempts. This can can - // never occur currently - if let Some(response) = final_response { - // write results and terminate iteration - #[allow(unused_variables)] - let rec = sqlx::query!( - r#" - INSERT INTO email_results (job_id, result) - VALUES ($1, $2) - "#, - job_id, - serde_json::json!(response) - ) - // TODO: This is a simplified solution and will work when - // the job queue and email results tables are in the same - // database. Keeping them in separate database will require - // some custom logic on the job registry side - // https://github.com/Diggsey/sqlxmq/issues/4 - .fetch_optional(current_job.pool()) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to write [email={}] result to db for [job={}] and [uuid={}] with [error={}]", - response.input, - job_id, - current_job.id(), - e - ); - - e - })?; - - debug!( - target: LOG_TARGET, - "Wrote result for [email={}] for [job={}] and [uuid={}]", - response.input, - job_id, - current_job.id(), - ); - } - - current_job.complete().await?; - Ok(()) -} diff --git a/backend/src/http/v0/mod.rs b/backend/src/http/v0/mod.rs index 92bd84116..02927d4ac 100644 --- a/backend/src/http/v0/mod.rs +++ b/backend/src/http/v0/mod.rs @@ -1,2 +1 @@ -pub mod bulk; pub mod check_email; diff --git a/backend/src/http/v1/bulk/post.rs b/backend/src/http/v1/bulk/post.rs index 027376730..7eb6f366c 100644 --- a/backend/src/http/v1/bulk/post.rs +++ b/backend/src/http/v1/bulk/post.rs @@ -115,7 +115,7 @@ async fn create_bulk_request( /// The endpoint accepts list of email address and creates /// a new job to check them. pub fn create_bulk_job( - o: Option, + o: Channel, ) -> impl Filter + Clone { warp::path!("v1" / "bulk") .and(warp::post()) @@ -133,16 +133,7 @@ pub fn create_bulk_job( /// Warp filter that extracts lapin Channel. fn with_channel( - o: Option, -) -> impl Filter + Clone { - warp::any().and_then(move || { - let o = o.clone(); - async move { - if let Some(channel) = o { - Ok(channel) - } else { - Err(warp::reject::not_found()) - } - } - }) + channel: Channel, +) -> impl Filter + Clone { + warp::any().map(move || channel.clone()) } diff --git a/backend/src/main.rs b/backend/src/main.rs index 2270f46df..634f74237 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -47,25 +47,18 @@ async fn main() -> Result<(), Box> { let _guard: sentry::ClientInitGuard = setup_sentry(); #[cfg(feature = "worker")] - let backend_name = env::var("RCH_BACKEND_NAME").expect("RCH_BACKEND_NAME is not set"); + { + let backend_name = env::var("RCH_BACKEND_NAME").expect("RCH_BACKEND_NAME is not set"); + let channel = create_channel(&backend_name).await?; + let http_server = run_warp_server(channel.clone()); + try_join!(http_server, run_worker(channel, &backend_name))?; + } - #[cfg(feature = "worker")] - let channel = { Some(create_channel(&backend_name).await?) }; - #[cfg(not(feature = "worker"))] - let channel = None; - - let _http_server = run_warp_server(channel.clone()); - - #[cfg(feature = "worker")] - try_join!( - _http_server, - run_worker( - channel.expect("If worker feature is set, channel is set."), - &backend_name - ) - )?; #[cfg(not(feature = "worker"))] - _http_server.await?; + { + let http_server = run_warp_server(); + http_server.await? + } Ok(()) } diff --git a/backend/src/worker/check_email.rs b/backend/src/worker/check_email.rs index bae27021d..679948b13 100644 --- a/backend/src/worker/check_email.rs +++ b/backend/src/worker/check_email.rs @@ -19,8 +19,13 @@ use check_if_email_exists::{Reachable, LOG_TARGET}; use lapin::message::Delivery; use lapin::{options::*, BasicProperties, Channel}; use serde::{Deserialize, Serialize}; +#[cfg(feature = "postgres")] +use sqlx::{Pool, Postgres}; use tracing::{debug, info}; +#[cfg(feature = "postgres")] +use super::db::save_to_db; + use crate::check::check_email; #[derive(Debug, Deserialize, Serialize)] @@ -45,6 +50,7 @@ struct WebhookOutput { pub async fn process_check_email( channel: &Channel, delivery: Delivery, + #[cfg(feature = "postgres")] conn_pool: Option>, ) -> Result<(), Box> { let payload = serde_json::from_slice::(&delivery.data)?; info!(target: LOG_TARGET, email=?payload.input.to_email, "New job"); @@ -82,6 +88,12 @@ pub async fn process_check_email( let (email, is_reachable) = (output.input.to_owned(), output.is_reachable.clone()); + // Check if we have a DB to save the results to + #[cfg(feature = "postgres")] + if let Some(conn_pool) = conn_pool { + save_to_db(conn_pool, &output).await?; + } + // Check if we have a webhook to send the output to. if let Some(webhook) = payload.webhook { let webhook_output = WebhookOutput { diff --git a/backend/src/worker/db.rs b/backend/src/worker/db.rs new file mode 100644 index 000000000..ceae61645 --- /dev/null +++ b/backend/src/worker/db.rs @@ -0,0 +1,75 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::env; + +use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; +use tracing::{debug, info}; + +pub async fn save_to_db( + conn_pool: Pool, + output: &CheckEmailOutput, +) -> Result<(), Box> { + let output_json = serde_json::to_value(output)?; + let is_reachable = format!("{:?}", output.is_reachable); + sqlx::query!( + r#" + INSERT INTO email_results (email, is_reachable, full_result) + VALUES ($1, $2, $3) + RETURNING id + "#, + output.input, + is_reachable, + output_json + ) + .fetch_one(&conn_pool) + .await?; + + debug!(target: LOG_TARGET, email=?output.input, is_reachable=?is_reachable, "Wrote to DB"); + + Ok(()) +} + +/// Create a DB pool. +pub async fn create_db() -> Result>, sqlx::Error> { + let pg_conn = env::var("DATABASE_URL").ok(); + let pg_max_conn = env::var("RCH_DATABASE_MAX_CONNECTIONS").map_or(5, |var| { + var.parse::() + .expect("Environment variable RCH_DATABASE_MAX_CONNECTIONS should parse to u32") + }); + + // create connection pool with database + // connection pool internally the shared db connection + // with arc so it can safely be cloned and shared across threads + let pool = match pg_conn { + Some(pg_conn_str) => { + let pool = PgPoolOptions::new() + .max_connections(pg_max_conn) + .connect(pg_conn_str.as_str()) + .await?; + + sqlx::migrate!("./migrations").run(&pool).await?; + + info!(target: LOG_TARGET, table="email_results", "Connected to DB, Reacher will write results to DB"); + + Some(pool) + } + None => None, + }; + + Ok(pool) +} diff --git a/backend/src/worker/mod.rs b/backend/src/worker/mod.rs index d4e78a0dd..56bab1691 100644 --- a/backend/src/worker/mod.rs +++ b/backend/src/worker/mod.rs @@ -22,8 +22,12 @@ use lapin::{options::*, types::FieldTable, Channel, Connection, ConnectionProper use tracing::{error, info}; pub mod check_email; +#[cfg(feature = "postgres")] +mod db; use check_email::process_check_email; +#[cfg(feature = "postgres")] +use db::create_db; pub async fn create_channel( backend_name: &str, @@ -88,10 +92,18 @@ pub async fn run_worker( ) .await?; + #[cfg(feature = "postgres")] + let conn_pool = create_db().await?; + while let Some(delivery) = consumer.next().await { if let Ok(delivery) = delivery { let channel = channel.clone(); + #[cfg(feature = "postgres")] + let conn_pool = conn_pool.clone(); tokio::spawn(async move { + #[cfg(feature = "postgres")] + let res = process_check_email(&channel, delivery, conn_pool).await; + #[cfg(not(feature = "postgres"))] let res = process_check_email(&channel, delivery).await; if let Err(err) = res { error!(target: LOG_TARGET, error=?err, "Error processing message"); diff --git a/backend/tests/check_email.rs b/backend/tests/check_email.rs index 979047ce9..7dcd832ff 100644 --- a/backend/tests/check_email.rs +++ b/backend/tests/check_email.rs @@ -18,7 +18,7 @@ use std::env; use check_if_email_exists::CheckEmailInput; use reacher_backend::check::REACHER_SECRET_HEADER; -use reacher_backend::http::create_routes; +use reacher_backend::http::create_routes_without_bulk; use warp::http::StatusCode; use warp::test::request; @@ -34,7 +34,7 @@ async fn test_input_foo_bar() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar"}"#).unwrap()) - .reply(&create_routes(None, None)) + .reply(&create_routes_without_bulk()) .await; assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp.body()); @@ -50,7 +50,7 @@ async fn test_input_foo_bar_baz() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar.baz"}"#).unwrap()) - .reply(&create_routes(None, None)) + .reply(&create_routes_without_bulk()) .await; assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp.body()); @@ -65,7 +65,7 @@ async fn test_reacher_secret_missing_header() { .path("/v0/check_email") .method("POST") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar.baz"}"#).unwrap()) - .reply(&create_routes(None, None)) + .reply(&create_routes_without_bulk()) .await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST, "{:?}", resp.body()); @@ -81,7 +81,7 @@ async fn test_reacher_secret_wrong_secret() { .method("POST") .header(REACHER_SECRET_HEADER, "barbaz") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar.baz"}"#).unwrap()) - .reply(&create_routes(None, None)) + .reply(&create_routes_without_bulk()) .await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST, "{:?}", resp.body()); @@ -97,7 +97,7 @@ async fn test_reacher_secret_correct_secret() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar"}"#).unwrap()) - .reply(&create_routes(None, None)) + .reply(&create_routes_without_bulk()) .await; assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp.body()); @@ -113,7 +113,7 @@ async fn test_reacher_to_mail_empty() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{"to_email": ""}"#).unwrap()) - .reply(&create_routes(None, None)) + .reply(&create_routes_without_bulk()) .await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST, "{:?}", resp.body()); @@ -129,7 +129,7 @@ async fn test_reacher_to_mail_missing() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{}"#).unwrap()) - .reply(&create_routes(None, None)) + .reply(&create_routes_without_bulk()) .await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST, "{:?}", resp.body());