Skip to content

Commit

Permalink
fix event query rpc (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
seunlanlege authored Mar 11, 2024
1 parent 233c76b commit 24c36d3
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 50 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
name: Docker Release

on:
workflow_dispatch:
push:
tags:
- '**[0-9]+.[0-9]+.[0-9]+*'

concurrency:
group: release-${{ github.ref }}
Expand Down
7 changes: 2 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions evm/abi/src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ impl TryFrom<EvmHostEvents> for ismp::events::Event {
timeout_timestamp: resp.res_timeout_timestamp.low_u64(),
gas_limit: resp.res_gaslimit.low_u64(),
})),
EvmHostEvents::PostRequestHandledFilter(handled) =>
Ok(ismp::events::Event::PostRequestHandled(ismp::events::PostRequestHandled {
commitment: handled.commitment.into(),
relayer: handled.relayer.as_bytes().to_vec(),
})),
event => Err(anyhow!("Unsupported event {event:?}")),
}
}
Expand Down
5 changes: 1 addition & 4 deletions modules/client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hyperclient"
version = "0.2.1"
version = "0.2.2"
edition = "2021"
description = "The hyperclient is a library for managing (in-flight) ISMP requests"
repository = "https://github.com/polytope-labs/hyperbridge"
Expand All @@ -18,13 +18,10 @@ hex-literal = { version = "0.4.1" }
serde-wasm-bindgen = { version = "0.6.3", default-features = false }
serde = { version = "1.0.196", features = ["derive"], default-features = false }
wasm-bindgen-futures = "0.4.40"
tiny-keccak = { version = "2.0.2", features = ["keccak"] }
codec = { package = "parity-scale-codec", version = "3.1.3", default-features = false }
async-trait = { version = "0.1.53", default-features = false }
futures = "0.3.30"
wasm-streams = "0.4.0"
tokio = { version = "1.35.1", features = ["macros"] }
reqwest = { version = "0.11.23", features = ["json"] }
wasm-bindgen-test = "0.3.41"
js-sys = "0.3.68"
web-sys = "0.3.68"
Expand Down
30 changes: 14 additions & 16 deletions modules/client/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,12 @@ pub async fn timeout_request_stream(
}
};

let response = lambda().await;
match response {
Ok(res) => res,
Err(e) => Some((Err(anyhow!("Encountered an error in stream {e:?}")), state)),
}
lambda().await.unwrap_or_else(|e| {
Some((
Err(anyhow!("Encountered an error in stream {e:?}")),
TimeoutStreamState::End,
))
})
}
});

Expand Down Expand Up @@ -496,16 +497,17 @@ pub async fn request_status_stream(
},
PostStreamState::HyperbridgeFinalized(finalized_height) => {
let res = dest_client.query_request_receipt(hash).await?;
let request_commitment =
hash_request::<Keccak256>(&Request::Post(post.clone()));
if res != H160::zero() {
let latest_height = dest_client.query_latest_block_height().await?;
let meta = dest_client
.query_ismp_event(finalized_height..=latest_height)
.await?
.into_iter()
.find_map(|event| match event.event {
Event::PostRequest(post_event)
if post.source == post_event.source &&
post.nonce == post_event.nonce =>
Event::PostRequestHandled(handled)
if handled.commitment == request_commitment =>
Some(event.meta),
_ => None,
})
Expand Down Expand Up @@ -539,14 +541,10 @@ pub async fn request_status_stream(
}
};

let response = lambda().await;
match response {
Ok(res) => res,
Err(e) => Some((
Err(anyhow!("Encountered an error in stream {e:?}")),
post_request_status,
)),
}
// terminate the stream once an error is encountered
lambda().await.unwrap_or_else(|e| {
Some((Err(anyhow!("Encountered an error in stream {e:?}")), PostStreamState::End))
})
}
});

