Skip to content

Commit

Permalink
Fix lag issue with Mailbox message sequence_and_tip (#2879)
Browse files Browse the repository at this point in the history
### Description

The old implementation of the `sequence_and_tip` fn in mailbox.rs used
to look like this, where it'd end up passing the tip as the lag!
```
let sequence = match NonZeroU64::new(tip as u64) {
            None => None,
            Some(n) => Some(self.nonce(Some(n)).await?),
        };
```

Shuffled things around where we correctly just specify the `tip` as the
block number instead of a lag. This means that all functions that were
calling `wasm_query` with the lag needed to start getting the lagged
block num instead

Sometimes I believe this would create a bug where we'd ask for a block
height 1 and get back RPC errors. I believe most of the time we'd ask
for a block height of 0, in which case it'd just act like we were asking
for the tip

### Drive-by changes

n/a

### Related issues

n/a

### Backward compatibility

n/a

### Testing

plan to roll it out once I get an image
  • Loading branch information
tkporter authored Oct 31, 2023
1 parent 7ea79d9 commit 3cf1d56
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 105 deletions.
1 change: 1 addition & 0 deletions rust/chains/hyperlane-cosmos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod providers;
mod routing_ism;
mod signers;
mod trait_builder;
mod utils;
mod validator_announce;

pub use self::{
Expand Down
98 changes: 30 additions & 68 deletions rust/chains/hyperlane-cosmos/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::payloads::mailbox::{
use crate::payloads::{general, mailbox};
use crate::rpc::{CosmosWasmIndexer, WasmIndexer};
use crate::CosmosProvider;
use crate::{signers::Signer, verify, ConnectionConf};
use crate::{signers::Signer, utils::get_block_height_for_lag, verify, ConnectionConf};
use async_trait::async_trait;

use cosmrs::proto::cosmos::base::abci::v1beta1::TxResponse;
Expand Down Expand Up @@ -40,8 +40,7 @@ impl CosmosMailbox {
/// Create a reference to a mailbox at a specific Ethereum address on some
/// chain
pub fn new(conf: ConnectionConf, locator: ContractLocator, signer: Signer) -> Self {
let provider: WasmGrpcProvider =
WasmGrpcProvider::new(conf.clone(), locator.clone(), signer.clone());
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer.clone());

Self {
_conf: conf,
Expand Down Expand Up @@ -80,22 +79,8 @@ impl Debug for CosmosMailbox {
impl Mailbox for CosmosMailbox {
#[instrument(level = "debug", err, ret, skip(self))]
async fn count(&self, lag: Option<NonZeroU64>) -> ChainResult<u32> {
let payload = mailbox::NonceRequest {
nonce: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(GeneralMailboxQuery { mailbox: payload }, lag)
.await;

if let Err(e) = data {
warn!("error: {:?}", e);
return Ok(0);
}

let response: mailbox::NonceResponse = serde_json::from_slice(&data?)?;
Ok(response.nonce)
let block_height = get_block_height_for_lag(&self.provider, lag).await?;
self.nonce_at_block(block_height).await
}

#[instrument(level = "debug", err, ret, skip(self))]
Expand Down Expand Up @@ -222,11 +207,29 @@ impl Mailbox for CosmosMailbox {
}
}

impl CosmosMailbox {
#[instrument(level = "debug", err, ret, skip(self))]
async fn nonce_at_block(&self, block_height: Option<u64>) -> ChainResult<u32> {
let payload = mailbox::NonceRequest {
nonce: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(GeneralMailboxQuery { mailbox: payload }, block_height)
.await?;

let response: mailbox::NonceResponse = serde_json::from_slice(&data)?;

Ok(response.nonce)
}
}

/// Struct that retrieves event data for a Cosmos Mailbox contract
#[derive(Debug)]
pub struct CosmosMailboxIndexer {
mailbox: CosmosMailbox,
indexer: Box<CosmosWasmIndexer>,
provider: Box<WasmGrpcProvider>,
}

impl CosmosMailboxIndexer {
Expand All @@ -238,13 +241,12 @@ impl CosmosMailboxIndexer {
signer: Signer,
event_type: String,
) -> Self {
let indexer: CosmosWasmIndexer =
CosmosWasmIndexer::new(conf.clone(), locator.clone(), event_type.clone());
let provider: WasmGrpcProvider = WasmGrpcProvider::new(conf, locator, signer);
let mailbox = CosmosMailbox::new(conf.clone(), locator.clone(), signer.clone());
let indexer: CosmosWasmIndexer = CosmosWasmIndexer::new(conf, locator, event_type);

Self {
mailbox,
indexer: Box::new(indexer),
provider: Box::new(provider),
}
}

Expand Down Expand Up @@ -280,37 +282,6 @@ impl CosmosMailboxIndexer {
None
}
}

#[instrument(level = "debug", err, ret, skip(self))]
async fn count(&self, lag: Option<NonZeroU64>) -> ChainResult<u32> {
let payload = mailbox::NonceRequest {
nonce: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(GeneralMailboxQuery { mailbox: payload }, lag)
.await?;
let response: mailbox::NonceResponse = serde_json::from_slice(&data)?;

Ok(response.nonce)
}

#[instrument(level = "debug", err, ret, skip(self))]
async fn nonce(&self, lag: Option<NonZeroU64>) -> ChainResult<u32> {
let payload = mailbox::NonceRequest {
nonce: general::EmptyStruct {},
};

let data = self
.provider
.wasm_query(GeneralMailboxQuery { mailbox: payload }, lag)
.await?;

let response: mailbox::NonceResponse = serde_json::from_slice(&data)?;

Ok(response.nonce)
}
}

#[async_trait]
Expand Down Expand Up @@ -350,29 +321,20 @@ impl Indexer<H256> for CosmosMailboxIndexer {
#[async_trait]
impl SequenceIndexer<H256> for CosmosMailboxIndexer {
async fn sequence_and_tip(&self) -> ChainResult<(Option<u32>, u32)> {
// TODO: implement when cosmos scraper support is implemented
let tip = self.indexer.latest_block_height().await?;

let sequence = match NonZeroU64::new(tip as u64) {
None => None,
Some(n) => Some(self.nonce(Some(n)).await?),
};

Ok((sequence, tip))
// No sequence for message deliveries.
Ok((None, tip))
}
}

#[async_trait]
impl SequenceIndexer<HyperlaneMessage> for CosmosMailboxIndexer {
async fn sequence_and_tip(&self) -> ChainResult<(Option<u32>, u32)> {
// TODO: implement when cosmos scraper support is implemented
let tip = self.indexer.latest_block_height().await?;

let sequence = match NonZeroU64::new(tip as u64) {
None => None,
Some(n) => Some(self.nonce(Some(n)).await?),
};
let sequence = self.mailbox.nonce_at_block(Some(tip.into())).await?;

Ok((sequence, tip))
Ok((Some(sequence), tip))
}
}
13 changes: 10 additions & 3 deletions rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
merkle_tree_hook,
},
rpc::{CosmosWasmIndexer, WasmIndexer},
utils::get_block_height_for_lag,
ConnectionConf, CosmosProvider, Signer,
};

Expand Down Expand Up @@ -75,13 +76,15 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
tree: general::EmptyStruct {},
};

let block_height = get_block_height_for_lag(&self.provider, lag).await?;

let data = self
.provider
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
},
lag,
block_height,
)
.await?;
let response: merkle_tree_hook::MerkleTreeResponse = serde_json::from_slice(&data)?;
Expand All @@ -108,13 +111,15 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
count: general::EmptyStruct {},
};

let block_height = get_block_height_for_lag(&self.provider, lag).await?;

let data = self
.provider
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
},
lag,
block_height,
)
.await?;
let response: merkle_tree_hook::MerkleTreeCountResponse = serde_json::from_slice(&data)?;
Expand All @@ -128,13 +133,15 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
check_point: general::EmptyStruct {},
};

