Skip to content

Commit

Permalink
Implement
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Dec 14, 2023
1 parent 4c3ffcf commit f9057ee
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 28 deletions.
2 changes: 1 addition & 1 deletion crates/tx-sitter-client/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct GetTxResponse {
pub status: TxStatus,
}

#[derive(Debug, Clone, Serialize, Deserialize, Display)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Display, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
#[strum(serialize_all = "camelCase")]
pub enum TxStatus {
Expand Down
4 changes: 4 additions & 0 deletions crates/tx-sitter-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ impl TxSitterClient {

self.json_get(&url).await
}

pub fn rpc_url(&self) -> String {
format!("{}/rpc", self.url.clone())
}
}
8 changes: 5 additions & 3 deletions src/ethereum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use tracing::instrument;
use url::Url;
pub use write::TxError;

use self::{write::TransactionId, write_oz::WriteProvider};
use self::write::TransactionId;
use self::write_oz::WriteProvider;
use crate::serde_utils::JsonStrWrapper;

pub mod read;
Expand Down Expand Up @@ -57,8 +58,9 @@ impl Ethereum {
);
}

let write_provider: Arc<WriteProvider> =
Arc::new(write_oz::WriteProvider::new(read_provider.clone(), &options.write_options).await?);
let write_provider: Arc<WriteProvider> = Arc::new(
write_oz::WriteProvider::new(read_provider.clone(), &options.write_options).await?,
);

Ok(Self {
read_provider: Arc::new(read_provider),
Expand Down
9 changes: 5 additions & 4 deletions src/ethereum/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::error::Error;
use std::fmt;

use async_trait::async_trait;
use ethers::providers::ProviderError;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::types::{Address, TransactionReceipt, H256};
use ethers::types::{TransactionReceipt, H256};
use thiserror::Error;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -35,7 +33,7 @@ pub enum TxError {
SendTimeout,

#[error("Error sending transaction: {0}")]
Send(Box<dyn Error + Send + Sync + 'static>),
Send(anyhow::Error),

#[error("Timeout while waiting for confirmations")]
ConfirmationTimeout,
Expand All @@ -51,4 +49,7 @@ pub enum TxError {

#[error("Error parsing transaction id: {0}")]
Parse(Box<dyn Error + Send + Sync + 'static>),

#[error("{0}")]
Other(anyhow::Error),
}
5 changes: 3 additions & 2 deletions src/ethereum/write_oz/inner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use ethers::types::{transaction::eip2718::TypedTransaction, H160, H256};
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::types::H256;

use crate::ethereum::write::TransactionId;
use crate::ethereum::TxError;
Expand All @@ -18,5 +19,5 @@ pub trait Inner: Send + Sync + 'static {

pub struct TransactionResult {
pub transaction_id: String,
pub hash: Option<H256>,
pub hash: Option<H256>,
}
2 changes: 1 addition & 1 deletion src/ethereum/write_oz/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl WriteProvider {
let inner: Arc<dyn Inner> = match options {
ParsedOptions::Oz(oz_options) => Arc::new(OzRelay::new(&oz_options).await?),
ParsedOptions::TxSitter(tx_sitter_options) => {
Arc::new(TxSitter::new(&tx_sitter_options.tx_sitter_url))
Arc::new(TxSitter::new(tx_sitter_options.tx_sitter_url))
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/ethereum/write_oz/openzeppelin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl OzRelay {
loop {
let transaction = self.query(id).await.map_err(|error| {
error!(?error, "Failed to get transaction status");
TxError::Send(Box::new(error))
TxError::Send(error.into())
})?;

let status = transaction.status;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl OzRelay {

let existing_transactions = self.list_recent_transactions().await.map_err(|e| {
error!(?e, "error occurred");
TxError::Send(Box::new(e))
TxError::Send(e.into())
})?;

let existing_transaction =
Expand Down Expand Up @@ -186,7 +186,7 @@ impl OzRelay {
})?
.map_err(|error| {
error!(?error, "Failed to send transaction");
TxError::Send(Box::new(error))
TxError::Send(error.into())
})?;

info!(?tx_id, "Transaction submitted to OZ Relay");
Expand Down
88 changes: 74 additions & 14 deletions src/ethereum/write_oz/tx_sitter.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::time::Duration;

use anyhow::Context;
use async_trait::async_trait;
use ethers::types::transaction::eip2718::TypedTransaction;
use tx_sitter_client::data::SendTxRequest;
use tx_sitter_client::data::{SendTxRequest, TransactionPriority, TxStatus};
use tx_sitter_client::TxSitterClient;

use super::inner::{Inner, TransactionResult};
use crate::ethereum::write::TransactionId;
use crate::ethereum::TxError;

const MINING_TIMEOUT: Duration = Duration::from_secs(60);

pub struct TxSitter {
client: TxSitterClient,
}
Expand All @@ -18,32 +22,88 @@ impl TxSitter {
client: TxSitterClient::new(url),
}
}

pub async fn mine_transaction_inner(
&self,
tx_id: TransactionId,
) -> Result<TransactionResult, TxError> {
loop {
let tx = self.client.get_tx(&tx_id.0).await.map_err(TxError::Send)?;

if tx.status == TxStatus::Mined || tx.status == TxStatus::Finalized {
return Ok(TransactionResult {
transaction_id: tx.tx_id,
hash: Some(
tx.tx_hash
.context("Missing hash on a mined tx")
.map_err(TxError::Send)?,
),
});
}

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}

#[async_trait]
impl Inner for TxSitter {
async fn send_transaction(
&self,
tx: TypedTransaction,
only_once: bool,
_only_once: bool,
) -> Result<TransactionId, TxError> {
let x = self.client.send_tx(&SendTxRequest {
to: *tx.to_addr().context("Tx receiver must be an address")?,
value: *tx.value().context("Missing tx value")?,
data: todo!(),
gas_limit: todo!(),
priority: todo!(),
tx_id: todo!(),
}).await.unwrap();

todo!()
// TODO: Handle only_once
let tx = self
.client
.send_tx(&SendTxRequest {
to: *tx
.to_addr()
.context("Tx receiver must be an address")
.map_err(TxError::Send)?,
value: *tx
.value()
.context("Missing tx value")
.map_err(TxError::Send)?,
data: tx.data().cloned(),
gas_limit: *tx
.gas()
.context("Missing tx gas limit")
.map_err(TxError::Send)?,
priority: TransactionPriority::Regular,
tx_id: None,
})
.await
.map_err(TxError::Send)?;

Ok(TransactionId(tx.tx_id))
}

async fn fetch_pending_transactions(&self) -> Result<Vec<TransactionId>, TxError> {
todo!()
let unsent_txs = self
.client
.get_txs(Some(TxStatus::Unsent))
.await
.map_err(TxError::Send)?;

let pending_txs = self
.client
.get_txs(Some(TxStatus::Pending))
.await
.map_err(TxError::Send)?;

let mut txs = vec![];

for tx in unsent_txs.into_iter().chain(pending_txs) {
txs.push(TransactionId(tx.tx_id));
}

Ok(txs)
}

async fn mine_transaction(&self, tx: TransactionId) -> Result<TransactionResult, TxError> {
todo!()
tokio::time::timeout(MINING_TIMEOUT, self.mine_transaction_inner(tx))
.await
.map_err(|_| TxError::ConfirmationTimeout)?
}
}

0 comments on commit f9057ee

Please sign in to comment.