From 9eff24bc6860990d04f3629329c973ccd32cda34 Mon Sep 17 00:00:00 2001 From: alex179ohm Date: Mon, 18 Feb 2019 13:26:21 +0100 Subject: [PATCH] Add Documentation --- Cargo.toml | 2 +- README.md | 24 +++++- examples/reader/src/main.rs | 2 +- src/codec.rs | 2 +- src/config.rs | 157 +++++++++++++++++++++++++++--------- src/conn.rs | 37 +++++++-- src/lib.rs | 15 ++-- src/msgs.rs | 130 ++++++++++++++++++++++++++++- src/subscribe.rs | 33 ++++++-- 9 files changed, 336 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 52af3f4..f9dc905 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nsq-client" -version = "0.1.2" +version = "0.1.3" authors = ["Alessandro Cresto Miseroglio "] description = """ Rust client for the NSQ realtime message processing system" diff --git a/README.md b/README.md index f089029..016368d 100644 --- a/README.md +++ b/README.md @@ -3,12 +3,24 @@ Sponsored by for MyReader { fn main() { env_logger::init(); let sys = System::new("nsq-consumer"); - let config = Config::default().client_id("consumer".to_string()); + let config = Config::new().client_id("consumer"); let c = Supervisor::start(|_| Connection::new( "test", // topic "test", //channel diff --git a/src/codec.rs b/src/codec.rs index 70271ce..b4fc292 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -22,7 +22,7 @@ // SOFTWARE. //! And implementation of the NSQ protocol, -//! Source: https://github.com/alex179ohm/nsqueue/blob/master/src/codec.rs +//! Source: https://github.com/alex179ohm/nsq-client-rs/blob/master/src/codec.rs use std::io::{self, Cursor}; use std::str; diff --git a/src/config.rs b/src/config.rs index 904a16d..a326298 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,54 +1,107 @@ use serde_derive::{Serialize, Deserialize}; +/// Configuration sent to nsqd to properly config the [Connection](struct.Connection.html) +/// +/// # Examples +///``` +/// use nsq_client::{Connection, Config}; +/// +/// fn main() { +/// let sys = System::new("consumer"); +/// let config = Config::new().client_id("consumer").user_agent("node-1"); +/// Supervisor::start(|_| Connection::new( +/// "test", +/// "test", +/// "0.0.0.0:4150", +/// Some(config), +/// None, +/// None, +/// )); +/// sys.run(); +/// } +///``` +/// #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Config { - // Identifiers sent to nsqd representing this client (consumer specific) + /// Identifiers sent to nsqd representing this client (consumer specific) + /// + /// Default: **hostname** where connection is started pub client_id: Option, - // hostname where client is deployed. + + /// Hostname where client is deployed. + /// + /// Default: **hostname** where connection is started pub hostname: Option, - // enable feature_negotiation + /// Enable feature_negotiation + /// + /// Default: **true** pub feature_negotiation: bool, - // Duration of time between heartbeats (milliseconds). - // Valid values: - // -1 disables heartbeats - // 1000 <= heartbeat_interval <= configured_max + /// Duration of time between heartbeats (milliseconds). + /// + /// Valid values: + /// * -1 disables heartbeats + /// * 1000 <= heartbeat_interval <= configured_max + /// + /// Default: **30000** pub heartbeat_interval: i64, - // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection - // Valid values: - // -1 disable output buffer - // 64 <= output_buffer_size <= configured_max + /// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection + /// + /// Valid values: + /// * -1 disable output buffer + /// * 64 <= output_buffer_size <= configured_max + /// + /// Default: **16384** pub output_buffer_size: u64, - // The timeout after which data nsqd has buffered will be flushed to this client. - // valid values: - // -1 disable buffer timeout - // 1ms <= output_buffer_timeout <= configured_max + /// The timeout after which data nsqd has buffered will be flushed to this client. + /// + /// Valid values: + /// * -1 disable buffer timeout + /// * 1ms <= output_buffer_timeout <= configured_max + /// + /// Default: **250** pub output_buffer_timeout: u32, - // Enable TLS negotiation + /// Enable TLS negotiation + /// + /// Default: **false** (Not implemented) pub tls_v1: bool, - // Enable snappy compression. + /// Enable snappy compression. + /// + /// Default: **false** (Not implemented) pub snappy: bool, - // Enable deflate compression. + /// Enable deflate compression. + /// + /// Default: **false** (Not implemented) pub deflate: bool, - // Configure deflate compression level. - // Valid range: - // 1 <= deflate_level <= configured_max + /// Configure deflate compression level. + /// + /// Valid range: + /// * 1 <= deflate_level <= configured_max + /// + /// Default: **6** pub deflate_level: u16, - // Integer percentage to sample the channel. - // Deliver a perventage of all messages received to this connection. + /// Integer percentage to sample the channel. + /// + /// Deliver a perventage of all messages received to this connection. + /// + /// Default: **0** pub sample_rate: u16, - // String indentifying the agent for this connection. + /// String indentifying the agent for this connection. + /// + /// Default: **hostname** where connection is started pub user_agent: String, - // Timeout used by nsqd before flushing buffered writes (set to 0 to disable). + /// Timeout used by nsqd before flushing buffered writes (set to 0 to disable). + /// + /// Default: **0** pub message_timeout: u32, } @@ -65,7 +118,6 @@ impl Default for Config { snappy: false, feature_negotiation: true, heartbeat_interval: 30000, - //heartbeat_interval: 2000, message_timeout: 0, output_buffer_size: 16384, output_buffer_timeout: 250, @@ -94,27 +146,58 @@ pub struct NsqdConfig { #[allow(dead_code)] impl Config { + /// Create default [Config](struct.Config.html) + /// ``` + /// use nsq_client::{Config}; + /// + /// fn main() { + /// let config = Config::new(); + /// assert_eq!(config, Config::default()); + /// } + /// ``` pub fn new() -> Config { Config{ ..Default::default() } } - pub fn client_id(mut self, client_id: String) -> Self { - self.client_id = Some(client_id); - self - } - - pub fn hostname(mut self, hostname: String) -> Self { - self.hostname = Some(hostname); + /// Change [client_id](struct.Config.html#structfield.client_id) + /// ``` + /// use nsq_client::Config; + /// + /// fn main() { + /// let config = Config::new().client_id("consumer"); + /// assert_eq!(config.client_id, Some("consumer".to_owned())); + /// } + /// ``` + pub fn client_id>(mut self, client_id: S) -> Self { + self.client_id = Some(client_id.into()); self } - pub fn user_agent(mut self, user_agent: String) -> Self { - self.user_agent = user_agent; + /// Change [hostname](struct.Config.html#structfield.hostname) + /// ``` + /// use nsq_client::Config; + /// + /// fn main() { + /// let config = Config::new().hostname("node-1"); + /// assert_eq!(config.hostname, Some("node-1".to_owned())); + /// } + /// ``` + pub fn hostname>(mut self, hostname: S) -> Self { + self.hostname = Some(hostname.into()); self } - pub fn snappy(mut self, snappy: bool) -> Self { - self.snappy = snappy; + /// Change [user_agent](struct.Config.html#structfield.user_agent) + /// ``` + /// use nsq_client::Config; + /// + /// fn main() { + /// let config = Config::new().user_agent("consumer-1"); + /// assert_eq!(config.user_agent, Some("consumer-1".to_owned())); + /// } + /// ``` + pub fn user_agent>(mut self, user_agent: S) -> Self { + self.user_agent = user_agent.into(); self } } diff --git a/src/conn.rs b/src/conn.rs index d8787bc..6044d69 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -45,7 +45,6 @@ use crate::msgs::{ Auth, Sub, Ready, Cls, Resume, NsqBackoff, Fin, Msg, NsqMsg, AddHandler, InFlight}; -//use crate::consumer_srvc::ConsumerService; #[derive(Message, Clone)] pub struct TcpConnect(pub String); @@ -62,6 +61,28 @@ pub enum ConnState { Stopped, } +/// Tcp Connection to NSQ system. +/// +/// Tries to connect to nsqd early as started: +/// +/// # Examples +/// ``` +/// use actix::prelude::*; +/// use nsq_client::Connection; +/// +/// fn main() { +/// let sys = System::new("consumer"); +/// Supervisor::start(|_| Connection::new( +/// "test", // <- topic +/// "test", // <- channel +/// "0.0.0.0:4150", // <- nsqd tcp address +/// None, // <- config (Optional) +/// None, // <- secret used by Auth +/// Some(1) // <- RDY setting for the Connection +/// )); +/// sys.run(); +/// } +/// ``` pub struct Connection { addr: String, @@ -106,6 +127,14 @@ impl Default for Connection impl Connection { + /// Return a Tcp Connection to nsqd. + /// + /// * `topic` - Topic String + /// * `channel` - Channel String + /// * `addr` - Tcp address of nsqd + /// * `config` - Optional [`Config`] + /// * `secret` - Optional String used to autenticate to nsqd + /// * `rdy` - Optional initial RDY setting pub fn new>( topic: S, channel: S, @@ -160,12 +189,6 @@ impl Actor for Connection } } -//impl Connection { -// fn add_inflight_handler(&mut self, handler: Recipient) { -// self.info_hashmap.insert(TypeId::of::(), Box::new(handler)); -// } -//} -// impl actix::io::WriteHandler for Connection { fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running { diff --git a/src/lib.rs b/src/lib.rs index ecc925c..4b652a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,19 +21,22 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -//! Nsq-client is a actix based implementation of nsq protocol. +//! Nsq-client is the [actix](https://actix.rs) based client implementation of the nsq protocol. //! //! This crate is intended as a swiss-knife base implementation for more //! complex nsq client applications, it supports even single or multiple connections, single or //! multiple async readers. //! -//! Due the actors's actix model, readers and connections are distinct entities witch communicate +//! Due the actors model, readers and connections are distinct entities witch communicate //! each other throught messages, so one reader could receive messages from multiple connections and multiple -//! connections could easily share multiple readers. +//! connections could easily send messages to multiple readers. //! //! -//! # Example +//! # Examples //! ``` +//! use actix::prelude::*; +//! use nsq_client::{Connection, Msg, Subscribe, Fin}; +//! //! struct MyReader{ //! conn: Arc>, //! }; @@ -84,10 +87,10 @@ mod producer; mod conn; mod subscribe; -pub use commands::{fin, req, touch}; +//pub use commands::{fin, req, touch}; pub use subscribe::{Subscribe}; pub use config::Config; pub use producer::{Producer}; pub use conn::{Connection}; pub use error::Error; -pub use msgs::{Fin, Msg, Reqeue, Touch, Conn, Pub, InFlight}; +pub use msgs::{Fin, Msg, Reqeue, Touch, Pub, InFlight}; diff --git a/src/msgs.rs b/src/msgs.rs index c774802..2bab486 100644 --- a/src/msgs.rs +++ b/src/msgs.rs @@ -25,7 +25,7 @@ use actix::prelude::*; use crate::codec::Cmd; use crate::error::Error; -use crate::conn::Connection; +//use crate::conn::Connection; pub trait NsqMsg: Message + Send + 'static {} @@ -37,15 +37,44 @@ where #[derive(Message)] pub struct AddHandler(pub Recipient); -#[derive(Message)] -pub struct Conn(pub Addr); - +//#[derive(Message)] +//pub struct Conn(pub Addr); + +/// Message sent by nsqd +/// +/// ## Example +/// ```rust +/// struct Consumer(Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn handle(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// fn Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: Msg, _: &mut Self::Context) { +/// println!("timestamp: {}", msg.timestamp); +/// println!("attemps: {}", msg.attemps); +/// println!("id: {}", msg.id); +/// println!("data: {}", msg.body); +/// println!("msg debug: {:?}", msg); +/// } +/// } +/// +/// ``` #[derive(Clone, Debug, Message)] pub struct Msg { + /// Timestamp of the message pub timestamp: i64, + /// Number of attemps reader tried to process the message pub attemps: u16, + /// Id of the message pub id: String, + /// Data sent by nsqd pub body: String, } @@ -60,6 +89,7 @@ impl Default for Msg { } } +/// Sent by [Connection](struct.Connection.html) every time in_fligth is increased or decreased #[derive(Message)] pub struct InFlight(pub u32); @@ -69,6 +99,19 @@ pub struct Auth; #[derive(Message)] pub struct Sub; +/// Allows Consumer to change Connection/s RDY +/// +/// # Examples +/// ```rust +/// struct Consumer(Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, _: &mut Self::Context) { +/// self.conn.do_send(Ready(3)); +/// } +/// } +/// ``` #[derive(Message)] pub struct Ready(pub u32); @@ -78,12 +121,91 @@ pub struct NsqBackoff; #[derive(Message)] pub struct Resume; +/// Send FIN command to nsqd +/// +/// Args: +/// * id - id of the message +/// +/// # Examples +/// ``` +/// use actix::prelude::*; +/// use nsq_client::{Connection, Subscribe, Msg, Fin}; +/// +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: Msg, ctx: &mut Self::Conetxt) { +/// self.0.do_send(Fin(msg.id)); +/// } +/// } +/// ``` #[derive(Message, Clone)] pub struct Fin(pub String); +/// Send REQ command to nsqd +/// +/// Args: +/// * id - id of the message +/// * timeout - time spent before message is re-sent by nsqd, 0 will not defer requeuing +/// +/// # Examples +/// ``` +/// use actix::prelude::*; +/// use nsq_client::{Connection, Subscribe, Requeue, Fin}; +/// +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: Msg, ctx: &mut Self::Conetxt) { +/// self.0.do_send(Requeue(msg.id, 2)); +/// } +/// } +/// ``` #[derive(Message, Clone)] pub struct Reqeue(pub String, u32); +/// Send TOUCH command to nsqd (reset timeout for and in-flight message) +/// +/// Args: +/// * id - id of the message +/// +/// # Examples +/// ``` +/// use actix::prelude::*; +/// use nsq_client::{Connection, Subscribe, Touch, Fin}; +/// +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: Msg, ctx: &mut Self::Conetxt) { +/// self.0.do_send(Touch(msg.id)); +/// } +/// } +/// ``` #[derive(Message, Clone)] pub struct Touch(pub String); diff --git a/src/subscribe.rs b/src/subscribe.rs index 82a0cda..22ee189 100644 --- a/src/subscribe.rs +++ b/src/subscribe.rs @@ -21,19 +21,42 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -//use std::collections::HashMap; -//use std::any::Any; use std::sync::Arc; use actix::prelude::*; use actix::dev::ToEnvelope; -//use actix::sync::SyncContext; -//use log::info; use crate::msgs::{NsqMsg, AddHandler}; use crate::conn::Connection; -//use crate::config::Config; +/// Allows differents consumers to subscribe to the desired msgs sent by connections. +/// +/// ### Example +/// ```rust +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// self.subsctibe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: Msg, _: &mut Self::Context) { +/// // process Msg +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: InFligth, _: &mut Self::Context) { +/// // do something every time in_fligth is increased or decreased +/// } +/// } +/// ``` pub trait Subscribe where Self: Actor,