diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 7359970b5..4496cba28 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -22,7 +22,13 @@ mod shrink; use self::id_data::{IdData, Include}; use self::shrink::{ShrinkMap, ShrinkVec}; -pub(crate) struct Aggregator { +/// Aggregates instrumentation traces and prepares state for the instrument +/// server. +/// +/// The `Aggregator` is responsible for receiving and organizing the +/// instrumentated events and preparing the data to be served to a instrument +/// client. +pub struct Aggregator { /// Channel of incoming events emitted by `TaskLayer`s. events: mpsc::Receiver, @@ -157,7 +163,12 @@ impl Aggregator { } } - pub(crate) async fn run(mut self) { + /// Runs the aggregator. + /// + /// This method will start the aggregator loop and should run as long as + /// the instrument server is running. If the instrument server stops, + /// this future can be aborted. + pub async fn run(mut self) { let mut publish = tokio::time::interval(self.publish_interval); loop { let should_send = tokio::select! { diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index d9d39f53d..8df105cce 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -42,7 +42,7 @@ mod stats; pub(crate) mod sync; mod visitors; -use aggregator::Aggregator; +pub use aggregator::Aggregator; pub use builder::{Builder, ServerAddr}; use callsites::Callsites; use record::Recorder; @@ -941,10 +941,11 @@ impl Server { ) -> Result<(), Box> { let addr = self.addr.clone(); let ServerParts { - instrument_server: service, - aggregator_handle: aggregate, + instrument_server, + aggregator, } = self.into_parts(); - let router = builder.add_service(service); + let aggregate = spawn_named(aggregator.run(), "console::aggregate"); + let router = builder.add_service(instrument_server); let res = match addr { ServerAddr::Tcp(addr) => { let serve = router.serve(addr); @@ -957,17 +958,21 @@ impl Server { spawn_named(serve, "console::serve").await } }; - drop(aggregate); + aggregate.abort(); res?.map_err(Into::into) } - /// Returns the parts needed to spawn a gRPC server and keep the aggregation - /// worker running. + /// Returns the parts needed to spawn a gRPC server and the aggregator that + /// supplies it. /// /// Note that a server spawned in this way will disregard any value set by /// [`Builder::server_addr`], as the user becomes responsible for defining /// the address when calling [`Router::serve`]. /// + /// Additionally, the user of this API must ensure that the [`Aggregator`] + /// is running for as long as the gRPC server is. If the server stops + /// running, the aggregator task can be aborted. + /// /// # Examples /// /// The parts can be used to serve the instrument server together with @@ -984,10 +989,11 @@ impl Server { /// let (console_layer, server) = ConsoleLayer::builder().build(); /// let ServerParts { /// instrument_server, - /// aggregator_handle, + /// aggregator, /// .. /// } = server.into_parts(); /// + /// let aggregator_handle = tokio::spawn(aggregator.run()); /// let router = tonic::transport::Server::builder() /// //.add_service(some_other_service) /// .add_service(instrument_server); @@ -1007,19 +1013,16 @@ impl Server { /// /// [`Router::serve`]: fn@tonic::transport::server::Router::serve pub fn into_parts(mut self) -> ServerParts { - let aggregate = self + let aggregator = self .aggregator .take() .expect("cannot start server multiple times"); - let aggregate = spawn_named(aggregate.run(), "console::aggregate"); - let service = proto::instrument::instrument_server::InstrumentServer::new(self); + let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self); ServerParts { - instrument_server: service, - aggregator_handle: AggregatorHandle { - join_handle: aggregate, - }, + instrument_server, + aggregator, } } } @@ -1032,8 +1035,9 @@ impl Server { /// The `InstrumentServer` can be used to construct a router which /// can be added to a [`tonic`] gRPC server. /// -/// The [`AggregatorHandle`] can be used to abort the associated aggregator task -/// after the server has been shut down. +/// The `aggregator` is a future which should be running as long as the server is. +/// Generally, this future should be spawned onto an appropriate runtime and then +/// aborted if the server gets shut down. /// /// See the [`Server::into_parts`] documentation for usage. #[non_exhaustive] @@ -1043,10 +1047,17 @@ pub struct ServerParts { /// See the documentation for [`InstrumentServer`] for details. pub instrument_server: InstrumentServer, - /// A handle to the background worker task responsible for aggregating trace data. + /// The aggregator. + /// + /// Responsible for collecting and preparing traces for the instrument server + /// to send its clients. + /// + /// The aggregator should be [`run`] when the instrument server is started. + /// If the server stops running for any reason, the aggregator task can be + /// aborted. /// - /// See the documentation for [`AggregatorHandle`] for details. - pub aggregator_handle: AggregatorHandle, + /// [`run`]: fn@crate::Aggregator::run + pub aggregator: Aggregator, } /// Aggregator handle.