Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix validator checkpoint syncer latest index #2858

Merged
merged 6 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion rust/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ValidatorSubmitter {
//
// tree.index() will panic if the tree is empty, so we use tree.count() instead
// and convert the correctness_checkpoint.index to a count by adding 1.
while correctness_checkpoint.index + 1 > tree.count() as u32 {
while tree.count() as u32 <= correctness_checkpoint.index {
if let Some(insertion) = self
.message_db
.retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))?
Expand Down Expand Up @@ -239,6 +239,8 @@ impl ValidatorSubmitter {
&self,
checkpoints: Vec<CheckpointWithMessageId>,
) -> Result<()> {
let last_checkpoint = checkpoints.as_slice()[checkpoints.len() - 1];
nambrot marked this conversation as resolved.
Show resolved Hide resolved

for queued_checkpoint in checkpoints {
let existing = self
.checkpoint_syncer
Expand All @@ -261,9 +263,15 @@ impl ValidatorSubmitter {
"Signed and submitted checkpoint"
);

// TODO: move these into S3 implementations
// small sleep before signing next checkpoint to avoid rate limiting
sleep(Duration::from_millis(100)).await;
}

self.checkpoint_syncer
.update_latest_index(last_checkpoint.index)
.await?;

Ok(())
}
}
Expand Down
10 changes: 10 additions & 0 deletions rust/hyperlane-base/src/traits/checkpoint_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ use hyperlane_core::{SignedAnnouncement, SignedCheckpointWithMessageId};
pub trait CheckpointSyncer: Debug + Send + Sync {
/// Read the highest index of this Syncer
async fn latest_index(&self) -> Result<Option<u32>>;
/// Writes the highest index of this Syncer
async fn write_latest_index(&self, index: u32) -> Result<()>;
/// Update the latest index of this syncer if necessary
async fn update_latest_index(&self, index: u32) -> Result<()> {
let curr = self.latest_index().await?.unwrap_or(0);
if index > curr {
self.write_latest_index(index).await?;
}
Ok(())
}
/// Attempt to fetch the signed (checkpoint, messageId) tuple at this index
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>>;
/// Write the signed (checkpoint, messageId) tuple to this syncer
Expand Down
8 changes: 8 additions & 0 deletions rust/hyperlane-base/src/types/local_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ impl CheckpointSyncer for LocalStorage {
}
}

async fn write_latest_index(&self, index: u32) -> Result<()> {
let path = self.latest_index_file_path();
tokio::fs::write(&path, index.to_string())
.await
.with_context(|| format!("Writing index to {path:?}"))?;
Ok(())
}

yorhodes marked this conversation as resolved.
Show resolved Hide resolved
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>> {
let Ok(data) = tokio::fs::read(self.checkpoint_file_path(index)).await else {
return Ok(None);
Expand Down
11 changes: 9 additions & 2 deletions rust/hyperlane-base/src/types/s3_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl S3Storage {
format!("checkpoint_{index}_with_id.json")
}

fn index_key() -> String {
fn latest_index_key() -> String {
"checkpoint_latest_index.json".to_owned()
}

Expand All @@ -144,7 +144,7 @@ impl S3Storage {
impl CheckpointSyncer for S3Storage {
async fn latest_index(&self) -> Result<Option<u32>> {
let ret = self
.anonymously_read_from_bucket(S3Storage::index_key())
.anonymously_read_from_bucket(S3Storage::latest_index_key())
.await?
.map(|data| serde_json::from_slice(&data))
.transpose()
Expand All @@ -159,6 +159,13 @@ impl CheckpointSyncer for S3Storage {
ret
}

async fn write_latest_index(&self, index: u32) -> Result<()> {
let serialized_index = serde_json::to_string(&index)?;
self.write_to_bucket(S3Storage::latest_index_key(), &serialized_index)
.await?;
Ok(())
}

async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>> {
self.anonymously_read_from_bucket(S3Storage::checkpoint_key(index))
.await?
Expand Down
Loading