Expand Down
8 changes: 4 additions & 4 deletions modules/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ pub async fn query_response_status(
/// calldata: Vec<u8>,
/// }
///
/// // An error was encountered in the stream, errors are recoverable so it's safe to continue
/// polling. interface Error {
/// // An error was encountered in the stream, the stream will come to an end.
/// interface Error {
/// kind: "Error";
/// // error description
/// description: string
Expand Down Expand Up @@ -247,8 +247,8 @@ pub async fn timeout_post_request(
/// kind: "Timeout";
/// }
///
/// // An error was encountered in the stream, errors are recoverable so it's safe to continue
/// polling. interface Error {
/// // An error was encountered in the stream, the stream will come to an end.
/// interface Error {
/// kind: "Error";
/// // error description
/// description: string
Expand Down
10 changes: 2 additions & 8 deletions modules/client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use alloc::collections::BTreeMap;
use anyhow::anyhow;
use codec::Encode;
use core::pin::Pin;
use ethers::types::H160;
use ethers::{types::H160, utils::keccak256};
use futures::Stream;
use ismp::{consensus::ConsensusStateId, host::StateMachine};
use serde::{Deserialize, Serialize};
Expand All @@ -30,13 +30,7 @@ pub struct KeccakHasher;
impl Hasher for KeccakHasher {
type Output = H256;
fn hash(s: &[u8]) -> Self::Output {
use tiny_keccak::Hasher;

let mut keccak = tiny_keccak::Keccak::v256();
let mut output = H256::default();
keccak.update(s);
keccak.finalize(&mut output[..]);
output
keccak256(s).into()
}
}

Expand Down
11 changes: 6 additions & 5 deletions modules/client/tests/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ async fn subscribe_to_request_status() -> Result<(), anyhow::Error> {
};

let hyperbrige_config = SubstrateConfig {
// rpc_url: "wss://hyperbridge-gargantua-rpc.blockops.network:443".to_string(),
rpc_url: "ws://127.0.0.1:9944".to_string(),
rpc_url: "wss://hyperbridge-gargantua-rpc.blockops.network:443".to_string(),
// rpc_url: "ws://127.0.0.1:9944".to_string(),
consensus_state_id: *b"PARA",
hash_algo: HashAlgorithm::Keccak,
};
Expand Down Expand Up @@ -162,7 +162,8 @@ async fn subscribe_to_request_status() -> Result<(), anyhow::Error> {
tracing::info!("Got Status {:?}", status);
},
Err(e) => {
tracing::info!("Error: {e:?}")
tracing::info!("Error: {e:?}");
Err(e)?
},
}
}
Expand Down Expand Up @@ -195,8 +196,8 @@ async fn test_timeout_request() -> Result<(), anyhow::Error> {
};

let hyperbrige_config = SubstrateConfig {
// rpc_url: "wss://hyperbridge-gargantua-rpc.blockops.network:443".to_string(),
rpc_url: "ws://127.0.0.1:9944".to_string(),
rpc_url: "wss://hyperbridge-gargantua-rpc.blockops.network:443".to_string(),
// rpc_url: "ws://127.0.0.1:9944".to_string(),
consensus_state_id: *b"PARA",
hash_algo: HashAlgorithm::Keccak,
};
Expand Down
13 changes: 13 additions & 0 deletions modules/ismp/core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::{
consensus::StateMachineId,
router::{Get, Post, PostResponse},
};
use alloc::vec::Vec;
use codec::{Decode, Encode};
use primitive_types::H256;
use scale_info::TypeInfo;

/// Emitted when a state machine is successfully updated to a new height after the challenge period
Expand All @@ -17,6 +19,15 @@ pub struct StateMachineUpdated {
pub latest_height: u64,
}

/// Emitted when a post request is successfully handled.
#[derive(Clone, Debug, TypeInfo, Encode, Decode, serde::Deserialize, serde::Serialize)]
pub struct PostRequestHandled {
/// The commitment to the request
pub commitment: H256,
/// The address of the relayer responsible for relaying the request
pub relayer: Vec<u8>,
}

/// This represents events that should be emitted by ismp-rs wrappers
#[derive(Clone, Debug, TypeInfo, Encode, Decode, serde::Deserialize, serde::Serialize)]
pub enum Event {
Expand All @@ -29,4 +40,6 @@ pub enum Event {
PostResponse(PostResponse),
/// An event that is emitted when a get request is dispatched
GetRequest(Get),
/// Emitted when a post request is handled
PostRequestHandled(PostRequestHandled),
}
10 changes: 4 additions & 6 deletions modules/ismp/pallet/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,9 @@ where
.map_err(|e| runtime_error_into_rpc_error(e.to_string()))?
.ok_or_else(|| runtime_error_into_rpc_error("Invalid block number or hash provided"))?;

let mut api = self.client.runtime_api();
api.register_extension(OffchainDbExt::new(self.offchain_db.clone()));

while header.number() >= from_block.number() {
let mut api = self.client.runtime_api();
api.register_extension(OffchainDbExt::new(self.offchain_db.clone()));
let at = header.hash();

let mut request_commitments = vec![];
Expand Down Expand Up @@ -432,10 +431,9 @@ where
.map_err(|e| runtime_error_into_rpc_error(e.to_string()))?
.ok_or_else(|| runtime_error_into_rpc_error("Invalid block number or hash provided"))?;

let mut api = self.client.runtime_api();
api.register_extension(OffchainDbExt::new(self.offchain_db.clone()));

while header.number() >= from_block.number() {
let mut api = self.client.runtime_api();
api.register_extension(OffchainDbExt::new(self.offchain_db.clone()));
let at = header.hash();

let block_events = api.block_events_with_metadata(at).map_err(|e| {
Expand Down
2 changes: 1 addition & 1 deletion parachain/node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hyperbridge"
version = "0.3.6"
version = "0.3.7"
authors = ["Polytope Labs <hello@polytope.technology>"]
description = "The Hyperbridge coprocessor node"
repository = "https://github.com/polytope-labs/hyperbridge"
Expand Down

0 comments on commit 24c36d3

Please sign in to comment.