let block_height = get_block_height_for_lag(&self.provider, lag).await?;

let data = self
.provider
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
},
lag,
block_height,
)
.await?;
let response: merkle_tree_hook::CheckPointResponse = serde_json::from_slice(&data)?;
Expand Down
41 changes: 7 additions & 34 deletions rust/chains/hyperlane-cosmos/src/providers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, HyperlaneDomain, H256, U256,
};
use serde::Serialize;
use std::num::NonZeroU64;

use crate::verify;
use crate::{signers::Signer, ConnectionConf};
Expand All @@ -42,15 +41,15 @@ pub trait WasmProvider: Send + Sync {
async fn wasm_query<T: Serialize + Sync + Send>(
&self,
payload: T,
maybe_lag: Option<NonZeroU64>,
block_height: Option<u64>,
) -> ChainResult<Vec<u8>>;

/// query to specific contract address
async fn wasm_query_to<T: Serialize + Sync + Send>(
&self,
to: String,
payload: T,
maybe_lag: Option<NonZeroU64>,
block_height: Option<u64>,
) -> ChainResult<Vec<u8>>;

/// query account info
Expand Down Expand Up @@ -133,41 +132,19 @@ impl WasmProvider for WasmGrpcProvider {
Ok(height as u64)
}

async fn wasm_query<T>(&self, payload: T, maybe_lag: Option<NonZeroU64>) -> ChainResult<Vec<u8>>
async fn wasm_query<T>(&self, payload: T, block_height: Option<u64>) -> ChainResult<Vec<u8>>
where
T: Serialize + Send + Sync,
{
let mut client = WasmQueryClient::connect(self.get_conn_url()?).await?;

let mut request = tonic::Request::new(QuerySmartContractStateRequest {
address: self.get_contract_addr()?,
query_data: serde_json::to_string(&payload)?.as_bytes().to_vec(),
});

if let Some(lag) = maybe_lag {
let height = self.latest_block_height().await?;
let height = height.saturating_sub(lag.get());

request
.metadata_mut()
.insert("x-cosmos-block-height", height.into());
}

let response = client
.smart_contract_state(request)
self.wasm_query_to(self.get_contract_addr()?, payload, block_height)
.await
.map_err(ChainCommunicationError::from_other)?
.into_inner();

// TODO: handle query to specific block number
Ok(response.data)
}

async fn wasm_query_to<T>(
&self,
to: String,
payload: T,
maybe_lag: Option<NonZeroU64>,
block_height: Option<u64>,
) -> ChainResult<Vec<u8>>
where
T: Serialize + Send + Sync,
Expand All @@ -178,13 +155,10 @@ impl WasmProvider for WasmGrpcProvider {
query_data: serde_json::to_string(&payload)?.as_bytes().to_vec(),
});

