From 419ecebf478bca9b26efbcc72cce91ed173da00d Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Tue, 7 Nov 2023 11:09:58 +0000 Subject: [PATCH] FallbackRwLockWriteGuard works --- .../hyperlane-cosmos/src/providers/grpc.rs | 74 +++++++++++++++++-- 1 file changed, 68 insertions(+), 6 deletions(-) diff --git a/rust/chains/hyperlane-cosmos/src/providers/grpc.rs b/rust/chains/hyperlane-cosmos/src/providers/grpc.rs index 021033e0b2..e000253348 100644 --- a/rust/chains/hyperlane-cosmos/src/providers/grpc.rs +++ b/rust/chains/hyperlane-cosmos/src/providers/grpc.rs @@ -1,3 +1,5 @@ +use std::ops::{Deref, DerefMut}; + use async_trait::async_trait; use cosmrs::proto::cosmos::auth::v1beta1::BaseAccount; use cosmrs::proto::cosmos::auth::v1beta1::{ @@ -16,13 +18,13 @@ use cosmrs::proto::cosmwasm::wasm::v1::{ QuerySmartContractStateRequest, }; use cosmrs::proto::traits::Message; +use tokio::sync::{RwLock, RwLockWriteGuard, TryLockError}; use cosmrs::tx::{self, Fee, MessageExt, SignDoc, SignerInfo}; use cosmrs::{Amount, Coin}; -use hyperlane_core::{ - ChainCommunicationError, ChainResult, ContractLocator, HyperlaneDomain, H256, U256, -}; +use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, H256, U256}; use serde::Serialize; +use tonic::transport::{Channel, Endpoint}; use crate::{signers::Signer, ConnectionConf}; use crate::{verify, HyperlaneCosmosError}; @@ -81,23 +83,72 @@ pub trait WasmProvider: Send + Sync { ) -> ChainResult; } +#[derive(Debug)] +struct FallbackRwLock(RwLock); + +enum FallbackRwLockWriteGuard<'a, T: ?Sized> { + RwLock(RwLockWriteGuard<'a, T>), + Fallback(Box), +} + +impl FallbackRwLock { + fn new(value: T) -> Self { + Self(RwLock::new(value)) + } + + fn try_write_or( + &self, + fallback: impl FnOnce() -> T, + ) -> Result, TryLockError> { + self.0 + .try_write() + .map(FallbackRwLockWriteGuard::RwLock) + .or_else(|_| Ok(FallbackRwLockWriteGuard::Fallback(Box::new(fallback())))) + } +} + +impl Deref for FallbackRwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + match self { + Self::RwLock(guard) => guard.deref(), + Self::Fallback(value) => value.deref(), + } + } +} + +impl DerefMut for FallbackRwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::RwLock(guard) => guard.deref_mut(), + Self::Fallback(value) => value.deref_mut(), + } + } +} + #[derive(Debug)] /// Cosmwasm GRPC Provider pub struct WasmGrpcProvider { conf: ConnectionConf, - _domain: HyperlaneDomain, address: H256, signer: Signer, + client: FallbackRwLock>, + grpc_endpoint: Endpoint, } impl WasmGrpcProvider { /// create new Cosmwasm GRPC Provider pub fn new(conf: ConnectionConf, locator: ContractLocator, signer: Signer) -> Self { + let endpoint = Endpoint::new(conf.get_grpc_url()).unwrap(); + let client = FallbackRwLock::new(ServiceClient::new(endpoint.clone().connect_lazy())); + Self { conf, - _domain: locator.domain.clone(), address: locator.address, signer, + client, + grpc_endpoint: endpoint, } } @@ -110,10 +161,21 @@ impl WasmGrpcProvider { } } +impl WasmGrpcProvider { + fn get_client(&self) -> ChainResult>> { + // try to get a write lock on self.client, otherwise create a new client altogether + let endpoint = self.grpc_endpoint.clone(); + + self.client + .try_write_or(|| ServiceClient::new(Endpoint::new(endpoint).unwrap().connect_lazy())) + .map_err(ChainCommunicationError::from_other) + } +} + #[async_trait] impl WasmProvider for WasmGrpcProvider { async fn latest_block_height(&self) -> ChainResult { - let mut client = ServiceClient::connect(self.get_conn_url()?).await?; + let mut client = self.get_client()?; //ServiceClient::connect(self.get_conn_url()?).await?; let request = tonic::Request::new(GetLatestBlockRequest {}); let response = client