Skip to content

Commit

Permalink
Background refresh of zknyms
Browse files Browse the repository at this point in the history
  • Loading branch information
octol committed Nov 12, 2024
1 parent 143d84e commit 5f2d21d
Showing 1 changed file with 57 additions and 11 deletions.
68 changes: 57 additions & 11 deletions nym-vpn-core/crates/nym-vpn-account-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ use crate::{
AvailableTicketbooks,
};

// The interval at which we automatically request zk-nyms
const ZK_NYM_AUTOMATIC_REQUEST_INTERVAL: Duration = Duration::from_secs(10 * 60);

// The maximum number of zk-nym requests that can fail in a row before we disable background
// refresh
const ZK_NYM_MAX_FAILS: u32 = 10;

// The interval at which we update the account state
const ACCOUNT_UPDATE_INTERVAL: Duration = Duration::from_secs(5 * 60);

pub(crate) type PendingCommands = Arc<std::sync::Mutex<HashMap<uuid::Uuid, String>>>;
pub(crate) type DevicesResponse = Arc<tokio::sync::Mutex<Option<NymVpnDevicesResponse>>>;
pub(crate) type AccountSummaryResponse =
Expand Down Expand Up @@ -68,6 +78,9 @@ where
// changed.
last_devices: DevicesResponse,

// If we have multiple fails in a row, disable background refresh
zk_nym_fails_in_row: u32,

// Tasks used to poll the status of zk-nyms
polling_tasks: JoinSet<PollingResult>,

Expand Down Expand Up @@ -121,6 +134,7 @@ where
account_state: SharedAccountState::new(),
last_account_summary: Arc::new(tokio::sync::Mutex::new(None)),
last_devices: Arc::new(tokio::sync::Mutex::new(None)),
zk_nym_fails_in_row: 0,
polling_tasks: JoinSet::new(),
command_rx,
command_tx,
Expand Down Expand Up @@ -303,19 +317,30 @@ where
// Check the local credential storage to see if we need to request more zk-nyms, the proceed
// to request zk-nyms for each ticket type that we need.
async fn handle_request_zk_nym(&mut self) -> Result<(), Error> {
let account = self.account_storage.load_account().await?;
let device = self.account_storage.load_device_keys().await?;
if self.shared_state().lock().await.pending_zk_nym {
tracing::info!("zk-nym request already in progress, skipping");
return Ok(());
}

tracing::info!("Checking which ticket types are running low");
let ticket_types_needed_to_request = self
.credential_storage
.check_ticket_types_running_low()
.await?;

if ticket_types_needed_to_request.is_empty() {
tracing::info!("No ticket types running low, skipping zk-nym request");
return Ok(());
}

tracing::info!(
"Ticket types running low: {:?}",
ticket_types_needed_to_request
);

let account = self.account_storage.load_account().await?;
let device = self.account_storage.load_device_keys().await?;

// Request zk-nyms for each ticket type that we need
let responses = futures::stream::iter(ticket_types_needed_to_request)
.filter_map(|ticket_type| {
Expand Down Expand Up @@ -485,13 +510,21 @@ where
PollingResult::Finished(response, ticketbook_type, request_info, request)
if response.status == NymVpnZkNymStatus::Active =>
{
tracing::info!("Polling finished succesfully, importing ticketbook");
self.import_zk_nym(response, ticketbook_type, *request_info, *request)
let id = response.id.clone();
tracing::info!("Polling finished succesfully, importing ticketbook: {id}",);
match self
.import_zk_nym(response, ticketbook_type, *request_info, *request)
.await
.inspect_err(|err| {
{
Ok(_) => {
tracing::info!("Successfully imported zk-nym: {}", id);
self.zk_nym_fails_in_row = 0;
}
Err(err) => {
tracing::error!("Failed to import zk-nym: {:#?}", err);
})
.ok();
self.zk_nym_fails_in_row += 1;
}
};
}
PollingResult::Finished(response, _, _, _) => {
tracing::warn!(
Expand All @@ -500,12 +533,15 @@ where
response.status,
);
tracing::warn!("Not importing zk-nym: {}", response.id);
self.zk_nym_fails_in_row += 1;
}
PollingResult::Timeout(response) => {
tracing::info!("Polling task timed out: {:#?}", response);
self.zk_nym_fails_in_row += 1;
}
PollingResult::Error(error) => {
tracing::error!("Polling task failed for {}: {:#?}", error.id, error.error);
self.zk_nym_fails_in_row += 1;
}
}
}
Expand Down Expand Up @@ -633,14 +669,20 @@ where
pub async fn run(mut self) {
self.print_info().await;

// Timer to check if any command tasks have finished
// Timer to check if any command tasks have finished. This just needs to be something small
// so that we periodically check the results without interfering with other tasks
let mut command_finish_timer = tokio::time::interval(Duration::from_millis(500));

// Timer to check if any zk-nym polling tasks have finished
// Timer to check if any zk-nym polling tasks have finished. This just needs to be
// something small so that we periodically check the results without interfering with other
// tasks
let mut polling_timer = tokio::time::interval(Duration::from_millis(500));

// Timer to periodically refresh the remote account state
let mut update_account_state_timer = tokio::time::interval(Duration::from_secs(5 * 60));
let mut update_account_state_timer = tokio::time::interval(ACCOUNT_UPDATE_INTERVAL);

// Timer to periodically check if we need to request more zk-nyms
let mut update_zk_nym_timer = tokio::time::interval(ZK_NYM_AUTOMATIC_REQUEST_INTERVAL);

loop {
tokio::select! {
Expand All @@ -665,9 +707,13 @@ where
self.update_pending_zk_nym_tasks().await;
}
// On a timer we want to refresh the account state
_ = update_account_state_timer.tick() => {
_ = update_account_state_timer.tick(), if self.zk_nym_fails_in_row < ZK_NYM_MAX_FAILS => {
self.queue_command(AccountCommand::UpdateAccountState);
}
// On a timer to check if we need to request more zk-nyms
_ = update_zk_nym_timer.tick() => {
self.queue_command(AccountCommand::RequestZkNym);
}
_ = self.cancel_token.cancelled() => {
tracing::trace!("Received cancellation signal");
break;
Expand Down

0 comments on commit 5f2d21d

Please sign in to comment.