Skip to content

Commit

Permalink
fix(module_manager): make tokio select biased
Browse files Browse the repository at this point in the history
  • Loading branch information
banditopazzo committed Jul 3, 2024
1 parent 4444b59 commit 748cd3a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/pulsard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub async fn pulsar_daemon_run(

log::info!("Terminating Pulsar Daemon...");
for module in pulsar_daemon.modules().await {
log::info!("Terminating {} module...", module.name);
if let Err(err) = pulsar_daemon.stop(module.name.clone()).await {
log::warn!(
"Module {} didn't respond to shutdown signal. Forcing shutdown.\n{:?}",
Expand Down
40 changes: 24 additions & 16 deletions src/pulsard/module_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,29 @@ async fn run_module_loop<T: PulsarModule>(

loop {
tokio::select! {
// Futures need to polled in a specific order
biased;
// Shutdown
r = rx_shutdown.recv() => {
T::graceful_stop(state).await?;
return r
},
// Stop event receiver
_ = rx_stop_event_recv.recv() => {
// Drop the Event Receiver and replace it with None
rx_event = None
}
// Stop config receiver
_ = rx_stop_cfg_recv.recv() => {
// Drop the Configuration Receiver and replace it with None
rx_config = None
}
// Extra action
t_output = T::trigger(&mut extension) => {
let t_output = t_output?;
T::action(&t_output, &config, &mut state, ctx).await?
}
// New config
rx_config = async {
match &mut rx_config {
Some(rx_config) => {
Expand All @@ -393,10 +416,7 @@ async fn run_module_loop<T: PulsarModule>(
config = T::Config::try_from(&rx_config.borrow())?;
T::on_config_change(&config, &mut state, ctx).await?;
}
_ = rx_stop_cfg_recv.recv() => {
// Drop the Configuration Receiver and replace it with None
rx_config = None
}
// Incoming event
event = async {
match &mut rx_event {
Some(rx_event) => pulsar_core::pdk::receive_from_broadcast(rx_event, ctx.module_name()).await,
Expand All @@ -406,18 +426,6 @@ async fn run_module_loop<T: PulsarModule>(
let event = event.expect("no more events");
T::on_event(&event, &config, &mut state, ctx).await?;
}
_ = rx_stop_event_recv.recv() => {
// Drop the Event Receiver and replace it with None
rx_event = None
}
t_output = T::trigger(&mut extension) => {
let t_output = t_output?;
T::action(&t_output, &config, &mut state, ctx).await?
}
r = rx_shutdown.recv() => {
T::graceful_stop(state).await?;
return r
},
}
}
}

0 comments on commit 748cd3a

Please sign in to comment.