From 044a2a04d50f58ad7b8a9e74503efa70b127cff0 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Thu, 12 Dec 2024 15:32:39 +0100 Subject: [PATCH] fix(sidecar): CommitmentProcessor endless loop; tests; cleanup --- .../src/api/commitments/firewall_recv.rs | 247 ++++++++++++------ bolt-sidecar/src/primitives/commitment.rs | 7 + 2 files changed, 172 insertions(+), 82 deletions(-) diff --git a/bolt-sidecar/src/api/commitments/firewall_recv.rs b/bolt-sidecar/src/api/commitments/firewall_recv.rs index 229a495d..356e792d 100644 --- a/bolt-sidecar/src/api/commitments/firewall_recv.rs +++ b/bolt-sidecar/src/api/commitments/firewall_recv.rs @@ -12,7 +12,7 @@ use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_tungstenite::tungstenite::protocol::WebSocketConfig; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async_with_config, MaybeTlsStream, WebSocketStream}; -use tracing::{error, info, warn}; +use tracing::{error, info, trace, warn}; use reqwest::Url; @@ -25,21 +25,27 @@ use super::server::CommitmentEvent; use super::spec::CommitmentError; /// The interval at which to send ping messages from connected clients. +#[cfg(test)] +const PING_INTERVAL: Duration = Duration::from_secs(3); +#[cfg(not(test))] const PING_INTERVAL: Duration = Duration::from_secs(30); +type ShutdownSignal = Pin + Send>>; + /// A [CommitmentsFirewallRecv] connects to multiple firewall-ed websocket RPC servers and /// forwards [CommitmentEvent]s to a single receiver, return upon calling the /// `[CommitmentsFirewallStream::run]` method. pub struct CommitmentsFirewallRecv { /// The operator's private key to sign authentication requests when opening websocket /// connections with RPCs. + #[allow(dead_code)] operator_private_key: EcdsaSecretKeyWrapper, /// The chain ID of the chain the sidecar is running. Used for authentication purposes. chain: Chain, /// The URLs of the websocket servers to connect to. urls: Vec, /// The shutdown signal. - signal: Option + Send>>>, + signal: Option, } impl Debug for CommitmentsFirewallRecv { @@ -65,13 +71,20 @@ impl CommitmentsFirewallRecv { } } + /// Sets the shutdown signal for the closing the open connections. + pub fn with_shutdown(mut self, signal: ShutdownSignal) -> Self { + self.signal = Some(signal); + self + } + /// Runs the [CommitmentsFirewallRecv] and returns a receiver for incoming commitment /// events. - pub async fn run(&self) -> mpsc::Receiver { + pub async fn run(mut self) -> mpsc::Receiver { // mspc channel where every websocket connection will send commitment events over its own // tx to a single receiver. let (api_event_tx, api_events_rx) = mpsc::channel(self.urls.len() * 2); let (ping_tx, ping_rx) = broadcast::channel::<()>(1); + let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); // a task to send pings to open connections to the servers at regular intervals tokio::spawn(async move { @@ -86,12 +99,23 @@ impl CommitmentsFirewallRecv { } }); + let signal = self.signal.take(); + if let Some(signal) = signal { + tokio::spawn(async move { + signal.await; + if let Err(err) = shutdown_tx.send(()) { + error!(?err, "failed to send shutdown signal"); + } + }); + } + for url in &self.urls { let url = url.clone(); let api_events_tx = api_event_tx.clone(); let ping_rx = ping_rx.resubscribe(); + let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { - handle_connection(url, api_events_tx, ping_rx).await; + handle_connection(url, api_events_tx, ping_rx, shutdown_rx).await; }); } @@ -108,31 +132,40 @@ async fn handle_connection( url: Url, api_events_tx: mpsc::Sender, ping_rx: broadcast::Receiver<()>, + shutdown_rx: broadcast::Receiver<()>, ) { // TODO: fill this let ws_config = WebSocketConfig { ..Default::default() }; loop { - match connect_async_with_config(url.to_string(), Some(ws_config), false).await { - Ok((stream, response)) => { - info!(?url, ?response, "opened websocket connection"); - let (write_sink, read_stream) = stream.split(); - - // For each opened connection, create a new commitment processor - // able to handle incoming message requests. - let commitment_request_processor = CommitmentRequestProcessor::new( - url.clone(), - api_events_tx.clone(), - write_sink, - read_stream, - ping_rx.resubscribe(), - ); - // Run the commitment processor indefinitely, reconnecting on failure. - commitment_request_processor.await - } - Err(e) => { - error!(?e, ?url, "failed to connect"); - } + let should_retry = + match connect_async_with_config(url.to_string(), Some(ws_config), false).await { + Ok((stream, response)) => { + info!(?url, ?response, "opened websocket connection"); + let (write_sink, read_stream) = stream.split(); + + // For each opened connection, create a new commitment processor + // able to handle incoming message requests. + let commitment_request_processor = CommitmentRequestProcessor::new( + url.clone(), + api_events_tx.clone(), + write_sink, + read_stream, + ping_rx.resubscribe(), + shutdown_rx.resubscribe(), + ); + // Run the commitment processor indefinitely, reconnecting on failure. + commitment_request_processor.await + } + Err(e) => { + error!(?e, ?url, "failed to connect"); + true + } + }; + + if !should_retry { + warn!("shutting down connection to {}", url); + break; } // Reconnect on failure @@ -142,6 +175,9 @@ async fn handle_connection( } } +type PendingCommitmentResult = dyn Future, oneshot::error::RecvError>> + + Send; + /// The [CommitmentRequestProcessor] handles incoming commitment requests a the websocket /// connection, and forwards them to the [CommitmentEvent] tx channel for processing. struct CommitmentRequestProcessor { @@ -155,22 +191,13 @@ struct CommitmentRequestProcessor { read_stream: SplitStream>>, /// The receiver for keep-alive ping messages. ping_rx: broadcast::Receiver<()>, + /// The receiver for shutdown signals. + shutdown_rx: broadcast::Receiver<()>, /// The collection of pending commitment requests responses, sent with [api_events_tx]. /// NOTE: Is there a better way to avoid this monster type? /// SAFETY: the `poll` implementation of this struct promptly handles these responses and /// ensures this vector doesn't grow indefinitely. - pending_commitment_responses: FuturesUnordered< - Pin< - Box< - dyn Future< - Output = Result< - Result, - oneshot::error::RecvError, - >, - > + Send, - >, - >, - >, + pending_commitment_responses: FuturesUnordered>>, /// The collection of outgoing messages to be sent to the connected websocket server. outgoing_messages: VecDeque, } @@ -182,6 +209,7 @@ impl CommitmentRequestProcessor { write: SplitSink>, Message>, read: SplitStream>>, ping_rx: broadcast::Receiver<()>, + shutdown_rx: broadcast::Receiver<()>, ) -> Self { Self { url, @@ -189,6 +217,7 @@ impl CommitmentRequestProcessor { write_sink: write, read_stream: read, ping_rx, + shutdown_rx, pending_commitment_responses: FuturesUnordered::new(), outgoing_messages: VecDeque::new(), } @@ -196,7 +225,8 @@ impl CommitmentRequestProcessor { } impl Future for CommitmentRequestProcessor { - type Output = (); + // The output of this future is a boolean indicating whether reconnection is needed. + type Output = bool; fn poll( self: Pin<&mut Self>, @@ -215,7 +245,12 @@ impl Future for CommitmentRequestProcessor { match maybe_message { Some(message_res) => { match message_res { - Ok(Message::Text(_text)) => { + Ok(Message::Text(text)) => { + trace!( + ?rpc_url, + text, + "received text message from websocket connection" + ); // Create the channel to send and receive the commitment response let (tx, rx) = oneshot::channel(); @@ -233,22 +268,22 @@ impl Future for CommitmentRequestProcessor { } Ok(Message::Close(_)) => { warn!(?rpc_url, "websocket connection closed by server"); - return Poll::Ready(()); + return Poll::Ready(true); } + Ok(_) => {} // ignore other message types Err(e) => { error!( ?e, ?rpc_url, "error reading message from websocket connection" ); - return Poll::Ready(()); + return Poll::Ready(true); } - _ => {} // Ignore non-text messages } } None => { warn!("websocket connection with {} closed by server", rpc_url); - return Poll::Ready(()); + return Poll::Ready(true); } } } @@ -261,6 +296,7 @@ impl Future for CommitmentRequestProcessor { match response { Ok(commitment_result) => { if let Ok(commitment) = commitment_result { + trace!(?rpc_url, ?commitment, "received commitment response"); // TODO: check whether this format is correct + handle errors. let message = Message::text(serde_json::to_string(&commitment).unwrap()); @@ -302,16 +338,13 @@ impl Future for CommitmentRequestProcessor { } // 4. Ensure the write sink is flushed so that message are sent to the caller server. - match this.write_sink.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => { - progress = true; - } - Poll::Ready(Err(e)) => { - error!(?e, "failed to flush websocket write sink"); - // NOTE: Should we return here? - // return Poll::Ready(()); - } - _ => {} + // + // NOTE: We're not considering "progress" flushing the sink, i.e. `Poll::Ready(None)`. + // That is because flushing an empty sink would lead to run this loop indefinitely + if let Poll::Ready(Err(e)) = this.write_sink.poll_flush_unpin(cx) { + error!(?e, "failed to flush websocket write sink"); + // NOTE: Should we return here? + // return Poll::Ready(()); } // 5. Handle ping messages @@ -322,7 +355,7 @@ impl Future for CommitmentRequestProcessor { } Err(TryRecvError::Closed) => { error!("ping channel for keep-alive messages closed"); - return Poll::Ready(()); + return Poll::Ready(false); } Err(TryRecvError::Lagged(lost)) => { error!("ping channel for keep-alives lagged by {} messages", lost) @@ -330,6 +363,19 @@ impl Future for CommitmentRequestProcessor { _ => {} } + // 6. Handle shutdown signals + match this.shutdown_rx.try_recv() { + Ok(_) => { + info!("received shutdown signal. closing websocket connection..."); + return Poll::Ready(false); + } + Err(TryRecvError::Closed) => { + error!("shutdown channel closed"); + return Poll::Ready(false); + } + _ => {} + } + if !progress { return Poll::Pending; } @@ -341,6 +387,7 @@ impl Future for CommitmentRequestProcessor { mod tests { use std::{borrow::Cow, net::SocketAddr, ops::ControlFlow, time::Duration}; + use alloy::primitives::Signature; use axum::{ extract::{ ws::{CloseFrame, Message, WebSocket}, @@ -351,30 +398,40 @@ mod tests { Router, }; use axum_extra::{headers, TypedHeader}; - use futures::{SinkExt, StreamExt}; + use futures::{FutureExt, SinkExt, StreamExt}; use tokio::sync::broadcast; + use tracing::{debug, error, info, warn}; use crate::{ api::commitments::firewall_recv::CommitmentsFirewallRecv, - common::secrets::EcdsaSecretKeyWrapper, config::chain::Chain, + common::secrets::EcdsaSecretKeyWrapper, + config::chain::Chain, + primitives::commitment::{InclusionCommitment, SignedCommitment}, }; const FIREWALL_STREAM_PATH: &str = "/api/v1/firewall_stream"; #[tokio::test] async fn test_firewall_rpc_stream_ws() { + let _ = tracing_subscriber::fmt::try_init(); + + // Shutdown servers after closing connections so we can test both types of shutdowns + const CONNECTIONS_SHUTDOWN_IN_SECS: u64 = 5; + const SERVERS_SHUTDOWN_IN_SECS: u64 = 7; + let operator_private_key = EcdsaSecretKeyWrapper::random(); // Create a Single-Producer-Multiple-Consumer (SPMC) channel via a broadcast that sends a shutdown // signal to all websocket servers. - let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); + let (shutdown_servers_tx, shutdown_servers_rx) = broadcast::channel::<()>(1); - let port_1 = create_websocket_server(shutdown_tx.subscribe()).await; - let port_2 = create_websocket_server(shutdown_rx).await; + let (shutdown_connections_tx, mut shutdown_connections_rx) = broadcast::channel::<()>(1); - println!("Server 1 running on port: {}", port_1); - println!("Server 2 running on port: {}", port_2); - println!("Waiting for 5 seconds before shutting down the servers..."); + let port_1 = create_websocket_server(shutdown_servers_rx.resubscribe()).await; + let port_2 = create_websocket_server(shutdown_servers_rx.resubscribe()).await; + + info!("Server 1 running on port: {}", port_1); + info!("Server 2 running on port: {}", port_2); let stream = CommitmentsFirewallRecv::new( operator_private_key, @@ -383,16 +440,38 @@ mod tests { format!("ws://127.0.0.1:{}{}", port_1, FIREWALL_STREAM_PATH).parse().unwrap(), format!("ws://127.0.0.1:{}{}", port_2, FIREWALL_STREAM_PATH).parse().unwrap(), ], - ); + ) + .with_shutdown(async move { shutdown_connections_rx.recv().await.unwrap() }.boxed()); + + let mut api_events_rx = stream.run().await; + + info!("Waiting for {CONNECTIONS_SHUTDOWN_IN_SECS} seconds before shutting down the connection..."); + info!("Waiting for {SERVERS_SHUTDOWN_IN_SECS} seconds before shutting down the servers..."); - let _ = tokio::time::timeout(Duration::from_secs(5), async { - stream.run().await; + let _ = tokio::time::timeout(Duration::from_secs(CONNECTIONS_SHUTDOWN_IN_SECS), async { + loop { + if let Some(event) = api_events_rx.recv().await { + info!("Received commitment event: {:?}", event); + let req = event.request.as_inclusion_request().unwrap().clone(); + let dumb_signed_commitment = SignedCommitment::Inclusion( + InclusionCommitment::new_unchecked(req, Signature::test_signature()), + ); + event.response.send(Ok(dumb_signed_commitment)).unwrap(); + } + } }) .await; - // Shutdown the servers - println!("Shutting down the servers..."); - shutdown_tx.send(()).unwrap(); + info!("Shutting down the connections..."); + shutdown_connections_tx.send(()).unwrap(); + + tokio::time::sleep(Duration::from_secs( + SERVERS_SHUTDOWN_IN_SECS - CONNECTIONS_SHUTDOWN_IN_SECS, + )) + .await; + + info!("Shutting down the servers..."); + shutdown_servers_tx.send(()).unwrap(); } async fn create_websocket_server(mut shutdown_rx: broadcast::Receiver<()>) -> u16 { @@ -432,7 +511,7 @@ mod tests { } else { String::from("Unknown user agent") }; - println!("`{user_agent}` at {addr} connected."); + info!("`{user_agent}` at {addr} connected."); // finalize the upgrade process by returning upgrade callback. // we can customize the callback by sending additional info such as address. ws.on_upgrade(move |socket| handle_socket(socket, addr)) @@ -445,7 +524,7 @@ mod tests { async fn handle_socket(mut socket: WebSocket, who: SocketAddr) { // send a ping just to kick things off and get a response if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() { - println!("Pinged {who}..."); + info!("Pinged {who}..."); } // By splitting socket we can send and receive at the same time. In this example we will send @@ -457,14 +536,17 @@ mod tests { let n_msg = 20; for i in 0..n_msg { // In case of any websocket error, we exit. - if sender.send(Message::Text(format!("Server message {i} ..."))).await.is_err() { + if let Err(err) = + sender.send(Message::Text(format!("Server message {i} ..."))).await + { + error!("Error sending message {i}: {err}"); return i; } - tokio::time::sleep(std::time::Duration::from_millis(300)).await; + tokio::time::sleep(std::time::Duration::from_millis(500)).await; } - println!("Sending close to {who}..."); + info!("Sending close to {who}..."); if let Err(e) = sender .send(Message::Close(Some(CloseFrame { code: axum::extract::ws::close_code::NORMAL, @@ -472,7 +554,7 @@ mod tests { }))) .await { - println!("Could not send Close due to {e}, probably it is ok?"); + warn!("Could not send Close due to {e}, probably it is ok?"); } n_msg }); @@ -494,14 +576,14 @@ mod tests { tokio::select! { rv_a = (&mut send_task) => { match rv_a { - Ok(a) => println!("{a} messages sent to {who}"), + Ok(_) => {}, Err(a) => println!("Error sending messages {a:?}") } recv_task.abort(); }, rv_b = (&mut recv_task) => { match rv_b { - Ok(b) => println!("Received {b} messages"), + Ok(_) => {}, Err(b) => println!("Error receiving messages {b:?}") } send_task.abort(); @@ -509,17 +591,18 @@ mod tests { } // returning from the handler closes the websocket connection - println!("Websocket context {who} destroyed"); + info!("Websocket context {who} destroyed"); } /// helper to print contents of messages to stdout. Has special treatment for Close. fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> { + #[allow(clippy::match_same_arms)] match msg { Message::Text(t) => { - println!(">>> {who} sent str: {t:?}"); + debug!(">>> {who} sent str: {t:?}"); } Message::Binary(d) => { - println!(">>> {} sent {} bytes: {:?}", who, d.len(), d); + debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d); } Message::Close(c) => { if let Some(cf) = c { @@ -533,14 +616,14 @@ mod tests { return ControlFlow::Break(()); } - Message::Pong(v) => { - println!(">>> {who} sent pong with {v:?}"); + Message::Pong(_) => { + // println!(">>> {who} sent pong with {v:?}"); } // You should never need to manually handle Message::Ping, as axum's websocket library // will do so for you automagically by replying with Pong and copying the v according to // spec. But if you need the contents of the pings you can see them here. - Message::Ping(v) => { - println!(">>> {who} sent ping with {v:?}"); + Message::Ping(_) => { + // println!(">>> {who} sent ping with {v:?}"); } } ControlFlow::Continue(()) diff --git a/bolt-sidecar/src/primitives/commitment.rs b/bolt-sidecar/src/primitives/commitment.rs index 50e6b8ea..bcdb246f 100644 --- a/bolt-sidecar/src/primitives/commitment.rs +++ b/bolt-sidecar/src/primitives/commitment.rs @@ -41,6 +41,13 @@ pub struct InclusionCommitment { signature: Signature, } +impl InclusionCommitment { + /// Creates a new unchecked inclusion commitment + pub fn new_unchecked(request: InclusionRequest, signature: Signature) -> Self { + Self { request, signature } + } +} + impl From for InclusionCommitment { fn from(commitment: SignedCommitment) -> Self { match commitment {