From 0ad7d8ef56d53ef3befd59411319599ba443ec12 Mon Sep 17 00:00:00 2001 From: Piotr Heilman Date: Mon, 25 Nov 2024 17:38:43 +0100 Subject: [PATCH] Skip ticks when missed. --- src/task_monitor/tasks/create_batches.rs | 3 ++- src/task_monitor/tasks/delete_identities.rs | 2 ++ src/task_monitor/tasks/insert_identities.rs | 2 ++ src/task_monitor/tasks/process_batches.rs | 7 ++++--- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/task_monitor/tasks/create_batches.rs b/src/task_monitor/tasks/create_batches.rs index f65e14ef..e915c557 100644 --- a/src/task_monitor/tasks/create_batches.rs +++ b/src/task_monitor/tasks/create_batches.rs @@ -7,6 +7,7 @@ use semaphore::poseidon_tree::{Branch, PoseidonHash}; use std::sync::Arc; use std::time::Duration; use tokio::sync::Notify; +use tokio::time::MissedTickBehavior; use tokio::{select, time}; use tracing::instrument; @@ -40,7 +41,7 @@ pub async fn create_batches( // We start a timer and force it to perform one initial tick to avoid an // immediate trigger. let mut timer = time::interval(Duration::from_secs(5)); - timer.tick().await; + timer.set_missed_tick_behavior(MissedTickBehavior::Skip); // When both futures are woken at once, the choice is made // non-deterministically. This could, in the worst case, result in users waiting diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 78a4a288..dc289351 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -5,6 +5,7 @@ use std::time::Duration; use anyhow::Context; use chrono::Utc; use tokio::sync::{Mutex, Notify}; +use tokio::time::MissedTickBehavior; use tokio::{select, time}; use tracing::info; @@ -30,6 +31,7 @@ pub async fn delete_identities( .context("Invalid batch deletion timeout duration")?; let mut timer = time::interval(Duration::from_secs(5)); + timer.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { select! { diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index 0120b41f..b85760d9 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, Notify}; +use tokio::time::MissedTickBehavior; use tokio::{select, time}; use tracing::info; @@ -23,6 +24,7 @@ pub async fn insert_identities( info!("Starting insertion processor task."); let mut timer = time::interval(Duration::from_secs(5)); + timer.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { select! { diff --git a/src/task_monitor/tasks/process_batches.rs b/src/task_monitor/tasks/process_batches.rs index 74efffe3..65df7d02 100644 --- a/src/task_monitor/tasks/process_batches.rs +++ b/src/task_monitor/tasks/process_batches.rs @@ -1,12 +1,12 @@ use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, Notify}; -use tokio::{select, time}; - use crate::app::App; use crate::database::methods::DbMethods as _; use crate::identity::processor::TransactionId; +use tokio::sync::{mpsc, Notify}; +use tokio::time::MissedTickBehavior; +use tokio::{select, time}; pub async fn process_batches( app: Arc, @@ -23,6 +23,7 @@ pub async fn process_batches( tracing::info!("Starting identity processor."); let mut timer = time::interval(Duration::from_secs(5)); + timer.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { // We wait either for a timer tick or a full batch