Skip to content

Commit

Permalink
fix(backend): CSV download retrieves all results (#1362)
Browse files Browse the repository at this point in the history
* Move file

* Fix compiler

* Update comments
  • Loading branch information
amaury1093 authored Oct 24, 2023
1 parent 1af3919 commit b3670fc
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,18 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! This file implements the /bulk/{id}/results endpoints.
use std::convert::{TryFrom, TryInto};

use check_if_email_exists::LOG_TARGET;
use csv::WriterBuilder;
use serde::{Deserialize, Serialize};
use sqlx::{Executor, Pool, Postgres, Row};
use warp::Filter;

use super::{
db::with_db,
error::{BulkError, CsvError},
};

/// Defines the download format, passed in as a query param.
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
enum JobResultResponseFormat {
Json,
Csv,
}

// limit and offset are optional in the request
// if they are unspecified their default values
// are 50 and 0 respectively
#[derive(Serialize, Deserialize)]
struct JobResultRequest {
format: Option<JobResultResponseFormat>,
limit: Option<u64>,
offset: Option<u64>,
}

#[derive(Serialize, Deserialize)]
struct JobResultJsonResponse {
results: Vec<serde_json::Value>,
}
use serde::Serialize;
use std::convert::TryFrom;

/// Wrapper for serde json value to convert
/// into a csv response
#[derive(Debug)]
struct CsvWrapper(serde_json::Value);
pub struct CsvWrapper(pub serde_json::Value);

/// Simplified output of `CheckEmailOutput` struct
/// for csv fields.
#[derive(Debug, Serialize)]
struct JobResultCsvResponse {
pub struct JobResultCsvResponse {
input: String,
is_reachable: String,
#[serde(rename = "misc.is_disposable")]
Expand Down Expand Up @@ -238,221 +203,3 @@ impl TryFrom<CsvWrapper> for JobResultCsvResponse {
})
}
}

async fn job_result(
job_id: i32,
conn_pool: Pool<Postgres>,
req: JobResultRequest,
) -> Result<impl warp::Reply, warp::Rejection> {
// Throw an error if the job is still running.
// Is there a way to combine these 2 requests in one?
let total_records = sqlx::query!(
r#"SELECT total_records FROM bulk_jobs WHERE id = $1;"#,
job_id
)
.fetch_one(&conn_pool)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to fetch total_records for [job={}] with [error={}]",
job_id,
e
);
BulkError::from(e)
})?
.total_records;
let total_processed = sqlx::query!(
r#"SELECT COUNT(*) FROM email_results WHERE job_id = $1;"#,
job_id
)
.fetch_one(&conn_pool)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to get total_processed for [job={}] with [error={}]",
job_id,
e
);
BulkError::from(e)
})?
.count
.unwrap_or(0);

if total_processed < total_records as i64 {
return Err(BulkError::JobInProgress.into());
}

let format = req.format.unwrap_or(JobResultResponseFormat::Json);
match format {
JobResultResponseFormat::Json => {
let data = job_result_json(
job_id,
req.limit.unwrap_or(50),
req.offset.unwrap_or(0),
conn_pool,
)
.await?;

let reply =
serde_json::to_vec(&JobResultJsonResponse { results: data }).map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to convert json results to string for [job={}] with [error={}]",
job_id,
e
);

BulkError::Json(e)
})?;

Ok(warp::reply::with_header(
reply,
"Content-Type",
"application/json",
))
}
JobResultResponseFormat::Csv => {
let data = job_result_csv(
job_id,
req.limit.unwrap_or(5000),
req.offset.unwrap_or(0),
conn_pool,
)
.await?;

Ok(warp::reply::with_header(data, "Content-Type", "text/csv"))
}
}
}

async fn job_result_json(
job_id: i32,
limit: u64,
offset: u64,
conn_pool: Pool<Postgres>,
) -> Result<Vec<serde_json::Value>, warp::Rejection> {
let query = sqlx::query!(
r#"
SELECT result FROM email_results
WHERE job_id = $1
ORDER BY id
LIMIT $2 OFFSET $3
"#,
job_id,
limit as i64,
offset as i64
);

let rows: Vec<serde_json::Value> = conn_pool
.fetch_all(query)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to get results for [job={}] [limit={}] [offset={}] with [error={}]",
job_id,
limit,
offset,
e
);

BulkError::from(e)
})?
.iter()
.map(|row| row.get("result"))
.collect();

Ok(rows)
}

async fn job_result_csv(
job_id: i32,
limit: u64,
offset: u64,
conn_pool: Pool<Postgres>,
) -> Result<Vec<u8>, warp::Rejection> {
let query = sqlx::query!(
r#"
SELECT result FROM email_results
WHERE job_id = $1
ORDER BY id
LIMIT $2 OFFSET $3
"#,
job_id,
limit as i64,
offset as i64
);

let mut wtr = WriterBuilder::new().has_headers(true).from_writer(vec![]);

for json_value in conn_pool
.fetch_all(query)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to get results for [job={}] with [error={}]",
job_id,
e
);

BulkError::from(e)
})?
.iter()
.map(|row| row.get("result"))
{
let result_csv: JobResultCsvResponse = CsvWrapper(json_value).try_into().map_err(|e: &'static str| {
log::error!(
target: LOG_TARGET,
"Failed to convert json to csv output struct for [job={}] [limit={}] [offset={}] to csv with [error={}]",
job_id,
limit,
offset,
e
);

BulkError::Csv(CsvError::Parse(e))
})?;
wtr.serialize(result_csv).map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to serialize result for [job={}] [limit={}] [offset={}] to csv with [error={}]",
job_id,
limit,
offset,
e
);

BulkError::Csv(CsvError::CsvLib(e))
})?;
}

let data = wtr.into_inner().map_err(|e| {
log::error!(
target: LOG_TARGET,
"Failed to convert results for [job={}] [limit={}] [offset={}] to csv with [error={}]",
job_id,
limit,
offset,
e
);

BulkError::Csv(CsvError::CsvLibWriter(Box::new(e)))
})?;

Ok(data)
}

pub fn get_bulk_job_result(
o: Option<Pool<Postgres>>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("v0" / "bulk" / i32 / "results")
.and(warp::get())
.and(with_db(o))
.and(warp::query::<JobResultRequest>())
.and_then(job_result)
// View access logs by setting `RUST_LOG=reacher`.
.with(warp::log(LOG_TARGET))
}
Loading

0 comments on commit b3670fc

Please sign in to comment.