From 6be9085b70d1f9fd4df3b5668183a9b5839d6ca1 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Wed, 28 Feb 2024 16:46:49 +0100 Subject: [PATCH 1/3] Fix missing message --- src/utils/aws.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/utils/aws.rs b/src/utils/aws.rs index b388dde..75d9870 100644 --- a/src/utils/aws.rs +++ b/src/utils/aws.rs @@ -63,17 +63,17 @@ pub async fn sqs_dequeue( Ok(messages) } -#[tracing::instrument(skip(client, message))] +#[tracing::instrument(skip(client, payload))] pub async fn sqs_enqueue( client: &aws_sdk_sqs::Client, queue_url: &str, message_group_id: &str, - message: T, + payload: T, ) -> eyre::Result<()> where T: Serialize + Debug, { - let body = serde_json::to_string(&message) + let body = serde_json::to_string(&payload) .wrap_err("Failed to serialize message")?; let message_attributes = construct_message_attributes()?; @@ -87,7 +87,7 @@ where .send() .await?; - tracing::info!(?send_message_output, ?message, "Enqueued message"); + tracing::info!(?send_message_output, ?payload, "Enqueued message"); Ok(()) } From 827b0dcd473d2c5221f149a3ee16754cfecd25f8 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Wed, 28 Feb 2024 16:49:47 +0100 Subject: [PATCH 2/3] Adjust traces --- src/coordinator.rs | 169 ++++++++++++++++++++++++--------------------- src/participant.rs | 119 +++++++++++++++++-------------- 2 files changed, 159 insertions(+), 129 deletions(-) diff --git a/src/coordinator.rs b/src/coordinator.rs index a8d5aad..38c602c 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use aws_sdk_sqs::types::Message; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; use futures::{future, StreamExt}; @@ -11,7 +12,6 @@ use tokio::net::TcpStream; use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinHandle; -use tracing::instrument; use crate::bits::Bits; use crate::config::CoordinatorConfig; @@ -90,47 +90,52 @@ impl Coordinator { self: Arc, ) -> Result<(), eyre::Error> { loop { - self.process_uniqueness_check_queue().await?; + let messages = sqs_dequeue( + &self.sqs_client, + &self.config.queues.queries_queue_url, + ) + .await?; + + for message in messages { + self.handle_uniqueness_check(message).await?; + } } } - #[tracing::instrument(skip(self))] - pub async fn process_uniqueness_check_queue(&self) -> eyre::Result<()> { - let messages = sqs_dequeue( - &self.sqs_client, - &self.config.queues.queries_queue_url, - ) - .await?; + #[tracing::instrument(skip(self, message))] + pub async fn handle_uniqueness_check( + &self, + message: Message, + ) -> eyre::Result<()> { + tracing::debug!(?message, "Handling message"); + + let receipt_handle = message + .receipt_handle + .context("Missing receipt handle in message")?; + + if let Some(message_attributes) = &message.message_attributes { + utils::aws::trace_from_message_attributes( + message_attributes, + &receipt_handle, + )?; + } else { + tracing::warn!( + ?receipt_handle, + "SQS message missing message attributes" + ); + } - for message in messages { - let receipt_handle = message - .receipt_handle - .context("Missing receipt handle in message")?; - - if let Some(message_attributes) = &message.message_attributes { - utils::aws::trace_from_message_attributes( - message_attributes, - &receipt_handle, - )?; - } else { - tracing::warn!( - ?receipt_handle, - "SQS message missing message attributes" - ); - } + let body = message.body.context("Missing message body")?; - let body = message.body.context("Missing message body")?; + let UniquenessCheckRequest { + plain_code: template, + signup_id, + } = serde_json::from_str(&body).context("Failed to parse message")?; - let UniquenessCheckRequest { - plain_code: template, - signup_id, - } = serde_json::from_str(&body) - .context("Failed to parse message")?; + // Process the query + self.uniqueness_check(receipt_handle, template, signup_id) + .await?; - // Process the query - self.uniqueness_check(receipt_handle, template, signup_id) - .await?; - } Ok(()) } @@ -445,58 +450,68 @@ impl Coordinator { async fn handle_db_sync(self: Arc) -> eyre::Result<()> { loop { - self.db_sync().await?; + let messages = sqs_dequeue( + &self.sqs_client, + &self.config.queues.db_sync_queue_url, + ) + .await?; + + if messages.is_empty() { + tokio::time::sleep(IDLE_SLEEP_TIME).await; + return Ok(()); + } + + for message in messages { + self.db_sync(message).await?; + } } } - #[instrument(skip(self))] - async fn db_sync(&self) -> eyre::Result<()> { - let messages = sqs_dequeue( - &self.sqs_client, - &self.config.queues.db_sync_queue_url, - ) - .await?; - - if messages.is_empty() { - tokio::time::sleep(IDLE_SLEEP_TIME).await; - return Ok(()); + #[tracing::instrument(skip(self, message))] + async fn db_sync(&self, message: Message) -> eyre::Result<()> { + let receipt_handle = message + .receipt_handle + .context("Missing receipt handle in message")?; + + if let Some(message_attributes) = &message.message_attributes { + utils::aws::trace_from_message_attributes( + message_attributes, + &receipt_handle, + )?; + } else { + tracing::warn!( + ?receipt_handle, + "SQS message missing message attributes" + ); } - for message in messages { - let body = message.body.context("Missing message body")?; - let receipt_handle = message - .receipt_handle - .context("Missing receipt handle in message")?; + let body = message.body.context("Missing message body")?; - let items = if let Ok(items) = - serde_json::from_str::>(&body) - { - items - } else { - tracing::error!( - ?receipt_handle, - "Failed to parse message body" - ); - continue; - }; + let items = if let Ok(items) = + serde_json::from_str::>(&body) + { + items + } else { + tracing::error!(?receipt_handle, "Failed to parse message body"); + return Ok(()); + }; - let masks: Vec<_> = - items.into_iter().map(|item| (item.id, item.mask)).collect(); + let masks: Vec<_> = + items.into_iter().map(|item| (item.id, item.mask)).collect(); - tracing::info!( - num_new_masks = masks.len(), - "Inserting masks into database" - ); + tracing::info!( + num_new_masks = masks.len(), + "Inserting masks into database" + ); - self.database.insert_masks(&masks).await?; + self.database.insert_masks(&masks).await?; - sqs_delete_message( - &self.sqs_client, - &self.config.queues.db_sync_queue_url, - receipt_handle, - ) - .await?; - } + sqs_delete_message( + &self.sqs_client, + &self.config.queues.db_sync_queue_url, + receipt_handle, + ) + .await?; Ok(()) } diff --git a/src/participant.rs b/src/participant.rs index d6aa776..34bb718 100644 --- a/src/participant.rs +++ b/src/participant.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use aws_sdk_sqs::types::Message; use distance::Template; use eyre::ContextCompat; use futures::stream::FuturesUnordered; @@ -17,6 +18,7 @@ use tracing::instrument; use crate::config::ParticipantConfig; use crate::db::Db; use crate::distance::{self, encode, DistanceEngine, EncodedBits}; +use crate::utils; use crate::utils::aws::{ sqs_client_from_config, sqs_delete_message, sqs_dequeue, }; @@ -61,7 +63,7 @@ impl Participant { let mut tasks = FuturesUnordered::new(); - tasks.push(tokio::spawn(self.clone().handle_uniqueness_check())); + tasks.push(tokio::spawn(self.clone().handle_uniqueness_checks())); tasks.push(tokio::spawn(self.clone().handle_db_sync())); while let Some(result) = tasks.next().await { @@ -71,16 +73,19 @@ impl Participant { Ok(()) } - async fn handle_uniqueness_check(self: Arc) -> eyre::Result<()> { + async fn handle_uniqueness_checks(self: Arc) -> eyre::Result<()> { loop { - self.process_uniqueness_check_stream().await?; + let stream = self.listener.accept().await?.0; + self.handle_uniqueness_check(stream).await?; } } #[tracing::instrument(skip(self))] - async fn process_uniqueness_check_stream(&self) -> eyre::Result<()> { - let mut stream = - tokio::io::BufWriter::new(self.listener.accept().await?.0); + async fn handle_uniqueness_check( + &self, + stream: TcpStream, + ) -> eyre::Result<()> { + let mut stream = tokio::io::BufWriter::new(stream); // Process the trace and span ids to correlate traces between services self.handle_traces_payload(&mut stream).await?; @@ -163,59 +168,69 @@ impl Participant { async fn handle_db_sync(self: Arc) -> eyre::Result<()> { loop { - self.db_sync().await?; + let messages = sqs_dequeue( + &self.sqs_client, + &self.config.queues.db_sync_queue_url, + ) + .await?; + + if messages.is_empty() { + tokio::time::sleep(IDLE_SLEEP_TIME).await; + return Ok(()); + } + + for message in messages { + self.db_sync(message).await?; + } } } - #[tracing::instrument(skip(self))] - async fn db_sync(&self) -> eyre::Result<()> { - let messages = sqs_dequeue( - &self.sqs_client, - &self.config.queues.db_sync_queue_url, - ) - .await?; + #[tracing::instrument(skip(self, message))] + async fn db_sync(&self, message: Message) -> eyre::Result<()> { + let receipt_handle = message + .receipt_handle + .context("Missing receipt handle in message")?; + + if let Some(message_attributes) = &message.message_attributes { + utils::aws::trace_from_message_attributes( + message_attributes, + &receipt_handle, + )?; + } else { + tracing::warn!( + ?receipt_handle, + "SQS message missing message attributes" + ); + } - if messages.is_empty() { - tokio::time::sleep(IDLE_SLEEP_TIME).await; + let body = message.body.context("Missing message body")?; + + let items = if let Ok(items) = + serde_json::from_str::>(&body) + { + items + } else { + tracing::error!(?receipt_handle, "Failed to parse message body"); return Ok(()); - } + }; - for message in messages { - let body = message.body.context("Missing message body")?; - let receipt_handle = message - .receipt_handle - .context("Missing receipt handle in message")?; - - let items = if let Ok(items) = - serde_json::from_str::>(&body) - { - items - } else { - tracing::error!( - ?receipt_handle, - "Failed to parse message body" - ); - continue; - }; - - let shares: Vec<_> = items - .into_iter() - .map(|item| (item.id, item.share)) - .collect(); - - tracing::info!( - num_new_shares = shares.len(), - "Inserting shares into database" - ); - self.database.insert_shares(&shares).await?; + let shares: Vec<_> = items + .into_iter() + .map(|item| (item.id, item.share)) + .collect(); - sqs_delete_message( - &self.sqs_client, - &self.config.queues.db_sync_queue_url, - receipt_handle, - ) - .await?; - } + tracing::info!( + num_new_shares = shares.len(), + "Inserting shares into database" + ); + self.database.insert_shares(&shares).await?; + + sqs_delete_message( + &self.sqs_client, + &self.config.queues.db_sync_queue_url, + receipt_handle, + ) + .await?; Ok(()) } From 36b3edc7cf1a117a9761fbd83c917dd02fdf342d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Tr=C4=85d?= Date: Wed, 28 Feb 2024 17:45:21 +0100 Subject: [PATCH 3/3] Offset sequence by 1 (#66) * Offset sequence by 1 * Fix db sync --- bin/e2e/e2e.rs | 10 +++------ src/coordinator.rs | 54 ++++++---------------------------------------- src/db.rs | 32 +++++++++++++-------------- src/distance.rs | 4 ++-- src/participant.rs | 1 - 5 files changed, 27 insertions(+), 74 deletions(-) diff --git a/bin/e2e/e2e.rs b/bin/e2e/e2e.rs index f177076..7898edb 100644 --- a/bin/e2e/e2e.rs +++ b/bin/e2e/e2e.rs @@ -112,7 +112,7 @@ async fn main() -> eyre::Result<()> { let masks = db.fetch_masks(0).await?; - (masks.len() as u64).checked_sub(1) + masks.len() as u64 }; let participant_db_sync_queues = vec![ @@ -169,18 +169,14 @@ async fn main() -> eyre::Result<()> { ); } } else { - if let Some(id) = next_serial_id.as_mut() { - *id += 1; - } else { - next_serial_id = Some(0); - } + next_serial_id += 1; common::seed_db_sync( &sqs_client, &config.db_sync.coordinator_db_sync_queue, &participant_db_sync_queues, template, - next_serial_id.context("Could not get next serial id")?, + next_serial_id, ) .await?; diff --git a/src/coordinator.rs b/src/coordinator.rs index 38c602c..f2a5cd1 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -424,10 +424,7 @@ impl Coordinator { tracing::info!(?matches, "Matches found"); } - // Latest serial id is the last id shared across all nodes - // so we need to subtract 1 from the counter - let latest_serial_id: Option = (i as u64).checked_sub(1); - let distance_results = DistanceResults::new(latest_serial_id, matches); + let distance_results = DistanceResults::new(i as u64, matches); Ok(distance_results) } @@ -458,7 +455,6 @@ impl Coordinator { if messages.is_empty() { tokio::time::sleep(IDLE_SLEEP_TIME).await; - return Ok(()); } for message in messages { @@ -531,49 +527,11 @@ pub struct UniquenessCheckRequest { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct UniquenessCheckResult { - #[serde(with = "some_or_minus_one")] - pub serial_id: Option, + pub serial_id: u64, pub matches: Vec, pub signup_id: String, } -mod some_or_minus_one { - use serde::Deserialize; - - pub fn serialize( - value: &Option, - serializer: S, - ) -> Result - where - S: serde::Serializer, - { - if let Some(value) = value { - serializer.serialize_u64(*value) - } else { - serializer.serialize_i64(-1) - } - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, - { - let value = i64::deserialize(deserializer)?; - - if value < -1 { - return Err(serde::de::Error::custom( - "value must be -1 or greater", - )); - } - - if value == -1 { - Ok(None) - } else { - Ok(Some(value as u64)) - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -603,7 +561,7 @@ mod tests { #[test] fn result_serialization() { let output = UniquenessCheckResult { - serial_id: Some(1), + serial_id: 1, matches: vec![Distance::new(0, 0.5), Distance::new(1, 0.2)], signup_id: "signup_id".to_string(), }; @@ -636,16 +594,16 @@ mod tests { } #[test] - fn result_serialization_no_serial_id() { + fn result_serialization_zero_serial_id() { let output = UniquenessCheckResult { - serial_id: None, + serial_id: 0, matches: vec![Distance::new(0, 0.5), Distance::new(1, 0.2)], signup_id: "signup_id".to_string(), }; const EXPECTED: &str = indoc::indoc! {r#" { - "serial_id": -1, + "serial_id": 0, "matches": [ { "distance": 0.5, diff --git a/src/db.rs b/src/db.rs index 1901df6..5c262c9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -42,7 +42,7 @@ impl Db { r#" SELECT id, mask FROM masks - WHERE id >= $1 + WHERE id > $1 ORDER BY id ASC "#, ) @@ -50,7 +50,7 @@ impl Db { .fetch_all(&self.pool) .await?; - Ok(filter_sequential_items(masks, id as i64)) + Ok(filter_sequential_items(masks, 1 + id as i64)) } #[tracing::instrument(skip(self))] @@ -90,7 +90,7 @@ impl Db { r#" SELECT id, share FROM shares - WHERE id >= $1 + WHERE id > $1 ORDER BY id ASC "#, ) @@ -98,7 +98,7 @@ impl Db { .fetch_all(&self.pool) .await?; - Ok(filter_sequential_items(shares, id as i64)) + Ok(filter_sequential_items(shares, 1 + id as i64)) } #[tracing::instrument(skip(self))] @@ -195,7 +195,7 @@ mod tests { let mut rng = thread_rng(); - let masks = vec![(0, rng.gen::()), (1, rng.gen::())]; + let masks = vec![(1, rng.gen::()), (2, rng.gen::())]; db.insert_masks(&masks).await?; @@ -215,7 +215,7 @@ mod tests { let mut rng = thread_rng(); - let masks = vec![(0, rng.gen::()), (1, rng.gen::())]; + let masks = vec![(1, rng.gen::()), (2, rng.gen::())]; db.insert_masks(&masks).await?; @@ -244,7 +244,7 @@ mod tests { let mut rng = thread_rng(); let shares = - vec![(0, rng.gen::()), (1, rng.gen::())]; + vec![(1, rng.gen::()), (2, rng.gen::())]; db.insert_shares(&shares).await?; @@ -265,7 +265,7 @@ mod tests { let mut rng = thread_rng(); let shares = - vec![(0, rng.gen::()), (1, rng.gen::())]; + vec![(1, rng.gen::()), (2, rng.gen::())]; db.insert_shares(&shares).await?; @@ -283,11 +283,11 @@ mod tests { let mut rng = thread_rng(); let shares = vec![ - (0, rng.gen::()), (1, rng.gen::()), - (4, rng.gen::()), + (2, rng.gen::()), (5, rng.gen::()), - (7, rng.gen::()), + (6, rng.gen::()), + (8, rng.gen::()), ]; db.insert_shares(&shares).await?; @@ -308,11 +308,11 @@ mod tests { let mut rng = thread_rng(); let masks = vec![ - (0, rng.gen::()), (1, rng.gen::()), (2, rng.gen::()), (3, rng.gen::()), - (5, rng.gen::()), + (4, rng.gen::()), + (6, rng.gen::()), ]; db.insert_masks(&masks).await?; @@ -335,11 +335,11 @@ mod tests { let mut rng = thread_rng(); let masks = vec![ - (0, rng.gen::()), (1, rng.gen::()), (2, rng.gen::()), (3, rng.gen::()), - (5, rng.gen::()), + (4, rng.gen::()), + (6, rng.gen::()), ]; db.insert_masks(&masks).await?; @@ -359,11 +359,11 @@ mod tests { let mut rng = thread_rng(); let masks = vec![ - (1, rng.gen::()), (2, rng.gen::()), (3, rng.gen::()), (4, rng.gen::()), (5, rng.gen::()), + (6, rng.gen::()), ]; db.insert_masks(&masks).await?; diff --git a/src/distance.rs b/src/distance.rs index b70cbb2..17c85ef 100644 --- a/src/distance.rs +++ b/src/distance.rs @@ -82,13 +82,13 @@ impl PartialEq for Distance { #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct DistanceResults { /// The lowest serial id known across all nodes - pub serial_id: Option, + pub serial_id: u64, /// The distances to the query pub matches: Vec, } impl DistanceResults { - pub fn new(serial_id: Option, matches: Vec) -> Self { + pub fn new(serial_id: u64, matches: Vec) -> Self { Self { serial_id, matches } } } diff --git a/src/participant.rs b/src/participant.rs index 34bb718..9f7af8c 100644 --- a/src/participant.rs +++ b/src/participant.rs @@ -176,7 +176,6 @@ impl Participant { if messages.is_empty() { tokio::time::sleep(IDLE_SLEEP_TIME).await; - return Ok(()); } for message in messages {