Skip to content

Commit

Permalink
Merge pull request #627 from worldcoin/dcbuild3r/add-insertion-timest…
Browse files Browse the repository at this point in the history
…amp-persistence
  • Loading branch information
dcbuild3r authored Oct 5, 2023
2 parents c43ce78 + fdeeedf commit 8297bea
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 12 deletions.
6 changes: 6 additions & 0 deletions schemas/database/010_latest_insertion_timestamp.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE latest_insertion_timestamp (
Lock char(1) NOT NULL DEFAULT 'X',
insertion_timestamp TIMESTAMPTZ,
constraint PK_T2 PRIMARY KEY (Lock),
constraint CK_T2_Locked CHECK (Lock='X')
);
47 changes: 47 additions & 0 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@ impl Database {
}))
}

pub async fn get_latest_insertion_timestamp(&self) -> Result<Option<DateTime<Utc>>, Error> {
let query = sqlx::query(
r#"
SELECT insertion_timestamp
FROM latest_insertion_timestamp
WHERE Lock = 'X';"#,
);

let row = self.pool.fetch_optional(query).await?;

Ok(row.map(|r| r.get::<DateTime<Utc>, _>(0)))
}

pub async fn count_unprocessed_identities(&self) -> Result<i32, Error> {
let query = sqlx::query(
r#"
Expand Down Expand Up @@ -537,6 +550,24 @@ impl Database {
}
}

pub async fn update_latest_insertion_timestamp(
&self,
insertion_timestamp: DateTime<Utc>,
) -> Result<(), Error> {
let query = sqlx::query(
r#"
INSERT INTO latest_insertion_timestamp (Lock, insertion_timestamp)
VALUES ('X', $1)
ON CONFLICT (Lock)
DO UPDATE SET insertion_timestamp = EXCLUDED.insertion_timestamp;
"#,
)
.bind(insertion_timestamp);

self.pool.execute(query).await?;
Ok(())
}

pub async fn update_latest_deletion(
&self,
deletion_timestamp: DateTime<Utc>,
Expand Down Expand Up @@ -1141,6 +1172,22 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_update_insertion_timestamp() -> anyhow::Result<()> {
let (db, _db_container) = setup_db().await?;

let insertion_timestamp = Utc::now();

db.update_latest_insertion_timestamp(insertion_timestamp)
.await?;

let latest_insertion_timestamp = db.get_latest_insertion_timestamp().await?.unwrap();

assert!(latest_insertion_timestamp.timestamp() - insertion_timestamp.timestamp() <= 1);

Ok(())
}

#[tokio::test]
async fn test_insert_deletion() -> anyhow::Result<()> {
let (db, _db_container) = setup_db().await?;
Expand Down
27 changes: 15 additions & 12 deletions src/task_monitor/tasks/process_identities.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::Duration;

use anyhow::{Context, Result as AnyhowResult};
use chrono::{DateTime, Utc};
use ethers::types::U256;
use ruint::Uint;
use semaphore::merkle_tree::Proof;
Expand Down Expand Up @@ -86,7 +87,10 @@ async fn process_identities(
// inserted. If we have an incomplete batch but are within a small delta of the
// tick happening anyway in the wake branch, we insert the current
// (possibly-incomplete) batch anyway.
let mut last_batch_time: SystemTime = SystemTime::now();
let mut last_batch_time: DateTime<Utc> = database
.get_latest_insertion_timestamp()
.await?
.unwrap_or(Utc::now());

loop {
// We ping-pong between two cases for being woken. This ensures that there is a
Expand Down Expand Up @@ -120,7 +124,8 @@ async fn process_identities(
&updates,
).await?;

last_batch_time = SystemTime::now();
last_batch_time = Utc::now();
database.update_latest_insertion_timestamp(last_batch_time).await?;

// Also wake up if woken up due to a tick
wake_up_notify.notify_one();
Expand All @@ -135,15 +140,12 @@ async fn process_identities(
// We unconditionally convert `u64 -> i64` as numbers should
// always be small. If the numbers are not always small then
// we _want_ to panic as something is horribly broken.
let current_time = SystemTime::now();
let diff_secs = if let Ok(diff) = current_time.duration_since(last_batch_time) {
diff.as_secs()
} else {
warn!("Identity committer thinks that the last batch is in the future.");
continue
};
let current_time = Utc::now();
let diff_secs = current_time - last_batch_time;
#[allow(clippy::cast_sign_loss)]
let diff_secs_u64 = diff_secs.num_seconds() as u64;
let should_process_anyway =
timeout_secs.abs_diff(diff_secs) <= DEBOUNCE_THRESHOLD_SECS;
timeout_secs.abs_diff(diff_secs_u64) <= DEBOUNCE_THRESHOLD_SECS;

let next_update = batching_tree.peek_next_updates(1);
if next_update.is_empty() {
Expand Down Expand Up @@ -185,7 +187,8 @@ async fn process_identities(
// we don't trigger again until either we get a full batch
// or the timer ticks.
timer.reset();
last_batch_time = SystemTime::now();
last_batch_time = Utc::now();
database.update_latest_insertion_timestamp(last_batch_time).await?;

// We want to check if there's a full batch available immediately
wake_up_notify.notify_one();
Expand Down

0 comments on commit 8297bea

Please sign in to comment.