diff --git a/event-svc/src/event/mod.rs b/event-svc/src/event/mod.rs index d1a063c1..db2a0bb1 100644 --- a/event-svc/src/event/mod.rs +++ b/event-svc/src/event/mod.rs @@ -6,5 +6,5 @@ mod service; mod store; mod validator; -pub use service::{BlockStore, DeliverableRequirement, EventService}; +pub use service::{BlockStore, DeliverableRequirement, EventService, UndeliveredEventReview}; pub use validator::ChainInclusionProvider; diff --git a/event-svc/src/event/ordering_task.rs b/event-svc/src/event/ordering_task.rs index 34665df7..aaca4507 100644 --- a/event-svc/src/event/ordering_task.rs +++ b/event-svc/src/event/ordering_task.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, VecDeque}; +use std::future::Future; use std::sync::Arc; use anyhow::anyhow; @@ -53,12 +54,14 @@ impl OrderingTask { event_access: Arc, max_iterations: usize, batch_size: u32, + shutdown_signal: Box>, ) -> Result { Self::process_all_undelivered_events_with_tasks( event_access, max_iterations, batch_size, UNDELIVERED_EVENTS_STARTUP_TASKS, + shutdown_signal, ) .await } @@ -69,6 +72,7 @@ impl OrderingTask { max_iterations: usize, batch_size: u32, num_tasks: u32, + shutdown_signal: Box>, ) -> Result { let (tx, rx_inserted) = tokio::sync::mpsc::channel::(PENDING_EVENTS_CHANNEL_DEPTH); @@ -83,6 +87,7 @@ impl OrderingTask { batch_size, tx, num_tasks, + Box::into_pin(shutdown_signal), ) .await { @@ -653,11 +658,78 @@ impl OrderingState { batch_size: u32, tx: Sender, partition_size: u32, + shutdown_signal: std::pin::Pin>>, ) -> Result { info!("Attempting to process all undelivered events. This could take some time."); + let (rx, tasks) = Self::spawn_tasks_for_undelivered_event_processing( + event_access, + max_iterations, + batch_size, + tx, + partition_size, + ); + + tokio::select! { + event_cnt = Self::collect_ordering_task_output(rx, tasks) => { + Ok(event_cnt) + } + _ = shutdown_signal => { + Err(Error::new_app(anyhow!("Ordering tasks aborted due to shutdown signal"))) + } + } + } - let mut tasks: tokio::task::JoinSet> = tokio::task::JoinSet::new(); - let (cnt_tx, mut rx) = tokio::sync::mpsc::channel(8); + async fn collect_ordering_task_output( + mut rx: tokio::sync::mpsc::Receiver, + mut tasks: JoinSet>, + ) -> usize { + let mut event_cnt = 0; + while let Some(OrderingTaskStatus { + number_discovered, + new_highwater, + task_id, + }) = rx.recv().await + { + event_cnt += number_discovered; + if event_cnt % LOG_EVERY_N_ENTRIES < number_discovered { + // these values are useful but can be slightly misleading. the highwater mark will move forward/backward + // based on the task reporting, and we're counting the number discovered and sent by the task, even though + // the task doing the ordering may discover and order additional events while reviewing the events sent + info!(count=%event_cnt, highwater=%new_highwater, %task_id, "Processed undelivered events"); + } + } + while let Some(res) = tasks.join_next().await { + match res { + Ok(v) => match v { + Ok(_) => { + // task finished so nothing to do + } + Err(error) => { + warn!(?error, "event ordering task failed while processing"); + } + }, + Err(error) => { + warn!(?error, "event ordering task failed with JoinError"); + } + } + } + event_cnt + } + + /// Process all undelivered events in the database. This is a blocking operation that could take a long time. + /// It is intended to be run at startup but could be used on an interval or after some errors to recover. + fn spawn_tasks_for_undelivered_event_processing( + event_access: Arc, + max_iterations: usize, + batch_size: u32, + tx: Sender, + partition_size: u32, + ) -> ( + tokio::sync::mpsc::Receiver, + JoinSet>, + ) { + let mut tasks: JoinSet> = JoinSet::new(); + let (cnt_tx, rx) = tokio::sync::mpsc::channel(8); for task_id in 0..partition_size { debug!("starting task {task_id} of {partition_size} to process undelivered events"); let tx = tx.clone(); @@ -668,10 +740,10 @@ impl OrderingState { let mut highwater = 0; while iter_cnt < max_iterations { iter_cnt += 1; - let (undelivered, new_hw) = event_access + let (undelivered, new_highwater) = event_access .undelivered_with_values(highwater, batch_size.into(), partition_size, task_id) .await?; - highwater = new_hw; + highwater = new_highwater; let found_something = !undelivered.is_empty(); let found_everything = undelivered.len() < batch_size as usize; if found_something { @@ -680,13 +752,13 @@ impl OrderingState { // at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory. // In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them // or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay. - let number_processed = OrderingState::process_undelivered_events_batch( + let number_discovered = OrderingState::process_undelivered_events_batch( &event_access, undelivered, &tx, ) .await?; - if cnt_tx.send((number_processed, new_hw, task_id)).await.is_err() { + if cnt_tx.send(OrderingTaskStatus {number_discovered, new_highwater, task_id}).await.is_err() { warn!("undelivered task manager not available... exiting task_id={task_id}"); return Err(crate::Error::new_fatal(anyhow!("delivered task manager not available... exiting task_id={task_id}"))); } @@ -702,36 +774,7 @@ impl OrderingState { Ok(()) }); } - // drop our senders so the background tasks exit without waiting on us - drop(cnt_tx); - drop(tx); - - let mut event_cnt = 0; - while let Some((number_processed, new_hw, task_id)) = rx.recv().await { - event_cnt += number_processed; - if event_cnt % LOG_EVERY_N_ENTRIES < number_processed { - // these values are useful but can be slightly misleading. the highwater mark will move forward/backward - // based on the task reporting, and we're counting the number discovered and sent by the task, even though - // the task doing the ordering may discover and order additional events while reviewing the events sent - info!(count=%event_cnt, highwater=%new_hw, %task_id, "Processed undelivered events"); - } - } - while let Some(res) = tasks.join_next().await { - match res { - Ok(v) => match v { - Ok(_) => { - // task finished so nothing to do - } - Err(error) => { - warn!(?error, "event ordering task failed while processing"); - } - }, - Err(error) => { - warn!(?error, "event ordering task failed with JoinError"); - } - } - } - Ok(event_cnt) + (rx, tasks) } async fn process_undelivered_events_batch( @@ -828,6 +871,13 @@ impl OrderingState { } } +#[derive(Debug)] +struct OrderingTaskStatus { + task_id: u32, + new_highwater: i64, + number_discovered: usize, +} + #[cfg(test)] mod test { use crate::store::EventInsertable; @@ -841,6 +891,13 @@ mod test { }; use super::*; + fn fake_shutdown_signal() -> Box> { + Box::new(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(60)).await + } + }) + } async fn get_n_insertable_events(n: usize) -> Vec { let mut res = Vec::with_capacity(n); @@ -861,9 +918,14 @@ mod test { async fn test_undelivered_batch_empty() { let pool = SqlitePool::connect_in_memory().await.unwrap(); let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); - let processed = OrderingTask::process_all_undelivered_events(event_access, 1, 5) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events( + event_access, + 1, + 5, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(0, processed); } @@ -905,24 +967,39 @@ mod test { assert_eq!(1, events.len()); // we make sure to use 1 task in this test as we want to measure the progress of each iteration - let processed = - OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events_with_tasks( + event_access.clone(), + 1, + 5, + 1, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(5, processed); let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap(); assert_eq!(6, events.len()); - let processed = - OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events_with_tasks( + event_access.clone(), + 1, + 5, + 1, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(4, processed); let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap(); assert_eq!(10, events.len()); - let processed = - OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 1, 5, 1) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events_with_tasks( + event_access.clone(), + 1, + 5, + 1, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(0, processed); } @@ -940,10 +1017,15 @@ mod test { let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); // we specify 1 task so we can easily expect how far it gets each run, rather than doing math against the number of spawned tasks - let processed = - OrderingTask::process_all_undelivered_events_with_tasks(event_access.clone(), 4, 10, 1) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events_with_tasks( + event_access.clone(), + 4, + 10, + 1, + fake_shutdown_signal(), + ) + .await + .unwrap(); let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(45, event.len()); @@ -960,9 +1042,14 @@ mod test { let _new = event_access.insert_many(insertable.iter()).await.unwrap(); let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(0, event.len()); - let res = OrderingTask::process_all_undelivered_events(Arc::clone(&event_access), 4, 3) - .await - .unwrap(); + let res = OrderingTask::process_all_undelivered_events( + Arc::clone(&event_access), + 4, + 3, + fake_shutdown_signal(), + ) + .await + .unwrap(); assert_eq!(res, 9); let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); @@ -983,10 +1070,14 @@ mod test { let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); - let processed = - OrderingTask::process_all_undelivered_events(event_access.clone(), 100_000_000, 5) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events( + event_access.clone(), + 100_000_000, + 5, + fake_shutdown_signal(), + ) + .await + .unwrap(); let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(50, event.len()); @@ -1007,10 +1098,14 @@ mod test { .await .unwrap(); assert_eq!(1000, event.len()); - let _res = - OrderingTask::process_all_undelivered_events(Arc::clone(&event_access), 100_000_000, 5) - .await - .unwrap(); + let _res = OrderingTask::process_all_undelivered_events( + Arc::clone(&event_access), + 100_000_000, + 5, + fake_shutdown_signal(), + ) + .await + .unwrap(); let (_hw, event) = event_access .new_events_since_value(0, 100_000) @@ -1058,6 +1153,7 @@ mod test { Arc::clone(&event_access), 100_000_000, 100, + fake_shutdown_signal(), ) .await .unwrap(); @@ -1106,9 +1202,14 @@ mod test { let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); - let processed = OrderingTask::process_all_undelivered_events(event_access.clone(), 1, 100) - .await - .unwrap(); + let processed = OrderingTask::process_all_undelivered_events( + event_access.clone(), + 1, + 100, + fake_shutdown_signal(), + ) + .await + .unwrap(); let (_hw, cids) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(50, cids.len()); diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index be81cddf..735cb0f1 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, + future::Future, sync::{Arc, Mutex, MutexGuard}, }; @@ -81,6 +82,18 @@ pub enum DeliverableRequirement { Lazy, } +/// Input to determine behvaior related to startup event ordering. +/// Used at service creating to block until processing completes if requested, or skip doing any processing. +pub enum UndeliveredEventReview { + /// Do not review all undelivered events in the database when creating the service + Skip, + /// Review and order all undelivered events before returning. May block for a long time. + Process { + /// A future that can be used to signal the tasks to stop before they complete + shutdown_signal: Box>, + }, +} + impl EventService { /// Create a new CeramicEventStore. /// @@ -88,7 +101,7 @@ impl EventService { /// processed. pub async fn try_new( pool: SqlitePool, - process_undelivered_events: bool, + process_undelivered_events: UndeliveredEventReview, validate_events: bool, ethereum_rpc_providers: Vec, ) -> Result { @@ -108,9 +121,13 @@ impl EventService { pending_writes: Arc::new(Mutex::new(HashMap::default())), event_access, }; - if process_undelivered_events { - svc.process_all_undelivered_events().await?; + match process_undelivered_events { + UndeliveredEventReview::Skip => {} + UndeliveredEventReview::Process { shutdown_signal } => { + let _num_processed = svc.process_all_undelivered_events(shutdown_signal).await?; + } } + Ok(svc) } @@ -119,7 +136,7 @@ impl EventService { /// in the next pass.. but it's basically same same but different. #[allow(dead_code)] pub(crate) async fn new_with_event_validation(pool: SqlitePool) -> Result { - Self::try_new(pool, false, true, vec![]).await + Self::try_new(pool, UndeliveredEventReview::Skip, true, vec![]).await } /// Currently, we track events when the [`ValidationRequirement`] allows. Right now, this applies to @@ -165,11 +182,15 @@ impl EventService { } /// Returns the number of undelivered events that were updated - async fn process_all_undelivered_events(&self) -> Result { + async fn process_all_undelivered_events( + &self, + shutdown_signal: Box>, + ) -> Result { OrderingTask::process_all_undelivered_events( Arc::clone(&self.event_access), MAX_ITERATIONS, DELIVERABLE_EVENTS_BATCH_SIZE, + shutdown_signal, ) .await } diff --git a/event-svc/src/lib.rs b/event-svc/src/lib.rs index 75275c45..8d564111 100644 --- a/event-svc/src/lib.rs +++ b/event-svc/src/lib.rs @@ -10,6 +10,6 @@ mod tests; pub use ceramic_validation::eth_rpc; pub use error::Error; pub use event::ChainInclusionProvider; -pub use event::{BlockStore, EventService}; +pub use event::{BlockStore, EventService, UndeliveredEventReview}; pub(crate) type Result = std::result::Result; diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index 105a4e01..5d463371 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -1,6 +1,6 @@ use std::str::FromStr; -use crate::EventService; +use crate::{EventService, UndeliveredEventReview}; use anyhow::Error; use bytes::Bytes; use ceramic_api::{ApiItem, EventService as ApiEventService}; @@ -25,7 +25,7 @@ macro_rules! test_with_sqlite { async fn [<$test_name _sqlite>]() { let conn = $crate::store::SqlitePool::connect_in_memory().await.unwrap(); - let store = $crate::EventService::try_new(conn, true, true, vec![]).await.unwrap(); + let store = $crate::EventService::try_new(conn, UndeliveredEventReview::Skip, true, vec![]).await.unwrap(); $( for stmt in $sql_stmts { store.pool.run_statement(stmt).await.unwrap(); @@ -664,7 +664,7 @@ where #[test(tokio::test)] async fn test_conclusion_events_since() -> Result<(), Box> { let pool = SqlitePool::connect_in_memory().await?; - let service = EventService::try_new(pool, false, false, vec![]).await?; + let service = EventService::try_new(pool, UndeliveredEventReview::Skip, false, vec![]).await?; let test_events = generate_chained_events().await; ceramic_api::EventService::insert_many( diff --git a/one/src/daemon.rs b/one/src/daemon.rs index 45230821..f5ac7d9c 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -411,6 +411,8 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1); let signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT])?; let handle = signals.handle(); + debug!("starting signal handler task"); + let signals_handle = tokio::spawn(handle_signals(signals, shutdown_signal_tx)); // Construct sqlite_pool let sqlite_pool = opts @@ -434,8 +436,19 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let peer_svc = Arc::new(PeerService::new(sqlite_pool.clone())); let interest_svc = Arc::new(InterestService::new(sqlite_pool.clone())); let event_validation = opts.event_validation.unwrap_or(true); + let mut ss = shutdown_signal.resubscribe(); let event_svc = Arc::new( - EventService::try_new(sqlite_pool.clone(), true, event_validation, rpc_providers).await?, + EventService::try_new( + sqlite_pool.clone(), + ceramic_event_svc::UndeliveredEventReview::Process { + shutdown_signal: Box::new(async move { + let _ = ss.recv().await; + }), + }, + event_validation, + rpc_providers, + ) + .await?, ); let network = opts.network.to_network(&opts.local_network_id)?; @@ -722,9 +735,6 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { ("/api/v0/".to_string(), kubo_rpc_service), ); - debug!("starting signal handler task"); - let signals_handle = tokio::spawn(handle_signals(signals, shutdown_signal_tx)); - // The server task blocks until we are ready to start shutdown info!("starting api server at address {}", opts.bind_address); hyper::server::Server::try_bind(&opts.bind_address.parse()?) diff --git a/one/src/migrations.rs b/one/src/migrations.rs index d6cf3ed8..daa32f37 100644 --- a/one/src/migrations.rs +++ b/one/src/migrations.rs @@ -148,7 +148,15 @@ async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> { let db_opts: DBOpts = (&opts).into(); let sqlite_pool = db_opts.get_sqlite_pool(SqliteOpts::default()).await?; // TODO: feature flags here? or just remove this entirely when enabling - let event_svc = Arc::new(EventService::try_new(sqlite_pool, false, false, vec![]).await?); + let event_svc = Arc::new( + EventService::try_new( + sqlite_pool, + ceramic_event_svc::UndeliveredEventReview::Skip, + false, + vec![], + ) + .await?, + ); let blocks = FSBlockStore { input_ipfs_path: opts.input_ipfs_path, sharded_paths: !opts.non_sharded_paths, diff --git a/p2p/tests/node.rs b/p2p/tests/node.rs index 676a65f1..18ba94f7 100644 --- a/p2p/tests/node.rs +++ b/p2p/tests/node.rs @@ -35,8 +35,15 @@ impl TestRunnerBuilder { let sql_pool = SqlitePool::connect_in_memory().await.unwrap(); let peer_svc = Arc::new(ceramic_peer_svc::PeerService::new(sql_pool.clone())); let interest_svc = Arc::new(ceramic_interest_svc::InterestService::new(sql_pool.clone())); - let event_svc = - Arc::new(ceramic_event_svc::EventService::try_new(sql_pool, true, true, vec![]).await?); + let event_svc = Arc::new( + ceramic_event_svc::EventService::try_new( + sql_pool, + ceramic_event_svc::UndeliveredEventReview::Skip, + true, + vec![], + ) + .await?, + ); let mut registry = prometheus_client::registry::Registry::default(); let metrics = Metrics::register(&mut registry); diff --git a/peer-svc/src/tests/peer.rs b/peer-svc/src/tests/peer.rs index 36994e00..18cd0f04 100644 --- a/peer-svc/src/tests/peer.rs +++ b/peer-svc/src/tests/peer.rs @@ -41,7 +41,7 @@ pub(crate) fn peer_key_builder() -> Builder { // Generate an event for the same network,model,controller,stream // The event and height are random when when its None. -pub(crate) fn random_peer_key<'a>(expiration: Option) -> PeerKey { +pub(crate) fn random_peer_key(expiration: Option) -> PeerKey { peer_key_builder() .with_expiration(expiration.unwrap_or_else(|| thread_rng().gen())) .with_id(&NodeKey::random())