Skip to content

Commit

Permalink
Merge branch 'v3' into nambrot/repro-only-merkle-tree
Browse files Browse the repository at this point in the history
  • Loading branch information
aroralanuk authored Nov 3, 2023
2 parents 614da35 + 136e7c8 commit b6c33c7
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 3 deletions.
10 changes: 9 additions & 1 deletion rust/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ValidatorSubmitter {
//
// tree.index() will panic if the tree is empty, so we use tree.count() instead
// and convert the correctness_checkpoint.index to a count by adding 1.
while correctness_checkpoint.index + 1 > tree.count() as u32 {
while tree.count() as u32 <= correctness_checkpoint.index {
if let Some(insertion) = self
.message_db
.retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))?
Expand Down Expand Up @@ -239,6 +239,8 @@ impl ValidatorSubmitter {
&self,
checkpoints: Vec<CheckpointWithMessageId>,
) -> Result<()> {
let last_checkpoint = checkpoints.as_slice()[checkpoints.len() - 1];

for queued_checkpoint in checkpoints {
let existing = self
.checkpoint_syncer
Expand All @@ -260,9 +262,15 @@ impl ValidatorSubmitter {
"Signed and submitted checkpoint"
);

// TODO: move these into S3 implementations
// small sleep before signing next checkpoint to avoid rate limiting
sleep(Duration::from_millis(100)).await;
}

self.checkpoint_syncer
.update_latest_index(last_checkpoint.index)
.await?;

Ok(())
}
}
Expand Down
10 changes: 10 additions & 0 deletions rust/hyperlane-base/src/traits/checkpoint_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ use hyperlane_core::{SignedAnnouncement, SignedCheckpointWithMessageId};
pub trait CheckpointSyncer: Debug + Send + Sync {
/// Read the highest index of this Syncer
async fn latest_index(&self) -> Result<Option<u32>>;
/// Writes the highest index of this Syncer
async fn write_latest_index(&self, index: u32) -> Result<()>;
/// Update the latest index of this syncer if necessary
async fn update_latest_index(&self, index: u32) -> Result<()> {
let curr = self.latest_index().await?.unwrap_or(0);
if index > curr {
self.write_latest_index(index).await?;
}
Ok(())
}
/// Attempt to fetch the signed (checkpoint, messageId) tuple at this index
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>>;
/// Write the signed (checkpoint, messageId) tuple to this syncer
Expand Down
8 changes: 8 additions & 0 deletions rust/hyperlane-base/src/types/local_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ impl CheckpointSyncer for LocalStorage {
}
}

async fn write_latest_index(&self, index: u32) -> Result<()> {
let path = self.latest_index_file_path();
tokio::fs::write(&path, index.to_string())
.await
.with_context(|| format!("Writing index to {path:?}"))?;
Ok(())
}

async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>> {
let Ok(data) = tokio::fs::read(self.checkpoint_file_path(index)).await else {
return Ok(None);
Expand Down
11 changes: 9 additions & 2 deletions rust/hyperlane-base/src/types/s3_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl S3Storage {
format!("checkpoint_{index}_with_id.json")
}

fn index_key() -> String {
fn latest_index_key() -> String {
"checkpoint_latest_index.json".to_owned()
}

Expand All @@ -144,7 +144,7 @@ impl S3Storage {
impl CheckpointSyncer for S3Storage {
async fn latest_index(&self) -> Result<Option<u32>> {
let ret = self
.anonymously_read_from_bucket(S3Storage::index_key())
.anonymously_read_from_bucket(S3Storage::latest_index_key())
.await?
.map(|data| serde_json::from_slice(&data))
.transpose()
Expand All @@ -159,6 +159,13 @@ impl CheckpointSyncer for S3Storage {
ret
}

async fn write_latest_index(&self, index: u32) -> Result<()> {
let serialized_index = serde_json::to_string(&index)?;
self.write_to_bucket(S3Storage::latest_index_key(), &serialized_index)
.await?;
Ok(())
}

async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>> {
self.anonymously_read_from_bucket(S3Storage::checkpoint_key(index))
.await?
Expand Down

0 comments on commit b6c33c7

Please sign in to comment.