Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index merkle tree insertions just like messages #2886

Merged
16 changes: 10 additions & 6 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MessageContractSync,
WatermarkContractSync,
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
};
use hyperlane_core::{HyperlaneDomain, InterchainGasPayment, MerkleTreeInsertion, U256};
use tokio::{
sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -50,15 +52,15 @@ pub struct Relayer {
destination_chains: HashSet<HyperlaneDomain>,
#[as_ref]
core: HyperlaneAgentCore,
message_syncs: HashMap<HyperlaneDomain, Arc<MessageContractSync>>,
message_syncs: HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<HyperlaneMessage>>>,
interchain_gas_payment_syncs:
HashMap<HyperlaneDomain, Arc<WatermarkContractSync<InterchainGasPayment>>>,
/// Context data for each (origin, destination) chain pair a message can be
/// sent between
msg_ctxs: HashMap<ContextKey, Arc<MessageContext>>,
prover_syncs: HashMap<HyperlaneDomain, Arc<RwLock<MerkleTreeBuilder>>>,
merkle_tree_hook_syncs:
HashMap<HyperlaneDomain, Arc<WatermarkContractSync<MerkleTreeInsertion>>>,
HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<MerkleTreeInsertion>>>,
dbs: HashMap<HyperlaneDomain, HyperlaneRocksDB>,
whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
Expand Down Expand Up @@ -314,7 +316,9 @@ impl Relayer {
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();
let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone();
let cursor = contract_sync.rate_limited_cursor(index_settings).await;
let cursor = contract_sync
.forward_backward_message_sync_cursor(index_settings)
.await;
tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await })
.instrument(info_span!("ContractSync"))
}
Expand Down
19 changes: 9 additions & 10 deletions rust/agents/scraper/src/chain_scraper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use eyre::Result;
use hyperlane_base::settings::IndexSettings;
use hyperlane_core::{
unwrap_or_none_result, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore,
HyperlaneMessage, HyperlaneMessageStore, HyperlaneProvider, HyperlaneWatermarkedLogStore,
InterchainGasPayment, LogMeta, H256,
HyperlaneMessage, HyperlaneProvider, HyperlaneSequenceIndexerStore,
HyperlaneWatermarkedLogStore, InterchainGasPayment, LogMeta, H256,
};
use itertools::Itertools;
use tracing::trace;
Expand Down Expand Up @@ -370,23 +370,22 @@ impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneSqlDb {
}

#[async_trait]
impl HyperlaneMessageStore for HyperlaneSqlDb {
/// Gets a message by nonce.
async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result<Option<HyperlaneMessage>> {
impl HyperlaneSequenceIndexerStore<HyperlaneMessage> for HyperlaneSqlDb {
/// Gets a message by its nonce.
async fn retrieve_by_sequence(&self, sequence: u32) -> Result<Option<HyperlaneMessage>> {
let message = self
.db
.retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, nonce)
.retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, sequence)
.await?;
Ok(message)
}

/// Retrieves the block number at which the message with the provided nonce
/// was dispatched.
async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result<Option<u64>> {
/// Gets the block number at which the log occurred.
async fn retrieve_log_block_number(&self, sequence: u32) -> Result<Option<u64>> {
unwrap_or_none_result!(
tx_id,
self.db
.retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, nonce)
.retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, sequence)
.await?
);
unwrap_or_none_result!(block_id, self.db.retrieve_block_id(tx_id).await?);
Expand Down
8 changes: 5 additions & 3 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
WatermarkContractSync,
SequencedDataContractSync,
};

use hyperlane_core::{
Expand All @@ -33,7 +33,7 @@ pub struct Validator {
#[as_ref]
core: HyperlaneAgentCore,
db: HyperlaneRocksDB,
merkle_tree_hook_sync: Arc<WatermarkContractSync<MerkleTreeInsertion>>,
merkle_tree_hook_sync: Arc<SequencedDataContractSync<MerkleTreeInsertion>>,
mailbox: Arc<dyn Mailbox>,
merkle_tree_hook: Arc<dyn MerkleTreeHook>,
validator_announce: Arc<dyn ValidatorAnnounce>,
Expand Down Expand Up @@ -154,7 +154,9 @@ impl Validator {
let index_settings =
self.as_ref().settings.chains[self.origin_chain.name()].index_settings();
let contract_sync = self.merkle_tree_hook_sync.clone();
let cursor = contract_sync.rate_limited_cursor(index_settings).await;
let cursor = contract_sync
.forward_backward_message_sync_cursor(index_settings)
.await;
tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await })
.instrument(info_span!("MerkleTreeHookSyncer"))
}
Expand Down
62 changes: 44 additions & 18 deletions rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,14 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
}

