From 42d60a3f68bca013b52c22b0fbc230d337071c57 Mon Sep 17 00:00:00 2001 From: David Estes Date: Fri, 6 Dec 2024 11:48:32 -0700 Subject: [PATCH 1/2] fix: pass shutdown signal to startup ordering tasks could take hours and it wouldn't shutdown with interrupt. if you killed the process you might leave the lock on the sqlite db and need fuser to find and kill the pid to release it --- event-svc/src/event/mod.rs | 2 +- event-svc/src/event/ordering_task.rs | 239 +++++++++++++++++++-------- event-svc/src/event/service.rs | 31 +++- event-svc/src/lib.rs | 2 +- event-svc/src/tests/event.rs | 6 +- one/src/daemon.rs | 18 +- one/src/migrations.rs | 10 +- p2p/tests/node.rs | 11 +- 8 files changed, 233 insertions(+), 86 deletions(-) diff --git a/event-svc/src/event/mod.rs b/event-svc/src/event/mod.rs index d1a063c1..db2a0bb1 100644 --- a/event-svc/src/event/mod.rs +++ b/event-svc/src/event/mod.rs @@ -6,5 +6,5 @@ mod service; mod store; mod validator; -pub use service::{BlockStore, DeliverableRequirement, EventService}; +pub use service::{BlockStore, DeliverableRequirement, EventService, UndeliveredEventReview}; pub use validator::ChainInclusionProvider; diff --git a/event-svc/src/event/ordering_task.rs b/event-svc/src/event/ordering_task.rs index 34665df7..aaca4507 100644 --- a/event-svc/src/event/ordering_task.rs +++ b/event-svc/src/event/ordering_task.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, VecDeque}; +use std::future::Future; use std::sync::Arc; use anyhow::anyhow; @@ -53,12 +54,14 @@ impl OrderingTask { event_access: Arc, max_iterations: usize, batch_size: u32, + shutdown_signal: Box>, ) -> Result { Self::process_all_undelivered_events_with_tasks( event_access, max_iterations, batch_size, UNDELIVERED_EVENTS_STARTUP_TASKS, + shutdown_signal, ) .await } @@ -69,6 +72,7 @@ impl OrderingTask { max_iterations: usize, batch_size: u32, num_tasks: u32, + shutdown_signal: Box>, ) -> Result { let (tx, rx_inserted) = tokio::sync::mpsc::channel::(PENDING_EVENTS_CHANNEL_DEPTH); @@ -83,6 +87,7 @@ impl OrderingTask { batch_size, tx, num_tasks, + Box::into_pin(shutdown_signal), ) .await { @@ -653,11 +658,78 @@ impl OrderingState { batch_size: u32, tx: Sender, partition_size: u32, + shutdown_signal: std::pin::Pin>>, ) -> Result { info!("Attempting to process all undelivered events. This could take some time."); + let (rx, tasks) = Self::spawn_tasks_for_undelivered_event_processing( + event_access, + max_iterations, + batch_size, + tx, + partition_size, + ); + + tokio::select! { + event_cnt = Self::collect_ordering_task_output(rx, tasks) => { + Ok(event_cnt) + } + _ = shutdown_signal => { + Err(Error::new_app(anyhow!("Ordering tasks aborted due to shutdown signal"))) + } + } + } - let mut tasks: tokio::task::JoinSet> = tokio::task::JoinSet::new(); - let (cnt_tx, mut rx) = tokio::sync::mpsc::channel(8); + async fn collect_ordering_task_output( + mut rx: tokio::sync::mpsc::Receiver, + mut tasks: JoinSet>, + ) -> usize { + let mut event_cnt = 0; + while let Some(OrderingTaskStatus { + number_discovered, + new_highwater, + task_id, + }) = rx.recv().await + { + event_cnt += number_discovered; + if event_cnt % LOG_EVERY_N_ENTRIES < number_discovered { + // these values are useful but can be slightly misleading. the highwater mark will move forward/backward + // based on the task reporting, and we're counting the number discovered and sent by the task, even though + // the task doing the ordering may discover and order additional events while reviewing the events sent + info!(count=%event_cnt, highwater=%new_highwater, %task_id, "Processed undelivered events"); + } + } + while let Some(res) = tasks.join_next().await { + match res { + Ok(v) => match v { + Ok(_) => { + // task finished so nothing to do + } + Err(error) => { + warn!(?error, "event ordering task failed while processing"); + } + }, + Err(error) => { + warn!(?error, "event ordering task failed with JoinError"); + } + } + } + event_cnt + } + + /// Process all undelivered events in the database. This is a blocking operation that could take a long time. + /// It is intended to be run at startup but could be used on an interval or after some errors to recover. + fn spawn_tasks_for_undelivered_event_processing( + event_access: Arc, + max_iterations: usize, + batch_size: u32, + tx: Sender, + partition_size: u32, + ) -> ( + tokio::sync::mpsc::Receiver, + JoinSet>, + ) { + let mut tasks: JoinSet> = JoinSet::new(); + let (cnt_tx, rx) = tokio::sync::mpsc::channel(8); for task_id in 0..partition_size { debug!("starting task {task_id} of {partition_size} to process undelivered events"); let tx = tx.clone(); @@ -668,10 +740,10 @@ impl OrderingState { let mut highwater = 0; while iter_cnt < max_iterations { iter_cnt += 1; - let (undelivered, new_hw) = event_access + let (undelivered, new_highwater) = event_access .undelivered_with_values(highwater, batch_size.into(), partition_size, task_id) .await?; - highwater = new_hw; + highwater = new_highwater; let found_something = !undelivered.is_empty(); let found_everything = undelivered.len() < batch_size as usize; if found_something { @@ -680,13 +752,13 @@ impl OrderingState { // at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory. // In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them // or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay. - let number_processed = OrderingState::process_undelivered_events_batch( + let number_discovered = OrderingState::process_undelivered_events_batch( &event_access, undelivered, &tx, ) .await?; - if cnt_tx.send((number_processed, new_hw, task_id)).await.is_err() { + if cnt_tx.send(OrderingTaskStatus {number_discovered, new_highwater, task_id}).await.is_err() { warn!("undelivered task manager not available... exiting task_id={task_id}"); return Err(crate::Error::new_fatal(anyhow!("delivered task manager not available... exiting task_id={task_id}"))); } @@ -702,36 +774,7 @@ impl OrderingState { Ok(()) }); } - // drop our senders so the background tasks exit without waiting on us - drop(cnt_tx); - drop(tx); - - let mut event_cnt = 0; - while let Some((number_processed, new_hw, task_id)) = rx.recv().await { - event_cnt += number_processed; - if event_cnt % LOG_EVERY_N_ENTRIES < number_processed { - // these values are useful but can be slightly misleading. the highwater mark will move forward/backward - // based on the task reporting, and we're counting the number discovered and sent by the task, even though - // the task doing the ordering may discover and order additional events while reviewing the events sent - info!(count=%event_cnt, highwater=%new_hw, %task_id, "Processed undelivered events"); - } - } - while let Some(res) = tasks.join_next().await { - match res { - Ok(v) => match v { - Ok(_) => { - // task finished so nothing to do - } - Err(error) => { - warn!(?error, "event ordering task failed while processing"); - } - }, - Err(error) => { - warn!(?error, "event ordering task failed with JoinError"); - } - } - } - Ok(event_cnt) + (rx, tasks) } async fn process_undelivered_events_batch( @@ -828,6 +871,13 @@ impl OrderingState { } } +#[derive(Debug)] +struct OrderingTaskStatus { + task_id: u32, + new_highwater: i64, + number_discovered: usize, +} + #[cfg(test)] mod test { use crate::store::EventInsertable; @@ -841,6 +891,13 @@ mod test { }; use super::*; + fn fake_shutdown_signal() -> Box> { + Box::new(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(60)).await + } + }) + } async fn get_n_insertable_events(n: usize) -> Vec { let mut res = Vec::with_capacity(n); @@ -861,9 +918,14 @@ mod test { async fn test_undelivered_batch_empty() { let pool = SqlitePool::connect_in_memory().await.unwrap(); let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); - let processed = OrderingTask::process_all_undelivered_events(event_access, 1, 5) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events( + event_access, + 1, + 5, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(0, processed); } @@ -905,24 +967,39 @@ mod test { assert_eq!(1, events.len()); // we make sure to use 1 task in this test as we want to measure the progress of each iteration - let processed = - OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events_with_tasks( + event_access.clone(), + 1, + 5, + 1, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(5, processed); let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap(); assert_eq!(6, events.len()); - let processed = - OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events_with_tasks( + event_access.clone(), + 1, + 5, + 1, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(4, processed); let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap(); assert_eq!(10, events.len()); - let processed = - OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events_with_tasks( + event_access.clone(), + 1, + 5, + 1, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(0, processed); } @@ -940,10 +1017,15 @@ mod test { let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); // we specify 1 task so we can easily expect how far it gets each run, rather than doing math against the number of spawned tasks - let processed = - OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 4, 10, 1) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events_with_tasks( + event_access.clone(), + 4, + 10, + 1, + fake_shutdown_signal(), + ) + .await + .unwrap(); let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(45, event.len()); @@ -960,9 +1042,14 @@ mod test { let _new = event_access.insert_many(insertable.iter()).await.unwrap(); let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(0, event.len()); - let res = OrderingTask::process_all_undelivered_events(Arc::clone(&event_access), 4, 3) - .await - .unwrap(); + let res = OrderingTask::process_all_undelivered_events( + Arc::clone(&event_access), + 4, + 3, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(res, 9); let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); @@ -983,10 +1070,14 @@ mod test { let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); - let processed = - OrderingTask::process_all_undelivered_events(event_access.clone(), 100_000_000, 5) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events( + event_access.clone(), + 100_000_000, + 5, + fake_shutdown_signal(), + ) + .await + .unwrap(); let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(50, event.len()); @@ -1007,10 +1098,14 @@ mod test { .await .unwrap(); assert_eq!(1000, event.len()); - let _res = - OrderingTask::process_all_undelivered_events(Arc::clone(&event_access), 100_000_000, 5) - .await - .unwrap(); + let _res = OrderingTask::process_all_undelivered_events( + Arc::clone(&event_access), + 100_000_000, + 5, + fake_shutdown_signal(), + ) + .await + .unwrap(); let (_hw, event) = event_access .new_events_since_value(0, 100_000) @@ -1058,6 +1153,7 @@ mod test { Arc::clone(&event_access), 100_000_000, 100, + fake_shutdown_signal(), ) .await .unwrap(); @@ -1106,9 +1202,14 @@ mod test { let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); - let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 1, 100) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events( + event_access.clone(), + 1, + 100, + fake_shutdown_signal(), + ) + .await + .unwrap(); let (_hw, cids) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(50, cids.len()); diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index be81cddf..735cb0f1 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, + future::Future, sync::{Arc, Mutex, MutexGuard}, }; @@ -81,6 +82,18 @@ pub enum DeliverableRequirement { Lazy, } +/// Input to determine behvaior related to startup event ordering. +/// Used at service creating to block until processing completes if requested, or skip doing any processing. +pub enum UndeliveredEventReview { + /// Do not review all undelivered events in the database when creating the service + Skip, + /// Review and order all undelivered events before returning. May block for a long time. + Process { + /// A future that can be used to signal the tasks to stop before they complete + shutdown_signal: Box>, + }, +} + impl EventService { /// Create a new CeramicEventStore. /// @@ -88,7 +101,7 @@ impl EventService { /// processed. pub async fn try_new( pool: SqlitePool, - process_undelivered_events: bool, + process_undelivered_events: UndeliveredEventReview, validate_events: bool, ethereum_rpc_providers: Vec, ) -> Result { @@ -108,9 +121,13 @@ impl EventService { pending_writes: Arc::new(Mutex::new(HashMap::default())), event_access, }; - if process_undelivered_events { - svc.process_all_undelivered_events().await?; + match process_undelivered_events { + UndeliveredEventReview::Skip => {} + UndeliveredEventReview::Process { shutdown_signal } => { + let _num_processed = svc.process_all_undelivered_events(shutdown_signal).await?; + } } + Ok(svc) } @@ -119,7 +136,7 @@ impl EventService { /// in the next pass.. but it's basically same same but different. #[allow(dead_code)] pub(crate) async fn new_with_event_validation(pool: SqlitePool) -> Result { - Self::try_new(pool, false, true, vec![]).await + Self::try_new(pool, UndeliveredEventReview::Skip, true, vec![]).await } /// Currently, we track events when the [`ValidationRequirement`] allows. Right now, this applies to @@ -165,11 +182,15 @@ impl EventService { } /// Returns the number of undelivered events that were updated - async fn process_all_undelivered_events(&self) -> Result { + async fn process_all_undelivered_events( + &self, + shutdown_signal: Box>, + ) -> Result { OrderingTask::process_all_undelivered_events( Arc::clone(&self.event_access), MAX_ITERATIONS, DELIVERABLE_EVENTS_BATCH_SIZE, + shutdown_signal, ) .await } diff --git a/event-svc/src/lib.rs b/event-svc/src/lib.rs index 75275c45..8d564111 100644 --- a/event-svc/src/lib.rs +++ b/event-svc/src/lib.rs @@ -10,6 +10,6 @@ mod tests; pub use ceramic_validation::eth_rpc; pub use error::Error; pub use event::ChainInclusionProvider; -pub use event::{BlockStore, EventService}; +pub use event::{BlockStore, EventService, UndeliveredEventReview}; pub(crate) type Result = std::result::Result; diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index 105a4e01..5d463371 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -1,6 +1,6 @@ use std::str::FromStr; -use crate::EventService; +use crate::{EventService, UndeliveredEventReview}; use anyhow::Error; use bytes::Bytes; use ceramic_api::{ApiItem, EventService as ApiEventService}; @@ -25,7 +25,7 @@ macro_rules! test_with_sqlite { async fn [<$test_name _sqlite>]() { let conn = $crate::store::SqlitePool::connect_in_memory().await.unwrap(); - let store = $crate::EventService::try_new(conn, true, true, vec![]).await.unwrap(); + let store = $crate::EventService::try_new(conn, UndeliveredEventReview::Skip, true, vec![]).await.unwrap(); $( for stmt in $sql_stmts { store.pool.run_statement(stmt).await.unwrap(); @@ -664,7 +664,7 @@ where #[test(tokio::test)] async fn test_conclusion_events_since() -> Result<(), Box> { let pool = SqlitePool::connect_in_memory().await?; - let service = EventService::try_new(pool, false, false, vec![]).await?; + let service = EventService::try_new(pool, UndeliveredEventReview::Skip, false, vec![]).await?; let test_events = generate_chained_events().await; ceramic_api::EventService::insert_many( diff --git a/one/src/daemon.rs b/one/src/daemon.rs index 45230821..f5ac7d9c 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -411,6 +411,8 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1); let signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT])?; let handle = signals.handle(); + debug!("starting signal handler task"); + let signals_handle = tokio::spawn(handle_signals(signals, shutdown_signal_tx)); // Construct sqlite_pool let sqlite_pool = opts @@ -434,8 +436,19 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let peer_svc = Arc::new(PeerService::new(sqlite_pool.clone())); let interest_svc = Arc::new(InterestService::new(sqlite_pool.clone())); let event_validation = opts.event_validation.unwrap_or(true); + let mut ss = shutdown_signal.resubscribe(); let event_svc = Arc::new( - EventService::try_new(sqlite_pool.clone(), true, event_validation, rpc_providers).await?, + EventService::try_new( + sqlite_pool.clone(), + ceramic_event_svc::UndeliveredEventReview::Process { + shutdown_signal: Box::new(async move { + let _ = ss.recv().await; + }), + }, + event_validation, + rpc_providers, + ) + .await?, ); let network = opts.network.to_network(&opts.local_network_id)?; @@ -722,9 +735,6 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { ("/api/v0/".to_string(), kubo_rpc_service), ); - debug!("starting signal handler task"); - let signals_handle = tokio::spawn(handle_signals(signals, shutdown_signal_tx)); - // The server task blocks until we are ready to start shutdown info!("starting api server at address {}", opts.bind_address); hyper::server::Server::try_bind(&opts.bind_address.parse()?) diff --git a/one/src/migrations.rs b/one/src/migrations.rs index d6cf3ed8..daa32f37 100644 --- a/one/src/migrations.rs +++ b/one/src/migrations.rs @@ -148,7 +148,15 @@ async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> { let db_opts: DBOpts = (&opts).into(); let sqlite_pool = db_opts.get_sqlite_pool(SqliteOpts::default()).await?; // TODO: feature flags here? or just remove this entirely when enabling - let event_svc = Arc::new(EventService::try_new(sqlite_pool, false, false, vec![]).await?); + let event_svc = Arc::new( + EventService::try_new( + sqlite_pool, + ceramic_event_svc::UndeliveredEventReview::Skip, + false, + vec![], + ) + .await?, + ); let blocks = FSBlockStore { input_ipfs_path: opts.input_ipfs_path, sharded_paths: !opts.non_sharded_paths, diff --git a/p2p/tests/node.rs b/p2p/tests/node.rs index 676a65f1..18ba94f7 100644 --- a/p2p/tests/node.rs +++ b/p2p/tests/node.rs @@ -35,8 +35,15 @@ impl TestRunnerBuilder { let sql_pool = SqlitePool::connect_in_memory().await.unwrap(); let peer_svc = Arc::new(ceramic_peer_svc::PeerService::new(sql_pool.clone())); let interest_svc = Arc::new(ceramic_interest_svc::InterestService::new(sql_pool.clone())); - let event_svc = - Arc::new(ceramic_event_svc::EventService::try_new(sql_pool, true, true, vec![]).await?); + let event_svc = Arc::new( + ceramic_event_svc::EventService::try_new( + sql_pool, + ceramic_event_svc::UndeliveredEventReview::Skip, + true, + vec![], + ) + .await?, + ); let mut registry = prometheus_client::registry::Registry::default(); let metrics = Metrics::register(&mut registry); From c6ba08f0e46d4f593b0418f9ae1418530daa1a78 Mon Sep 17 00:00:00 2001 From: David Estes Date: Fri, 6 Dec 2024 11:48:40 -0700 Subject: [PATCH 2/2] chore: clippy lint --- peer-svc/src/tests/peer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer-svc/src/tests/peer.rs b/peer-svc/src/tests/peer.rs index 36994e00..18cd0f04 100644 --- a/peer-svc/src/tests/peer.rs +++ b/peer-svc/src/tests/peer.rs @@ -41,7 +41,7 @@ pub(crate) fn peer_key_builder() -> Builder { // Generate an event for the same network,model,controller,stream // The event and height are random when when its None. -pub(crate) fn random_peer_key<'a>(expiration: Option) -> PeerKey { +pub(crate) fn random_peer_key(expiration: Option) -> PeerKey { peer_key_builder() .with_expiration(expiration.unwrap_or_else(|| thread_rng().gen())) .with_id(&NodeKey::random())