diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 96e911d6a8..93ac88a8bb 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -9,10 +9,12 @@ use derive_more::AsRef; use eyre::Result; use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, - run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MessageContractSync, - WatermarkContractSync, + run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, + SequencedDataContractSync, WatermarkContractSync, +}; +use hyperlane_core::{ + HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256, }; -use hyperlane_core::{HyperlaneDomain, InterchainGasPayment, MerkleTreeInsertion, U256}; use tokio::{ sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -50,7 +52,7 @@ pub struct Relayer { destination_chains: HashSet, #[as_ref] core: HyperlaneAgentCore, - message_syncs: HashMap>, + message_syncs: HashMap>>, interchain_gas_payment_syncs: HashMap>>, /// Context data for each (origin, destination) chain pair a message can be @@ -58,7 +60,7 @@ pub struct Relayer { msg_ctxs: HashMap>, prover_syncs: HashMap>>, merkle_tree_hook_syncs: - HashMap>>, + HashMap>>, dbs: HashMap, whitelist: Arc, blacklist: Arc, @@ -314,7 +316,9 @@ impl Relayer { ) -> Instrumented>> { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); - let cursor = contract_sync.rate_limited_cursor(index_settings).await; + let cursor = contract_sync + .forward_backward_message_sync_cursor(index_settings) + .await; tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await }) .instrument(info_span!("ContractSync")) } diff --git a/rust/agents/scraper/src/chain_scraper/mod.rs b/rust/agents/scraper/src/chain_scraper/mod.rs index 09294905a6..78410277d3 100644 --- a/rust/agents/scraper/src/chain_scraper/mod.rs +++ b/rust/agents/scraper/src/chain_scraper/mod.rs @@ -8,8 +8,8 @@ use eyre::Result; use hyperlane_base::settings::IndexSettings; use hyperlane_core::{ unwrap_or_none_result, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageStore, HyperlaneProvider, HyperlaneWatermarkedLogStore, - InterchainGasPayment, LogMeta, H256, + HyperlaneMessage, HyperlaneProvider, HyperlaneSequenceIndexerStore, + HyperlaneWatermarkedLogStore, InterchainGasPayment, LogMeta, H256, }; use itertools::Itertools; use tracing::trace; @@ -370,23 +370,22 @@ impl HyperlaneLogStore for HyperlaneSqlDb { } #[async_trait] -impl HyperlaneMessageStore for HyperlaneSqlDb { - /// Gets a message by nonce. - async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result> { +impl HyperlaneSequenceIndexerStore for HyperlaneSqlDb { + /// Gets a message by its nonce. + async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { let message = self .db - .retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, nonce) + .retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, sequence) .await?; Ok(message) } - /// Retrieves the block number at which the message with the provided nonce - /// was dispatched. - async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result> { + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number(&self, sequence: u32) -> Result> { unwrap_or_none_result!( tx_id, self.db - .retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, nonce) + .retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, sequence) .await? ); unwrap_or_none_result!(block_id, self.db.retrieve_block_id(tx_id).await?); diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index bc9ba325b4..96feb97b8f 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -11,7 +11,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, - WatermarkContractSync, + SequencedDataContractSync, }; use hyperlane_core::{ @@ -33,7 +33,7 @@ pub struct Validator { #[as_ref] core: HyperlaneAgentCore, db: HyperlaneRocksDB, - merkle_tree_hook_sync: Arc>, + merkle_tree_hook_sync: Arc>, mailbox: Arc, merkle_tree_hook: Arc, validator_announce: Arc, @@ -154,7 +154,9 @@ impl Validator { let index_settings = self.as_ref().settings.chains[self.origin_chain.name()].index_settings(); let contract_sync = self.merkle_tree_hook_sync.clone(); - let cursor = contract_sync.rate_limited_cursor(index_settings).await; + let cursor = contract_sync + .forward_backward_message_sync_cursor(index_settings) + .await; tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await }) .instrument(info_span!("MerkleTreeHookSyncer")) } diff --git a/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs b/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs index 17be845421..47447e6a8e 100644 --- a/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs @@ -104,7 +104,6 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { } /// Gets the current leaf count of the merkle tree - #[instrument(level = "debug", err, ret, skip(self))] async fn count(&self, lag: Option) -> ChainResult { let payload = merkle_tree_hook::MerkleTreeCountRequest { count: general::EmptyStruct {}, @@ -112,18 +111,7 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { let block_height = get_block_height_for_lag(&self.provider, lag).await?; - let data = self - .provider - .wasm_query( - merkle_tree_hook::MerkleTreeGenericRequest { - merkle_hook: payload, - }, - block_height, - ) - .await?; - let response: merkle_tree_hook::MerkleTreeCountResponse = serde_json::from_slice(&data)?; - - Ok(response.count) + self.count_at_block(block_height).await } #[instrument(level = "debug", err, ret, skip(self))] @@ -154,6 +142,28 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { } } +impl CosmosMerkleTreeHook { + #[instrument(level = "debug", err, ret, skip(self))] + async fn count_at_block(&self, block_height: Option) -> ChainResult { + let payload = merkle_tree_hook::MerkleTreeCountRequest { + count: general::EmptyStruct {}, + }; + + let data = self + .provider + .wasm_query( + merkle_tree_hook::MerkleTreeGenericRequest { + merkle_hook: payload, + }, + block_height, + ) + .await?; + let response: merkle_tree_hook::MerkleTreeCountResponse = serde_json::from_slice(&data)?; + + Ok(response.count) + } +} + // ------------------ Indexer ------------------ const EVENT_TYPE: &str = "hpl_hook_merkle::post_dispatch"; @@ -161,17 +171,29 @@ const EVENT_TYPE: &str = "hpl_hook_merkle::post_dispatch"; #[derive(Debug)] /// A reference to a MerkleTreeHookIndexer contract on some Cosmos chain pub struct CosmosMerkleTreeHookIndexer { + /// The CosmosMerkleTreeHook + merkle_tree_hook: CosmosMerkleTreeHook, /// Cosmwasm indexer instance indexer: Box, } impl CosmosMerkleTreeHookIndexer { /// create new Cosmos MerkleTreeHookIndexer agent - pub fn new(conf: ConnectionConf, locator: ContractLocator, reorg_period: u32) -> Self { - let indexer: CosmosWasmIndexer = - CosmosWasmIndexer::new(conf, locator, EVENT_TYPE.to_string(), reorg_period); + pub fn new( + conf: ConnectionConf, + locator: ContractLocator, + signer: Signer, + reorg_period: u32, + ) -> Self { + let indexer: CosmosWasmIndexer = CosmosWasmIndexer::new( + conf.clone(), + locator.clone(), + EVENT_TYPE.to_string(), + reorg_period, + ); Self { + merkle_tree_hook: CosmosMerkleTreeHook::new(conf, locator, signer), indexer: Box::new(indexer), } } @@ -250,8 +272,12 @@ impl Indexer for CosmosMerkleTreeHookIndexer { #[async_trait] impl SequenceIndexer for CosmosMerkleTreeHookIndexer { async fn sequence_and_tip(&self) -> ChainResult<(Option, u32)> { - // TODO: implement when cosmos scraper support is implemented let tip = self.get_finalized_block_number().await?; - Ok((None, tip)) + let sequence = self + .merkle_tree_hook + .count_at_block(Some(tip.into())) + .await?; + + Ok((Some(sequence), tip)) } } diff --git a/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs b/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs index 3881b03250..1beffed5bc 100644 --- a/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs @@ -130,7 +130,7 @@ where Ok(logs) } - #[instrument(level = "debug", err, ret, skip(self))] + #[instrument(level = "debug", err, skip(self))] async fn get_finalized_block_number(&self) -> ChainResult { Ok(self .provider @@ -147,15 +147,13 @@ impl SequenceIndexer for EthereumMerkleTreeHookIndexer ChainResult<(Option, u32)> { - // The InterchainGasPaymasterIndexerBuilder must return a `SequenceIndexer` type. - // It's fine if only a blanket implementation is provided for EVM chains, since their - // indexing only uses the `Index` trait, which is a supertrait of `SequenceIndexer`. - // TODO: if `SequenceIndexer` turns out to not depend on `Indexer` at all, then the supertrait - // dependency could be removed, even if the builder would still need to return a type that is both - // ``SequenceIndexer` and `Indexer`. let tip = self.get_finalized_block_number().await?; - Ok((None, tip)) + let sequence = self.contract.count().block(u64::from(tip)).call().await?; + Ok((Some(sequence), tip)) } } diff --git a/rust/config/testnet4_config.json b/rust/config/testnet4_config.json index 6679cc0056..12621e6035 100644 --- a/rust/config/testnet4_config.json +++ b/rust/config/testnet4_config.json @@ -1028,7 +1028,7 @@ "prefix": "dual", "index": { "from": 1, - "chunk": 1000 + "chunk": 100000 }, "blocks": { "reorgPeriod": 1 diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 14cff9b02a..cbbd393dbd 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -10,9 +10,9 @@ use async_trait::async_trait; use derive_new::new; use eyre::Result; use hyperlane_core::{ - ChainCommunicationError, ChainResult, ContractSyncCursor, CursorAction, HyperlaneMessage, - HyperlaneMessageStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, - SequenceIndexer, + ChainCommunicationError, ChainResult, ContractSyncCursor, CursorAction, + HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, IndexMode, Indexer, LogMeta, + SequenceIndexer, Sequenced, }; use tokio::time::sleep; use tracing::{debug, warn}; @@ -25,11 +25,11 @@ const ETA_TIME_WINDOW: f64 = 2. * 60.; const MAX_SEQUENCE_RANGE: u32 = 100; /// A struct that holds the data needed for forwards and backwards -/// message sync cursors. +/// sequence sync cursors. #[derive(Debug, new)] -pub(crate) struct MessageSyncCursor { - indexer: Arc>, - db: Arc, +pub(crate) struct SequenceSyncCursor { + indexer: Arc>, + db: Arc>, sync_state: SyncState, } @@ -129,41 +129,34 @@ impl SyncState { } } -impl MessageSyncCursor { - async fn retrieve_message_by_nonce(&self, nonce: u32) -> Option { - if let Ok(Some(message)) = self.db.retrieve_message_by_nonce(nonce).await { - Some(message) - } else { - None - } +impl SequenceSyncCursor { + async fn retrieve_by_sequence(&self, sequence: u32) -> Option { + self.db.retrieve_by_sequence(sequence).await.ok().flatten() } - async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Option { - if let Ok(Some(block_number)) = self.db.retrieve_dispatched_block_number(nonce).await { - Some(u32::try_from(block_number).unwrap()) - } else { - None - } + async fn retrieve_log_block_number(&self, sequence: u32) -> Option { + self.db + .retrieve_log_block_number(sequence) + .await + .ok() + .flatten() + .map(|num| u32::try_from(num).unwrap()) } - async fn update( - &mut self, - logs: Vec<(HyperlaneMessage, LogMeta)>, - prev_sequence: u32, - ) -> Result<()> { - // If we found messages, but did *not* find the message we were looking for, - // we need to rewind to the block at which we found the last message. + async fn update(&mut self, logs: Vec<(T, LogMeta)>, prev_sequence: u32) -> Result<()> { + // If we found logs, but did *not* find the log we were looking for, + // we need to rewind to the block at which we found the last log. if !logs.is_empty() && !logs .iter() - .any(|m| m.0.nonce == self.sync_state.next_sequence) + .any(|m| m.0.sequence() == self.sync_state.next_sequence) { - warn!(next_nonce=?self.sync_state.next_sequence, "Target nonce not found, rewinding"); - // If the previous nonce has been synced, rewind to the block number + warn!(next_sequence=?self.sync_state.next_sequence, "Target sequence not found, rewinding"); + // If the previous sequence has been synced, rewind to the block number // at which it was dispatched. Otherwise, rewind all the way back to the start block. - if let Some(block_number) = self.retrieve_dispatched_block_number(prev_sequence).await { + if let Some(block_number) = self.retrieve_log_block_number(prev_sequence).await { self.sync_state.next_block = block_number; - warn!(block_number, "Rewound to previous known message"); + warn!(block_number, "Rewound to previous known sequenced log"); } else { self.sync_state.next_block = self.sync_state.start_block; } @@ -174,15 +167,15 @@ impl MessageSyncCursor { } } -/// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardMessageSyncCursor { - cursor: MessageSyncCursor, +/// A SequenceSyncCursor that syncs forwards in perpetuity. +pub(crate) struct ForwardSequenceSyncCursor { + cursor: SequenceSyncCursor, } -impl ForwardMessageSyncCursor { +impl ForwardSequenceSyncCursor { pub fn new( - indexer: Arc>, - db: Arc, + indexer: Arc>, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -190,7 +183,7 @@ impl ForwardMessageSyncCursor { next_sequence: u32, ) -> Self { Self { - cursor: MessageSyncCursor::new( + cursor: SequenceSyncCursor::new( indexer, db, SyncState::new( @@ -206,17 +199,17 @@ impl ForwardMessageSyncCursor { } async fn get_next_range(&mut self) -> ChainResult>> { - // Check if any new messages have been inserted into the DB, + // Check if any new logs have been inserted into the DB, // and update the cursor accordingly. while self .cursor - .retrieve_message_by_nonce(self.cursor.sync_state.next_sequence) + .retrieve_by_sequence(self.cursor.sync_state.next_sequence) .await .is_some() { if let Some(block_number) = self .cursor - .retrieve_dispatched_block_number(self.cursor.sync_state.next_sequence) + .retrieve_log_block_number(self.cursor.sync_state.next_sequence) .await { debug!(next_block = block_number, "Fast forwarding next block"); @@ -224,8 +217,8 @@ impl ForwardMessageSyncCursor { self.cursor.sync_state.next_block = block_number; } debug!( - next_nonce = self.cursor.sync_state.next_sequence + 1, - "Fast forwarding next nonce" + next_sequence = self.cursor.sync_state.next_sequence + 1, + "Fast forwarding next sequence" ); self.cursor.sync_state.next_sequence += 1; } @@ -236,7 +229,7 @@ impl ForwardMessageSyncCursor { let cursor_count = self.cursor.sync_state.next_sequence; Ok(match cursor_count.cmp(&mailbox_count) { Ordering::Equal => { - // We are synced up to the latest nonce so we don't need to index anything. + // We are synced up to the latest sequence so we don't need to index anything. // We update our next block number accordingly. self.cursor.sync_state.next_block = tip; None @@ -259,7 +252,7 @@ impl ForwardMessageSyncCursor { } #[async_trait] -impl ContractSyncCursor for ForwardMessageSyncCursor { +impl ContractSyncCursor for ForwardSequenceSyncCursor { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { // TODO: Fix ETA calculation let eta = Duration::from_secs(0); @@ -278,29 +271,29 @@ impl ContractSyncCursor for ForwardMessageSyncCursor { /// If the previous block has been synced, rewind to the block number /// at which it was dispatched. /// Otherwise, rewind all the way back to the start block. - async fn update(&mut self, logs: Vec<(HyperlaneMessage, LogMeta)>) -> Result<()> { - let prev_nonce = self.cursor.sync_state.next_sequence.saturating_sub(1); - // We may wind up having re-indexed messages that are previous to the nonce that we are looking for. - // We should not consider these messages when checking for continuity errors. + async fn update(&mut self, logs: Vec<(T, LogMeta)>) -> Result<()> { + let prev_sequence = self.cursor.sync_state.next_sequence.saturating_sub(1); + // We may wind up having re-indexed logs that are previous to the sequence that we are looking for. + // We should not consider these logs when checking for continuity errors. let filtered_logs = logs .into_iter() - .filter(|m| m.0.nonce >= self.cursor.sync_state.next_sequence) + .filter(|m| m.0.sequence() >= self.cursor.sync_state.next_sequence) .collect(); - self.cursor.update(filtered_logs, prev_nonce).await + self.cursor.update(filtered_logs, prev_sequence).await } } -/// A MessageSyncCursor that syncs backwards to sequence (nonce) zero. -pub(crate) struct BackwardMessageSyncCursor { - cursor: MessageSyncCursor, +/// A SequenceSyncCursor that syncs backwards to sequence zero. +pub(crate) struct BackwardSequenceSyncCursor { + cursor: SequenceSyncCursor, synced: bool, } -impl BackwardMessageSyncCursor { +impl BackwardSequenceSyncCursor { #[allow(clippy::too_many_arguments)] pub fn new( - indexer: Arc>, - db: Arc, + indexer: Arc>, + db: Arc>, chunk_size: u32, start_block: u32, next_block: u32, @@ -309,7 +302,7 @@ impl BackwardMessageSyncCursor { synced: bool, ) -> Self { Self { - cursor: MessageSyncCursor::new( + cursor: SequenceSyncCursor::new( indexer, db, SyncState::new( @@ -326,12 +319,12 @@ impl BackwardMessageSyncCursor { } async fn get_next_range(&mut self) -> ChainResult>> { - // Check if any new messages have been inserted into the DB, + // Check if any new logs have been inserted into the DB, // and update the cursor accordingly. while !self.synced { if self .cursor - .retrieve_message_by_nonce(self.cursor.sync_state.next_sequence) + .retrieve_by_sequence(self.cursor.sync_state.next_sequence) .await .is_none() { @@ -345,7 +338,7 @@ impl BackwardMessageSyncCursor { if let Some(block_number) = self .cursor - .retrieve_dispatched_block_number(self.cursor.sync_state.next_sequence) + .retrieve_log_block_number(self.cursor.sync_state.next_sequence) .await { // It's possible that eth_getLogs dropped logs from this block, therefore we cannot do block_number - 1. @@ -367,13 +360,13 @@ impl BackwardMessageSyncCursor { /// If the previous block has been synced, rewind to the block number /// at which it was dispatched. /// Otherwise, rewind all the way back to the start block. - async fn update(&mut self, logs: Vec<(HyperlaneMessage, LogMeta)>) -> Result<()> { + async fn update(&mut self, logs: Vec<(T, LogMeta)>) -> Result<()> { let prev_sequence = self.cursor.sync_state.next_sequence.saturating_add(1); - // We may wind up having re-indexed messages that are previous to the sequence (nonce) that we are looking for. - // We should not consider these messages when checking for continuity errors. + // We may wind up having re-indexed logs that are previous to the sequence that we are looking for. + // We should not consider these logs when checking for continuity errors. let filtered_logs = logs .into_iter() - .filter(|m| m.0.nonce <= self.cursor.sync_state.next_sequence) + .filter(|m| m.0.sequence() <= self.cursor.sync_state.next_sequence) .collect(); self.cursor.update(filtered_logs, prev_sequence).await } @@ -385,43 +378,43 @@ pub enum SyncDirection { Backward, } -/// A MessageSyncCursor that syncs forwards in perpetuity. -pub(crate) struct ForwardBackwardMessageSyncCursor { - forward: ForwardMessageSyncCursor, - backward: BackwardMessageSyncCursor, +/// A SequenceSyncCursor that syncs forwards in perpetuity. +pub(crate) struct ForwardBackwardSequenceSyncCursor { + forward: ForwardSequenceSyncCursor, + backward: BackwardSequenceSyncCursor, direction: SyncDirection, } -impl ForwardBackwardMessageSyncCursor { +impl ForwardBackwardSequenceSyncCursor { /// Construct a new contract sync helper. pub async fn new( - indexer: Arc>, - db: Arc, + indexer: Arc>, + db: Arc>, chunk_size: u32, mode: IndexMode, ) -> Result { - let (count, tip) = indexer.sequence_and_tip().await?; - let count = count.ok_or(ChainCommunicationError::from_other_str( - "Failed to query message count", + let (sequence, tip) = indexer.sequence_and_tip().await?; + let sequence = sequence.ok_or(ChainCommunicationError::from_other_str( + "Failed to query sequence", ))?; - let forward_cursor = ForwardMessageSyncCursor::new( + let forward_cursor = ForwardSequenceSyncCursor::new( indexer.clone(), db.clone(), chunk_size, tip, tip, mode, - count, + sequence, ); - let backward_cursor = BackwardMessageSyncCursor::new( + let backward_cursor = BackwardSequenceSyncCursor::new( indexer.clone(), db.clone(), chunk_size, tip, tip, mode, - count.saturating_sub(1), - count == 0, + sequence.saturating_sub(1), + sequence == 0, ); Ok(Self { forward: forward_cursor, @@ -432,7 +425,7 @@ impl ForwardBackwardMessageSyncCursor { } #[async_trait] -impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { +impl ContractSyncCursor for ForwardBackwardSequenceSyncCursor { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { // TODO: Proper ETA for backwards sync let eta = Duration::from_secs(0); @@ -454,7 +447,7 @@ impl ContractSyncCursor for ForwardBackwardMessageSyncCursor { self.forward.cursor.sync_state.next_block.saturating_sub(1) } - async fn update(&mut self, logs: Vec<(HyperlaneMessage, LogMeta)>) -> Result<()> { + async fn update(&mut self, logs: Vec<(T, LogMeta)>) -> Result<()> { match self.direction { SyncDirection::Forward => self.forward.update(logs).await, SyncDirection::Backward => self.backward.update(logs).await, diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index d569d2c14f..3968ad9f57 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -4,8 +4,8 @@ use cursor::*; use derive_new::new; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, Indexer, - SequenceIndexer, + HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, Indexer, SequenceIndexer, + Sequenced, }; pub use metrics::ContractSyncMetrics; use tokio::time::sleep; @@ -121,20 +121,17 @@ where } } -/// A ContractSync for syncing messages using a MessageSyncCursor -pub type MessageContractSync = ContractSync< - HyperlaneMessage, - Arc, - Arc>, ->; -impl MessageContractSync { +/// A ContractSync for syncing messages using a SequenceSyncCursor +pub type SequencedDataContractSync = + ContractSync>, Arc>>; +impl SequencedDataContractSync { /// Returns a new cursor to be used for syncing dispatched messages from the indexer pub async fn forward_message_sync_cursor( &self, index_settings: IndexSettings, next_nonce: u32, - ) -> Box> { - Box::new(ForwardMessageSyncCursor::new( + ) -> Box> { + Box::new(ForwardSequenceSyncCursor::new( self.indexer.clone(), self.db.clone(), index_settings.chunk_size, @@ -149,9 +146,9 @@ impl MessageContractSync { pub async fn forward_backward_message_sync_cursor( &self, index_settings: IndexSettings, - ) -> Box> { + ) -> Box> { Box::new( - ForwardBackwardMessageSyncCursor::new( + ForwardBackwardSequenceSyncCursor::new( self.indexer.clone(), self.db.clone(), index_settings.chunk_size, diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index f01a12a166..807645beb0 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -4,9 +4,9 @@ use paste::paste; use tracing::{debug, instrument, trace}; use hyperlane_core::{ - GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, HyperlaneMessageStore, - HyperlaneWatermarkedLogStore, InterchainGasExpenditure, InterchainGasPayment, - InterchainGasPaymentMeta, LogMeta, MerkleTreeInsertion, H256, + GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, + HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasExpenditure, + InterchainGasPayment, InterchainGasPaymentMeta, LogMeta, MerkleTreeInsertion, H256, }; use super::{ @@ -28,6 +28,8 @@ const PENDING_MESSAGE_RETRY_COUNT_FOR_MESSAGE_ID: &str = "pending_message_retry_count_for_message_id_"; const MERKLE_TREE_INSERTION: &str = "merkle_tree_insertion_"; const MERKLE_LEAF_INDEX_BY_MESSAGE_ID: &str = "merkle_leaf_index_by_message_id_"; +const MERKLE_TREE_INSERTION_BLOCK_NUMBER_BY_LEAF_INDEX: &str = + "merkle_tree_insertion_block_number_by_leaf_index_"; const LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block"; type DbResult = std::result::Result; @@ -137,7 +139,11 @@ impl HyperlaneRocksDB { } /// Store the merkle tree insertion event, and also store a mapping from message_id to leaf_index - pub fn process_tree_insertion(&self, insertion: &MerkleTreeInsertion) -> DbResult { + pub fn process_tree_insertion( + &self, + insertion: &MerkleTreeInsertion, + insertion_block_number: u64, + ) -> DbResult { if let Ok(Some(_)) = self.retrieve_merkle_tree_insertion_by_leaf_index(&insertion.index()) { debug!(insertion=?insertion, "Tree insertion already stored in db"); return Ok(false); @@ -149,6 +155,11 @@ impl HyperlaneRocksDB { self.store_merkle_tree_insertion_by_leaf_index(&insertion.index(), insertion)?; self.store_merkle_leaf_index_by_message_id(&insertion.message_id(), &insertion.index())?; + + self.store_merkle_tree_insertion_block_number_by_leaf_index( + &insertion.index(), + &insertion_block_number, + )?; // Return true to indicate the tree insertion was processed Ok(true) } @@ -260,8 +271,8 @@ impl HyperlaneLogStore for HyperlaneRocksDB { #[instrument(skip_all)] async fn store_logs(&self, leaves: &[(MerkleTreeInsertion, LogMeta)]) -> Result { let mut insertions = 0; - for (insertion, _meta) in leaves { - if self.process_tree_insertion(insertion)? { + for (insertion, meta) in leaves { + if self.process_tree_insertion(insertion, meta.block_number)? { insertions += 1; } } @@ -270,16 +281,31 @@ impl HyperlaneLogStore for HyperlaneRocksDB { } #[async_trait] -impl HyperlaneMessageStore for HyperlaneRocksDB { - /// Gets a message by nonce. - async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result> { - let message = self.retrieve_message_by_nonce(nonce)?; +impl HyperlaneSequenceIndexerStore for HyperlaneRocksDB { + /// Gets data by its sequence. + async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { + let message = self.retrieve_message_by_nonce(sequence)?; Ok(message) } - /// Retrieve dispatched block number by message nonce - async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result> { - let number = self.retrieve_dispatched_block_number_by_nonce(&nonce)?; + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number(&self, sequence: u32) -> Result> { + let number = self.retrieve_dispatched_block_number_by_nonce(&sequence)?; + Ok(number) + } +} + +#[async_trait] +impl HyperlaneSequenceIndexerStore for HyperlaneRocksDB { + /// Gets data by its sequence. + async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { + let insertion = self.retrieve_merkle_tree_insertion_by_leaf_index(&sequence)?; + Ok(insertion) + } + + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number(&self, sequence: u32) -> Result> { + let number = self.retrieve_merkle_tree_insertion_block_number_by_leaf_index(&sequence)?; Ok(number) } } @@ -357,3 +383,10 @@ make_store_and_retrieve!( H256, u32 ); +make_store_and_retrieve!( + pub, + merkle_tree_insertion_block_number_by_leaf_index, + MERKLE_TREE_INSERTION_BLOCK_NUMBER_BY_LEAF_INDEX, + u32, + u64 +); diff --git a/rust/hyperlane-base/src/settings/base.rs b/rust/hyperlane-base/src/settings/base.rs index efdda4e834..135d80ea0b 100644 --- a/rust/hyperlane-base/src/settings/base.rs +++ b/rust/hyperlane-base/src/settings/base.rs @@ -3,14 +3,15 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use eyre::{eyre, Context, Result}; use futures_util::future::try_join_all; use hyperlane_core::{ - Delivery, HyperlaneChain, HyperlaneDomain, HyperlaneMessageStore, HyperlaneProvider, - HyperlaneWatermarkedLogStore, InterchainGasPaymaster, InterchainGasPayment, Mailbox, - MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, ValidatorAnnounce, H256, + Delivery, HyperlaneChain, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, + HyperlaneSequenceIndexerStore, HyperlaneWatermarkedLogStore, InterchainGasPaymaster, + InterchainGasPayment, Mailbox, MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, + ValidatorAnnounce, H256, }; use crate::{ settings::{chains::ChainConf, trace::TracingConfig}, - ContractSync, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MessageContractSync, + ContractSync, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync, WatermarkContractSync, }; @@ -183,7 +184,7 @@ impl Settings { build_contract_fns!(build_validator_announce, build_validator_announces -> dyn ValidatorAnnounce); build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider); build_indexer_fns!(build_delivery_indexer, build_delivery_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneMessageStore, MessageContractSync); + build_indexer_fns!(build_message_indexer, build_message_indexers -> dyn HyperlaneSequenceIndexerStore, SequencedDataContractSync); build_indexer_fns!(build_interchain_gas_payment_indexer, build_interchain_gas_payment_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); - build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneWatermarkedLogStore, WatermarkContractSync); + build_indexer_fns!(build_merkle_tree_hook_indexer, build_merkle_tree_hook_indexers -> dyn HyperlaneSequenceIndexerStore, SequencedDataContractSync); } diff --git a/rust/hyperlane-base/src/settings/chains.rs b/rust/hyperlane-base/src/settings/chains.rs index f974622612..6764b4b4f9 100644 --- a/rust/hyperlane-base/src/settings/chains.rs +++ b/rust/hyperlane-base/src/settings/chains.rs @@ -387,9 +387,12 @@ impl ChainConf { Ok(indexer as Box>) } ChainConnectionConf::Cosmos(conf) => { + let signer = self.cosmos_signer().await.context(ctx)?; let indexer = Box::new(h_cosmos::CosmosMerkleTreeHookIndexer::new( conf.clone(), locator, + // TODO: remove signer requirement entirely + signer.unwrap().clone(), self.reorg_period, )); Ok(indexer as Box>) diff --git a/rust/hyperlane-core/src/traits/db.rs b/rust/hyperlane-core/src/traits/db.rs index 50f9960e04..3fbebf52db 100644 --- a/rust/hyperlane-core/src/traits/db.rs +++ b/rust/hyperlane-core/src/traits/db.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use auto_impl::auto_impl; use eyre::Result; -use crate::{HyperlaneMessage, LogMeta}; +use crate::LogMeta; /// Interface for a HyperlaneLogStore that ingests logs. #[async_trait] @@ -15,14 +15,26 @@ pub trait HyperlaneLogStore: Send + Sync + Debug { async fn store_logs(&self, logs: &[(T, LogMeta)]) -> Result; } -/// Extension of HyperlaneLogStore trait that supports getting the block number at which a known message was dispatched. +/// A sequence is a monotonically increasing number that is incremented every time a message ID is indexed. +/// E.g. for Mailbox indexing, this is equal to the message nonce, and for merkle tree hook indexing, this +/// is equal to the leaf index. +pub trait Sequenced: 'static + Send + Sync { + /// The sequence of this sequenced type. + fn sequence(&self) -> u32; +} + +/// Extension of HyperlaneLogStore trait that supports indexed sequenced data. #[async_trait] #[auto_impl(&, Box, Arc)] -pub trait HyperlaneMessageStore: HyperlaneLogStore { - /// Gets a message by nonce. - async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result>; - /// Gets the block number at which a message was dispatched. - async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result>; +pub trait HyperlaneSequenceIndexerStore: HyperlaneLogStore +where + T: Send + Sync, +{ + /// Gets data by its sequence. + async fn retrieve_by_sequence(&self, sequence: u32) -> Result>; + + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number(&self, nonce: u32) -> Result>; } /// Extension of HyperlaneLogStore trait that supports a high watermark for the highest indexed block number. @@ -31,6 +43,7 @@ pub trait HyperlaneMessageStore: HyperlaneLogStore { pub trait HyperlaneWatermarkedLogStore: HyperlaneLogStore { /// Gets the block number high watermark async fn retrieve_high_watermark(&self) -> Result>; + /// Stores the block number high watermark async fn store_high_watermark(&self, block_number: u32) -> Result<()>; } diff --git a/rust/hyperlane-core/src/types/merkle_tree.rs b/rust/hyperlane-core/src/types/merkle_tree.rs index 1a5a4058f0..2e9020043d 100644 --- a/rust/hyperlane-core/src/types/merkle_tree.rs +++ b/rust/hyperlane-core/src/types/merkle_tree.rs @@ -1,7 +1,7 @@ use derive_new::new; use std::io::{Read, Write}; -use crate::{Decode, Encode, HyperlaneProtocolError, H256}; +use crate::{Decode, Encode, HyperlaneProtocolError, Sequenced, H256}; /// Merkle Tree Hook insertion event #[derive(Debug, Copy, Clone, new)] @@ -22,6 +22,12 @@ impl MerkleTreeInsertion { } } +impl Sequenced for MerkleTreeInsertion { + fn sequence(&self) -> u32 { + self.leaf_index + } +} + impl Encode for MerkleTreeInsertion { fn write_to(&self, writer: &mut W) -> std::io::Result where diff --git a/rust/hyperlane-core/src/types/message.rs b/rust/hyperlane-core/src/types/message.rs index 1faec8436d..4ebaffc1cd 100644 --- a/rust/hyperlane-core/src/types/message.rs +++ b/rust/hyperlane-core/src/types/message.rs @@ -2,7 +2,7 @@ use sha3::{digest::Update, Digest, Keccak256}; use std::fmt::{Debug, Display, Formatter}; use crate::utils::{fmt_address_for_domain, fmt_domain}; -use crate::{Decode, Encode, HyperlaneProtocolError, H256}; +use crate::{Decode, Encode, HyperlaneProtocolError, Sequenced, H256}; const HYPERLANE_MESSAGE_PREFIX_LEN: usize = 77; @@ -39,6 +39,12 @@ pub struct HyperlaneMessage { pub body: Vec, } +impl Sequenced for HyperlaneMessage { + fn sequence(&self) -> u32 { + self.nonce + } +} + impl Debug for HyperlaneMessage { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(