Skip to content

Commit

Permalink
Improve docs and add request many feature flag
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Oct 31, 2024
1 parent d51c6d0 commit 7456f3b
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ This is a list of the current utilities.

| Module | Description | Docs |
| ------ | --------------------------------------------- | --------------------------- |
| core extras | Set of useful tools for Core NATS | [README.md](core-extras/README.md) |
| nats extra | Set of useful tools for Core NATS | [README.md](nats-extra/README.md) |
4 changes: 4 additions & 0 deletions nats-extra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ bytes = "1.8.0"
futures = "0.3.31"
tokio = { version = "1.0", features = ["time"] }
nats_server = { git = "https://github.com/nats-io/nats.rs", package = "nats-server" }

[features]
request_many = []
default = ["request_many"]
Empty file added nats-extra/README.md
Empty file.
1 change: 1 addition & 0 deletions nats-extra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
#![deny(rustdoc::invalid_rust_codeblocks)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

#[cfg(feature = "request_many")]
pub mod request_many;
16 changes: 16 additions & 0 deletions nats-extra/src/request_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! Request many pattern implementation useful for streaming responses
//! and scatter-gather pattern.
use std::{
pin::Pin,
task::{Context, Poll},
Expand All @@ -21,6 +24,8 @@ use async_nats::{subject::ToSubject, Client, RequestError, Subscriber};
use bytes::Bytes;
use futures::{FutureExt, Stream, StreamExt};

/// Extension trait for [async-nats Client](async_nats::Client) that enables the
/// [client.request_many()](RequestManyExt::request_many), which allows for streaming responses and scatter-gather patterns.
pub trait RequestManyExt {
fn request_many(&self) -> RequestMany;
}
Expand All @@ -31,8 +36,10 @@ impl RequestManyExt for Client {
}
}

/// Predicate function that can be used to pick responses termination.
type SentinelPredicate = Option<Box<dyn Fn(&async_nats::Message) -> bool + 'static>>;

/// A builder for the request many pattern.
pub struct RequestMany {
client: Client,
sentinel: SentinelPredicate,
Expand All @@ -52,26 +59,34 @@ impl RequestMany {
}
}

/// Set the sentinel predicate that will be used to terminate the responses.
pub fn sentinel(mut self, sentinel: impl Fn(&async_nats::Message) -> bool + 'static) -> Self {
self.sentinel = Some(Box::new(sentinel));
self
}

/// Set the maximum time between messages before the responses are terminated.
/// Useful when the number of responses is not known upfront. Can also work in scatter-gather
/// where sentinel or setting max messages would not work.
pub fn stall_wait(mut self, stall_wait: Duration) -> Self {
self.stall_wait = Some(stall_wait);
self
}

/// Set the maximum number of messages to receive before the responses are terminated.
pub fn max_messages(mut self, max_messages: usize) -> Self {
self.max_messags = Some(max_messages);
self
}

/// Set the maximum time to wait for responses before terminating.
/// By default, the client's request timeout is used.
pub fn max_wait(mut self, max_wait: Option<Duration>) -> Self {
self.max_wait = max_wait;
self
}

/// Send a request to the subject and return a stream of responses.
pub async fn send<S: ToSubject>(
self,
subject: S,
Expand Down Expand Up @@ -99,6 +114,7 @@ impl RequestMany {
}
}

/// A stream of responses from a request many pattern.
pub struct Responses {
responses: Subscriber,
messages_received: usize,
Expand Down
1 change: 1 addition & 0 deletions nats-extra/tests/request_many_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "request_many")]
mod request_many {
use std::time::Duration;

Expand Down

0 comments on commit 7456f3b

Please sign in to comment.