Skip to content

Commit

Permalink
Merge pull request #2 from worldcoin/osiris/broadcast-logs
Browse files Browse the repository at this point in the history
Osiris/broadcast logs
  • Loading branch information
0xOsiris authored Oct 22, 2024
2 parents cac412d + 27ca3d7 commit 6b7c4af
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ alloy = { version = "0.5", features = [
"signer-mnemonic",
"signers",
] }
ethers-core = "*"
alloy-signer-local = { version = "0.5" }
futures = "0.3"
hex = "0.4"
Expand Down
71 changes: 60 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ use std::sync::Arc;

use abi::IStateBridge::IStateBridgeInstance;
use alloy::network::EthereumWallet;
use alloy::providers::{Provider as _, ProviderBuilder};
use alloy::primitives::U256;
use alloy::providers::{Provider, ProviderBuilder};
use alloy::rpc::types::Filter;
use alloy::signers::local::MnemonicBuilder;
use alloy::sol_types::SolEvent;
use alloy_signer_local::coins_bip39::English;
use clap::Parser;
use config::{NetworkType, WalletConfig};
use eyre::eyre::Result;
use eyre::eyre::{eyre, Result};
use futures::StreamExt;
use relay::signer::{AlloySigner, Signer};
use relay::{EVMRelay, Relayer};
use relay::signer::{AlloySigner, Signer, TxSitterSigner};
use relay::{EVMRelay, Relay, Relayer};
use telemetry_batteries::metrics::statsd::StatsdBattery;
use telemetry_batteries::tracing::datadog::DatadogBattery;
use telemetry_batteries::tracing::TracingShutdownHandle;
use tokio::task::JoinSet;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

Expand Down Expand Up @@ -116,13 +118,44 @@ pub async fn run(config: Config) -> Result<()> {
.await?;

tracing::info!(chain_id, latest_block_number, "Starting ingestion");
scanner
.root_stream()
.for_each(|x| async move {
println!("{:#?}", x);
})
.await;

let (tx, _) = tokio::sync::broadcast::channel::<U256>(1000);
let relayers = init_relays(config)?;
let mut joinset = JoinSet::new();
for relay in relayers {
let tx = tx.clone();
joinset.spawn(async move {
relay.subscribe_roots(tx.subscribe()).await.map_err(|e| {
tracing::error!(?e, "Error subscribing to roots");
eyre!(e)
})?;
Ok::<(), eyre::Report>(())
});
}

let scanner_fut = async {
scanner
.root_stream()
.for_each(|event| {
let tx = tx.clone();
async move {
let field = event.postRoot;
if let Err(e) = tx.send(field) {
tracing::error!(?e, "Error sending root");
}
}
})
.await;
};

tokio::select! {
_ = scanner_fut => {
tracing::error!("Scanner task failed");
}
_ = joinset.join_all() => {
tracing::error!("Relayer task failed");
}
}
Ok(())
}

Expand Down Expand Up @@ -158,7 +191,23 @@ fn init_relays(cfg: Config) -> Result<Vec<Relayer>> {
n.provider.rpc_endpoint.clone(),
)));
}
_ => unimplemented!(),
WalletConfig::TxSitter {
url,
address: _,
gas_limit,
} => {
let signer = TxSitterSigner::new(
url.as_str(),
n.state_bridge_address,
*gas_limit,
);

relayers.push(Relayer::Evm(EVMRelay::new(
Signer::TxSitter(signer),
n.world_id_address,
n.provider.rpc_endpoint.clone(),
)));
}
};
}
_ => {}
Expand Down
8 changes: 4 additions & 4 deletions src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use alloy::providers::ProviderBuilder;
use eyre::Result;
use semaphore::Field;
use signer::{RelaySigner, Signer};
use tokio::sync::mpsc::Receiver;
use tokio::sync::broadcast::Receiver;
use url::Url;

use crate::abi::IBridgedWorldID::IBridgedWorldIDInstance;
Expand All @@ -31,7 +31,6 @@ impl Relay for Relayer {
}
}

#[derive(Debug, Clone)]
pub struct EVMRelay {
pub signer: Signer,
pub world_id_address: Address,
Expand Down Expand Up @@ -59,14 +58,15 @@ impl Relay for EVMRelay {
self.world_id_address,
l2_provider,
));
while let Some(field) = rx.recv().await {

loop {
let field = rx.recv().await?;
let world_id = world_id_instance.clone();
let latest = world_id.latestRoot().call().await?._0;
if latest != field {
self.signer.propagate_root().await?;
}
}
Ok(())
}
}

