diff --git a/rust/agents/validator/src/submit.rs b/rust/agents/validator/src/submit.rs index c80d590ef8..3de0a798af 100644 --- a/rust/agents/validator/src/submit.rs +++ b/rust/agents/validator/src/submit.rs @@ -167,7 +167,7 @@ impl ValidatorSubmitter { // // tree.index() will panic if the tree is empty, so we use tree.count() instead // and convert the correctness_checkpoint.index to a count by adding 1. - while correctness_checkpoint.index + 1 > tree.count() as u32 { + while tree.count() as u32 <= correctness_checkpoint.index { if let Some(insertion) = self .message_db .retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))? @@ -239,6 +239,8 @@ impl ValidatorSubmitter { &self, checkpoints: Vec, ) -> Result<()> { + let last_checkpoint = checkpoints.as_slice()[checkpoints.len() - 1]; + for queued_checkpoint in checkpoints { let existing = self .checkpoint_syncer @@ -260,9 +262,15 @@ impl ValidatorSubmitter { "Signed and submitted checkpoint" ); + // TODO: move these into S3 implementations // small sleep before signing next checkpoint to avoid rate limiting sleep(Duration::from_millis(100)).await; } + + self.checkpoint_syncer + .update_latest_index(last_checkpoint.index) + .await?; + Ok(()) } } diff --git a/rust/hyperlane-base/src/traits/checkpoint_syncer.rs b/rust/hyperlane-base/src/traits/checkpoint_syncer.rs index e017b3f5aa..abec982c7d 100644 --- a/rust/hyperlane-base/src/traits/checkpoint_syncer.rs +++ b/rust/hyperlane-base/src/traits/checkpoint_syncer.rs @@ -10,6 +10,16 @@ use hyperlane_core::{SignedAnnouncement, SignedCheckpointWithMessageId}; pub trait CheckpointSyncer: Debug + Send + Sync { /// Read the highest index of this Syncer async fn latest_index(&self) -> Result>; + /// Writes the highest index of this Syncer + async fn write_latest_index(&self, index: u32) -> Result<()>; + /// Update the latest index of this syncer if necessary + async fn update_latest_index(&self, index: u32) -> Result<()> { + let curr = self.latest_index().await?.unwrap_or(0); + if index > curr { + self.write_latest_index(index).await?; + } + Ok(()) + } /// Attempt to fetch the signed (checkpoint, messageId) tuple at this index async fn fetch_checkpoint(&self, index: u32) -> Result>; /// Write the signed (checkpoint, messageId) tuple to this syncer diff --git a/rust/hyperlane-base/src/types/local_storage.rs b/rust/hyperlane-base/src/types/local_storage.rs index 76c02a38cd..38397c1bf6 100644 --- a/rust/hyperlane-base/src/types/local_storage.rs +++ b/rust/hyperlane-base/src/types/local_storage.rs @@ -62,6 +62,14 @@ impl CheckpointSyncer for LocalStorage { } } + async fn write_latest_index(&self, index: u32) -> Result<()> { + let path = self.latest_index_file_path(); + tokio::fs::write(&path, index.to_string()) + .await + .with_context(|| format!("Writing index to {path:?}"))?; + Ok(()) + } + async fn fetch_checkpoint(&self, index: u32) -> Result> { let Ok(data) = tokio::fs::read(self.checkpoint_file_path(index)).await else { return Ok(None); diff --git a/rust/hyperlane-base/src/types/s3_storage.rs b/rust/hyperlane-base/src/types/s3_storage.rs index cfce7ff3e5..2d09a18a86 100644 --- a/rust/hyperlane-base/src/types/s3_storage.rs +++ b/rust/hyperlane-base/src/types/s3_storage.rs @@ -131,7 +131,7 @@ impl S3Storage { format!("checkpoint_{index}_with_id.json") } - fn index_key() -> String { + fn latest_index_key() -> String { "checkpoint_latest_index.json".to_owned() } @@ -144,7 +144,7 @@ impl S3Storage { impl CheckpointSyncer for S3Storage { async fn latest_index(&self) -> Result> { let ret = self - .anonymously_read_from_bucket(S3Storage::index_key()) + .anonymously_read_from_bucket(S3Storage::latest_index_key()) .await? .map(|data| serde_json::from_slice(&data)) .transpose() @@ -159,6 +159,13 @@ impl CheckpointSyncer for S3Storage { ret } + async fn write_latest_index(&self, index: u32) -> Result<()> { + let serialized_index = serde_json::to_string(&index)?; + self.write_to_bucket(S3Storage::latest_index_key(), &serialized_index) + .await?; + Ok(()) + } + async fn fetch_checkpoint(&self, index: u32) -> Result> { self.anonymously_read_from_bucket(S3Storage::checkpoint_key(index)) .await?