if let Some(lag) = maybe_lag {
let height = self.latest_block_height().await?;
let height = height.saturating_sub(lag.get());

if let Some(block_height) = block_height {
request
.metadata_mut()
.insert("x-cosmos-block-height", height.into());
.insert("x-cosmos-block-height", block_height.into());
}

let response = client
Expand All @@ -193,7 +167,6 @@ impl WasmProvider for WasmGrpcProvider {
.map_err(ChainCommunicationError::from_other)?
.into_inner();

// TODO: handle query to specific block number
Ok(response.data)
}

Expand Down
23 changes: 23 additions & 0 deletions rust/chains/hyperlane-cosmos/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::num::NonZeroU64;

use crate::grpc::{WasmGrpcProvider, WasmProvider};
use hyperlane_core::ChainResult;

/// Given a lag, returns the block height at the moment.
/// If the lag is None, a block height of None is given, indicating that the
/// tip directly can be used.
pub(crate) async fn get_block_height_for_lag(
provider: &WasmGrpcProvider,
lag: Option<NonZeroU64>,
) -> ChainResult<Option<u64>> {
let block_height = match lag {
Some(lag) => {
let tip = provider.latest_block_height().await?;
let block_height = tip - lag.get();
Some(block_height)
}
None => None,
};

Ok(block_height)
}

0 comments on commit 3cf1d56

Please sign in to comment.