Skip to content

Commit

Permalink
feat: ensure correct process termination (using task_scope)
Browse files Browse the repository at this point in the history
  • Loading branch information
msgmaxim committed Oct 11, 2023
1 parent 30409ef commit fd10133
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 86 deletions.
71 changes: 43 additions & 28 deletions api/bin/chainflip-ingress-egress-tracker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use chainflip_engine::settings::WsHttpEndpoints;
use futures::FutureExt;
use jsonrpsee::{core::Error, server::ServerBuilder, RpcModule};
use std::{env, io::Write, net::SocketAddr, path::PathBuf};
use tracing::log;
use utilities::task_scope;

mod witnessing;

Expand All @@ -12,16 +14,18 @@ pub struct DepositTrackerSettings {
state_chain_ws_endpoint: String,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn start(
scope: &task_scope::Scope<'_, anyhow::Error>,
settings: DepositTrackerSettings,
) -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");
let server = ServerBuilder::default().build("0.0.0.0:13337".parse::<SocketAddr>()?).await?;
let mut module = RpcModule::new(());

let btc_tracker = witnessing::btc::start().await;
let btc_tracker = witnessing::btc::start(scope).await;

module.register_async_method("status", move |arguments, _context| {
let btc_tracker = btc_tracker.clone();
Expand All @@ -40,29 +44,7 @@ async fn main() -> anyhow::Result<()> {
let (witness_sender, _) =
tokio::sync::broadcast::channel::<state_chain_runtime::RuntimeCall>(EVENT_BUFFER_SIZE);

// Temporary hack: we don't actually use eth key, but the current witnesser is
// expecting a path with a valid key, so we create a temporary dummy key file here:
let mut eth_key_temp_file = tempfile::NamedTempFile::new()?;
eth_key_temp_file
.write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
.unwrap();
let eth_key_path = eth_key_temp_file.path();

let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string());
let eth_http_endpoint =
env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string());
let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string());

let settings = DepositTrackerSettings {
eth_node: WsHttpEndpoints {
ws_endpoint: eth_ws_endpoint.into(),
http_endpoint: eth_http_endpoint.into(),
},
eth_key_path: eth_key_path.into(),
state_chain_ws_endpoint: sc_ws_endpoint,
};

witnessing::start(settings, witness_sender.clone());
witnessing::start(scope, settings, witness_sender.clone()).await?;

module.register_subscription(
"subscribe_witnessing",
Expand All @@ -86,7 +68,40 @@ async fn main() -> anyhow::Result<()> {

let addr = server.local_addr()?;
log::info!("Listening on http://{}", addr);
let serverhandle = Box::pin(server.start(module)?.stopped());
let _ = serverhandle.await;

scope.spawn(async {
server.start(module)?.stopped().await;
// If the server stops for some reason, we return
// error to terminate other tasks and the process.
Err(anyhow::anyhow!("RPC server stopped"))
});

Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Temporary hack: we don't actually use eth key, but the current witnesser is
// expecting a path with a valid key, so we create a temporary dummy key file here:
let mut eth_key_temp_file = tempfile::NamedTempFile::new()?;
eth_key_temp_file
.write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
.unwrap();
let eth_key_path = eth_key_temp_file.path();

let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string());
let eth_http_endpoint =
env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string());
let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string());

let settings = DepositTrackerSettings {
eth_node: WsHttpEndpoints {
ws_endpoint: eth_ws_endpoint.into(),
http_endpoint: eth_http_endpoint.into(),
},
eth_key_path: eth_key_path.into(),
state_chain_ws_endpoint: sc_ws_endpoint,
};

task_scope::task_scope(|scope| async move { start(scope, settings).await }.boxed()).await
}
95 changes: 40 additions & 55 deletions api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use chainflip_engine::{
},
witness::common::epoch_source::EpochSource,
};
use futures::FutureExt;
use sp_core::H160;
use utilities::task_scope;

Expand Down Expand Up @@ -73,62 +72,48 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro
}
}

pub(super) fn start(
pub(super) async fn start(
scope: &task_scope::Scope<'_, anyhow::Error>,
settings: DepositTrackerSettings,
witness_sender: tokio::sync::broadcast::Sender<state_chain_runtime::RuntimeCall>,
) {
tokio::spawn(async {
// TODO: ensure that if this panics, the whole process exists (probably by moving
// task scope to main)
task_scope::task_scope(|scope| {
) -> anyhow::Result<()> {
let (state_chain_stream, state_chain_client) = {
state_chain_observer::client::StateChainClient::connect_without_account(
scope,
&settings.state_chain_ws_endpoint,
)
.await?
};

let env_params = get_env_parameters(&state_chain_client).await;

let epoch_source =
EpochSource::builder(scope, state_chain_stream.clone(), state_chain_client.clone()).await;

let witness_call = {
let witness_sender = witness_sender.clone();
move |call: state_chain_runtime::RuntimeCall, _epoch_index| {
let witness_sender = witness_sender.clone();
async move {
let (state_chain_stream, state_chain_client) = {
state_chain_observer::client::StateChainClient::connect_without_account(
scope,
&settings.state_chain_ws_endpoint,
)
.await?
};

let env_params = get_env_parameters(&state_chain_client).await;

let epoch_source = EpochSource::builder(
scope,
state_chain_stream.clone(),
state_chain_client.clone(),
)
.await;

let witness_call = {
let witness_sender = witness_sender.clone();
move |call: state_chain_runtime::RuntimeCall, _epoch_index| {
let witness_sender = witness_sender.clone();
async move {
// Send may fail if there aren't any subscribers,
// but it is safe to ignore the error.
if let Ok(n) = witness_sender.send(call) {
tracing::info!("Broadcasting witnesser call to {} subscribers", n);
}
}
}
};

eth::start(
scope,
state_chain_client.clone(),
state_chain_stream.clone(),
settings,
env_params,
epoch_source.clone(),
witness_call,
)
.await?;

Ok(())
// Send may fail if there aren't any subscribers,
// but it is safe to ignore the error.
if let Ok(n) = witness_sender.send(call) {
tracing::info!("Broadcasting witnesser call to {} subscribers", n);
}
}
.boxed()
})
.await
.unwrap()
});
}
};

eth::start(
scope,
state_chain_client.clone(),
state_chain_stream.clone(),
settings,
env_params,
epoch_source.clone(),
witness_call,
)
.await?;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use anyhow::anyhow;
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::{error, info};
use utilities::task_scope;

type TxHash = String;
type BlockHash = String;
Expand Down Expand Up @@ -292,10 +293,9 @@ impl BtcTracker {
}
}

pub async fn start() -> BtcTracker {
pub async fn start(scope: &task_scope::Scope<'_, anyhow::Error>) -> BtcTracker {
let cache: Arc<Mutex<Cache>> = Default::default();
// TODO: spawn using task_scope
tokio::task::spawn({
scope.spawn({
let cache = cache.clone();
async move {
let mut interval = tokio::time::interval(Duration::from_secs(REFRESH_INTERVAL));
Expand Down

0 comments on commit fd10133

Please sign in to comment.