Skip to content

Commit

Permalink
chore(sidecar): refactor and cleanup of message processor
Browse files Browse the repository at this point in the history
  • Loading branch information
thedevbirb committed Dec 12, 2024
1 parent 83cc8d6 commit e17add1
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use futures::stream::{FuturesUnordered, SplitSink, SplitStream};
use futures::{FutureExt, SinkExt, StreamExt};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::fmt::{self, Formatter};
use std::task::Poll;
use std::time::Duration;
use std::{future::Future, pin::Pin};
Expand All @@ -25,8 +27,10 @@ use super::spec::CommitmentError;
/// The interval at which to send ping messages from connected clients.
const PING_INTERVAL: Duration = Duration::from_secs(30);

#[allow(dead_code)]
pub struct CommitmentsFirewallStream {
/// A [CommitmentsFirewallStream] connects to multiple firewall-ed websocket RPC servers and
/// forwards [CommitmentEvent]s to a single receiver, return upon calling the
/// `[CommitmentsFirewallStream::run]` method.
pub struct CommitmentsFirewallRecv {
/// The operator's private key to sign authentication requests when opening websocket
/// connections with RPCs.
operator_private_key: EcdsaSecretKeyWrapper,
Expand All @@ -38,8 +42,18 @@ pub struct CommitmentsFirewallStream {
signal: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

#[allow(dead_code)]
impl CommitmentsFirewallStream {
impl Debug for CommitmentsFirewallRecv {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("CommitmentsFirewallStream")
.field("operator_private_key", &"********")
.field("chain", &self.chain)
.field("urls", &self.urls)
.finish()
}
}

impl CommitmentsFirewallRecv {
/// Creates a new instance of the commitments firewall stream.
pub fn new(operator_private_key: EcdsaSecretKeyWrapper, chain: Chain, urls: Vec<Url>) -> Self {
Self {
operator_private_key,
Expand All @@ -51,14 +65,16 @@ impl CommitmentsFirewallStream {
}
}

/// Runs the [CommitmentsFirewallRecv] and returns a receiver for incoming commitment
/// events.
pub async fn run(&self) -> mpsc::Receiver<CommitmentEvent> {
// Create a Multiple-Producer-Single-Consumer (MPSC) channel to receive messages from the
// websocket servers on a single stream.
// mspc channel where every websocket connection will send commitment events over its own
// tx to a single receiver.
let (api_event_tx, api_events_rx) = mpsc::channel(self.urls.len() * 2);
let (ping_tx, ping_rx) = broadcast::channel::<()>(1);

// Create a task to send pings to the server at regular intervals
let _ = tokio::spawn(async move {
// a task to send pings to open connections to the servers at regular intervals
tokio::spawn(async move {
let ping_interval = tokio::time::interval(PING_INTERVAL);
tokio::pin!(ping_interval);

Expand Down Expand Up @@ -93,43 +109,56 @@ async fn handle_connection(
api_events_tx: mpsc::Sender<CommitmentEvent>,
ping_rx: broadcast::Receiver<()>,
) {
let ws_config = WebSocketConfig {
max_message_size: Some(1 << 10), // 64KB
..Default::default()
};
// TODO: fill this
let ws_config = WebSocketConfig { ..Default::default() };

loop {
match connect_async_with_config(url.to_string(), Some(ws_config), false).await {
Ok((stream, _response)) => {
info!(?url, "opened websocket connection");
let (write, read) = stream.split();
Ok((stream, response)) => {
info!(?url, ?response, "opened websocket connection");
let (write_sink, read_stream) = stream.split();

let message_processer = MessageProcesser::new(
// For each opened connection, create a new commitment processor
// able to handle incoming message requests.
let commitment_request_processor = CommitmentRequestProcessor::new(
url.clone(),
api_events_tx.clone(),
write,
read,
write_sink,
read_stream,
ping_rx.resubscribe(),
);
message_processer.await
// Run the commitment processor indefinitely, reconnecting on failure.
commitment_request_processor.await
}
Err(e) => {
error!(?e, ?url, "failed to connect");
}
}

// Reconnect on failure
println!("Reconnecting to {}", url);
// TODO: add backoff
warn!(?url, "connection lost. reconnecting...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

struct MessageProcesser {
/// The [CommitmentRequestProcessor] handles incoming commitment requests a the websocket
/// connection, and forwards them to the [CommitmentEvent] tx channel for processing.
struct CommitmentRequestProcessor {
/// The URL of the connected websocket server.
url: Url,
tx: mpsc::Sender<CommitmentEvent>,
write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
/// The channel to send commitment events to be processed.
api_events_tx: mpsc::Sender<CommitmentEvent>,
/// The websocket writer sink.
write_sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
/// The websocket reader stream.
read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
/// The receiver for keep-alive ping messages.
ping_rx: broadcast::Receiver<()>,
/// The collection of pending commitment requests responses, sent with [api_events_tx].
/// NOTE: Is there a better way to avoid this monster type?
/// SAFETY: the `poll` implementation of this struct promptly handles these responses and
/// ensures this vector doesn't grow indefinitely.
pending_commitment_responses: FuturesUnordered<
Pin<
Box<
Expand All @@ -142,10 +171,11 @@ struct MessageProcesser {
>,
>,
>,
/// The collection of outgoing messages to be sent to the connected websocket server.
outgoing_messages: VecDeque<Message>,
}

impl MessageProcesser {
impl CommitmentRequestProcessor {
pub fn new(
url: Url,
tx: mpsc::Sender<CommitmentEvent>,
Expand All @@ -155,17 +185,17 @@ impl MessageProcesser {
) -> Self {
Self {
url,
tx,
write,
read,
api_events_tx: tx,
write_sink: write,
read_stream: read,
ping_rx,
pending_commitment_responses: FuturesUnordered::new(),
outgoing_messages: VecDeque::new(),
}
}
}

impl Future for MessageProcesser {
impl Future for CommitmentRequestProcessor {
type Output = ();

fn poll(
Expand All @@ -178,51 +208,63 @@ impl Future for MessageProcesser {
loop {
let mut progress = false;

// 1. Handle incoming WebSocket messages
match this.read.poll_next_unpin(cx) {
Poll::Ready(Some(message)) => {
progress = true;
// 1. Handle incoming websocket messages from the read stream.
while let Poll::Ready(maybe_message) = this.read_stream.poll_next_unpin(cx) {
progress = true;

match message {
Ok(Message::Text(_text)) => {
let (tx, rx) = oneshot::channel();
let request = CommitmentRequest::Inclusion(InclusionRequest::default());
let event = CommitmentEvent { request, response: tx };
if let Err(err) = this.tx.try_send(event) {
error!(?err, "failed to forward commitment event to channel");
}
match maybe_message {
Some(message_res) => {
match message_res {
Ok(Message::Text(_text)) => {
// Create the channel to send and receive the commitment response
let (tx, rx) = oneshot::channel();

// Add the receiver's future to the FuturesUnordered
this.pending_commitment_responses.push(rx.boxed());
}
Ok(Message::Close(_)) => {
warn!(?rpc_url, "websocket connection closed by server");
return Poll::Ready(());
}
Err(e) => {
error!(?e, ?rpc_url, "error reading message from websocket connection");
return Poll::Ready(());
// TODO: parse the text into a commitment request
let request =
CommitmentRequest::Inclusion(InclusionRequest::default());
let event = CommitmentEvent { request, response: tx };

if let Err(err) = this.api_events_tx.try_send(event) {
error!(?err, "failed to forward commitment event to channel");
}

// add the pending response to this buffer for later processing
this.pending_commitment_responses.push(rx.boxed());
}
Ok(Message::Close(_)) => {
warn!(?rpc_url, "websocket connection closed by server");
return Poll::Ready(());
}
Err(e) => {
error!(
?e,
?rpc_url,
"error reading message from websocket connection"
);
return Poll::Ready(());
}
_ => {} // Ignore non-text messages
}
_ => {} // Ignore non-text messages
}
None => {
warn!("websocket connection with {} closed by server", rpc_url);
return Poll::Ready(());
}
}
Poll::Ready(None) => {
warn!("websocket connection with {} closed by server", rpc_url);
return Poll::Ready(());
}
_ => {}
}

// 2. Handle commitment responses
// 2. Handle commitment request responses after they've been processed.
while let Poll::Ready(Some(response)) =
this.pending_commitment_responses.poll_next_unpin(cx)
{
progress = true;
match response {
Ok(commitment_result) => {
if let Ok(commitment) = commitment_result {
// TODO: check whether this format is correct + handle errors.
let message =
Message::text(serde_json::to_string(&commitment).unwrap());
// Add the message to the outgoing messages queue
this.outgoing_messages.push_back(message);
}
}
Expand All @@ -232,29 +274,15 @@ impl Future for MessageProcesser {
}
}

// 3. Handle ping messages
match this.ping_rx.try_recv() {
Ok(_) => {
progress = true;
this.outgoing_messages.push_back(Message::Ping(vec![8, 0, 1, 7]));
}
Err(TryRecvError::Closed) => {
error!("ping channel for keep-alive messages closed");
return Poll::Ready(());
}
Err(TryRecvError::Lagged(lost)) => {
error!("ping channel for keep-alives lagged by {} messages", lost)
}
_ => {}
}

// 4. Process outgoing messages
// 3. Process outgoing messages
while let Some(message) = this.outgoing_messages.pop_front() {
match this.write.poll_ready_unpin(cx) {
// Check if the write sink is able to receive data.
match this.write_sink.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => {
progress = true;

if let Err(e) = this.write.start_send_unpin(message) {
// Try to send the message to the sink, for later flushing.
if let Err(e) = this.write_sink.start_send_unpin(message) {
error!(?e, ?rpc_url, "failed to send message to websocket connection");
// NOTE: Should we return here?
// return Poll::Ready(());
Expand All @@ -273,8 +301,8 @@ impl Future for MessageProcesser {
}
}

// 5. Ensure the write sink is flushed
match this.write.poll_flush_unpin(cx) {
// 4. Ensure the write sink is flushed so that message are sent to the caller server.
match this.write_sink.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => {
progress = true;
}
Expand All @@ -286,6 +314,22 @@ impl Future for MessageProcesser {
_ => {}
}

// 5. Handle ping messages
match this.ping_rx.try_recv() {
Ok(_) => {
progress = true;
this.outgoing_messages.push_back(Message::Ping(vec![8, 0, 1, 7]));
}
Err(TryRecvError::Closed) => {
error!("ping channel for keep-alive messages closed");
return Poll::Ready(());
}
Err(TryRecvError::Lagged(lost)) => {
error!("ping channel for keep-alives lagged by {} messages", lost)
}
_ => {}
}

if !progress {
return Poll::Pending;
}
Expand All @@ -311,7 +355,7 @@ mod tests {
use tokio::sync::broadcast;

use crate::{
api::commitments::firewall_stream::CommitmentsFirewallStream,
api::commitments::firewall_recv::CommitmentsFirewallRecv,
common::secrets::EcdsaSecretKeyWrapper, config::chain::Chain,
};

Expand All @@ -332,7 +376,7 @@ mod tests {
println!("Server 2 running on port: {}", port_2);
println!("Waiting for 5 seconds before shutting down the servers...");

let stream = CommitmentsFirewallStream::new(
let stream = CommitmentsFirewallRecv::new(
operator_private_key,
Chain::Holesky,
vec![
Expand Down
2 changes: 1 addition & 1 deletion bolt-sidecar/src/api/commitments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// The commitments-API stream handler.
pub mod firewall_stream;
pub mod firewall_recv;
/// The commitments-API request handlers.
mod handlers;
/// The commitments-API headers and constants.
Expand Down
9 changes: 3 additions & 6 deletions bolt-sidecar/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
api::{
builder::{start_builder_proxy_server, BuilderProxyConfig},
commitments::{
firewall_stream::CommitmentsFirewallStream,
firewall_recv::CommitmentsFirewallRecv,
server::{CommitmentEvent, CommitmentsApiServer},
spec::CommitmentError,
},
Expand Down Expand Up @@ -230,15 +230,12 @@ impl<C: StateFetcher, ECDSA: SignerECDSA> SidecarDriver<C, ECDSA> {

let api_events_rx = if let Some(port) = opts.commitment_opts.port {
// start the commitments api server
let api_addr = format!(
"0.0.0.0:{}",
opts.commitment_opts.port.expect("commitments port must be provided")
);
let api_addr = format!("0.0.0.0:{}", port);
let (api_events_tx, api_events_rx) = mpsc::channel(1024);
CommitmentsApiServer::new(api_addr).run(api_events_tx, opts.limits).await;
api_events_rx
} else {
CommitmentsFirewallStream::new(
CommitmentsFirewallRecv::new(
opts.commitment_opts.operator_private_key.clone(),
opts.chain.chain,
opts.commitment_opts
Expand Down

0 comments on commit e17add1

Please sign in to comment.