Skip to content

Commit

Permalink
remove exit-future (#5183)
Browse files Browse the repository at this point in the history
* remove exit-future usage,

as it is non maintained, and replace with async-channel which is already in the repo.

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into remove-exit-future

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into remove-exit-future
  • Loading branch information
jxs authored Feb 27, 2024
1 parent abd9965 commit 65c4ff0
Show file tree
Hide file tree
Showing 18 changed files with 171 additions and 193 deletions.
256 changes: 117 additions & 139 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ edition = "2021"
[workspace.dependencies]
anyhow = "1"
arbitrary = { version = "1", features = ["derive"] }
async-channel = "1.9.0"
bincode = "1"
bitvec = "1"
byteorder = "1"
Expand Down
1 change: 0 additions & 1 deletion beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ execution_layer = { workspace = true }
sensitive_url = { workspace = true }
superstruct = { workspace = true }
hex = { workspace = true }
exit-future = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
slog-term = { workspace = true }
slog-async = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion beacon_node/execution_layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ kzg = { workspace = true }
state_processing = { workspace = true }
superstruct = { workspace = true }
lru = { workspace = true }
exit-future = { workspace = true }
tree_hash = { workspace = true }
tree_hash_derive = { workspace = true }
parking_lot = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
discv5 = { workspace = true }
unsigned-varint = { version = "0.6", features = ["codec"] }
ssz_types = { workspace = true }
Expand Down Expand Up @@ -55,7 +56,6 @@ hex_fmt = "0.3.0"
instant = "0.1.12"
quick-protobuf = "0.8"
void = "1.0.2"
async-channel = "1.9.0"
asynchronous-codec = "0.7.0"
base64 = "0.21.5"
libp2p-mplex = "0.41"
Expand All @@ -70,7 +70,6 @@ features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "plaintext", "s
slog-term = { workspace = true }
slog-async = { workspace = true }
tempfile = { workspace = true }
exit-future = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-std = { version = "1.6.3", features = ["unstable"] }
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct Libp2pInstance(
LibP2PService<ReqId, E>,
#[allow(dead_code)]
// This field is managed for lifetime purposes may not be used directly, hence the `#[allow(dead_code)]` attribute.
exit_future::Signal,
async_channel::Sender<()>,
);

impl std::ops::Deref for Libp2pInstance {
Expand Down Expand Up @@ -110,7 +110,7 @@ pub async fn build_libp2p_instance(
let config = build_config(boot_nodes);
// launch libp2p service

let (signal, exit) = exit_future::signal();
let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, log.clone(), shutdown_tx);
let libp2p_context = lighthouse_network::Context {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ edition = { workspace = true }
sloggers = { workspace = true }
genesis = { workspace = true }
matches = "0.1.8"
exit-future = { workspace = true }
slog-term = { workspace = true }
slog-async = { workspace = true }
eth2 = { workspace = true }

[dependencies]
async-channel = { workspace = true }
anyhow = { workspace = true }
beacon_chain = { workspace = true }
store = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {

let runtime = Arc::new(Runtime::new().unwrap());

let (signal, exit) = exit_future::signal();
let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
Expand Down Expand Up @@ -139,7 +139,7 @@ mod tests {

// Build network service.
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
let (_, exit) = exit_future::signal();
let (_, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
Expand Down
2 changes: 1 addition & 1 deletion common/task_executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
tokio = { workspace = true }
slog = { workspace = true }
futures = { workspace = true }
exit-future = { workspace = true }
lazy_static = { workspace = true }
lighthouse_metrics = { workspace = true }
sloggers = { workspace = true }
61 changes: 32 additions & 29 deletions common/task_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct TaskExecutor {
/// The handle to the runtime on which tasks are spawned
handle_provider: HandleProvider,
/// The receiver exit future which on receiving shuts down the task
exit: exit_future::Exit,
exit: async_channel::Receiver<()>,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
/// continue they can request that everything shuts down.
///
Expand All @@ -93,7 +93,7 @@ impl TaskExecutor {
/// crate).
pub fn new<T: Into<HandleProvider>>(
handle: T,
exit: exit_future::Exit,
exit: async_channel::Receiver<()>,
log: slog::Logger,
signal_tx: Sender<ShutdownReason>,
) -> Self {
Expand Down Expand Up @@ -159,8 +159,8 @@ impl TaskExecutor {

/// Spawn a future on the tokio runtime.
///
/// The future is wrapped in an `exit_future::Exit`. The task is cancelled when the corresponding
/// exit_future `Signal` is fired/dropped.
/// The future is wrapped in an `async-channel::Receiver`. The task is cancelled when the corresponding
/// Sender is dropped.
///
/// The future is monitored via another spawned future to ensure that it doesn't panic. In case
/// of a panic, the executor will be shut down via `self.signal_tx`.
Expand All @@ -172,9 +172,9 @@ impl TaskExecutor {
}
}

/// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit`
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `async-channel::Receiver`
/// like [spawn](#method.spawn).
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
/// The caller of this function is responsible for wrapping up the task with an `async-channel::Receiver` to
/// ensure that the task gets canceled appropriately.
/// This function generates prometheus metrics on number of tasks and task duration.
///
Expand Down Expand Up @@ -213,40 +213,39 @@ impl TaskExecutor {
}
}

/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit` returning an optional
/// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional
/// join handle to the future.
/// The task is canceled when the corresponding exit_future `Signal` is fired/dropped.
/// The task is canceled when the corresponding async-channel is dropped.
///
/// This function generates prometheus metrics on number of tasks and task duration.
pub fn spawn_handle<R: Send + 'static>(
&self,
task: impl Future<Output = R> + Send + 'static,
name: &'static str,
) -> Option<tokio::task::JoinHandle<Option<R>>> {
let exit = self.exit.clone();
let exit = self.exit();
let log = self.log.clone();

if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
// Task is shutdown before it completes if `exit` receives
let int_gauge_1 = int_gauge.clone();
let future = future::select(Box::pin(task), exit).then(move |either| {
let result = match either {
future::Either::Left((value, _)) => {
trace!(log, "Async task completed"; "task" => name);
Some(value)
}
future::Either::Right(_) => {
debug!(log, "Async task shutdown, exit received"; "task" => name);
None
}
};
int_gauge_1.dec();
futures::future::ready(result)
});

int_gauge.inc();
if let Some(handle) = self.handle() {
Some(handle.spawn(future))
Some(handle.spawn(async move {
futures::pin_mut!(exit);
let result = match future::select(Box::pin(task), exit).await {
future::Either::Left((value, _)) => {
trace!(log, "Async task completed"; "task" => name);
Some(value)
}
future::Either::Right(_) => {
debug!(log, "Async task shutdown, exit received"; "task" => name);
None
}
};
int_gauge_1.dec();
result
}))
} else {
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
None
Expand Down Expand Up @@ -324,7 +323,7 @@ impl TaskExecutor {
metrics::inc_gauge_vec(&metrics::BLOCK_ON_TASKS_COUNT, &[name]);
let log = self.log.clone();
let handle = self.handle()?;
let exit = self.exit.clone();
let exit = self.exit();

debug!(
log,
Expand Down Expand Up @@ -362,9 +361,13 @@ impl TaskExecutor {
self.handle_provider.handle()
}

/// Returns a copy of the `exit_future::Exit`.
pub fn exit(&self) -> exit_future::Exit {
self.exit.clone()
/// Returns a future that completes when `async-channel::Sender` is dropped or () is sent,
/// which translates to the exit signal being triggered.
pub fn exit(&self) -> impl Future<Output = ()> {
let exit = self.exit.clone();
async move {
let _ = exit.recv().await;
}
}

/// Get a channel to request shutting down.
Expand Down
4 changes: 2 additions & 2 deletions common/task_executor/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::runtime;
/// This struct should never be used in production, only testing.
pub struct TestRuntime {
runtime: Option<Arc<tokio::runtime::Runtime>>,
_runtime_shutdown: exit_future::Signal,
_runtime_shutdown: async_channel::Sender<()>,
pub task_executor: TaskExecutor,
pub log: Logger,
}
Expand All @@ -24,7 +24,7 @@ impl Default for TestRuntime {
/// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the
/// `Self` is dropped.
fn default() -> Self {
let (runtime_shutdown, exit) = exit_future::signal();
let (runtime_shutdown, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let log = null_logger().unwrap();

Expand Down
2 changes: 1 addition & 1 deletion lighthouse/environment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
tokio = { workspace = true }
slog = { workspace = true }
sloggers = { workspace = true }
Expand All @@ -17,7 +18,6 @@ slog-term = { workspace = true }
slog-async = { workspace = true }
futures = { workspace = true }
slog-json = "2.3.0"
exit-future = { workspace = true }
serde = { workspace = true }

[target.'cfg(not(target_family = "unix"))'.dependencies]
Expand Down
8 changes: 4 additions & 4 deletions lighthouse/environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl<E: EthSpec> EnvironmentBuilder<E> {

/// Consumes the builder, returning an `Environment`.
pub fn build(self) -> Result<Environment<E>, String> {
let (signal, exit) = exit_future::signal();
let (signal, exit) = async_channel::bounded(1);
let (signal_tx, signal_rx) = channel(1);
Ok(Environment {
runtime: self
Expand All @@ -370,8 +370,8 @@ pub struct Environment<E: EthSpec> {
signal_rx: Option<Receiver<ShutdownReason>>,
/// Sender to request shutting down.
signal_tx: Sender<ShutdownReason>,
signal: Option<exit_future::Signal>,
exit: exit_future::Exit,
signal: Option<async_channel::Sender<()>>,
exit: async_channel::Receiver<()>,
log: Logger,
sse_logging_components: Option<SSELoggingComponents>,
eth_spec_instance: E,
Expand Down Expand Up @@ -543,7 +543,7 @@ impl<E: EthSpec> Environment<E> {
/// Fire exit signal which shuts down all spawned services
pub fn fire_signal(&mut self) {
if let Some(signal) = self.signal.take() {
let _ = signal.fire();
drop(signal);
}
}

Expand Down
4 changes: 2 additions & 2 deletions testing/execution_engine_integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ version = "0.1.0"
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
tempfile = { workspace = true }
serde_json = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
exit-future = { workspace = true }
environment = { workspace = true }
execution_layer = { workspace = true }
sensitive_url = { workspace = true }
Expand All @@ -24,4 +24,4 @@ fork_choice = { workspace = true }
logging = { workspace = true }

[features]
portable = ["types/portable"]
portable = ["types/portable"]
4 changes: 2 additions & 2 deletions testing/execution_engine_integration/src/test_rig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct TestRig<E, T: EthSpec = MainnetEthSpec> {
ee_a: ExecutionPair<E, T>,
ee_b: ExecutionPair<E, T>,
spec: ChainSpec,
_runtime_shutdown: exit_future::Signal,
_runtime_shutdown: async_channel::Sender<()>,
}

/// Import a private key into the execution engine and unlock it so that we can
Expand Down Expand Up @@ -111,7 +111,7 @@ impl<E: GenericExecutionEngine> TestRig<E> {
.build()
.unwrap(),
);
let (runtime_shutdown, exit) = exit_future::signal();
let (runtime_shutdown, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx);
let mut spec = TEST_FORK.make_genesis_spec(MainnetEthSpec::default_spec());
Expand Down
2 changes: 1 addition & 1 deletion testing/web3signer_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = { workspace = true }
[dependencies]

[dev-dependencies]
async-channel = { workspace = true }
eth2_keystore = { workspace = true }
types = { workspace = true }
tempfile = { workspace = true }
Expand All @@ -17,7 +18,6 @@ url = { workspace = true }
validator_client = { workspace = true }
slot_clock = { workspace = true }
futures = { workspace = true }
exit-future = { workspace = true }
task_executor = { workspace = true }
environment = { workspace = true }
account_utils = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions testing/web3signer_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ mod tests {
validator_store: Arc<ValidatorStore<TestingSlotClock, E>>,
_validator_dir: TempDir,
runtime: Arc<tokio::runtime::Runtime>,
_runtime_shutdown: exit_future::Signal,
_runtime_shutdown: async_channel::Sender<()>,
using_web3signer: bool,
}

Expand Down Expand Up @@ -340,7 +340,7 @@ mod tests {
.build()
.unwrap(),
);
let (runtime_shutdown, exit) = exit_future::signal();
let (runtime_shutdown, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor =
TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx);
Expand Down
1 change: 0 additions & 1 deletion validator_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ directory = { workspace = true }
lockfile = { workspace = true }
environment = { workspace = true }
parking_lot = { workspace = true }
exit-future = { workspace = true }
filesystem = { workspace = true }
hex = { workspace = true }
deposit_contract = { workspace = true }
Expand Down

0 comments on commit 65c4ff0

Please sign in to comment.