Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lag issue with Mailbox message sequence_and_tip #2879

Merged
merged 4 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/chains/hyperlane-cosmos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod providers;
mod routing_ism;
mod signers;
mod trait_builder;
mod utils;
mod validator_announce;

pub use self::{
Expand Down
98 changes: 30 additions & 68 deletions rust/chains/hyperlane-cosmos/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::payloads::mailbox::{
use crate::payloads::{general, mailbox};
use crate::rpc::{CosmosWasmIndexer, WasmIndexer};
use crate::CosmosProvider;
use crate::{signers::Signer, verify, ConnectionConf};
use crate::{signers::Signer, utils::get_block_height_for_lag, verify, ConnectionConf};
use async_trait::async_trait;

use cosmrs::proto::cosmos::base::abci::v1beta1::TxResponse;
Expand Down Expand Up @@ -40,8 +40,7 @@ impl CosmosMailbox {
/// Create a reference to a mailbox at a specific Ethereum address on some
/// chain
pub fn new(conf: ConnectionConf, locator: ContractLocator, signer: Signer) -> Self {
let provider: WasmGrpcProvider =
WasmGrpcProvider::new(conf.clone(), locator.clone(), signer.clone());
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer.clone());

Self {
_conf: conf,
Expand Down Expand Up @@ -80,22 +79,8 @@ impl Debug for CosmosMailbox {
impl Mailbox for CosmosMailbox {
#[instrument(level = "debug", err, ret, skip(self))]
async fn count(&self, lag: Option<NonZeroU64>) -> ChainResult<u32> {
let payload = mailbox::NonceRequest {
nonce: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(GeneralMailboxQuery { mailbox: payload }, lag)
.await;

if let Err(e) = data {
warn!("error: {:?}", e);
return Ok(0);
}

let response: mailbox::NonceResponse = serde_json::from_slice(&data?)?;
Ok(response.nonce)
let block_height = get_block_height_for_lag(&self.provider, lag).await?;
self.nonce_at_block(block_height).await
}

#[instrument(level = "debug", err, ret, skip(self))]
Expand Down Expand Up @@ -222,11 +207,29 @@ impl Mailbox for CosmosMailbox {
}
}

impl CosmosMailbox {
#[instrument(level = "debug", err, ret, skip(self))]
async fn nonce_at_block(&self, block_height: Option<u64>) -> ChainResult<u32> {
let payload = mailbox::NonceRequest {
nonce: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(GeneralMailboxQuery { mailbox: payload }, block_height)
.await?;

let response: mailbox::NonceResponse = serde_json::from_slice(&data)?;

Ok(response.nonce)
}
}

/// Struct that retrieves event data for a Cosmos Mailbox contract
#[derive(Debug)]
pub struct CosmosMailboxIndexer {
mailbox: CosmosMailbox,
indexer: Box<CosmosWasmIndexer>,
provider: Box<WasmGrpcProvider>,
}

impl CosmosMailboxIndexer {
Expand All @@ -238,13 +241,12 @@ impl CosmosMailboxIndexer {
signer: Signer,
event_type: String,
) -> Self {
let indexer: CosmosWasmIndexer =
CosmosWasmIndexer::new(conf.clone(), locator.clone(), event_type.clone());
let provider: WasmGrpcProvider = WasmGrpcProvider::new(conf, locator, signer);
let mailbox = CosmosMailbox::new(conf.clone(), locator.clone(), signer.clone());
let indexer: CosmosWasmIndexer = CosmosWasmIndexer::new(conf, locator, event_type);

Self {
mailbox,
indexer: Box::new(indexer),
provider: Box::new(provider),
}
}

Expand Down Expand Up @@ -280,37 +282,6 @@ impl CosmosMailboxIndexer {
None
}
}

#[instrument(level = "debug", err, ret, skip(self))]
async fn count(&self, lag: Option<NonZeroU64>) -> ChainResult<u32> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were these not used at all?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no 😛

let payload = mailbox::NonceRequest {
nonce: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(GeneralMailboxQuery { mailbox: payload }, lag)
.await?;
let response: mailbox::NonceResponse = serde_json::from_slice(&data)?;

Ok(response.nonce)
}

#[instrument(level = "debug", err, ret, skip(self))]
async fn nonce(&self, lag: Option<NonZeroU64>) -> ChainResult<u32> {
let payload = mailbox::NonceRequest {
nonce: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(GeneralMailboxQuery { mailbox: payload }, lag)
.await?;

let response: mailbox::NonceResponse = serde_json::from_slice(&data)?;

Ok(response.nonce)
}
}

#[async_trait]
Expand Down Expand Up @@ -350,29 +321,20 @@ impl Indexer<H256> for CosmosMailboxIndexer {
#[async_trait]
impl SequenceIndexer<H256> for CosmosMailboxIndexer {
async fn sequence_and_tip(&self) -> ChainResult<(Option<u32>, u32)> {
// TODO: implement when cosmos scraper support is implemented
let tip = self.indexer.latest_block_height().await?;

let sequence = match NonZeroU64::new(tip as u64) {
None => None,
Some(n) => Some(self.nonce(Some(n)).await?),
};

Ok((sequence, tip))
// No sequence for message deliveries.
Ok((None, tip))
}
}

#[async_trait]
impl SequenceIndexer<HyperlaneMessage> for CosmosMailboxIndexer {
async fn sequence_and_tip(&self) -> ChainResult<(Option<u32>, u32)> {
// TODO: implement when cosmos scraper support is implemented
let tip = self.indexer.latest_block_height().await?;

let sequence = match NonZeroU64::new(tip as u64) {
None => None,
Some(n) => Some(self.nonce(Some(n)).await?),
};
let sequence = self.mailbox.nonce_at_block(Some(tip.into())).await?;

Ok((sequence, tip))
Ok((Some(sequence), tip))
}
}
13 changes: 10 additions & 3 deletions rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
merkle_tree_hook,
},
rpc::{CosmosWasmIndexer, WasmIndexer},
utils::get_block_height_for_lag,
ConnectionConf, CosmosProvider, Signer,
};

Expand Down Expand Up @@ -75,13 +76,15 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
tree: general::EmptyStruct {},
};

let block_height = get_block_height_for_lag(&self.provider, lag).await?;

let data = self
.provider
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
},
lag,
block_height,
)
.await?;
let response: merkle_tree_hook::MerkleTreeResponse = serde_json::from_slice(&data)?;
Expand All @@ -108,13 +111,15 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
count: general::EmptyStruct {},
};

let block_height = get_block_height_for_lag(&self.provider, lag).await?;

let data = self
.provider
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
},
lag,
block_height,
)
.await?;
let response: merkle_tree_hook::MerkleTreeCountResponse = serde_json::from_slice(&data)?;
Expand All @@ -128,13 +133,15 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
check_point: general::EmptyStruct {},
};

