Skip to content

Commit

Permalink
FallbackRwLockWriteGuard works
Browse files Browse the repository at this point in the history
  • Loading branch information
tkporter committed Nov 7, 2023
1 parent 453734a commit 419eceb
Showing 1 changed file with 68 additions and 6 deletions.
74 changes: 68 additions & 6 deletions rust/chains/hyperlane-cosmos/src/providers/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::ops::{Deref, DerefMut};

use async_trait::async_trait;
use cosmrs::proto::cosmos::auth::v1beta1::BaseAccount;
use cosmrs::proto::cosmos::auth::v1beta1::{
Expand All @@ -16,13 +18,13 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
QuerySmartContractStateRequest,
};
use cosmrs::proto::traits::Message;
use tokio::sync::{RwLock, RwLockWriteGuard, TryLockError};

use cosmrs::tx::{self, Fee, MessageExt, SignDoc, SignerInfo};
use cosmrs::{Amount, Coin};
use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, HyperlaneDomain, H256, U256,
};
use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, H256, U256};
use serde::Serialize;
use tonic::transport::{Channel, Endpoint};

use crate::{signers::Signer, ConnectionConf};
use crate::{verify, HyperlaneCosmosError};
Expand Down Expand Up @@ -81,23 +83,72 @@ pub trait WasmProvider: Send + Sync {
) -> ChainResult<SimulateResponse>;
}

#[derive(Debug)]
struct FallbackRwLock<T: ?Sized>(RwLock<T>);

enum FallbackRwLockWriteGuard<'a, T: ?Sized> {
RwLock(RwLockWriteGuard<'a, T>),
Fallback(Box<T>),
}

impl<T> FallbackRwLock<T> {
fn new(value: T) -> Self {
Self(RwLock::new(value))
}

fn try_write_or(
&self,
fallback: impl FnOnce() -> T,
) -> Result<FallbackRwLockWriteGuard<T>, TryLockError> {
self.0
.try_write()
.map(FallbackRwLockWriteGuard::RwLock)
.or_else(|_| Ok(FallbackRwLockWriteGuard::Fallback(Box::new(fallback()))))
}
}

impl<T> Deref for FallbackRwLockWriteGuard<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
match self {
Self::RwLock(guard) => guard.deref(),
Self::Fallback(value) => value.deref(),
}
}
}

impl<T> DerefMut for FallbackRwLockWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Self::RwLock(guard) => guard.deref_mut(),
Self::Fallback(value) => value.deref_mut(),
}
}
}

#[derive(Debug)]
/// Cosmwasm GRPC Provider
pub struct WasmGrpcProvider {
conf: ConnectionConf,
_domain: HyperlaneDomain,
address: H256,
signer: Signer,
client: FallbackRwLock<ServiceClient<Channel>>,
grpc_endpoint: Endpoint,
}

impl WasmGrpcProvider {
/// create new Cosmwasm GRPC Provider
pub fn new(conf: ConnectionConf, locator: ContractLocator, signer: Signer) -> Self {
let endpoint = Endpoint::new(conf.get_grpc_url()).unwrap();
let client = FallbackRwLock::new(ServiceClient::new(endpoint.clone().connect_lazy()));

Self {
conf,
_domain: locator.domain.clone(),
address: locator.address,
signer,
client,
grpc_endpoint: endpoint,
}
}

Expand All @@ -110,10 +161,21 @@ impl WasmGrpcProvider {
}
}

impl WasmGrpcProvider {
fn get_client(&self) -> ChainResult<FallbackRwLockWriteGuard<ServiceClient<Channel>>> {
// try to get a write lock on self.client, otherwise create a new client altogether
let endpoint = self.grpc_endpoint.clone();

self.client
.try_write_or(|| ServiceClient::new(Endpoint::new(endpoint).unwrap().connect_lazy()))
.map_err(ChainCommunicationError::from_other)
}
}

#[async_trait]
impl WasmProvider for WasmGrpcProvider {
async fn latest_block_height(&self) -> ChainResult<u64> {
let mut client = ServiceClient::connect(self.get_conn_url()?).await?;
let mut client = self.get_client()?; //ServiceClient::connect(self.get_conn_url()?).await?;
let request = tonic::Request::new(GetLatestBlockRequest {});

let response = client
Expand Down

0 comments on commit 419eceb

Please sign in to comment.