diff --git a/bolt-sidecar/src/api/commitments/firewall_stream.rs b/bolt-sidecar/src/api/commitments/firewall_recv.rs similarity index 72% rename from bolt-sidecar/src/api/commitments/firewall_stream.rs rename to bolt-sidecar/src/api/commitments/firewall_recv.rs index e983b71b..fb1ee914 100644 --- a/bolt-sidecar/src/api/commitments/firewall_stream.rs +++ b/bolt-sidecar/src/api/commitments/firewall_recv.rs @@ -1,6 +1,8 @@ use futures::stream::{FuturesUnordered, SplitSink, SplitStream}; use futures::{FutureExt, SinkExt, StreamExt}; use std::collections::VecDeque; +use std::fmt::Debug; +use std::fmt::{self, Formatter}; use std::task::Poll; use std::time::Duration; use std::{future::Future, pin::Pin}; @@ -25,8 +27,10 @@ use super::spec::CommitmentError; /// The interval at which to send ping messages from connected clients. const PING_INTERVAL: Duration = Duration::from_secs(30); -#[allow(dead_code)] -pub struct CommitmentsFirewallStream { +/// A [CommitmentsFirewallStream] connects to multiple firewall-ed websocket RPC servers and +/// forwards [CommitmentEvent]s to a single receiver, return upon calling the +/// `[CommitmentsFirewallStream::run]` method. +pub struct CommitmentsFirewallRecv { /// The operator's private key to sign authentication requests when opening websocket /// connections with RPCs. operator_private_key: EcdsaSecretKeyWrapper, @@ -38,8 +42,18 @@ pub struct CommitmentsFirewallStream { signal: Option + Send>>>, } -#[allow(dead_code)] -impl CommitmentsFirewallStream { +impl Debug for CommitmentsFirewallRecv { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("CommitmentsFirewallStream") + .field("operator_private_key", &"********") + .field("chain", &self.chain) + .field("urls", &self.urls) + .finish() + } +} + +impl CommitmentsFirewallRecv { + /// Creates a new instance of the commitments firewall stream. pub fn new(operator_private_key: EcdsaSecretKeyWrapper, chain: Chain, urls: Vec) -> Self { Self { operator_private_key, @@ -51,14 +65,16 @@ impl CommitmentsFirewallStream { } } + /// Runs the [CommitmentsFirewallRecv] and returns a receiver for incoming commitment + /// events. pub async fn run(&self) -> mpsc::Receiver { - // Create a Multiple-Producer-Single-Consumer (MPSC) channel to receive messages from the - // websocket servers on a single stream. + // mspc channel where every websocket connection will send commitment events over its own + // tx to a single receiver. let (api_event_tx, api_events_rx) = mpsc::channel(self.urls.len() * 2); let (ping_tx, ping_rx) = broadcast::channel::<()>(1); - // Create a task to send pings to the server at regular intervals - let _ = tokio::spawn(async move { + // a task to send pings to open connections to the servers at regular intervals + tokio::spawn(async move { let ping_interval = tokio::time::interval(PING_INTERVAL); tokio::pin!(ping_interval); @@ -93,25 +109,26 @@ async fn handle_connection( api_events_tx: mpsc::Sender, ping_rx: broadcast::Receiver<()>, ) { - let ws_config = WebSocketConfig { - max_message_size: Some(1 << 10), // 64KB - ..Default::default() - }; + // TODO: fill this + let ws_config = WebSocketConfig { ..Default::default() }; loop { match connect_async_with_config(url.to_string(), Some(ws_config), false).await { - Ok((stream, _response)) => { - info!(?url, "opened websocket connection"); - let (write, read) = stream.split(); + Ok((stream, response)) => { + info!(?url, ?response, "opened websocket connection"); + let (write_sink, read_stream) = stream.split(); - let message_processer = MessageProcesser::new( + // For each opened connection, create a new commitment processor + // able to handle incoming message requests. + let commitment_request_processor = CommitmentRequestProcessor::new( url.clone(), api_events_tx.clone(), - write, - read, + write_sink, + read_stream, ping_rx.resubscribe(), ); - message_processer.await + // Run the commitment processor indefinitely, reconnecting on failure. + commitment_request_processor.await } Err(e) => { error!(?e, ?url, "failed to connect"); @@ -119,17 +136,29 @@ async fn handle_connection( } // Reconnect on failure - println!("Reconnecting to {}", url); + // TODO: add backoff + warn!(?url, "connection lost. reconnecting..."); tokio::time::sleep(Duration::from_secs(5)).await; } } -struct MessageProcesser { +/// The [CommitmentRequestProcessor] handles incoming commitment requests a the websocket +/// connection, and forwards them to the [CommitmentEvent] tx channel for processing. +struct CommitmentRequestProcessor { + /// The URL of the connected websocket server. url: Url, - tx: mpsc::Sender, - write: SplitSink>, Message>, - read: SplitStream>>, + /// The channel to send commitment events to be processed. + api_events_tx: mpsc::Sender, + /// The websocket writer sink. + write_sink: SplitSink>, Message>, + /// The websocket reader stream. + read_stream: SplitStream>>, + /// The receiver for keep-alive ping messages. ping_rx: broadcast::Receiver<()>, + /// The collection of pending commitment requests responses, sent with [api_events_tx]. + /// NOTE: Is there a better way to avoid this monster type? + /// SAFETY: the `poll` implementation of this struct promptly handles these responses and + /// ensures this vector doesn't grow indefinitely. pending_commitment_responses: FuturesUnordered< Pin< Box< @@ -142,10 +171,11 @@ struct MessageProcesser { >, >, >, + /// The collection of outgoing messages to be sent to the connected websocket server. outgoing_messages: VecDeque, } -impl MessageProcesser { +impl CommitmentRequestProcessor { pub fn new( url: Url, tx: mpsc::Sender, @@ -155,9 +185,9 @@ impl MessageProcesser { ) -> Self { Self { url, - tx, - write, - read, + api_events_tx: tx, + write_sink: write, + read_stream: read, ping_rx, pending_commitment_responses: FuturesUnordered::new(), outgoing_messages: VecDeque::new(), @@ -165,7 +195,7 @@ impl MessageProcesser { } } -impl Future for MessageProcesser { +impl Future for CommitmentRequestProcessor { type Output = (); fn poll( @@ -178,42 +208,52 @@ impl Future for MessageProcesser { loop { let mut progress = false; - // 1. Handle incoming WebSocket messages - match this.read.poll_next_unpin(cx) { - Poll::Ready(Some(message)) => { - progress = true; + // 1. Handle incoming websocket messages from the read stream. + while let Poll::Ready(maybe_message) = this.read_stream.poll_next_unpin(cx) { + progress = true; - match message { - Ok(Message::Text(_text)) => { - let (tx, rx) = oneshot::channel(); - let request = CommitmentRequest::Inclusion(InclusionRequest::default()); - let event = CommitmentEvent { request, response: tx }; - if let Err(err) = this.tx.try_send(event) { - error!(?err, "failed to forward commitment event to channel"); - } + match maybe_message { + Some(message_res) => { + match message_res { + Ok(Message::Text(_text)) => { + // Create the channel to send and receive the commitment response + let (tx, rx) = oneshot::channel(); - // Add the receiver's future to the FuturesUnordered - this.pending_commitment_responses.push(rx.boxed()); - } - Ok(Message::Close(_)) => { - warn!(?rpc_url, "websocket connection closed by server"); - return Poll::Ready(()); - } - Err(e) => { - error!(?e, ?rpc_url, "error reading message from websocket connection"); - return Poll::Ready(()); + // TODO: parse the text into a commitment request + let request = + CommitmentRequest::Inclusion(InclusionRequest::default()); + let event = CommitmentEvent { request, response: tx }; + + if let Err(err) = this.api_events_tx.try_send(event) { + error!(?err, "failed to forward commitment event to channel"); + } + + // add the pending response to this buffer for later processing + this.pending_commitment_responses.push(rx.boxed()); + } + Ok(Message::Close(_)) => { + warn!(?rpc_url, "websocket connection closed by server"); + return Poll::Ready(()); + } + Err(e) => { + error!( + ?e, + ?rpc_url, + "error reading message from websocket connection" + ); + return Poll::Ready(()); + } + _ => {} // Ignore non-text messages } - _ => {} // Ignore non-text messages + } + None => { + warn!("websocket connection with {} closed by server", rpc_url); + return Poll::Ready(()); } } - Poll::Ready(None) => { - warn!("websocket connection with {} closed by server", rpc_url); - return Poll::Ready(()); - } - _ => {} } - // 2. Handle commitment responses + // 2. Handle commitment request responses after they've been processed. while let Poll::Ready(Some(response)) = this.pending_commitment_responses.poll_next_unpin(cx) { @@ -221,8 +261,10 @@ impl Future for MessageProcesser { match response { Ok(commitment_result) => { if let Ok(commitment) = commitment_result { + // TODO: check whether this format is correct + handle errors. let message = Message::text(serde_json::to_string(&commitment).unwrap()); + // Add the message to the outgoing messages queue this.outgoing_messages.push_back(message); } } @@ -232,29 +274,15 @@ impl Future for MessageProcesser { } } - // 3. Handle ping messages - match this.ping_rx.try_recv() { - Ok(_) => { - progress = true; - this.outgoing_messages.push_back(Message::Ping(vec![8, 0, 1, 7])); - } - Err(TryRecvError::Closed) => { - error!("ping channel for keep-alive messages closed"); - return Poll::Ready(()); - } - Err(TryRecvError::Lagged(lost)) => { - error!("ping channel for keep-alives lagged by {} messages", lost) - } - _ => {} - } - - // 4. Process outgoing messages + // 3. Process outgoing messages while let Some(message) = this.outgoing_messages.pop_front() { - match this.write.poll_ready_unpin(cx) { + // Check if the write sink is able to receive data. + match this.write_sink.poll_ready_unpin(cx) { Poll::Ready(Ok(())) => { progress = true; - if let Err(e) = this.write.start_send_unpin(message) { + // Try to send the message to the sink, for later flushing. + if let Err(e) = this.write_sink.start_send_unpin(message) { error!(?e, ?rpc_url, "failed to send message to websocket connection"); // NOTE: Should we return here? // return Poll::Ready(()); @@ -273,8 +301,8 @@ impl Future for MessageProcesser { } } - // 5. Ensure the write sink is flushed - match this.write.poll_flush_unpin(cx) { + // 4. Ensure the write sink is flushed so that message are sent to the caller server. + match this.write_sink.poll_flush_unpin(cx) { Poll::Ready(Ok(())) => { progress = true; } @@ -286,6 +314,22 @@ impl Future for MessageProcesser { _ => {} } + // 5. Handle ping messages + match this.ping_rx.try_recv() { + Ok(_) => { + progress = true; + this.outgoing_messages.push_back(Message::Ping(vec![8, 0, 1, 7])); + } + Err(TryRecvError::Closed) => { + error!("ping channel for keep-alive messages closed"); + return Poll::Ready(()); + } + Err(TryRecvError::Lagged(lost)) => { + error!("ping channel for keep-alives lagged by {} messages", lost) + } + _ => {} + } + if !progress { return Poll::Pending; } @@ -311,7 +355,7 @@ mod tests { use tokio::sync::broadcast; use crate::{ - api::commitments::firewall_stream::CommitmentsFirewallStream, + api::commitments::firewall_recv::CommitmentsFirewallRecv, common::secrets::EcdsaSecretKeyWrapper, config::chain::Chain, }; @@ -332,7 +376,7 @@ mod tests { println!("Server 2 running on port: {}", port_2); println!("Waiting for 5 seconds before shutting down the servers..."); - let stream = CommitmentsFirewallStream::new( + let stream = CommitmentsFirewallRecv::new( operator_private_key, Chain::Holesky, vec![ diff --git a/bolt-sidecar/src/api/commitments/mod.rs b/bolt-sidecar/src/api/commitments/mod.rs index bdfe57ef..3bc52d95 100644 --- a/bolt-sidecar/src/api/commitments/mod.rs +++ b/bolt-sidecar/src/api/commitments/mod.rs @@ -1,5 +1,5 @@ /// The commitments-API stream handler. -pub mod firewall_stream; +pub mod firewall_recv; /// The commitments-API request handlers. mod handlers; /// The commitments-API headers and constants. diff --git a/bolt-sidecar/src/driver.rs b/bolt-sidecar/src/driver.rs index 381a2b72..70102a7b 100644 --- a/bolt-sidecar/src/driver.rs +++ b/bolt-sidecar/src/driver.rs @@ -18,7 +18,7 @@ use crate::{ api::{ builder::{start_builder_proxy_server, BuilderProxyConfig}, commitments::{ - firewall_stream::CommitmentsFirewallStream, + firewall_recv::CommitmentsFirewallRecv, server::{CommitmentEvent, CommitmentsApiServer}, spec::CommitmentError, }, @@ -230,15 +230,12 @@ impl SidecarDriver { let api_events_rx = if let Some(port) = opts.commitment_opts.port { // start the commitments api server - let api_addr = format!( - "0.0.0.0:{}", - opts.commitment_opts.port.expect("commitments port must be provided") - ); + let api_addr = format!("0.0.0.0:{}", port); let (api_events_tx, api_events_rx) = mpsc::channel(1024); CommitmentsApiServer::new(api_addr).run(api_events_tx, opts.limits).await; api_events_rx } else { - CommitmentsFirewallStream::new( + CommitmentsFirewallRecv::new( opts.commitment_opts.operator_private_key.clone(), opts.chain.chain, opts.commitment_opts