/// Gets the current leaf count of the merkle tree
#[instrument(level = "debug", err, ret, skip(self))]
tkporter marked this conversation as resolved.
Show resolved Hide resolved
async fn count(&self, lag: Option<NonZeroU64>) -> ChainResult<u32> {
let payload = merkle_tree_hook::MerkleTreeCountRequest {
count: general::EmptyStruct {},
};

let block_height = get_block_height_for_lag(&self.provider, lag).await?;

let data = self
.provider
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
},
block_height,
)
.await?;
let response: merkle_tree_hook::MerkleTreeCountResponse = serde_json::from_slice(&data)?;

Ok(response.count)
self.count_at_block(block_height).await
}

#[instrument(level = "debug", err, ret, skip(self))]
Expand Down Expand Up @@ -154,24 +142,58 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
}
}

impl CosmosMerkleTreeHook {
#[instrument(level = "debug", err, ret, skip(self))]
async fn count_at_block(&self, block_height: Option<u64>) -> ChainResult<u32> {
let payload = merkle_tree_hook::MerkleTreeCountRequest {
count: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
},
block_height,
)
.await?;
let response: merkle_tree_hook::MerkleTreeCountResponse = serde_json::from_slice(&data)?;

Ok(response.count)
}
}

// ------------------ Indexer ------------------

const EVENT_TYPE: &str = "hpl_hook_merkle::post_dispatch";

#[derive(Debug)]
/// A reference to a MerkleTreeHookIndexer contract on some Cosmos chain
pub struct CosmosMerkleTreeHookIndexer {
/// The CosmosMerkleTreeHook
merkle_tree_hook: CosmosMerkleTreeHook,
/// Cosmwasm indexer instance
indexer: Box<CosmosWasmIndexer>,
}

impl CosmosMerkleTreeHookIndexer {
/// create new Cosmos MerkleTreeHookIndexer agent
pub fn new(conf: ConnectionConf, locator: ContractLocator, reorg_period: u32) -> Self {
let indexer: CosmosWasmIndexer =
CosmosWasmIndexer::new(conf, locator, EVENT_TYPE.to_string(), reorg_period);
pub fn new(
conf: ConnectionConf,
locator: ContractLocator,
signer: Signer,
reorg_period: u32,
) -> Self {
let indexer: CosmosWasmIndexer = CosmosWasmIndexer::new(
conf.clone(),
locator.clone(),
EVENT_TYPE.to_string(),
reorg_period,
);

Self {
merkle_tree_hook: CosmosMerkleTreeHook::new(conf, locator, signer),
indexer: Box::new(indexer),
}
}
Expand Down Expand Up @@ -250,8 +272,12 @@ impl Indexer<MerkleTreeInsertion> for CosmosMerkleTreeHookIndexer {
#[async_trait]
impl SequenceIndexer<MerkleTreeInsertion> for CosmosMerkleTreeHookIndexer {
async fn sequence_and_tip(&self) -> ChainResult<(Option<u32>, u32)> {
// TODO: implement when cosmos scraper support is implemented
let tip = self.get_finalized_block_number().await?;
Ok((None, tip))
let sequence = self
.merkle_tree_hook
.count_at_block(Some(tip.into()))
.await?;

Ok((Some(sequence), tip))
}
}
9 changes: 2 additions & 7 deletions rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,9 @@ where
M: Middleware + 'static,
{
async fn sequence_and_tip(&self) -> ChainResult<(Option<u32>, u32)> {
// The InterchainGasPaymasterIndexerBuilder must return a `SequenceIndexer` type.
// It's fine if only a blanket implementation is provided for EVM chains, since their
// indexing only uses the `Index` trait, which is a supertrait of `SequenceIndexer`.
// TODO: if `SequenceIndexer` turns out to not depend on `Indexer` at all, then the supertrait
tkporter marked this conversation as resolved.
Show resolved Hide resolved
// dependency could be removed, even if the builder would still need to return a type that is both
// ``SequenceIndexer` and `Indexer`.
let tip = self.get_finalized_block_number().await?;
Ok((None, tip))
let sequence = self.contract.count().block(u64::from(tip)).call().await?;
Ok((Some(sequence), tip))
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/config/testnet4_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@
"prefix": "dual",
"index": {
"from": 1,
"chunk": 1000
"chunk": 100000
},
"blocks": {
"reorgPeriod": 1
Expand Down
Loading
Loading