From c750ac49f6f8f586bdf2bebc272e7b40fc3e2632 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 11:14:49 +0000 Subject: [PATCH 01/15] sequence_and_tip hack --- rust/chains/hyperlane-cosmos/src/mailbox.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rust/chains/hyperlane-cosmos/src/mailbox.rs b/rust/chains/hyperlane-cosmos/src/mailbox.rs index d14e28f32c..3a51b9cc67 100644 --- a/rust/chains/hyperlane-cosmos/src/mailbox.rs +++ b/rust/chains/hyperlane-cosmos/src/mailbox.rs @@ -338,6 +338,16 @@ impl SequenceIndexer for CosmosMailboxIndexer { async fn sequence_and_tip(&self) -> ChainResult<(Option, u32)> { let tip = Indexer::::get_finalized_block_number(&self).await?; + // As a hack to ensure we don't get a bunch errors for querying in the + // future if an RPC URL has poor load balancing, we just return the + // current tip minus 1. + // We often would get errors where we query the tip and try to get the + // nonce at that tip, and then the same RPC provider complains we're asking + // for state in the future. + // TODO: RPC retrying to fix this. + + let tip = tip - 1; + let sequence = self.mailbox.nonce_at_block(Some(tip.into())).await?; Ok((Some(sequence), tip)) From f3682809bfb8dfc2fb9bb751839f367a7dd82c2f Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 12:11:53 +0000 Subject: [PATCH 02/15] Wip --- .../relayer/src/merkle_tree/processor.rs | 8 +++ rust/agents/relayer/src/relayer.rs | 1 + .../hyperlane-cosmos/src/merkle_tree_hook.rs | 51 ++++++++++++------- .../src/merkle_tree_hook.rs | 9 +--- .../src/contract_sync/cursor.rs | 51 +++++++++++-------- rust/hyperlane-base/src/contract_sync/mod.rs | 6 +-- .../src/db/rocks/hyperlane_db.rs | 30 +++++++++-- rust/hyperlane-base/src/settings/base.rs | 9 ++-- rust/hyperlane-base/src/settings/chains.rs | 3 ++ rust/hyperlane-core/src/traits/db.rs | 22 +++++++- 10 files changed, 134 insertions(+), 56 deletions(-) diff --git a/rust/agents/relayer/src/merkle_tree/processor.rs b/rust/agents/relayer/src/merkle_tree/processor.rs index 873c52ab97..9dda060244 100644 --- a/rust/agents/relayer/src/merkle_tree/processor.rs +++ b/rust/agents/relayer/src/merkle_tree/processor.rs @@ -47,6 +47,14 @@ impl ProcessorExt for MerkleTreeProcessor { /// One round of processing, extracted from infinite work loop for /// testing purposes. async fn tick(&mut self) -> Result<()> { + let insertion_123 = self.db.retrieve_merkle_tree_insertion_by_leaf_index(&123)?; + + let insertion_124 = self.db.retrieve_merkle_tree_insertion_by_leaf_index(&124)?; + + let insertion_125 = self.db.retrieve_merkle_tree_insertion_by_leaf_index(&125)?; + + tracing::warn!(?insertion_123, ?insertion_124, ?insertion_125, domain=?self.domain(), "insertions bls"); + if let Some(insertion) = self.next_unprocessed_leaf()? { // Feed the message to the prover sync self.prover_sync diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 96e911d6a8..24c93977df 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -315,6 +315,7 @@ impl Relayer { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); let cursor = contract_sync.rate_limited_cursor(index_settings).await; + // let cursor = contract_sync.forward_backward_message_sync_cursor(index_settings).await; tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await }) .instrument(info_span!("ContractSync")) } diff --git a/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs b/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs index 17be845421..0aa9d90a54 100644 --- a/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs @@ -104,7 +104,6 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { } /// Gets the current leaf count of the merkle tree - #[instrument(level = "debug", err, ret, skip(self))] async fn count(&self, lag: Option) -> ChainResult { let payload = merkle_tree_hook::MerkleTreeCountRequest { count: general::EmptyStruct {}, @@ -112,18 +111,7 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { let block_height = get_block_height_for_lag(&self.provider, lag).await?; - let data = self - .provider - .wasm_query( - merkle_tree_hook::MerkleTreeGenericRequest { - merkle_hook: payload, - }, - block_height, - ) - .await?; - let response: merkle_tree_hook::MerkleTreeCountResponse = serde_json::from_slice(&data)?; - - Ok(response.count) + self.count_at_block(block_height).await } #[instrument(level = "debug", err, ret, skip(self))] @@ -154,6 +142,28 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { } } +impl CosmosMerkleTreeHook { + #[instrument(level = "debug", err, ret, skip(self))] + async fn count_at_block(&self, block_height: Option) -> ChainResult { + let payload = merkle_tree_hook::MerkleTreeCountRequest { + count: general::EmptyStruct {}, + }; + + let data = self + .provider + .wasm_query( + merkle_tree_hook::MerkleTreeGenericRequest { + merkle_hook: payload, + }, + block_height, + ) + .await?; + let response: merkle_tree_hook::MerkleTreeCountResponse = serde_json::from_slice(&data)?; + + Ok(response.count) + } +} + // ------------------ Indexer ------------------ const EVENT_TYPE: &str = "hpl_hook_merkle::post_dispatch"; @@ -161,17 +171,20 @@ const EVENT_TYPE: &str = "hpl_hook_merkle::post_dispatch"; #[derive(Debug)] /// A reference to a MerkleTreeHookIndexer contract on some Cosmos chain pub struct CosmosMerkleTreeHookIndexer { + /// The CosmosMerkleTreeHook + merkle_tree_hook: CosmosMerkleTreeHook, /// Cosmwasm indexer instance indexer: Box, } impl CosmosMerkleTreeHookIndexer { /// create new Cosmos MerkleTreeHookIndexer agent - pub fn new(conf: ConnectionConf, locator: ContractLocator, reorg_period: u32) -> Self { + pub fn new(conf: ConnectionConf, locator: ContractLocator, signer: Signer, reorg_period: u32) -> Self { let indexer: CosmosWasmIndexer = - CosmosWasmIndexer::new(conf, locator, EVENT_TYPE.to_string(), reorg_period); + CosmosWasmIndexer::new(conf.clone(), locator.clone(), EVENT_TYPE.to_string(), reorg_period); Self { + merkle_tree_hook: CosmosMerkleTreeHook::new(conf, locator, signer), indexer: Box::new(indexer), } } @@ -250,8 +263,12 @@ impl Indexer for CosmosMerkleTreeHookIndexer { #[async_trait] impl SequenceIndexer for CosmosMerkleTreeHookIndexer { async fn sequence_and_tip(&self) -> ChainResult<(Option, u32)> { - // TODO: implement when cosmos scraper support is implemented let tip = self.get_finalized_block_number().await?; - Ok((None, tip)) + let sequence = self + .merkle_tree_hook + .count_at_block(Some(tip.into())) + .await?; + + Ok((Some(sequence), tip)) } } diff --git a/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs b/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs index 3881b03250..2df7dbe251 100644 --- a/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs @@ -148,14 +148,9 @@ where M: Middleware + 'static, { async fn sequence_and_tip(&self) -> ChainResult<(Option, u32)> { - // The InterchainGasPaymasterIndexerBuilder must return a `SequenceIndexer` type. - // It's fine if only a blanket implementation is provided for EVM chains, since their - // indexing only uses the `Index` trait, which is a supertrait of `SequenceIndexer`. - // TODO: if `SequenceIndexer` turns out to not depend on `Indexer` at all, then the supertrait - // dependency could be removed, even if the builder would still need to return a type that is both - // ``SequenceIndexer` and `Indexer`. let tip = self.get_finalized_block_number().await?; - Ok((None, tip)) + let sequence = self.contract.count().block(u64::from(tip)).call().await?; + Ok((Some(sequence), tip)) } } diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 14cff9b02a..923990bd3d 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -11,8 +11,8 @@ use derive_new::new; use eyre::Result; use hyperlane_core::{ ChainCommunicationError, ChainResult, ContractSyncCursor, CursorAction, HyperlaneMessage, - HyperlaneMessageStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, - SequenceIndexer, + HyperlaneMessageIdIndexerStore, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, IndexMode, + Indexer, LogMeta, SequenceIndexer, }; use tokio::time::sleep; use tracing::{debug, warn}; @@ -29,7 +29,7 @@ const MAX_SEQUENCE_RANGE: u32 = 100; #[derive(Debug, new)] pub(crate) struct MessageSyncCursor { indexer: Arc>, - db: Arc, + db: Arc>, sync_state: SyncState, } @@ -130,16 +130,16 @@ impl SyncState { } impl MessageSyncCursor { - async fn retrieve_message_by_nonce(&self, nonce: u32) -> Option { - if let Ok(Some(message)) = self.db.retrieve_message_by_nonce(nonce).await { + async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Option { + if let Ok(Some(message)) = self.db.retrieve_message_id_by_sequence(sequence).await { Some(message) } else { None } } - async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Option { - if let Ok(Some(block_number)) = self.db.retrieve_dispatched_block_number(nonce).await { + async fn retrieve_log_block_number(&self, sequence: u32) -> Option { + if let Ok(Some(block_number)) = self.db.retrieve_log_block_number(sequence).await { Some(u32::try_from(block_number).unwrap()) } else { None @@ -161,7 +161,7 @@ impl MessageSyncCursor { warn!(next_nonce=?self.sync_state.next_sequence, "Target nonce not found, rewinding"); // If the previous nonce has been synced, rewind to the block number // at which it was dispatched. Otherwise, rewind all the way back to the start block. - if let Some(block_number) = self.retrieve_dispatched_block_number(prev_sequence).await { + if let Some(block_number) = self.retrieve_log_block_number(prev_sequence).await { self.sync_state.next_block = block_number; warn!(block_number, "Rewound to previous known message"); } else { @@ -182,7 +182,7 @@ pub(crate) struct ForwardMessageSyncCursor { impl ForwardMessageSyncCursor { pub fn new( indexer: Arc>, - db: Arc, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -210,13 +210,13 @@ impl ForwardMessageSyncCursor { // and update the cursor accordingly. while self .cursor - .retrieve_message_by_nonce(self.cursor.sync_state.next_sequence) + .retrieve_message_id_by_sequence(self.cursor.sync_state.next_sequence) .await .is_some() { if let Some(block_number) = self .cursor - .retrieve_dispatched_block_number(self.cursor.sync_state.next_sequence) + .retrieve_log_block_number(self.cursor.sync_state.next_sequence) .await { debug!(next_block = block_number, "Fast forwarding next block"); @@ -300,7 +300,7 @@ impl BackwardMessageSyncCursor { #[allow(clippy::too_many_arguments)] pub fn new( indexer: Arc>, - db: Arc, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -331,7 +331,7 @@ impl BackwardMessageSyncCursor { while !self.synced { if self .cursor - .retrieve_message_by_nonce(self.cursor.sync_state.next_sequence) + .retrieve_message_id_by_sequence(self.cursor.sync_state.next_sequence) .await .is_none() { @@ -345,7 +345,7 @@ impl BackwardMessageSyncCursor { if let Some(block_number) = self .cursor - .retrieve_dispatched_block_number(self.cursor.sync_state.next_sequence) + .retrieve_log_block_number(self.cursor.sync_state.next_sequence) .await { // It's possible that eth_getLogs dropped logs from this block, therefore we cannot do block_number - 1. @@ -396,7 +396,7 @@ impl ForwardBackwardMessageSyncCursor { /// Construct a new contract sync helper. pub async fn new( indexer: Arc>, - db: Arc, + db: Arc>, chunk_size: u32, mode: IndexMode, ) -> Result { @@ -442,12 +442,21 @@ impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { return Ok((CursorAction::Query(forward_range), eta)); } - if let Some(backward_range) = self.backward.get_next_range().await? { - self.direction = SyncDirection::Backward; - return Ok((CursorAction::Query(backward_range), eta)); - } - // TODO: Define the sleep time from interval flag - return Ok((CursorAction::Sleep(Duration::from_secs(5)), eta)); + todo!() + // // TODO: Proper ETA for backwards sync + // let eta = Duration::from_secs(0); + // // Prioritize forward syncing over backward syncing. + // if let Some(forward_range) = self.forward.get_next_range().await? { + // self.direction = SyncDirection::Forward; + // return Ok((CursorAction::Query(forward_range), eta)); + // } + + // if let Some(backward_range) = self.backward.get_next_range().await? { + // self.direction = SyncDirection::Backward; + // return Ok((CursorAction::Query(backward_range), eta)); + // } + // // TODO: Define the sleep time from interval flag + // return Ok((CursorAction::Sleep(Duration::from_secs(5)), eta)); } fn latest_block(&self) -> u32 { diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index d569d2c14f..981166096a 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -4,8 +4,8 @@ use cursor::*; use derive_new::new; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, Indexer, - SequenceIndexer, + HyperlaneMessage, HyperlaneMessageIdIndexerStore, HyperlaneMessageStore, + HyperlaneWatermarkedLogStore, Indexer, SequenceIndexer, }; pub use metrics::ContractSyncMetrics; use tokio::time::sleep; @@ -124,7 +124,7 @@ where /// A ContractSync for syncing messages using a MessageSyncCursor pub type MessageContractSync = ContractSync< HyperlaneMessage, - Arc, + Arc>, Arc>, >; impl MessageContractSync { diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index f01a12a166..d70f00a6fd 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -4,9 +4,10 @@ use paste::paste; use tracing::{debug, instrument, trace}; use hyperlane_core::{ - GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, HyperlaneMessageStore, - HyperlaneWatermarkedLogStore, InterchainGasExpenditure, InterchainGasPayment, - InterchainGasPaymentMeta, LogMeta, MerkleTreeInsertion, H256, + GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, + HyperlaneMessageIdIndexerStore, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, + InterchainGasExpenditure, InterchainGasPayment, InterchainGasPaymentMeta, LogMeta, + MerkleTreeInsertion, H256, }; use super::{ @@ -284,6 +285,29 @@ impl HyperlaneMessageStore for HyperlaneRocksDB { } } +/// TODO +#[async_trait] +impl HyperlaneMessageIdIndexerStore for HyperlaneRocksDB { + /// Gets a message ID by its sequence. + /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. + /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this + /// is equal to the leaf index. + async fn retrieve_message_id_by_sequence( + &self, + sequence: u32, + ) -> Result> { + let message = self.retrieve_message_by_nonce(sequence)?; + // Ok(message.map(|m| m.id())) + Ok(message) + } + + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number(&self, sequence: u32) -> Result> { + let number = self.retrieve_dispatched_block_number_by_nonce(&sequence)?; + Ok(number) + } +} + /// Note that for legacy reasons this watermark may be shared across multiple cursors, some of which may not have anything to do with gas payments /// The high watermark cursor is relatively conservative in writing block numbers, so this shouldn't result in any events being missed. #[async_trait] diff --git a/rust/hyperlane-base/src/settings/base.rs b/rust/hyperlane-base/src/settings/base.rs index efdda4e834..92922cc372 100644 --- a/rust/hyperlane-base/src/settings/base.rs +++ b/rust/hyperlane-base/src/settings/base.rs @@ -3,9 +3,10 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use eyre::{eyre, Context, Result}; use futures_util::future::try_join_all; use hyperlane_core::{ - Delivery, HyperlaneChain, HyperlaneDomain, HyperlaneMessageStore, HyperlaneProvider, - HyperlaneWatermarkedLogStore, InterchainGasPaymaster, InterchainGasPayment, Mailbox, - MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, ValidatorAnnounce, H256, + Delivery, HyperlaneChain, HyperlaneDomain, HyperlaneMessage, HyperlaneMessageIdIndexerStore, + HyperlaneMessageStore, HyperlaneProvider, HyperlaneWatermarkedLogStore, InterchainGasPaymaster, + InterchainGasPayment, Mailbox, MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, + ValidatorAnnounce, H256, }; use crate::{ @@ -183,7 +184,7 @@ impl Settings { build_contract_fns!(build_validator_announce, build_validator_announces -> dyn ValidatorAnnounce); build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider); build_indexer_fns!(build_delivery_indexer, build_delivery_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneMessageStore, MessageContractSync); + build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneMessageIdIndexerStore, MessageContractSync); build_indexer_fns!(build_interchain_gas_payment_indexer, build_interchain_gas_payment_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); } diff --git a/rust/hyperlane-base/src/settings/chains.rs b/rust/hyperlane-base/src/settings/chains.rs index f974622612..6764b4b4f9 100644 --- a/rust/hyperlane-base/src/settings/chains.rs +++ b/rust/hyperlane-base/src/settings/chains.rs @@ -387,9 +387,12 @@ impl ChainConf { Ok(indexer as Box>) } ChainConnectionConf::Cosmos(conf) => { + let signer = self.cosmos_signer().await.context(ctx)?; let indexer = Box::new(h_cosmos::CosmosMerkleTreeHookIndexer::new( conf.clone(), locator, + // TODO: remove signer requirement entirely + signer.unwrap().clone(), self.reorg_period, )); Ok(indexer as Box>) diff --git a/rust/hyperlane-core/src/traits/db.rs b/rust/hyperlane-core/src/traits/db.rs index 50f9960e04..a9f654dbb5 100644 --- a/rust/hyperlane-core/src/traits/db.rs +++ b/rust/hyperlane-core/src/traits/db.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use auto_impl::auto_impl; use eyre::Result; -use crate::{HyperlaneMessage, LogMeta}; +use crate::{HyperlaneMessage, LogMeta, H256}; /// Interface for a HyperlaneLogStore that ingests logs. #[async_trait] @@ -25,6 +25,26 @@ pub trait HyperlaneMessageStore: HyperlaneLogStore { async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result>; } +/// TODO +#[async_trait] +#[auto_impl(&, Box, Arc)] +pub trait HyperlaneMessageIdIndexerStore: HyperlaneLogStore +where + T: Send + Sync, +{ + /// Gets a message ID by its sequence. + /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. + /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this + /// is equal to the leaf index. + async fn retrieve_message_id_by_sequence( + &self, + sequence: u32, + ) -> Result>; + + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number(&self, nonce: u32) -> Result>; +} + /// Extension of HyperlaneLogStore trait that supports a high watermark for the highest indexed block number. #[async_trait] #[auto_impl(&, Box, Arc)] From ae49469f03bfe131980b8e67ad3937735f9abdf8 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 12:20:49 +0000 Subject: [PATCH 03/15] move to H256 return type --- rust/agents/scraper/src/chain_scraper/mod.rs | 32 +++++++++++++++++-- .../src/contract_sync/cursor.rs | 6 ++-- .../src/db/rocks/hyperlane_db.rs | 8 ++--- rust/hyperlane-core/src/traits/db.rs | 5 +-- 4 files changed, 36 insertions(+), 15 deletions(-) diff --git a/rust/agents/scraper/src/chain_scraper/mod.rs b/rust/agents/scraper/src/chain_scraper/mod.rs index 09294905a6..0bf397722d 100644 --- a/rust/agents/scraper/src/chain_scraper/mod.rs +++ b/rust/agents/scraper/src/chain_scraper/mod.rs @@ -8,8 +8,8 @@ use eyre::Result; use hyperlane_base::settings::IndexSettings; use hyperlane_core::{ unwrap_or_none_result, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageStore, HyperlaneProvider, HyperlaneWatermarkedLogStore, - InterchainGasPayment, LogMeta, H256, + HyperlaneMessage, HyperlaneMessageIdIndexerStore, HyperlaneMessageStore, HyperlaneProvider, + HyperlaneWatermarkedLogStore, InterchainGasPayment, LogMeta, H256, }; use itertools::Itertools; use tracing::trace; @@ -394,6 +394,34 @@ impl HyperlaneMessageStore for HyperlaneSqlDb { } } +/// TODO +#[async_trait] +impl HyperlaneMessageIdIndexerStore for HyperlaneSqlDb { + /// Gets a message ID by its sequence. + /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. + /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this + /// is equal to the leaf index. + async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Result> { + let message = self + .db + .retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, sequence) + .await?; + Ok(message.map(|m| m.id())) + } + + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number(&self, sequence: u32) -> Result> { + unwrap_or_none_result!( + tx_id, + self.db + .retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, sequence) + .await? + ); + unwrap_or_none_result!(block_id, self.db.retrieve_block_id(tx_id).await?); + Ok(self.db.retrieve_block_number(block_id).await?) + } +} + #[async_trait] impl HyperlaneWatermarkedLogStore for HyperlaneSqlDb where diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 923990bd3d..27c0cf5f70 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -11,8 +11,8 @@ use derive_new::new; use eyre::Result; use hyperlane_core::{ ChainCommunicationError, ChainResult, ContractSyncCursor, CursorAction, HyperlaneMessage, - HyperlaneMessageIdIndexerStore, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, IndexMode, - Indexer, LogMeta, SequenceIndexer, + HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, + SequenceIndexer, H256, }; use tokio::time::sleep; use tracing::{debug, warn}; @@ -130,7 +130,7 @@ impl SyncState { } impl MessageSyncCursor { - async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Option { + async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Option { if let Ok(Some(message)) = self.db.retrieve_message_id_by_sequence(sequence).await { Some(message) } else { diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index d70f00a6fd..abfec65e89 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -292,13 +292,9 @@ impl HyperlaneMessageIdIndexerStore for HyperlaneRocksDB { /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this /// is equal to the leaf index. - async fn retrieve_message_id_by_sequence( - &self, - sequence: u32, - ) -> Result> { + async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Result> { let message = self.retrieve_message_by_nonce(sequence)?; - // Ok(message.map(|m| m.id())) - Ok(message) + Ok(message.map(|m| m.id())) } /// Gets the block number at which the log occurred. diff --git a/rust/hyperlane-core/src/traits/db.rs b/rust/hyperlane-core/src/traits/db.rs index a9f654dbb5..2099008a74 100644 --- a/rust/hyperlane-core/src/traits/db.rs +++ b/rust/hyperlane-core/src/traits/db.rs @@ -36,10 +36,7 @@ where /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this /// is equal to the leaf index. - async fn retrieve_message_id_by_sequence( - &self, - sequence: u32, - ) -> Result>; + async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Result>; /// Gets the block number at which the log occurred. async fn retrieve_log_block_number(&self, nonce: u32) -> Result>; From e77b965cbdb7acbce66178ea44da5916ca782e02 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 16:01:17 +0000 Subject: [PATCH 04/15] fmt --- .../hyperlane-cosmos/src/merkle_tree_hook.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs b/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs index 0aa9d90a54..47447e6a8e 100644 --- a/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs @@ -179,9 +179,18 @@ pub struct CosmosMerkleTreeHookIndexer { impl CosmosMerkleTreeHookIndexer { /// create new Cosmos MerkleTreeHookIndexer agent - pub fn new(conf: ConnectionConf, locator: ContractLocator, signer: Signer, reorg_period: u32) -> Self { - let indexer: CosmosWasmIndexer = - CosmosWasmIndexer::new(conf.clone(), locator.clone(), EVENT_TYPE.to_string(), reorg_period); + pub fn new( + conf: ConnectionConf, + locator: ContractLocator, + signer: Signer, + reorg_period: u32, + ) -> Self { + let indexer: CosmosWasmIndexer = CosmosWasmIndexer::new( + conf.clone(), + locator.clone(), + EVENT_TYPE.to_string(), + reorg_period, + ); Self { merkle_tree_hook: CosmosMerkleTreeHook::new(conf, locator, signer), From 5cdbe568e3ae7099e596e302b76c46d35bce655a Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 12:59:08 +0000 Subject: [PATCH 05/15] wip generics --- rust/agents/relayer/src/relayer.rs | 6 +- .../src/contract_sync/cursor.rs | 77 ++++++++----------- rust/hyperlane-base/src/contract_sync/mod.rs | 17 ++-- .../src/db/rocks/hyperlane_db.rs | 28 +++++++ rust/hyperlane-base/src/settings/base.rs | 8 +- 5 files changed, 75 insertions(+), 61 deletions(-) diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 24c93977df..e73d583633 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -314,8 +314,10 @@ impl Relayer { ) -> Instrumented>> { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); - let cursor = contract_sync.rate_limited_cursor(index_settings).await; - // let cursor = contract_sync.forward_backward_message_sync_cursor(index_settings).await; + // let cursor = contract_sync.rate_limited_cursor(index_settings).await; + let cursor = contract_sync + .forward_backward_message_sync_cursor(index_settings) + .await; tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await }) .instrument(info_span!("ContractSync")) } diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 27c0cf5f70..2bbb26a28c 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -27,9 +27,9 @@ const MAX_SEQUENCE_RANGE: u32 = 100; /// A struct that holds the data needed for forwards and backwards /// message sync cursors. #[derive(Debug, new)] -pub(crate) struct MessageSyncCursor { - indexer: Arc>, - db: Arc>, +pub(crate) struct MessageSyncCursor { + indexer: Arc>, + db: Arc>, sync_state: SyncState, } @@ -129,7 +129,7 @@ impl SyncState { } } -impl MessageSyncCursor { +impl MessageSyncCursor { async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Option { if let Ok(Some(message)) = self.db.retrieve_message_id_by_sequence(sequence).await { Some(message) @@ -146,11 +146,7 @@ impl MessageSyncCursor { } } - async fn update( - &mut self, - logs: Vec<(HyperlaneMessage, LogMeta)>, - prev_sequence: u32, - ) -> Result<()> { + async fn update(&mut self, logs: Vec<(T, LogMeta)>, prev_sequence: u32) -> Result<()> { // If we found messages, but did *not* find the message we were looking for, // we need to rewind to the block at which we found the last message. if !logs.is_empty() @@ -175,14 +171,14 @@ impl MessageSyncCursor { } /// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardMessageSyncCursor { - cursor: MessageSyncCursor, +pub(crate) struct ForwardMessageSyncCursor { + cursor: MessageSyncCursor, } -impl ForwardMessageSyncCursor { +impl ForwardMessageSyncCursor { pub fn new( - indexer: Arc>, - db: Arc>, + indexer: Arc>, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -259,7 +255,7 @@ impl ForwardMessageSyncCursor { } #[async_trait] -impl ContractSyncCursor for ForwardMessageSyncCursor { +impl ContractSyncCursor for ForwardMessageSyncCursor { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { // TODO: Fix ETA calculation let eta = Duration::from_secs(0); @@ -278,7 +274,7 @@ impl ContractSyncCursor for ForwardMessageSyncCursor { /// If the previous block has been synced, rewind to the block number /// at which it was dispatched. /// Otherwise, rewind all the way back to the start block. - async fn update(&mut self, logs: Vec<(HyperlaneMessage, LogMeta)>) -> Result<()> { + async fn update(&mut self, logs: Vec<(T, LogMeta)>) -> Result<()> { let prev_nonce = self.cursor.sync_state.next_sequence.saturating_sub(1); // We may wind up having re-indexed messages that are previous to the nonce that we are looking for. // We should not consider these messages when checking for continuity errors. @@ -291,16 +287,16 @@ impl ContractSyncCursor for ForwardMessageSyncCursor { } /// A MessageSyncCursor that syncs backwards to sequence (nonce) zero. -pub(crate) struct BackwardMessageSyncCursor { - cursor: MessageSyncCursor, +pub(crate) struct BackwardMessageSyncCursor { + cursor: MessageSyncCursor, synced: bool, } -impl BackwardMessageSyncCursor { +impl BackwardMessageSyncCursor { #[allow(clippy::too_many_arguments)] pub fn new( - indexer: Arc>, - db: Arc>, + indexer: Arc>, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -367,7 +363,7 @@ impl BackwardMessageSyncCursor { /// If the previous block has been synced, rewind to the block number /// at which it was dispatched. /// Otherwise, rewind all the way back to the start block. - async fn update(&mut self, logs: Vec<(HyperlaneMessage, LogMeta)>) -> Result<()> { + async fn update(&mut self, logs: Vec<(T, LogMeta)>) -> Result<()> { let prev_sequence = self.cursor.sync_state.next_sequence.saturating_add(1); // We may wind up having re-indexed messages that are previous to the sequence (nonce) that we are looking for. // We should not consider these messages when checking for continuity errors. @@ -386,17 +382,17 @@ pub enum SyncDirection { } /// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardBackwardMessageSyncCursor { - forward: ForwardMessageSyncCursor, - backward: BackwardMessageSyncCursor, +pub(crate) struct ForwardBackwardMessageSyncCursor { + forward: ForwardMessageSyncCursor, + backward: BackwardMessageSyncCursor, direction: SyncDirection, } -impl ForwardBackwardMessageSyncCursor { +impl ForwardBackwardMessageSyncCursor { /// Construct a new contract sync helper. pub async fn new( - indexer: Arc>, - db: Arc>, + indexer: Arc>, + db: Arc>, chunk_size: u32, mode: IndexMode, ) -> Result { @@ -432,7 +428,7 @@ impl ForwardBackwardMessageSyncCursor { } #[async_trait] -impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { +impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { // TODO: Proper ETA for backwards sync let eta = Duration::from_secs(0); @@ -442,28 +438,19 @@ impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { return Ok((CursorAction::Query(forward_range), eta)); } - todo!() - // // TODO: Proper ETA for backwards sync - // let eta = Duration::from_secs(0); - // // Prioritize forward syncing over backward syncing. - // if let Some(forward_range) = self.forward.get_next_range().await? { - // self.direction = SyncDirection::Forward; - // return Ok((CursorAction::Query(forward_range), eta)); - // } - - // if let Some(backward_range) = self.backward.get_next_range().await? { - // self.direction = SyncDirection::Backward; - // return Ok((CursorAction::Query(backward_range), eta)); - // } - // // TODO: Define the sleep time from interval flag - // return Ok((CursorAction::Sleep(Duration::from_secs(5)), eta)); + if let Some(backward_range) = self.backward.get_next_range().await? { + self.direction = SyncDirection::Backward; + return Ok((CursorAction::Query(backward_range), eta)); + } + // TODO: Define the sleep time from interval flag + return Ok((CursorAction::Sleep(Duration::from_secs(5)), eta)); } fn latest_block(&self) -> u32 { self.forward.cursor.sync_state.next_block.saturating_sub(1) } - async fn update(&mut self, logs: Vec<(HyperlaneMessage, LogMeta)>) -> Result<()> { + async fn update(&mut self, logs: Vec<(T, LogMeta)>) -> Result<()> { match self.direction { SyncDirection::Forward => self.forward.update(logs).await, SyncDirection::Backward => self.backward.update(logs).await, diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index 981166096a..11cd77bccc 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -4,8 +4,8 @@ use cursor::*; use derive_new::new; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageIdIndexerStore, HyperlaneMessageStore, - HyperlaneWatermarkedLogStore, Indexer, SequenceIndexer, + HyperlaneMessage, HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, Indexer, + SequenceIndexer, }; pub use metrics::ContractSyncMetrics; use tokio::time::sleep; @@ -122,18 +122,15 @@ where } /// A ContractSync for syncing messages using a MessageSyncCursor -pub type MessageContractSync = ContractSync< - HyperlaneMessage, - Arc>, - Arc>, ->; -impl MessageContractSync { +pub type MessageContractSync = + ContractSync>, Arc>>; +impl MessageContractSync { /// Returns a new cursor to be used for syncing dispatched messages from the indexer pub async fn forward_message_sync_cursor( &self, index_settings: IndexSettings, next_nonce: u32, - ) -> Box> { + ) -> Box> { Box::new(ForwardMessageSyncCursor::new( self.indexer.clone(), self.db.clone(), @@ -149,7 +146,7 @@ impl MessageContractSync { pub async fn forward_backward_message_sync_cursor( &self, index_settings: IndexSettings, - ) -> Box> { + ) -> Box> { Box::new( ForwardBackwardMessageSyncCursor::new( self.indexer.clone(), diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index abfec65e89..ab570ab011 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -29,6 +29,8 @@ const PENDING_MESSAGE_RETRY_COUNT_FOR_MESSAGE_ID: &str = "pending_message_retry_count_for_message_id_"; const MERKLE_TREE_INSERTION: &str = "merkle_tree_insertion_"; const MERKLE_LEAF_INDEX_BY_MESSAGE_ID: &str = "merkle_leaf_index_by_message_id_"; +const MERKLE_TREE_INSERTION_BLOCK_NUMBER_BY_LEAF_INDEX: &str = + "merkle_tree_insertion_block_number_by_leaf_index_"; const LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block"; type DbResult = std::result::Result; @@ -304,6 +306,25 @@ impl HyperlaneMessageIdIndexerStore for HyperlaneRocksDB { } } +/// TODO +#[async_trait] +impl HyperlaneMessageIdIndexerStore for HyperlaneRocksDB { + /// Gets a message ID by its sequence. + /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. + /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this + /// is equal to the leaf index. + async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Result> { + let insertion = self.retrieve_merkle_tree_insertion_by_leaf_index(&sequence)?; + Ok(insertion.map(|i| i.message_id())) + } + + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number(&self, sequence: u32) -> Result> { + let number = self.retrieve_merkle_tree_insertion_block_number_by_leaf_index(&sequence)?; + Ok(number) + } +} + /// Note that for legacy reasons this watermark may be shared across multiple cursors, some of which may not have anything to do with gas payments /// The high watermark cursor is relatively conservative in writing block numbers, so this shouldn't result in any events being missed. #[async_trait] @@ -377,3 +398,10 @@ make_store_and_retrieve!( H256, u32 ); +make_store_and_retrieve!( + pub, + merkle_tree_insertion_block_number_by_leaf_index, + MERKLE_TREE_INSERTION_BLOCK_NUMBER_BY_LEAF_INDEX, + u32, + u64 +); diff --git a/rust/hyperlane-base/src/settings/base.rs b/rust/hyperlane-base/src/settings/base.rs index 92922cc372..6a7da44111 100644 --- a/rust/hyperlane-base/src/settings/base.rs +++ b/rust/hyperlane-base/src/settings/base.rs @@ -4,9 +4,8 @@ use eyre::{eyre, Context, Result}; use futures_util::future::try_join_all; use hyperlane_core::{ Delivery, HyperlaneChain, HyperlaneDomain, HyperlaneMessage, HyperlaneMessageIdIndexerStore, - HyperlaneMessageStore, HyperlaneProvider, HyperlaneWatermarkedLogStore, InterchainGasPaymaster, - InterchainGasPayment, Mailbox, MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, - ValidatorAnnounce, H256, + HyperlaneProvider, HyperlaneWatermarkedLogStore, InterchainGasPaymaster, InterchainGasPayment, + Mailbox, MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, ValidatorAnnounce, H256, }; use crate::{ @@ -184,7 +183,8 @@ impl Settings { build_contract_fns!(build_validator_announce, build_validator_announces -> dyn ValidatorAnnounce); build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider); build_indexer_fns!(build_delivery_indexer, build_delivery_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneMessageIdIndexerStore, MessageContractSync); + build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneMessageIdIndexerStore, MessageContractSync); build_indexer_fns!(build_interchain_gas_payment_indexer, build_interchain_gas_payment_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); + // build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneMessageIdIndexerStore, WatermarkContractSync); } From 0b1ff2c6e5049d59bd920470af347fabfce4c893 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 16:16:21 +0000 Subject: [PATCH 06/15] all done woo --- rust/agents/relayer/src/relayer.rs | 9 +++--- rust/agents/validator/src/validator.rs | 8 ++++-- .../src/contract_sync/cursor.rs | 28 +++++++++---------- rust/hyperlane-base/src/contract_sync/mod.rs | 8 +++--- rust/hyperlane-base/src/settings/base.rs | 2 +- rust/hyperlane-core/src/traits/db.rs | 6 ++++ rust/hyperlane-core/src/types/merkle_tree.rs | 8 +++++- rust/hyperlane-core/src/types/message.rs | 8 +++++- 8 files changed, 49 insertions(+), 28 deletions(-) diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index e73d583633..a074f7ef9b 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -12,7 +12,9 @@ use hyperlane_base::{ run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MessageContractSync, WatermarkContractSync, }; -use hyperlane_core::{HyperlaneDomain, InterchainGasPayment, MerkleTreeInsertion, U256}; +use hyperlane_core::{ + HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256, +}; use tokio::{ sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -50,15 +52,14 @@ pub struct Relayer { destination_chains: HashSet, #[as_ref] core: HyperlaneAgentCore, - message_syncs: HashMap>, + message_syncs: HashMap>>, interchain_gas_payment_syncs: HashMap>>, /// Context data for each (origin, destination) chain pair a message can be /// sent between msg_ctxs: HashMap>, prover_syncs: HashMap>>, - merkle_tree_hook_syncs: - HashMap>>, + merkle_tree_hook_syncs: HashMap>>, dbs: HashMap, whitelist: Arc, blacklist: Arc, diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index bc9ba325b4..70cc9a9a69 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -11,7 +11,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, - WatermarkContractSync, + MessageContractSync, }; use hyperlane_core::{ @@ -33,7 +33,7 @@ pub struct Validator { #[as_ref] core: HyperlaneAgentCore, db: HyperlaneRocksDB, - merkle_tree_hook_sync: Arc>, + merkle_tree_hook_sync: Arc>, mailbox: Arc, merkle_tree_hook: Arc, validator_announce: Arc, @@ -154,7 +154,9 @@ impl Validator { let index_settings = self.as_ref().settings.chains[self.origin_chain.name()].index_settings(); let contract_sync = self.merkle_tree_hook_sync.clone(); - let cursor = contract_sync.rate_limited_cursor(index_settings).await; + let cursor = contract_sync + .forward_backward_message_sync_cursor(index_settings) + .await; tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await }) .instrument(info_span!("MerkleTreeHookSyncer")) } diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 2bbb26a28c..728eaf4a0f 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -10,9 +10,9 @@ use async_trait::async_trait; use derive_new::new; use eyre::Result; use hyperlane_core::{ - ChainCommunicationError, ChainResult, ContractSyncCursor, CursorAction, HyperlaneMessage, + ChainCommunicationError, ChainResult, ContractSyncCursor, CursorAction, HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, - SequenceIndexer, H256, + SequenceIndexer, Sequenced, H256, }; use tokio::time::sleep; use tracing::{debug, warn}; @@ -129,7 +129,7 @@ impl SyncState { } } -impl MessageSyncCursor { +impl MessageSyncCursor { async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Option { if let Ok(Some(message)) = self.db.retrieve_message_id_by_sequence(sequence).await { Some(message) @@ -152,7 +152,7 @@ impl MessageSyncCursor { if !logs.is_empty() && !logs .iter() - .any(|m| m.0.nonce == self.sync_state.next_sequence) + .any(|m| m.0.sequence() == self.sync_state.next_sequence) { warn!(next_nonce=?self.sync_state.next_sequence, "Target nonce not found, rewinding"); // If the previous nonce has been synced, rewind to the block number @@ -171,11 +171,11 @@ impl MessageSyncCursor { } /// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardMessageSyncCursor { +pub(crate) struct ForwardMessageSyncCursor { cursor: MessageSyncCursor, } -impl ForwardMessageSyncCursor { +impl ForwardMessageSyncCursor { pub fn new( indexer: Arc>, db: Arc>, @@ -255,7 +255,7 @@ impl ForwardMessageSyncCursor { } #[async_trait] -impl ContractSyncCursor for ForwardMessageSyncCursor { +impl ContractSyncCursor for ForwardMessageSyncCursor { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { // TODO: Fix ETA calculation let eta = Duration::from_secs(0); @@ -280,19 +280,19 @@ impl ContractSyncCursor for ForwardMessageSyncCursor { // We should not consider these messages when checking for continuity errors. let filtered_logs = logs .into_iter() - .filter(|m| m.0.nonce >= self.cursor.sync_state.next_sequence) + .filter(|m| m.0.sequence() >= self.cursor.sync_state.next_sequence) .collect(); self.cursor.update(filtered_logs, prev_nonce).await } } /// A MessageSyncCursor that syncs backwards to sequence (nonce) zero. -pub(crate) struct BackwardMessageSyncCursor { +pub(crate) struct BackwardMessageSyncCursor { cursor: MessageSyncCursor, synced: bool, } -impl BackwardMessageSyncCursor { +impl BackwardMessageSyncCursor { #[allow(clippy::too_many_arguments)] pub fn new( indexer: Arc>, @@ -369,7 +369,7 @@ impl BackwardMessageSyncCursor { // We should not consider these messages when checking for continuity errors. let filtered_logs = logs .into_iter() - .filter(|m| m.0.nonce <= self.cursor.sync_state.next_sequence) + .filter(|m| m.0.sequence() <= self.cursor.sync_state.next_sequence) .collect(); self.cursor.update(filtered_logs, prev_sequence).await } @@ -382,13 +382,13 @@ pub enum SyncDirection { } /// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardBackwardMessageSyncCursor { +pub(crate) struct ForwardBackwardMessageSyncCursor { forward: ForwardMessageSyncCursor, backward: BackwardMessageSyncCursor, direction: SyncDirection, } -impl ForwardBackwardMessageSyncCursor { +impl ForwardBackwardMessageSyncCursor { /// Construct a new contract sync helper. pub async fn new( indexer: Arc>, @@ -428,7 +428,7 @@ impl ForwardBackwardMessageSyncCursor { } #[async_trait] -impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { +impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { // TODO: Proper ETA for backwards sync let eta = Duration::from_secs(0); diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index 11cd77bccc..5fb05da0d7 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -4,8 +4,8 @@ use cursor::*; use derive_new::new; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, Indexer, - SequenceIndexer, + HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, Indexer, SequenceIndexer, + Sequenced, }; pub use metrics::ContractSyncMetrics; use tokio::time::sleep; @@ -122,9 +122,9 @@ where } /// A ContractSync for syncing messages using a MessageSyncCursor -pub type MessageContractSync = +pub type MessageContractSync = ContractSync>, Arc>>; -impl MessageContractSync { +impl MessageContractSync { /// Returns a new cursor to be used for syncing dispatched messages from the indexer pub async fn forward_message_sync_cursor( &self, diff --git a/rust/hyperlane-base/src/settings/base.rs b/rust/hyperlane-base/src/settings/base.rs index 6a7da44111..2c7b69d3a2 100644 --- a/rust/hyperlane-base/src/settings/base.rs +++ b/rust/hyperlane-base/src/settings/base.rs @@ -185,6 +185,6 @@ impl Settings { build_indexer_fns!(build_delivery_indexer, build_delivery_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneMessageIdIndexerStore, MessageContractSync); build_indexer_fns!(build_interchain_gas_payment_indexer, build_interchain_gas_payment_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); + build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneMessageIdIndexerStore, MessageContractSync); // build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneMessageIdIndexerStore, WatermarkContractSync); } diff --git a/rust/hyperlane-core/src/traits/db.rs b/rust/hyperlane-core/src/traits/db.rs index 2099008a74..80fd702e32 100644 --- a/rust/hyperlane-core/src/traits/db.rs +++ b/rust/hyperlane-core/src/traits/db.rs @@ -25,6 +25,12 @@ pub trait HyperlaneMessageStore: HyperlaneLogStore { async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result>; } +/// TODO +pub trait Sequenced: 'static + Send + Sync { + /// TODO + fn sequence(&self) -> u32; +} + /// TODO #[async_trait] #[auto_impl(&, Box, Arc)] diff --git a/rust/hyperlane-core/src/types/merkle_tree.rs b/rust/hyperlane-core/src/types/merkle_tree.rs index 1a5a4058f0..2e9020043d 100644 --- a/rust/hyperlane-core/src/types/merkle_tree.rs +++ b/rust/hyperlane-core/src/types/merkle_tree.rs @@ -1,7 +1,7 @@ use derive_new::new; use std::io::{Read, Write}; -use crate::{Decode, Encode, HyperlaneProtocolError, H256}; +use crate::{Decode, Encode, HyperlaneProtocolError, Sequenced, H256}; /// Merkle Tree Hook insertion event #[derive(Debug, Copy, Clone, new)] @@ -22,6 +22,12 @@ impl MerkleTreeInsertion { } } +impl Sequenced for MerkleTreeInsertion { + fn sequence(&self) -> u32 { + self.leaf_index + } +} + impl Encode for MerkleTreeInsertion { fn write_to(&self, writer: &mut W) -> std::io::Result where diff --git a/rust/hyperlane-core/src/types/message.rs b/rust/hyperlane-core/src/types/message.rs index 1faec8436d..4ebaffc1cd 100644 --- a/rust/hyperlane-core/src/types/message.rs +++ b/rust/hyperlane-core/src/types/message.rs @@ -2,7 +2,7 @@ use sha3::{digest::Update, Digest, Keccak256}; use std::fmt::{Debug, Display, Formatter}; use crate::utils::{fmt_address_for_domain, fmt_domain}; -use crate::{Decode, Encode, HyperlaneProtocolError, H256}; +use crate::{Decode, Encode, HyperlaneProtocolError, Sequenced, H256}; const HYPERLANE_MESSAGE_PREFIX_LEN: usize = 77; @@ -39,6 +39,12 @@ pub struct HyperlaneMessage { pub body: Vec, } +impl Sequenced for HyperlaneMessage { + fn sequence(&self) -> u32 { + self.nonce + } +} + impl Debug for HyperlaneMessage { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( From 75df560a035b4114f378ed0a40fd5ced7ece2518 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 16:18:20 +0000 Subject: [PATCH 07/15] rm some unnecessary type boundaries --- rust/hyperlane-base/src/contract_sync/cursor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 728eaf4a0f..39d1f108aa 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -171,7 +171,7 @@ impl MessageSyncCursor { } /// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardMessageSyncCursor { +pub(crate) struct ForwardMessageSyncCursor { cursor: MessageSyncCursor, } @@ -287,7 +287,7 @@ impl ContractSyncCursor for ForwardMessageSyncCursor { } /// A MessageSyncCursor that syncs backwards to sequence (nonce) zero. -pub(crate) struct BackwardMessageSyncCursor { +pub(crate) struct BackwardMessageSyncCursor { cursor: MessageSyncCursor, synced: bool, } @@ -382,7 +382,7 @@ pub enum SyncDirection { } /// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardBackwardMessageSyncCursor { +pub(crate) struct ForwardBackwardMessageSyncCursor { forward: ForwardMessageSyncCursor, backward: BackwardMessageSyncCursor, direction: SyncDirection, From 1efd69f3c076fccba5c28348934c9e26e7fa8e44 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 16:22:45 +0000 Subject: [PATCH 08/15] rm HyperlaneMessageStore --- rust/agents/scraper/src/chain_scraper/mod.rs | 27 +------------------ .../src/db/rocks/hyperlane_db.rs | 20 ++------------ rust/hyperlane-core/src/traits/db.rs | 13 ++------- 3 files changed, 5 insertions(+), 55 deletions(-) diff --git a/rust/agents/scraper/src/chain_scraper/mod.rs b/rust/agents/scraper/src/chain_scraper/mod.rs index 0bf397722d..c82091c740 100644 --- a/rust/agents/scraper/src/chain_scraper/mod.rs +++ b/rust/agents/scraper/src/chain_scraper/mod.rs @@ -8,7 +8,7 @@ use eyre::Result; use hyperlane_base::settings::IndexSettings; use hyperlane_core::{ unwrap_or_none_result, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageIdIndexerStore, HyperlaneMessageStore, HyperlaneProvider, + HyperlaneMessage, HyperlaneMessageIdIndexerStore, HyperlaneProvider, HyperlaneWatermarkedLogStore, InterchainGasPayment, LogMeta, H256, }; use itertools::Itertools; @@ -369,31 +369,6 @@ impl HyperlaneLogStore for HyperlaneSqlDb { } } -#[async_trait] -impl HyperlaneMessageStore for HyperlaneSqlDb { - /// Gets a message by nonce. - async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result> { - let message = self - .db - .retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, nonce) - .await?; - Ok(message) - } - - /// Retrieves the block number at which the message with the provided nonce - /// was dispatched. - async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result> { - unwrap_or_none_result!( - tx_id, - self.db - .retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, nonce) - .await? - ); - unwrap_or_none_result!(block_id, self.db.retrieve_block_id(tx_id).await?); - Ok(self.db.retrieve_block_number(block_id).await?) - } -} - /// TODO #[async_trait] impl HyperlaneMessageIdIndexerStore for HyperlaneSqlDb { diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index ab570ab011..e6475a5aa8 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -5,9 +5,8 @@ use tracing::{debug, instrument, trace}; use hyperlane_core::{ GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, - HyperlaneMessageIdIndexerStore, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, - InterchainGasExpenditure, InterchainGasPayment, InterchainGasPaymentMeta, LogMeta, - MerkleTreeInsertion, H256, + HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasExpenditure, + InterchainGasPayment, InterchainGasPaymentMeta, LogMeta, MerkleTreeInsertion, H256, }; use super::{ @@ -272,21 +271,6 @@ impl HyperlaneLogStore for HyperlaneRocksDB { } } -#[async_trait] -impl HyperlaneMessageStore for HyperlaneRocksDB { - /// Gets a message by nonce. - async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result> { - let message = self.retrieve_message_by_nonce(nonce)?; - Ok(message) - } - - /// Retrieve dispatched block number by message nonce - async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result> { - let number = self.retrieve_dispatched_block_number_by_nonce(&nonce)?; - Ok(number) - } -} - /// TODO #[async_trait] impl HyperlaneMessageIdIndexerStore for HyperlaneRocksDB { diff --git a/rust/hyperlane-core/src/traits/db.rs b/rust/hyperlane-core/src/traits/db.rs index 80fd702e32..d4c58ac788 100644 --- a/rust/hyperlane-core/src/traits/db.rs +++ b/rust/hyperlane-core/src/traits/db.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use auto_impl::auto_impl; use eyre::Result; -use crate::{HyperlaneMessage, LogMeta, H256}; +use crate::{LogMeta, H256}; /// Interface for a HyperlaneLogStore that ingests logs. #[async_trait] @@ -15,16 +15,6 @@ pub trait HyperlaneLogStore: Send + Sync + Debug { async fn store_logs(&self, logs: &[(T, LogMeta)]) -> Result; } -/// Extension of HyperlaneLogStore trait that supports getting the block number at which a known message was dispatched. -#[async_trait] -#[auto_impl(&, Box, Arc)] -pub trait HyperlaneMessageStore: HyperlaneLogStore { - /// Gets a message by nonce. - async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result>; - /// Gets the block number at which a message was dispatched. - async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result>; -} - /// TODO pub trait Sequenced: 'static + Send + Sync { /// TODO @@ -32,6 +22,7 @@ pub trait Sequenced: 'static + Send + Sync { } /// TODO +/// Extension of HyperlaneLogStore trait that supports getting the block number at which a known message was dispatched. #[async_trait] #[auto_impl(&, Box, Arc)] pub trait HyperlaneMessageIdIndexerStore: HyperlaneLogStore From a425d9994d2d8b8cf77b00e4e76c4d9d6cb9f594 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 16:32:16 +0000 Subject: [PATCH 09/15] Rename HyperlaneMessageIdIndexerStore -> HyperlaneSequencedDataIndexerStore, just have it retrieve itself --- rust/agents/scraper/src/chain_scraper/mod.rs | 14 +++----- .../src/contract_sync/cursor.rs | 35 +++++++++---------- rust/hyperlane-base/src/contract_sync/mod.rs | 4 +-- .../src/db/rocks/hyperlane_db.rs | 16 ++++----- rust/hyperlane-base/src/settings/base.rs | 13 +++---- rust/hyperlane-core/src/traits/db.rs | 21 ++++++----- 6 files changed, 47 insertions(+), 56 deletions(-) diff --git a/rust/agents/scraper/src/chain_scraper/mod.rs b/rust/agents/scraper/src/chain_scraper/mod.rs index c82091c740..670eae0c78 100644 --- a/rust/agents/scraper/src/chain_scraper/mod.rs +++ b/rust/agents/scraper/src/chain_scraper/mod.rs @@ -8,7 +8,7 @@ use eyre::Result; use hyperlane_base::settings::IndexSettings; use hyperlane_core::{ unwrap_or_none_result, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageIdIndexerStore, HyperlaneProvider, + HyperlaneMessage, HyperlaneProvider, HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasPayment, LogMeta, H256, }; use itertools::Itertools; @@ -369,19 +369,15 @@ impl HyperlaneLogStore for HyperlaneSqlDb { } } -/// TODO #[async_trait] -impl HyperlaneMessageIdIndexerStore for HyperlaneSqlDb { - /// Gets a message ID by its sequence. - /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. - /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this - /// is equal to the leaf index. - async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Result> { +impl HyperlaneSequencedDataIndexerStore for HyperlaneSqlDb { + /// Gets a message by its nonce. + async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { let message = self .db .retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, sequence) .await?; - Ok(message.map(|m| m.id())) + Ok(message) } /// Gets the block number at which the log occurred. diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 39d1f108aa..327506b51a 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -11,8 +11,8 @@ use derive_new::new; use eyre::Result; use hyperlane_core::{ ChainCommunicationError, ChainResult, ContractSyncCursor, CursorAction, - HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, - SequenceIndexer, Sequenced, H256, + HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, + SequenceIndexer, Sequenced, }; use tokio::time::sleep; use tracing::{debug, warn}; @@ -29,7 +29,7 @@ const MAX_SEQUENCE_RANGE: u32 = 100; #[derive(Debug, new)] pub(crate) struct MessageSyncCursor { indexer: Arc>, - db: Arc>, + db: Arc>, sync_state: SyncState, } @@ -130,20 +130,17 @@ impl SyncState { } impl MessageSyncCursor { - async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Option { - if let Ok(Some(message)) = self.db.retrieve_message_id_by_sequence(sequence).await { - Some(message) - } else { - None - } + async fn retrieve_by_sequence(&self, sequence: u32) -> Option { + self.db.retrieve_by_sequence(sequence).await.ok().flatten() } async fn retrieve_log_block_number(&self, sequence: u32) -> Option { - if let Ok(Some(block_number)) = self.db.retrieve_log_block_number(sequence).await { - Some(u32::try_from(block_number).unwrap()) - } else { - None - } + self.db + .retrieve_log_block_number(sequence) + .await + .ok() + .flatten() + .map(|num| u32::try_from(num).unwrap()) } async fn update(&mut self, logs: Vec<(T, LogMeta)>, prev_sequence: u32) -> Result<()> { @@ -178,7 +175,7 @@ pub(crate) struct ForwardMessageSyncCursor { impl ForwardMessageSyncCursor { pub fn new( indexer: Arc>, - db: Arc>, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -206,7 +203,7 @@ impl ForwardMessageSyncCursor { // and update the cursor accordingly. while self .cursor - .retrieve_message_id_by_sequence(self.cursor.sync_state.next_sequence) + .retrieve_by_sequence(self.cursor.sync_state.next_sequence) .await .is_some() { @@ -296,7 +293,7 @@ impl BackwardMessageSyncCursor { #[allow(clippy::too_many_arguments)] pub fn new( indexer: Arc>, - db: Arc>, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -327,7 +324,7 @@ impl BackwardMessageSyncCursor { while !self.synced { if self .cursor - .retrieve_message_id_by_sequence(self.cursor.sync_state.next_sequence) + .retrieve_by_sequence(self.cursor.sync_state.next_sequence) .await .is_none() { @@ -392,7 +389,7 @@ impl ForwardBackwardMessageSyncCursor { /// Construct a new contract sync helper. pub async fn new( indexer: Arc>, - db: Arc>, + db: Arc>, chunk_size: u32, mode: IndexMode, ) -> Result { diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index 5fb05da0d7..9559c2081c 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -4,7 +4,7 @@ use cursor::*; use derive_new::new; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, Indexer, SequenceIndexer, + HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, Indexer, SequenceIndexer, Sequenced, }; pub use metrics::ContractSyncMetrics; @@ -123,7 +123,7 @@ where /// A ContractSync for syncing messages using a MessageSyncCursor pub type MessageContractSync = - ContractSync>, Arc>>; + ContractSync>, Arc>>; impl MessageContractSync { /// Returns a new cursor to be used for syncing dispatched messages from the indexer pub async fn forward_message_sync_cursor( diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index e6475a5aa8..78c54a2a3a 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -5,7 +5,7 @@ use tracing::{debug, instrument, trace}; use hyperlane_core::{ GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, - HyperlaneMessageIdIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasExpenditure, + HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasExpenditure, InterchainGasPayment, InterchainGasPaymentMeta, LogMeta, MerkleTreeInsertion, H256, }; @@ -271,16 +271,15 @@ impl HyperlaneLogStore for HyperlaneRocksDB { } } -/// TODO #[async_trait] -impl HyperlaneMessageIdIndexerStore for HyperlaneRocksDB { +impl HyperlaneSequencedDataIndexerStore for HyperlaneRocksDB { /// Gets a message ID by its sequence. /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this /// is equal to the leaf index. - async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Result> { + async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { let message = self.retrieve_message_by_nonce(sequence)?; - Ok(message.map(|m| m.id())) + Ok(message) } /// Gets the block number at which the log occurred. @@ -290,16 +289,15 @@ impl HyperlaneMessageIdIndexerStore for HyperlaneRocksDB { } } -/// TODO #[async_trait] -impl HyperlaneMessageIdIndexerStore for HyperlaneRocksDB { +impl HyperlaneSequencedDataIndexerStore for HyperlaneRocksDB { /// Gets a message ID by its sequence. /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this /// is equal to the leaf index. - async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Result> { + async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { let insertion = self.retrieve_merkle_tree_insertion_by_leaf_index(&sequence)?; - Ok(insertion.map(|i| i.message_id())) + Ok(insertion) } /// Gets the block number at which the log occurred. diff --git a/rust/hyperlane-base/src/settings/base.rs b/rust/hyperlane-base/src/settings/base.rs index 2c7b69d3a2..65f9c52686 100644 --- a/rust/hyperlane-base/src/settings/base.rs +++ b/rust/hyperlane-base/src/settings/base.rs @@ -3,9 +3,10 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use eyre::{eyre, Context, Result}; use futures_util::future::try_join_all; use hyperlane_core::{ - Delivery, HyperlaneChain, HyperlaneDomain, HyperlaneMessage, HyperlaneMessageIdIndexerStore, - HyperlaneProvider, HyperlaneWatermarkedLogStore, InterchainGasPaymaster, InterchainGasPayment, - Mailbox, MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, ValidatorAnnounce, H256, + Delivery, HyperlaneChain, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, + HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasPaymaster, + InterchainGasPayment, Mailbox, MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, + ValidatorAnnounce, H256, }; use crate::{ @@ -183,8 +184,8 @@ impl Settings { build_contract_fns!(build_validator_announce, build_validator_announces -> dyn ValidatorAnnounce); build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider); build_indexer_fns!(build_delivery_indexer, build_delivery_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneMessageIdIndexerStore, MessageContractSync); + build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneSequencedDataIndexerStore, MessageContractSync); build_indexer_fns!(build_interchain_gas_payment_indexer, build_interchain_gas_payment_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneMessageIdIndexerStore, MessageContractSync); - // build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneMessageIdIndexerStore, WatermarkContractSync); + build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneSequencedDataIndexerStore, MessageContractSync); + // build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneSequencedDataIndexerStore, WatermarkContractSync); } diff --git a/rust/hyperlane-core/src/traits/db.rs b/rust/hyperlane-core/src/traits/db.rs index d4c58ac788..b151cf13cf 100644 --- a/rust/hyperlane-core/src/traits/db.rs +++ b/rust/hyperlane-core/src/traits/db.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use auto_impl::auto_impl; use eyre::Result; -use crate::{LogMeta, H256}; +use crate::LogMeta; /// Interface for a HyperlaneLogStore that ingests logs. #[async_trait] @@ -15,25 +15,23 @@ pub trait HyperlaneLogStore: Send + Sync + Debug { async fn store_logs(&self, logs: &[(T, LogMeta)]) -> Result; } -/// TODO +/// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. +/// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this +/// is equal to the leaf index. pub trait Sequenced: 'static + Send + Sync { - /// TODO + /// The sequence of this sequenced type. fn sequence(&self) -> u32; } -/// TODO -/// Extension of HyperlaneLogStore trait that supports getting the block number at which a known message was dispatched. +/// Extension of HyperlaneLogStore trait that supports indexed sequenced data. #[async_trait] #[auto_impl(&, Box, Arc)] -pub trait HyperlaneMessageIdIndexerStore: HyperlaneLogStore +pub trait HyperlaneSequencedDataIndexerStore: HyperlaneLogStore where T: Send + Sync, { - /// Gets a message ID by its sequence. - /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. - /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this - /// is equal to the leaf index. - async fn retrieve_message_id_by_sequence(&self, sequence: u32) -> Result>; + /// Gets data by its sequence. + async fn retrieve_by_sequence(&self, sequence: u32) -> Result>; /// Gets the block number at which the log occurred. async fn retrieve_log_block_number(&self, nonce: u32) -> Result>; @@ -45,6 +43,7 @@ where pub trait HyperlaneWatermarkedLogStore: HyperlaneLogStore { /// Gets the block number high watermark async fn retrieve_high_watermark(&self) -> Result>; + /// Stores the block number high watermark async fn store_high_watermark(&self, block_number: u32) -> Result<()>; } From e50652cc5f24b50bfddf2b08dae837bcf3cd171e Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 16:38:16 +0000 Subject: [PATCH 10/15] Rename MessageSyncCursor -> SequenceSyncCursor --- .../src/contract_sync/cursor.rs | 88 +++++++++---------- rust/hyperlane-base/src/contract_sync/mod.rs | 6 +- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 327506b51a..a682810e80 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -25,9 +25,9 @@ const ETA_TIME_WINDOW: f64 = 2. * 60.; const MAX_SEQUENCE_RANGE: u32 = 100; /// A struct that holds the data needed for forwards and backwards -/// message sync cursors. +/// sequence sync cursors. #[derive(Debug, new)] -pub(crate) struct MessageSyncCursor { +pub(crate) struct SequenceSyncCursor { indexer: Arc>, db: Arc>, sync_state: SyncState, @@ -129,7 +129,7 @@ impl SyncState { } } -impl MessageSyncCursor { +impl SequenceSyncCursor { async fn retrieve_by_sequence(&self, sequence: u32) -> Option { self.db.retrieve_by_sequence(sequence).await.ok().flatten() } @@ -144,19 +144,19 @@ impl MessageSyncCursor { } async fn update(&mut self, logs: Vec<(T, LogMeta)>, prev_sequence: u32) -> Result<()> { - // If we found messages, but did *not* find the message we were looking for, - // we need to rewind to the block at which we found the last message. + // If we found logs, but did *not* find the log we were looking for, + // we need to rewind to the block at which we found the last log. if !logs.is_empty() && !logs .iter() .any(|m| m.0.sequence() == self.sync_state.next_sequence) { - warn!(next_nonce=?self.sync_state.next_sequence, "Target nonce not found, rewinding"); - // If the previous nonce has been synced, rewind to the block number + warn!(next_sequence=?self.sync_state.next_sequence, "Target sequence not found, rewinding"); + // If the previous sequence has been synced, rewind to the block number // at which it was dispatched. Otherwise, rewind all the way back to the start block. if let Some(block_number) = self.retrieve_log_block_number(prev_sequence).await { self.sync_state.next_block = block_number; - warn!(block_number, "Rewound to previous known message"); + warn!(block_number, "Rewound to previous known sequenced log"); } else { self.sync_state.next_block = self.sync_state.start_block; } @@ -167,12 +167,12 @@ impl MessageSyncCursor { } } -/// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardMessageSyncCursor { - cursor: MessageSyncCursor, +/// A SequenceSyncCursor that syncs forwards in perpetuity. +pub(crate) struct ForwardSequenceSyncCursor { + cursor: SequenceSyncCursor, } -impl ForwardMessageSyncCursor { +impl ForwardSequenceSyncCursor { pub fn new( indexer: Arc>, db: Arc>, @@ -183,7 +183,7 @@ impl ForwardMessageSyncCursor { next_sequence: u32, ) -> Self { Self { - cursor: MessageSyncCursor::new( + cursor: SequenceSyncCursor::new( indexer, db, SyncState::new( @@ -199,7 +199,7 @@ impl ForwardMessageSyncCursor { } async fn get_next_range(&mut self) -> ChainResult>> { - // Check if any new messages have been inserted into the DB, + // Check if any new logs have been inserted into the DB, // and update the cursor accordingly. while self .cursor @@ -217,8 +217,8 @@ impl ForwardMessageSyncCursor { self.cursor.sync_state.next_block = block_number; } debug!( - next_nonce = self.cursor.sync_state.next_sequence + 1, - "Fast forwarding next nonce" + next_sequence = self.cursor.sync_state.next_sequence + 1, + "Fast forwarding next sequence" ); self.cursor.sync_state.next_sequence += 1; } @@ -229,7 +229,7 @@ impl ForwardMessageSyncCursor { let cursor_count = self.cursor.sync_state.next_sequence; Ok(match cursor_count.cmp(&mailbox_count) { Ordering::Equal => { - // We are synced up to the latest nonce so we don't need to index anything. + // We are synced up to the latest sequence so we don't need to index anything. // We update our next block number accordingly. self.cursor.sync_state.next_block = tip; None @@ -252,7 +252,7 @@ impl ForwardMessageSyncCursor { } #[async_trait] -impl ContractSyncCursor for ForwardMessageSyncCursor { +impl ContractSyncCursor for ForwardSequenceSyncCursor { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { // TODO: Fix ETA calculation let eta = Duration::from_secs(0); @@ -272,24 +272,24 @@ impl ContractSyncCursor for ForwardMessageSyncCursor { /// at which it was dispatched. /// Otherwise, rewind all the way back to the start block. async fn update(&mut self, logs: Vec<(T, LogMeta)>) -> Result<()> { - let prev_nonce = self.cursor.sync_state.next_sequence.saturating_sub(1); - // We may wind up having re-indexed messages that are previous to the nonce that we are looking for. - // We should not consider these messages when checking for continuity errors. + let prev_sequence = self.cursor.sync_state.next_sequence.saturating_sub(1); + // We may wind up having re-indexed logs that are previous to the sequence that we are looking for. + // We should not consider these logs when checking for continuity errors. let filtered_logs = logs .into_iter() .filter(|m| m.0.sequence() >= self.cursor.sync_state.next_sequence) .collect(); - self.cursor.update(filtered_logs, prev_nonce).await + self.cursor.update(filtered_logs, prev_sequence).await } } -/// A MessageSyncCursor that syncs backwards to sequence (nonce) zero. -pub(crate) struct BackwardMessageSyncCursor { - cursor: MessageSyncCursor, +/// A SequenceSyncCursor that syncs backwards to sequence zero. +pub(crate) struct BackwardSequenceSyncCursor { + cursor: SequenceSyncCursor, synced: bool, } -impl BackwardMessageSyncCursor { +impl BackwardSequenceSyncCursor { #[allow(clippy::too_many_arguments)] pub fn new( indexer: Arc>, @@ -302,7 +302,7 @@ impl BackwardMessageSyncCursor { synced: bool, ) -> Self { Self { - cursor: MessageSyncCursor::new( + cursor: SequenceSyncCursor::new( indexer, db, SyncState::new( @@ -319,7 +319,7 @@ impl BackwardMessageSyncCursor { } async fn get_next_range(&mut self) -> ChainResult>> { - // Check if any new messages have been inserted into the DB, + // Check if any new logs have been inserted into the DB, // and update the cursor accordingly. while !self.synced { if self @@ -362,8 +362,8 @@ impl BackwardMessageSyncCursor { /// Otherwise, rewind all the way back to the start block. async fn update(&mut self, logs: Vec<(T, LogMeta)>) -> Result<()> { let prev_sequence = self.cursor.sync_state.next_sequence.saturating_add(1); - // We may wind up having re-indexed messages that are previous to the sequence (nonce) that we are looking for. - // We should not consider these messages when checking for continuity errors. + // We may wind up having re-indexed logs that are previous to the sequence that we are looking for. + // We should not consider these logs when checking for continuity errors. let filtered_logs = logs .into_iter() .filter(|m| m.0.sequence() <= self.cursor.sync_state.next_sequence) @@ -378,14 +378,14 @@ pub enum SyncDirection { Backward, } -/// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardBackwardMessageSyncCursor { - forward: ForwardMessageSyncCursor, - backward: BackwardMessageSyncCursor, +/// A SequenceSyncCursor that syncs forwards in perpetuity. +pub(crate) struct ForwardBackwardSequenceSyncCursor { + forward: ForwardSequenceSyncCursor, + backward: BackwardSequenceSyncCursor, direction: SyncDirection, } -impl ForwardBackwardMessageSyncCursor { +impl ForwardBackwardSequenceSyncCursor { /// Construct a new contract sync helper. pub async fn new( indexer: Arc>, @@ -393,28 +393,28 @@ impl ForwardBackwardMessageSyncCursor { chunk_size: u32, mode: IndexMode, ) -> Result { - let (count, tip) = indexer.sequence_and_tip().await?; - let count = count.ok_or(ChainCommunicationError::from_other_str( - "Failed to query message count", + let (sequence, tip) = indexer.sequence_and_tip().await?; + let sequence = sequence.ok_or(ChainCommunicationError::from_other_str( + "Failed to query sequence", ))?; - let forward_cursor = ForwardMessageSyncCursor::new( + let forward_cursor = ForwardSequenceSyncCursor::new( indexer.clone(), db.clone(), chunk_size, tip, tip, mode, - count, + sequence, ); - let backward_cursor = BackwardMessageSyncCursor::new( + let backward_cursor = BackwardSequenceSyncCursor::new( indexer.clone(), db.clone(), chunk_size, tip, tip, mode, - count.saturating_sub(1), - count == 0, + sequence.saturating_sub(1), + sequence == 0, ); Ok(Self { forward: forward_cursor, @@ -425,7 +425,7 @@ impl ForwardBackwardMessageSyncCursor { } #[async_trait] -impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { +impl ContractSyncCursor for ForwardBackwardSequenceSyncCursor { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { // TODO: Proper ETA for backwards sync let eta = Duration::from_secs(0); diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index 9559c2081c..0487a1c1ab 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -121,7 +121,7 @@ where } } -/// A ContractSync for syncing messages using a MessageSyncCursor +/// A ContractSync for syncing messages using a SequenceSyncCursor pub type MessageContractSync = ContractSync>, Arc>>; impl MessageContractSync { @@ -131,7 +131,7 @@ impl MessageContractSync { index_settings: IndexSettings, next_nonce: u32, ) -> Box> { - Box::new(ForwardMessageSyncCursor::new( + Box::new(ForwardSequenceSyncCursor::new( self.indexer.clone(), self.db.clone(), index_settings.chunk_size, @@ -148,7 +148,7 @@ impl MessageContractSync { index_settings: IndexSettings, ) -> Box> { Box::new( - ForwardBackwardMessageSyncCursor::new( + ForwardBackwardSequenceSyncCursor::new( self.indexer.clone(), self.db.clone(), index_settings.chunk_size, From c216b35f5dfdc4fd307ae21a450209ba30ed978b Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 16:39:45 +0000 Subject: [PATCH 11/15] more renamez --- rust/agents/scraper/src/chain_scraper/mod.rs | 4 ++-- rust/hyperlane-base/src/contract_sync/cursor.rs | 10 +++++----- rust/hyperlane-base/src/contract_sync/mod.rs | 4 ++-- rust/hyperlane-base/src/db/rocks/hyperlane_db.rs | 6 +++--- rust/hyperlane-base/src/settings/base.rs | 7 +++---- rust/hyperlane-core/src/traits/db.rs | 2 +- 6 files changed, 16 insertions(+), 17 deletions(-) diff --git a/rust/agents/scraper/src/chain_scraper/mod.rs b/rust/agents/scraper/src/chain_scraper/mod.rs index 670eae0c78..78410277d3 100644 --- a/rust/agents/scraper/src/chain_scraper/mod.rs +++ b/rust/agents/scraper/src/chain_scraper/mod.rs @@ -8,7 +8,7 @@ use eyre::Result; use hyperlane_base::settings::IndexSettings; use hyperlane_core::{ unwrap_or_none_result, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneProvider, HyperlaneSequencedDataIndexerStore, + HyperlaneMessage, HyperlaneProvider, HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasPayment, LogMeta, H256, }; use itertools::Itertools; @@ -370,7 +370,7 @@ impl HyperlaneLogStore for HyperlaneSqlDb { } #[async_trait] -impl HyperlaneSequencedDataIndexerStore for HyperlaneSqlDb { +impl HyperlaneSequenceIndexerStore for HyperlaneSqlDb { /// Gets a message by its nonce. async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { let message = self diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index a682810e80..cbbd393dbd 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -11,7 +11,7 @@ use derive_new::new; use eyre::Result; use hyperlane_core::{ ChainCommunicationError, ChainResult, ContractSyncCursor, CursorAction, - HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, + HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, SequenceIndexer, Sequenced, }; use tokio::time::sleep; @@ -29,7 +29,7 @@ const MAX_SEQUENCE_RANGE: u32 = 100; #[derive(Debug, new)] pub(crate) struct SequenceSyncCursor { indexer: Arc>, - db: Arc>, + db: Arc>, sync_state: SyncState, } @@ -175,7 +175,7 @@ pub(crate) struct ForwardSequenceSyncCursor { impl ForwardSequenceSyncCursor { pub fn new( indexer: Arc>, - db: Arc>, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -293,7 +293,7 @@ impl BackwardSequenceSyncCursor { #[allow(clippy::too_many_arguments)] pub fn new( indexer: Arc>, - db: Arc>, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -389,7 +389,7 @@ impl ForwardBackwardSequenceSyncCursor { /// Construct a new contract sync helper. pub async fn new( indexer: Arc>, - db: Arc>, + db: Arc>, chunk_size: u32, mode: IndexMode, ) -> Result { diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index 0487a1c1ab..568c4cebf7 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -4,7 +4,7 @@ use cursor::*; use derive_new::new; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, - HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, Indexer, SequenceIndexer, + HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, Indexer, SequenceIndexer, Sequenced, }; pub use metrics::ContractSyncMetrics; @@ -123,7 +123,7 @@ where /// A ContractSync for syncing messages using a SequenceSyncCursor pub type MessageContractSync = - ContractSync>, Arc>>; + ContractSync>, Arc>>; impl MessageContractSync { /// Returns a new cursor to be used for syncing dispatched messages from the indexer pub async fn forward_message_sync_cursor( diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index 78c54a2a3a..e412849e93 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -5,7 +5,7 @@ use tracing::{debug, instrument, trace}; use hyperlane_core::{ GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, - HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasExpenditure, + HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasExpenditure, InterchainGasPayment, InterchainGasPaymentMeta, LogMeta, MerkleTreeInsertion, H256, }; @@ -272,7 +272,7 @@ impl HyperlaneLogStore for HyperlaneRocksDB { } #[async_trait] -impl HyperlaneSequencedDataIndexerStore for HyperlaneRocksDB { +impl HyperlaneSequenceIndexerStore for HyperlaneRocksDB { /// Gets a message ID by its sequence. /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this @@ -290,7 +290,7 @@ impl HyperlaneSequencedDataIndexerStore for HyperlaneRocksDB { } #[async_trait] -impl HyperlaneSequencedDataIndexerStore for HyperlaneRocksDB { +impl HyperlaneSequenceIndexerStore for HyperlaneRocksDB { /// Gets a message ID by its sequence. /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this diff --git a/rust/hyperlane-base/src/settings/base.rs b/rust/hyperlane-base/src/settings/base.rs index 65f9c52686..1603b26912 100644 --- a/rust/hyperlane-base/src/settings/base.rs +++ b/rust/hyperlane-base/src/settings/base.rs @@ -4,7 +4,7 @@ use eyre::{eyre, Context, Result}; use futures_util::future::try_join_all; use hyperlane_core::{ Delivery, HyperlaneChain, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, - HyperlaneSequencedDataIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasPaymaster, + HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasPaymaster, InterchainGasPayment, Mailbox, MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, ValidatorAnnounce, H256, }; @@ -184,8 +184,7 @@ impl Settings { build_contract_fns!(build_validator_announce, build_validator_announces -> dyn ValidatorAnnounce); build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider); build_indexer_fns!(build_delivery_indexer, build_delivery_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneSequencedDataIndexerStore, MessageContractSync); + build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneSequenceIndexerStore, MessageContractSync); build_indexer_fns!(build_interchain_gas_payment_indexer, build_interchain_gas_payment_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneSequencedDataIndexerStore, MessageContractSync); - // build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneSequencedDataIndexerStore, WatermarkContractSync); + build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneSequenceIndexerStore, MessageContractSync); } diff --git a/rust/hyperlane-core/src/traits/db.rs b/rust/hyperlane-core/src/traits/db.rs index b151cf13cf..3fbebf52db 100644 --- a/rust/hyperlane-core/src/traits/db.rs +++ b/rust/hyperlane-core/src/traits/db.rs @@ -26,7 +26,7 @@ pub trait Sequenced: 'static + Send + Sync { /// Extension of HyperlaneLogStore trait that supports indexed sequenced data. #[async_trait] #[auto_impl(&, Box, Arc)] -pub trait HyperlaneSequencedDataIndexerStore: HyperlaneLogStore +pub trait HyperlaneSequenceIndexerStore: HyperlaneLogStore where T: Send + Sync, { From db6da267eac6b4314ec45d501d9d42f36d362822 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 16:47:12 +0000 Subject: [PATCH 12/15] Cleanup --- rust/agents/relayer/src/merkle_tree/processor.rs | 8 -------- rust/agents/relayer/src/relayer.rs | 1 - rust/chains/hyperlane-cosmos/src/mailbox.rs | 10 ---------- rust/hyperlane-base/src/db/rocks/hyperlane_db.rs | 10 ++-------- 4 files changed, 2 insertions(+), 27 deletions(-) diff --git a/rust/agents/relayer/src/merkle_tree/processor.rs b/rust/agents/relayer/src/merkle_tree/processor.rs index 9dda060244..873c52ab97 100644 --- a/rust/agents/relayer/src/merkle_tree/processor.rs +++ b/rust/agents/relayer/src/merkle_tree/processor.rs @@ -47,14 +47,6 @@ impl ProcessorExt for MerkleTreeProcessor { /// One round of processing, extracted from infinite work loop for /// testing purposes. async fn tick(&mut self) -> Result<()> { - let insertion_123 = self.db.retrieve_merkle_tree_insertion_by_leaf_index(&123)?; - - let insertion_124 = self.db.retrieve_merkle_tree_insertion_by_leaf_index(&124)?; - - let insertion_125 = self.db.retrieve_merkle_tree_insertion_by_leaf_index(&125)?; - - tracing::warn!(?insertion_123, ?insertion_124, ?insertion_125, domain=?self.domain(), "insertions bls"); - if let Some(insertion) = self.next_unprocessed_leaf()? { // Feed the message to the prover sync self.prover_sync diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index a074f7ef9b..c5e920faf4 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -315,7 +315,6 @@ impl Relayer { ) -> Instrumented>> { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); - // let cursor = contract_sync.rate_limited_cursor(index_settings).await; let cursor = contract_sync .forward_backward_message_sync_cursor(index_settings) .await; diff --git a/rust/chains/hyperlane-cosmos/src/mailbox.rs b/rust/chains/hyperlane-cosmos/src/mailbox.rs index 3a51b9cc67..d14e28f32c 100644 --- a/rust/chains/hyperlane-cosmos/src/mailbox.rs +++ b/rust/chains/hyperlane-cosmos/src/mailbox.rs @@ -338,16 +338,6 @@ impl SequenceIndexer for CosmosMailboxIndexer { async fn sequence_and_tip(&self) -> ChainResult<(Option, u32)> { let tip = Indexer::::get_finalized_block_number(&self).await?; - // As a hack to ensure we don't get a bunch errors for querying in the - // future if an RPC URL has poor load balancing, we just return the - // current tip minus 1. - // We often would get errors where we query the tip and try to get the - // nonce at that tip, and then the same RPC provider complains we're asking - // for state in the future. - // TODO: RPC retrying to fix this. - - let tip = tip - 1; - let sequence = self.mailbox.nonce_at_block(Some(tip.into())).await?; Ok((Some(sequence), tip)) diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index e412849e93..a04d4034ea 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -273,10 +273,7 @@ impl HyperlaneLogStore for HyperlaneRocksDB { #[async_trait] impl HyperlaneSequenceIndexerStore for HyperlaneRocksDB { - /// Gets a message ID by its sequence. - /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. - /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this - /// is equal to the leaf index. + /// Gets data by its sequence. async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { let message = self.retrieve_message_by_nonce(sequence)?; Ok(message) @@ -291,10 +288,7 @@ impl HyperlaneSequenceIndexerStore for HyperlaneRocksDB { #[async_trait] impl HyperlaneSequenceIndexerStore for HyperlaneRocksDB { - /// Gets a message ID by its sequence. - /// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. - /// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this - /// is equal to the leaf index. + /// Gets data by its sequence. async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { let insertion = self.retrieve_merkle_tree_insertion_by_leaf_index(&sequence)?; Ok(insertion) From 3424a1fa86afc18d046520c632bc3504679ec3e6 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 17:01:46 +0000 Subject: [PATCH 13/15] Rename MessageContractSync -> SequencedDataContractSync --- rust/agents/relayer/src/relayer.rs | 9 +++++---- rust/agents/validator/src/validator.rs | 4 ++-- rust/hyperlane-base/src/contract_sync/mod.rs | 4 ++-- rust/hyperlane-base/src/settings/base.rs | 6 +++--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index c5e920faf4..93ac88a8bb 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -9,8 +9,8 @@ use derive_more::AsRef; use eyre::Result; use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, - run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MessageContractSync, - WatermarkContractSync, + run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, + SequencedDataContractSync, WatermarkContractSync, }; use hyperlane_core::{ HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256, @@ -52,14 +52,15 @@ pub struct Relayer { destination_chains: HashSet, #[as_ref] core: HyperlaneAgentCore, - message_syncs: HashMap>>, + message_syncs: HashMap>>, interchain_gas_payment_syncs: HashMap>>, /// Context data for each (origin, destination) chain pair a message can be /// sent between msg_ctxs: HashMap>, prover_syncs: HashMap>>, - merkle_tree_hook_syncs: HashMap>>, + merkle_tree_hook_syncs: + HashMap>>, dbs: HashMap, whitelist: Arc, blacklist: Arc, diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index 70cc9a9a69..96feb97b8f 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -11,7 +11,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, - MessageContractSync, + SequencedDataContractSync, }; use hyperlane_core::{ @@ -33,7 +33,7 @@ pub struct Validator { #[as_ref] core: HyperlaneAgentCore, db: HyperlaneRocksDB, - merkle_tree_hook_sync: Arc>, + merkle_tree_hook_sync: Arc>, mailbox: Arc, merkle_tree_hook: Arc, validator_announce: Arc, diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index 568c4cebf7..3968ad9f57 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -122,9 +122,9 @@ where } /// A ContractSync for syncing messages using a SequenceSyncCursor -pub type MessageContractSync = +pub type SequencedDataContractSync = ContractSync>, Arc>>; -impl MessageContractSync { +impl SequencedDataContractSync { /// Returns a new cursor to be used for syncing dispatched messages from the indexer pub async fn forward_message_sync_cursor( &self, diff --git a/rust/hyperlane-base/src/settings/base.rs b/rust/hyperlane-base/src/settings/base.rs index 1603b26912..135d80ea0b 100644 --- a/rust/hyperlane-base/src/settings/base.rs +++ b/rust/hyperlane-base/src/settings/base.rs @@ -11,7 +11,7 @@ use hyperlane_core::{ use crate::{ settings::{chains::ChainConf, trace::TracingConfig}, - ContractSync, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MessageContractSync, + ContractSync, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync, WatermarkContractSync, }; @@ -184,7 +184,7 @@ impl Settings { build_contract_fns!(build_validator_announce, build_validator_announces -> dyn ValidatorAnnounce); build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider); build_indexer_fns!(build_delivery_indexer, build_delivery_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneSequenceIndexerStore, MessageContractSync); + build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneSequenceIndexerStore, SequencedDataContractSync); build_indexer_fns!(build_interchain_gas_payment_indexer, build_interchain_gas_payment_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneSequenceIndexerStore, MessageContractSync); + build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneSequenceIndexerStore, SequencedDataContractSync); } From 50ec4976af4756a5cdc21c55ae69414385323922 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 17:11:42 +0000 Subject: [PATCH 14/15] Higher chunk for neutrontestnet --- rust/config/testnet4_config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/config/testnet4_config.json b/rust/config/testnet4_config.json index 6679cc0056..12621e6035 100644 --- a/rust/config/testnet4_config.json +++ b/rust/config/testnet4_config.json @@ -1028,7 +1028,7 @@ "prefix": "dual", "index": { "from": 1, - "chunk": 1000 + "chunk": 100000 }, "blocks": { "reorgPeriod": 1 From d7c5497f3045a27d2ef99cd7ea6f650c9a3c6ea0 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Thu, 2 Nov 2023 17:50:31 +0000 Subject: [PATCH 15/15] store merkle tree insertion block nums --- .../hyperlane-ethereum/src/merkle_tree_hook.rs | 5 ++++- rust/hyperlane-base/src/db/rocks/hyperlane_db.rs | 15 ++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs b/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs index 2df7dbe251..1beffed5bc 100644 --- a/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs @@ -130,7 +130,7 @@ where Ok(logs) } - #[instrument(level = "debug", err, ret, skip(self))] + #[instrument(level = "debug", err, skip(self))] async fn get_finalized_block_number(&self) -> ChainResult { Ok(self .provider @@ -147,6 +147,9 @@ impl SequenceIndexer for EthereumMerkleTreeHookIndexer ChainResult<(Option, u32)> { let tip = self.get_finalized_block_number().await?; let sequence = self.contract.count().block(u64::from(tip)).call().await?; diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index a04d4034ea..807645beb0 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -139,7 +139,11 @@ impl HyperlaneRocksDB { } /// Store the merkle tree insertion event, and also store a mapping from message_id to leaf_index - pub fn process_tree_insertion(&self, insertion: &MerkleTreeInsertion) -> DbResult { + pub fn process_tree_insertion( + &self, + insertion: &MerkleTreeInsertion, + insertion_block_number: u64, + ) -> DbResult { if let Ok(Some(_)) = self.retrieve_merkle_tree_insertion_by_leaf_index(&insertion.index()) { debug!(insertion=?insertion, "Tree insertion already stored in db"); return Ok(false); @@ -151,6 +155,11 @@ impl HyperlaneRocksDB { self.store_merkle_tree_insertion_by_leaf_index(&insertion.index(), insertion)?; self.store_merkle_leaf_index_by_message_id(&insertion.message_id(), &insertion.index())?; + + self.store_merkle_tree_insertion_block_number_by_leaf_index( + &insertion.index(), + &insertion_block_number, + )?; // Return true to indicate the tree insertion was processed Ok(true) } @@ -262,8 +271,8 @@ impl HyperlaneLogStore for HyperlaneRocksDB { #[instrument(skip_all)] async fn store_logs(&self, leaves: &[(MerkleTreeInsertion, LogMeta)]) -> Result { let mut insertions = 0; - for (insertion, _meta) in leaves { - if self.process_tree_insertion(insertion)? { + for (insertion, meta) in leaves { + if self.process_tree_insertion(insertion, meta.block_number)? { insertions += 1; } }