diff --git a/schemas/database/010_latest_insertion_timestamp.sql b/schemas/database/010_latest_insertion_timestamp.sql new file mode 100644 index 00000000..52a091da --- /dev/null +++ b/schemas/database/010_latest_insertion_timestamp.sql @@ -0,0 +1,6 @@ +CREATE TABLE latest_insertion_timestamp ( + Lock char(1) NOT NULL DEFAULT 'X', + insertion_timestamp TIMESTAMPTZ, + constraint PK_T2 PRIMARY KEY (Lock), + constraint CK_T2_Locked CHECK (Lock='X') +); \ No newline at end of file diff --git a/src/database/mod.rs b/src/database/mod.rs index e02597bc..7a4a4982 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -366,6 +366,19 @@ impl Database { })) } + pub async fn get_latest_insertion_timestamp(&self) -> Result>, Error> { + let query = sqlx::query( + r#" + SELECT insertion_timestamp + FROM latest_insertion_timestamp + WHERE Lock = 'X';"#, + ); + + let row = self.pool.fetch_optional(query).await?; + + Ok(row.map(|r| r.get::, _>(0))) + } + pub async fn count_unprocessed_identities(&self) -> Result { let query = sqlx::query( r#" @@ -537,6 +550,24 @@ impl Database { } } + pub async fn update_latest_insertion_timestamp( + &self, + insertion_timestamp: DateTime, + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO latest_insertion_timestamp (Lock, insertion_timestamp) + VALUES ('X', $1) + ON CONFLICT (Lock) + DO UPDATE SET insertion_timestamp = EXCLUDED.insertion_timestamp; + "#, + ) + .bind(insertion_timestamp); + + self.pool.execute(query).await?; + Ok(()) + } + pub async fn update_latest_deletion( &self, deletion_timestamp: DateTime, @@ -1141,6 +1172,22 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_update_insertion_timestamp() -> anyhow::Result<()> { + let (db, _db_container) = setup_db().await?; + + let insertion_timestamp = Utc::now(); + + db.update_latest_insertion_timestamp(insertion_timestamp) + .await?; + + let latest_insertion_timestamp = db.get_latest_insertion_timestamp().await?.unwrap(); + + assert!(latest_insertion_timestamp.timestamp() - insertion_timestamp.timestamp() <= 1); + + Ok(()) + } + #[tokio::test] async fn test_insert_deletion() -> anyhow::Result<()> { let (db, _db_container) = setup_db().await?; diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 929440e1..4cf925ab 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -1,7 +1,8 @@ use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use anyhow::{Context, Result as AnyhowResult}; +use chrono::{DateTime, Utc}; use ethers::types::U256; use ruint::Uint; use semaphore::merkle_tree::Proof; @@ -86,7 +87,10 @@ async fn process_identities( // inserted. If we have an incomplete batch but are within a small delta of the // tick happening anyway in the wake branch, we insert the current // (possibly-incomplete) batch anyway. - let mut last_batch_time: SystemTime = SystemTime::now(); + let mut last_batch_time: DateTime = database + .get_latest_insertion_timestamp() + .await? + .unwrap_or(Utc::now()); loop { // We ping-pong between two cases for being woken. This ensures that there is a @@ -120,7 +124,8 @@ async fn process_identities( &updates, ).await?; - last_batch_time = SystemTime::now(); + last_batch_time = Utc::now(); + database.update_latest_insertion_timestamp(last_batch_time).await?; // Also wake up if woken up due to a tick wake_up_notify.notify_one(); @@ -135,15 +140,12 @@ async fn process_identities( // We unconditionally convert `u64 -> i64` as numbers should // always be small. If the numbers are not always small then // we _want_ to panic as something is horribly broken. - let current_time = SystemTime::now(); - let diff_secs = if let Ok(diff) = current_time.duration_since(last_batch_time) { - diff.as_secs() - } else { - warn!("Identity committer thinks that the last batch is in the future."); - continue - }; + let current_time = Utc::now(); + let diff_secs = current_time - last_batch_time; + #[allow(clippy::cast_sign_loss)] + let diff_secs_u64 = diff_secs.num_seconds() as u64; let should_process_anyway = - timeout_secs.abs_diff(diff_secs) <= DEBOUNCE_THRESHOLD_SECS; + timeout_secs.abs_diff(diff_secs_u64) <= DEBOUNCE_THRESHOLD_SECS; let next_update = batching_tree.peek_next_updates(1); if next_update.is_empty() { @@ -185,7 +187,8 @@ async fn process_identities( // we don't trigger again until either we get a full batch // or the timer ticks. timer.reset(); - last_batch_time = SystemTime::now(); + last_batch_time = Utc::now(); + database.update_latest_insertion_timestamp(last_batch_time).await?; // We want to check if there's a full batch available immediately wake_up_notify.notify_one();