Expand Down
107 changes: 98 additions & 9 deletions src/relay/signer.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,45 @@
use alloy::network::{Ethereum, EthereumWallet};
use alloy::primitives::{bytes, Address, Bytes};
use alloy::providers::fillers::{
BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill,
NonceFiller, WalletFiller,
};
use alloy::providers::RootProvider;
use alloy::providers::{Identity, RootProvider};
use alloy::transports::http::Http;
use eyre::eyre::Result;
use ethers_core::types::U256;
use eyre::eyre::{eyre, Result};
use tracing::{error, info};
use tx_sitter_client::data::{SendTxRequest, TransactionPriority, TxStatus};
use tx_sitter_client::TxSitterClient;

use crate::abi::IStateBridge::IStateBridgeInstance;

pub trait RelaySigner {
#[allow(async_fn_in_trait)]
/// "propogateRoot()" Selector
pub static PROPOGATE_ROOT_SELECTOR: Bytes = bytes!("21823a11");

pub(crate) trait RelaySigner {
/// Propogate a new Root to the State Bridge for the given network.
async fn propagate_root(&self) -> Result<()>;
}

#[derive(Debug, Clone)]
pub enum Signer {
Alloy(AlloySigner),
TxSitter, // TODO: Implement this
TxSitter(TxSitterSigner),
}

impl RelaySigner for Signer {
async fn propagate_root(&self) -> Result<()> {
match self {
Signer::Alloy(signer) => signer.propagate_root().await,
Signer::TxSitter => unimplemented!(),
Signer::TxSitter(signer) => signer.propagate_root().await,
}
}
}

pub type AlloySignerProvider = FillProvider<
JoinFill<
JoinFill<
alloy::providers::Identity,
Identity,
JoinFill<
GasFiller,
JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>,
Expand All @@ -47,7 +52,6 @@ pub type AlloySignerProvider = FillProvider<
Ethereum,
>;

#[derive(Debug, Clone)]
pub struct AlloySigner {
pub(crate) state_bridge_instance:
IStateBridgeInstance<Http<reqwest::Client>, AlloySignerProvider>,
Expand Down Expand Up @@ -83,3 +87,88 @@ impl RelaySigner for AlloySigner {
Ok(())
}
}

pub struct TxSitterSigner {
tx_sitter: TxSitterClient,
state_bridge_address: Address,
gas_limit: Option<u64>,
}

impl TxSitterSigner {
pub fn new(
url: &str,
state_bridge_address: Address,
gas_limit: Option<u64>,
) -> Self {
let tx_sitter = TxSitterClient::new(url);
Self {
tx_sitter,
state_bridge_address,
gas_limit,
}
}
}

impl RelaySigner for TxSitterSigner {
async fn propagate_root(&self) -> Result<()> {
let ethers_selector = ethers_core::types::Bytes::from_static(
PROPOGATE_ROOT_SELECTOR.as_ref(),
);
let ethers_address = ethers_core::types::Address::from_slice(
self.state_bridge_address.as_ref(),
);
let send_tx = SendTxRequest {
to: ethers_address,
data: Some(ethers_selector),
gas_limit: self.gas_limit.map(U256::from).unwrap_or_default(),
priority: TransactionPriority::Fast,
value: U256::zero(),
tx_id: None,
};

let resp = self.tx_sitter.send_tx(&send_tx).await.map_err(|e| {
eyre!(
"Failed to send root propogation transaction to tx sitter: {}",
e
)
})?;

info!(
tx_id = &resp.tx_id,
"Successfully sent root propogation transaction to tx sitter"
);
let timeout = std::time::Duration::from_secs(120); // TODO: Should be configurable?
let backoff = std::time::Duration::from_secs(12);
let start = std::time::Instant::now();
loop {
let tx_response =
self.tx_sitter.get_tx(&resp.tx_id).await.map_err(|e| {
eyre!("Failed to get tx status from tx sitter: {}", e)
})?;

match tx_response.status {
Some(TxStatus::Mined) | Some(TxStatus::Finalized) => {
info!(
tx_id = &resp.tx_id,
"Root propogation transaction mined"
);
break;
}
_ => {
info!(
tx_id = &resp.tx_id,
"Root propogation transaction not yet mined"
);
}
}

if start.elapsed() > timeout {
return Err(eyre!("Root propogation transaction timed out"));
}

std::thread::sleep(backoff);
}

Ok(())
}
}

0 comments on commit 6b7c4af

Please sign in to comment.