Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): CSV download retrieves all results #1362

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,18 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! This file implements the /bulk/{id}/results endpoints.

use std::convert::{TryFrom, TryInto};

use check_if_email_exists::LOG_TARGET;
use csv::WriterBuilder;
use serde::{Deserialize, Serialize};
use sqlx::{Executor, Pool, Postgres, Row};
use warp::Filter;

use super::{
db::with_db,
error::{BulkError, CsvError},
};

/// Defines the download format, passed in as a query param.
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
enum JobResultResponseFormat {
Json,
Csv,
}

// limit and offset are optional in the request
// if they are unspecified their default values
// are 50 and 0 respectively
#[derive(Serialize, Deserialize)]
struct JobResultRequest {
format: Option<JobResultResponseFormat>,
limit: Option<u64>,
offset: Option<u64>,
}

#[derive(Serialize, Deserialize)]
struct JobResultJsonResponse {
results: Vec<serde_json::Value>,
}
use serde::Serialize;
use std::convert::TryFrom;

/// Wrapper for serde json value to convert
/// into a csv response
#[derive(Debug)]
struct CsvWrapper(serde_json::Value);
pub struct CsvWrapper(pub serde_json::Value);

/// Simplified output of `CheckEmailOutput` struct
/// for csv fields.
#[derive(Debug, Serialize)]
struct JobResultCsvResponse {
pub struct JobResultCsvResponse {
input: String,
is_reachable: String,
#[serde(rename = "misc.is_disposable")]
Expand Down Expand Up @@ -238,221 +203,3 @@ impl TryFrom<CsvWrapper> for JobResultCsvResponse {
})
}
}

async fn job_result(
job_id: i32,
conn_pool: Pool<Postgres>,
req: JobResultRequest,
) -> Result<impl warp::Reply, warp::Rejection> {
// Throw an error if the job is still running.
// Is there a way to combine these 2 requests in one?
let total_records = sqlx::query!(
r#"SELECT total_records FROM bulk_jobs WHERE id = $1;"#,
job_id
)
.fetch_one(&conn_pool)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to fetch total_records for [job={}] with [error={}]",
job_id,
e
);
BulkError::from(e)
})?
.total_records;
let total_processed = sqlx::query!(
r#"SELECT COUNT(*) FROM email_results WHERE job_id = $1;"#,
job_id
)
.fetch_one(&conn_pool)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to get total_processed for [job={}] with [error={}]",
job_id,
e
);
BulkError::from(e)
})?
.count
.unwrap_or(0);

if total_processed < total_records as i64 {
return Err(BulkError::JobInProgress.into());
}

let format = req.format.unwrap_or(JobResultResponseFormat::Json);
match format {
JobResultResponseFormat::Json => {
let data = job_result_json(
job_id,
req.limit.unwrap_or(50),
req.offset.unwrap_or(0),
conn_pool,
)
.await?;

let reply =
serde_json::to_vec(&JobResultJsonResponse { results: data }).map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to convert json results to string for [job={}] with [error={}]",
job_id,
e
);

BulkError::Json(e)
})?;

Ok(warp::reply::with_header(
reply,
"Content-Type",
"application/json",
))
}
JobResultResponseFormat::Csv => {
let data = job_result_csv(
job_id,
req.limit.unwrap_or(5000),
req.offset.unwrap_or(0),
conn_pool,
)
.await?;

Ok(warp::reply::with_header(data, "Content-Type", "text/csv"))
}
}
}

async fn job_result_json(
job_id: i32,
limit: u64,
offset: u64,
conn_pool: Pool<Postgres>,
) -> Result<Vec<serde_json::Value>, warp::Rejection> {
let query = sqlx::query!(
r#"
SELECT result FROM email_results
WHERE job_id = $1
ORDER BY id
LIMIT $2 OFFSET $3
"#,
job_id,
limit as i64,
offset as i64
);

let rows: Vec<serde_json::Value> = conn_pool
.fetch_all(query)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to get results for [job={}] [limit={}] [offset={}] with [error={}]",
job_id,
limit,
offset,
e
);

BulkError::from(e)
})?
.iter()
.map(|row| row.get("result"))
.collect();

Ok(rows)
}

async fn job_result_csv(
job_id: i32,
limit: u64,
offset: u64,
conn_pool: Pool<Postgres>,
) -> Result<Vec<u8>, warp::Rejection> {
let query = sqlx::query!(
r#"
SELECT result FROM email_results
WHERE job_id = $1
ORDER BY id
LIMIT $2 OFFSET $3
"#,
job_id,
limit as i64,
offset as i64
);

let mut wtr = WriterBuilder::new().has_headers(true).from_writer(vec![]);

for json_value in conn_pool
.fetch_all(query)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to get results for [job={}] with [error={}]",
job_id,
e
);

BulkError::from(e)
})?
.iter()
.map(|row| row.get("result"))
{
let result_csv: JobResultCsvResponse = CsvWrapper(json_value).try_into().map_err(|e: &'static str| {
log::error!(
target: LOG_TARGET,
"Failed to convert json to csv output struct for [job={}] [limit={}] [offset={}] to csv with [error={}]",
job_id,
limit,
offset,
e
);

BulkError::Csv(CsvError::Parse(e))
})?;
wtr.serialize(result_csv).map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to serialize result for [job={}] [limit={}] [offset={}] to csv with [error={}]",
job_id,
limit,
offset,
e
);

BulkError::Csv(CsvError::CsvLib(e))
})?;
}

let data = wtr.into_inner().map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to convert results for [job={}] [limit={}] [offset={}] to csv with [error={}]",
job_id,
limit,
offset,
e
);

BulkError::Csv(CsvError::CsvLibWriter(Box::new(e)))
})?;

Ok(data)
}

pub fn get_bulk_job_result(
o: Option<Pool<Postgres>>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("v0" / "bulk" / i32 / "results")
.and(warp::get())
.and(with_db(o))
.and(warp::query::<JobResultRequest>())
.and_then(job_result)
// View access logs by setting `RUST_LOG=reacher`.
.with(warp::log(LOG_TARGET))
}
Loading
Loading