diff --git a/README.md b/README.md index dd59f88..84d63bd 100644 --- a/README.md +++ b/README.md @@ -13,4 +13,4 @@ This is a list of the current utilities. | Module | Description | Docs | | ------ | --------------------------------------------- | --------------------------- | -| core extras | Set of useful tools for Core NATS | [README.md](core-extras/README.md) | +| nats extra | Set of useful tools for Core NATS | [README.md](nats-extra/README.md) | diff --git a/nats-extra/Cargo.toml b/nats-extra/Cargo.toml index 1e6d227..3d80f5b 100644 --- a/nats-extra/Cargo.toml +++ b/nats-extra/Cargo.toml @@ -10,3 +10,7 @@ bytes = "1.8.0" futures = "0.3.31" tokio = { version = "1.0", features = ["time"] } nats_server = { git = "https://github.com/nats-io/nats.rs", package = "nats-server" } + +[features] +request_many = [] +default = ["request_many"] diff --git a/nats-extra/README.md b/nats-extra/README.md new file mode 100644 index 0000000..e69de29 diff --git a/nats-extra/src/lib.rs b/nats-extra/src/lib.rs index 3e8f016..af80558 100644 --- a/nats-extra/src/lib.rs +++ b/nats-extra/src/lib.rs @@ -18,4 +18,5 @@ #![deny(rustdoc::invalid_rust_codeblocks)] #![cfg_attr(docsrs, feature(doc_auto_cfg))] +#[cfg(feature = "request_many")] pub mod request_many; diff --git a/nats-extra/src/request_many.rs b/nats-extra/src/request_many.rs index b4a45dd..4623303 100644 --- a/nats-extra/src/request_many.rs +++ b/nats-extra/src/request_many.rs @@ -11,6 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Request many pattern implementation useful for streaming responses +//! and scatter-gather pattern. + use std::{ pin::Pin, task::{Context, Poll}, @@ -21,6 +24,8 @@ use async_nats::{subject::ToSubject, Client, RequestError, Subscriber}; use bytes::Bytes; use futures::{FutureExt, Stream, StreamExt}; +/// Extension trait for [async-nats Client](async_nats::Client) that enables the +/// [client.request_many()](RequestManyExt::request_many), which allows for streaming responses and scatter-gather patterns. pub trait RequestManyExt { fn request_many(&self) -> RequestMany; } @@ -31,8 +36,10 @@ impl RequestManyExt for Client { } } +/// Predicate function that can be used to pick responses termination. type SentinelPredicate = Option bool + 'static>>; +/// A builder for the request many pattern. pub struct RequestMany { client: Client, sentinel: SentinelPredicate, @@ -52,26 +59,34 @@ impl RequestMany { } } + /// Set the sentinel predicate that will be used to terminate the responses. pub fn sentinel(mut self, sentinel: impl Fn(&async_nats::Message) -> bool + 'static) -> Self { self.sentinel = Some(Box::new(sentinel)); self } + /// Set the maximum time between messages before the responses are terminated. + /// Useful when the number of responses is not known upfront. Can also work in scatter-gather + /// where sentinel or setting max messages would not work. pub fn stall_wait(mut self, stall_wait: Duration) -> Self { self.stall_wait = Some(stall_wait); self } + /// Set the maximum number of messages to receive before the responses are terminated. pub fn max_messages(mut self, max_messages: usize) -> Self { self.max_messags = Some(max_messages); self } + /// Set the maximum time to wait for responses before terminating. + /// By default, the client's request timeout is used. pub fn max_wait(mut self, max_wait: Option) -> Self { self.max_wait = max_wait; self } + /// Send a request to the subject and return a stream of responses. pub async fn send( self, subject: S, @@ -99,6 +114,7 @@ impl RequestMany { } } +/// A stream of responses from a request many pattern. pub struct Responses { responses: Subscriber, messages_received: usize, diff --git a/nats-extra/tests/request_many_tests.rs b/nats-extra/tests/request_many_tests.rs index a409634..f3932ae 100644 --- a/nats-extra/tests/request_many_tests.rs +++ b/nats-extra/tests/request_many_tests.rs @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "request_many")] mod request_many { use std::time::Duration;