From 5f2d21d0073a82ee90cd43a7c36990957525ea29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20H=C3=A4ggblad?= Date: Tue, 12 Nov 2024 17:04:46 +0100 Subject: [PATCH] Background refresh of zknyms --- .../src/controller.rs | 68 ++++++++++++++++--- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/nym-vpn-core/crates/nym-vpn-account-controller/src/controller.rs b/nym-vpn-core/crates/nym-vpn-account-controller/src/controller.rs index 4546b0ed55..9aecab29e6 100644 --- a/nym-vpn-core/crates/nym-vpn-account-controller/src/controller.rs +++ b/nym-vpn-core/crates/nym-vpn-account-controller/src/controller.rs @@ -39,6 +39,16 @@ use crate::{ AvailableTicketbooks, }; +// The interval at which we automatically request zk-nyms +const ZK_NYM_AUTOMATIC_REQUEST_INTERVAL: Duration = Duration::from_secs(10 * 60); + +// The maximum number of zk-nym requests that can fail in a row before we disable background +// refresh +const ZK_NYM_MAX_FAILS: u32 = 10; + +// The interval at which we update the account state +const ACCOUNT_UPDATE_INTERVAL: Duration = Duration::from_secs(5 * 60); + pub(crate) type PendingCommands = Arc>>; pub(crate) type DevicesResponse = Arc>>; pub(crate) type AccountSummaryResponse = @@ -68,6 +78,9 @@ where // changed. last_devices: DevicesResponse, + // If we have multiple fails in a row, disable background refresh + zk_nym_fails_in_row: u32, + // Tasks used to poll the status of zk-nyms polling_tasks: JoinSet, @@ -121,6 +134,7 @@ where account_state: SharedAccountState::new(), last_account_summary: Arc::new(tokio::sync::Mutex::new(None)), last_devices: Arc::new(tokio::sync::Mutex::new(None)), + zk_nym_fails_in_row: 0, polling_tasks: JoinSet::new(), command_rx, command_tx, @@ -303,19 +317,30 @@ where // Check the local credential storage to see if we need to request more zk-nyms, the proceed // to request zk-nyms for each ticket type that we need. async fn handle_request_zk_nym(&mut self) -> Result<(), Error> { - let account = self.account_storage.load_account().await?; - let device = self.account_storage.load_device_keys().await?; + if self.shared_state().lock().await.pending_zk_nym { + tracing::info!("zk-nym request already in progress, skipping"); + return Ok(()); + } tracing::info!("Checking which ticket types are running low"); let ticket_types_needed_to_request = self .credential_storage .check_ticket_types_running_low() .await?; + + if ticket_types_needed_to_request.is_empty() { + tracing::info!("No ticket types running low, skipping zk-nym request"); + return Ok(()); + } + tracing::info!( "Ticket types running low: {:?}", ticket_types_needed_to_request ); + let account = self.account_storage.load_account().await?; + let device = self.account_storage.load_device_keys().await?; + // Request zk-nyms for each ticket type that we need let responses = futures::stream::iter(ticket_types_needed_to_request) .filter_map(|ticket_type| { @@ -485,13 +510,21 @@ where PollingResult::Finished(response, ticketbook_type, request_info, request) if response.status == NymVpnZkNymStatus::Active => { - tracing::info!("Polling finished succesfully, importing ticketbook"); - self.import_zk_nym(response, ticketbook_type, *request_info, *request) + let id = response.id.clone(); + tracing::info!("Polling finished succesfully, importing ticketbook: {id}",); + match self + .import_zk_nym(response, ticketbook_type, *request_info, *request) .await - .inspect_err(|err| { + { + Ok(_) => { + tracing::info!("Successfully imported zk-nym: {}", id); + self.zk_nym_fails_in_row = 0; + } + Err(err) => { tracing::error!("Failed to import zk-nym: {:#?}", err); - }) - .ok(); + self.zk_nym_fails_in_row += 1; + } + }; } PollingResult::Finished(response, _, _, _) => { tracing::warn!( @@ -500,12 +533,15 @@ where response.status, ); tracing::warn!("Not importing zk-nym: {}", response.id); + self.zk_nym_fails_in_row += 1; } PollingResult::Timeout(response) => { tracing::info!("Polling task timed out: {:#?}", response); + self.zk_nym_fails_in_row += 1; } PollingResult::Error(error) => { tracing::error!("Polling task failed for {}: {:#?}", error.id, error.error); + self.zk_nym_fails_in_row += 1; } } } @@ -633,14 +669,20 @@ where pub async fn run(mut self) { self.print_info().await; - // Timer to check if any command tasks have finished + // Timer to check if any command tasks have finished. This just needs to be something small + // so that we periodically check the results without interfering with other tasks let mut command_finish_timer = tokio::time::interval(Duration::from_millis(500)); - // Timer to check if any zk-nym polling tasks have finished + // Timer to check if any zk-nym polling tasks have finished. This just needs to be + // something small so that we periodically check the results without interfering with other + // tasks let mut polling_timer = tokio::time::interval(Duration::from_millis(500)); // Timer to periodically refresh the remote account state - let mut update_account_state_timer = tokio::time::interval(Duration::from_secs(5 * 60)); + let mut update_account_state_timer = tokio::time::interval(ACCOUNT_UPDATE_INTERVAL); + + // Timer to periodically check if we need to request more zk-nyms + let mut update_zk_nym_timer = tokio::time::interval(ZK_NYM_AUTOMATIC_REQUEST_INTERVAL); loop { tokio::select! { @@ -665,9 +707,13 @@ where self.update_pending_zk_nym_tasks().await; } // On a timer we want to refresh the account state - _ = update_account_state_timer.tick() => { + _ = update_account_state_timer.tick(), if self.zk_nym_fails_in_row < ZK_NYM_MAX_FAILS => { self.queue_command(AccountCommand::UpdateAccountState); } + // On a timer to check if we need to request more zk-nyms + _ = update_zk_nym_timer.tick() => { + self.queue_command(AccountCommand::RequestZkNym); + } _ = self.cancel_token.cancelled() => { tracing::trace!("Received cancellation signal"); break;