let block_height = get_block_height_for_lag(&self.provider, lag).await?;

let data = self
.provider
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
},
lag,
block_height,
)
.await?;
let response: merkle_tree_hook::CheckPointResponse = serde_json::from_slice(&data)?;
Expand Down
41 changes: 7 additions & 34 deletions rust/chains/hyperlane-cosmos/src/providers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, HyperlaneDomain, H256, U256,
};
use serde::Serialize;
use std::num::NonZeroU64;

use crate::verify;
use crate::{signers::Signer, ConnectionConf};
Expand All @@ -42,15 +41,15 @@ pub trait WasmProvider: Send + Sync {
async fn wasm_query<T: Serialize + Sync + Send>(
&self,
payload: T,
maybe_lag: Option<NonZeroU64>,
block_height: Option<u64>,
) -> ChainResult<Vec<u8>>;

/// query to specific contract address
async fn wasm_query_to<T: Serialize + Sync + Send>(
&self,
to: String,
payload: T,
maybe_lag: Option<NonZeroU64>,
block_height: Option<u64>,
) -> ChainResult<Vec<u8>>;

/// query account info
Expand Down Expand Up @@ -133,41 +132,19 @@ impl WasmProvider for WasmGrpcProvider {
Ok(height as u64)
}

async fn wasm_query<T>(&self, payload: T, maybe_lag: Option<NonZeroU64>) -> ChainResult<Vec<u8>>
async fn wasm_query<T>(&self, payload: T, block_height: Option<u64>) -> ChainResult<Vec<u8>>
where
T: Serialize + Send + Sync,
{
let mut client = WasmQueryClient::connect(self.get_conn_url()?).await?;

let mut request = tonic::Request::new(QuerySmartContractStateRequest {
address: self.get_contract_addr()?,
query_data: serde_json::to_string(&payload)?.as_bytes().to_vec(),
Comment on lines -140 to -144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good spot

});

if let Some(lag) = maybe_lag {
let height = self.latest_block_height().await?;
let height = height.saturating_sub(lag.get());

request
.metadata_mut()
.insert("x-cosmos-block-height", height.into());
}

let response = client
.smart_contract_state(request)
self.wasm_query_to(self.get_contract_addr()?, payload, block_height)
.await
.map_err(ChainCommunicationError::from_other)?
.into_inner();

// TODO: handle query to specific block number
Ok(response.data)
}

async fn wasm_query_to<T>(
&self,
to: String,
payload: T,
maybe_lag: Option<NonZeroU64>,
block_height: Option<u64>,
) -> ChainResult<Vec<u8>>
where
T: Serialize + Send + Sync,
Expand All @@ -178,13 +155,10 @@ impl WasmProvider for WasmGrpcProvider {
query_data: serde_json::to_string(&payload)?.as_bytes().to_vec(),
});

if let Some(lag) = maybe_lag {
let height = self.latest_block_height().await?;
let height = height.saturating_sub(lag.get());

if let Some(block_height) = block_height {
request
.metadata_mut()
.insert("x-cosmos-block-height", height.into());
.insert("x-cosmos-block-height", block_height.into());
}

let response = client
Expand All @@ -193,7 +167,6 @@ impl WasmProvider for WasmGrpcProvider {
.map_err(ChainCommunicationError::from_other)?
.into_inner();

// TODO: handle query to specific block number
Ok(response.data)
}

Expand Down
23 changes: 23 additions & 0 deletions rust/chains/hyperlane-cosmos/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::num::NonZeroU64;

use crate::grpc::{WasmGrpcProvider, WasmProvider};
use hyperlane_core::ChainResult;

/// Given a lag, returns the block height at the moment.
/// If the lag is None, a block height of None is given, indicating that the
/// tip directly can be used.
pub(crate) async fn get_block_height_for_lag(
provider: &WasmGrpcProvider,
lag: Option<NonZeroU64>,
) -> ChainResult<Option<u64>> {
let block_height = match lag {
Some(lag) => {
let tip = provider.latest_block_height().await?;
let block_height = tip - lag.get();
Some(block_height)
}
None => None,
};

Ok(block_height)
}
Loading