Skip to content

Commit

Permalink
Add Documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
alex179ohm committed Feb 18, 2019
1 parent bfeb9e5 commit 9eff24b
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nsq-client"
version = "0.1.2"
version = "0.1.3"
authors = ["Alessandro Cresto Miseroglio <alex179ohm@gmail.com>"]
description = """
Rust client for the NSQ realtime message processing system"
Expand Down
24 changes: 20 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,24 @@ Sponsored by <a href="https://tngrm.io"><img src="https://tngrm.io/static/img/tn
---
A [Actix](https://actix.rs/) based client implementation for the [NSQ](https://nsq.io) realtime message processing system.
Nsq-client it's designed to support by default multiple Readers for Multiple Connections, readers are routed per single connection by a round robin algorithm.
---
## Usage

## Examples
- [Simple Processing Message](https://github.com/alex179ohm/nsq-client-rs/tree/master/examples/reader)
- [Simple Producer](https://github.com/alex179ohm/nsq-client-rs/tree/master/examples/producer)
To use nsq-client, add this to your Cargo.toml:
```toml
[dependencies]
actix = "0.7"
nsq-client = "0.1.2"
```
### Create your first consumer
In order to use nsq-client you first need to create a Reader actor which implement Handler for the type of messages you want to receive
from the connections and then subscribe it to the connections to be able to receive the type of messages you've selected.

### Simple Reader (SUB)
Available messages are:
- Msg nsqd messages sent to the Connection (routed to your Reader)
- InFlight Connection message sent to the reader every time inflight is increased or decreased

### Simple Consumer (SUB)
```rust
extern crate nsqueue;
extern crate actix;
Expand Down Expand Up @@ -69,6 +81,10 @@ $ cargo run

[![asciicast](https://asciinema.org/a/8dZ5QgjN3WCwDhgU8mAX9BMsR.svg)](https://asciinema.org/a/8dZ5QgjN3WCwDhgU8mAX9BMsR)

## Examples
- [Simple Processing Message](https://github.com/alex179ohm/nsq-client-rs/tree/master/examples/reader)
- [Simple Producer](https://github.com/alex179ohm/nsq-client-rs/tree/master/examples/producer)

### ToDo
- [ ] Discovery
- [ ] TLS
Expand Down
2 changes: 1 addition & 1 deletion examples/reader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Handler<Msg> for MyReader {
fn main() {
env_logger::init();
let sys = System::new("nsq-consumer");
let config = Config::default().client_id("consumer".to_string());
let config = Config::new().client_id("consumer");
let c = Supervisor::start(|_| Connection::new(
"test", // topic
"test", //channel
Expand Down
2 changes: 1 addition & 1 deletion src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// SOFTWARE.

//! And implementation of the NSQ protocol,
//! Source: https://github.com/alex179ohm/nsqueue/blob/master/src/codec.rs
//! Source: https://github.com/alex179ohm/nsq-client-rs/blob/master/src/codec.rs
use std::io::{self, Cursor};
use std::str;
Expand Down
157 changes: 120 additions & 37 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,107 @@
use serde_derive::{Serialize, Deserialize};

/// Configuration sent to nsqd to properly config the [Connection](struct.Connection.html)
///
/// # Examples
///```
/// use nsq_client::{Connection, Config};
///
/// fn main() {
/// let sys = System::new("consumer");
/// let config = Config::new().client_id("consumer").user_agent("node-1");
/// Supervisor::start(|_| Connection::new(
/// "test",
/// "test",
/// "0.0.0.0:4150",
/// Some(config),
/// None,
/// None,
/// ));
/// sys.run();
/// }
///```
///
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Config {
// Identifiers sent to nsqd representing this client (consumer specific)
/// Identifiers sent to nsqd representing this client (consumer specific)
///
/// Default: **hostname** where connection is started
pub client_id: Option<String>,
// hostname where client is deployed.

/// Hostname where client is deployed.
///
/// Default: **hostname** where connection is started
pub hostname: Option<String>,

// enable feature_negotiation
/// Enable feature_negotiation
///
/// Default: **true**
pub feature_negotiation: bool,

// Duration of time between heartbeats (milliseconds).
// Valid values:
// -1 disables heartbeats
// 1000 <= heartbeat_interval <= configured_max
/// Duration of time between heartbeats (milliseconds).
///
/// Valid values:
/// * -1 disables heartbeats
/// * 1000 <= heartbeat_interval <= configured_max
///
/// Default: **30000**
pub heartbeat_interval: i64,

// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
// Valid values:
// -1 disable output buffer
// 64 <= output_buffer_size <= configured_max
/// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
///
/// Valid values:
/// * -1 disable output buffer
/// * 64 <= output_buffer_size <= configured_max
///
/// Default: **16384**
pub output_buffer_size: u64,

// The timeout after which data nsqd has buffered will be flushed to this client.
// valid values:
// -1 disable buffer timeout
// 1ms <= output_buffer_timeout <= configured_max
/// The timeout after which data nsqd has buffered will be flushed to this client.
///
/// Valid values:
/// * -1 disable buffer timeout
/// * 1ms <= output_buffer_timeout <= configured_max
///
/// Default: **250**
pub output_buffer_timeout: u32,

// Enable TLS negotiation
/// Enable TLS negotiation
///
/// Default: **false** (Not implemented)
pub tls_v1: bool,

// Enable snappy compression.
/// Enable snappy compression.
///
/// Default: **false** (Not implemented)
pub snappy: bool,

// Enable deflate compression.
/// Enable deflate compression.
///
/// Default: **false** (Not implemented)
pub deflate: bool,
// Configure deflate compression level.
// Valid range:
// 1 <= deflate_level <= configured_max
/// Configure deflate compression level.
///
/// Valid range:
/// * 1 <= deflate_level <= configured_max
///
/// Default: **6**
pub deflate_level: u16,

// Integer percentage to sample the channel.
// Deliver a perventage of all messages received to this connection.
/// Integer percentage to sample the channel.
///
/// Deliver a perventage of all messages received to this connection.
///
/// Default: **0**
pub sample_rate: u16,

// String indentifying the agent for this connection.
/// String indentifying the agent for this connection.
///
/// Default: **hostname** where connection is started
pub user_agent: String,

// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
/// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
///
/// Default: **0**
pub message_timeout: u32,

}
Expand All @@ -65,7 +118,6 @@ impl Default for Config {
snappy: false,
feature_negotiation: true,
heartbeat_interval: 30000,
//heartbeat_interval: 2000,
message_timeout: 0,
output_buffer_size: 16384,
output_buffer_timeout: 250,
Expand Down Expand Up @@ -94,27 +146,58 @@ pub struct NsqdConfig {

#[allow(dead_code)]
impl Config {
/// Create default [Config](struct.Config.html)
/// ```
/// use nsq_client::{Config};
///
/// fn main() {
/// let config = Config::new();
/// assert_eq!(config, Config::default());
/// }
/// ```
pub fn new() -> Config {
Config{ ..Default::default() }
}

pub fn client_id(mut self, client_id: String) -> Self {
self.client_id = Some(client_id);
self
}

pub fn hostname(mut self, hostname: String) -> Self {
self.hostname = Some(hostname);
/// Change [client_id](struct.Config.html#structfield.client_id)
/// ```
/// use nsq_client::Config;
///
/// fn main() {
/// let config = Config::new().client_id("consumer");
/// assert_eq!(config.client_id, Some("consumer".to_owned()));
/// }
/// ```
pub fn client_id<S: Into<String>>(mut self, client_id: S) -> Self {
self.client_id = Some(client_id.into());
self
}

pub fn user_agent(mut self, user_agent: String) -> Self {
self.user_agent = user_agent;
/// Change [hostname](struct.Config.html#structfield.hostname)
/// ```
/// use nsq_client::Config;
///
/// fn main() {
/// let config = Config::new().hostname("node-1");
/// assert_eq!(config.hostname, Some("node-1".to_owned()));
/// }
/// ```
pub fn hostname<S: Into<String>>(mut self, hostname: S) -> Self {
self.hostname = Some(hostname.into());
self
}

pub fn snappy(mut self, snappy: bool) -> Self {
self.snappy = snappy;
/// Change [user_agent](struct.Config.html#structfield.user_agent)
/// ```
/// use nsq_client::Config;
///
/// fn main() {
/// let config = Config::new().user_agent("consumer-1");
/// assert_eq!(config.user_agent, Some("consumer-1".to_owned()));
/// }
/// ```
pub fn user_agent<S: Into<String>>(mut self, user_agent: S) -> Self {
self.user_agent = user_agent.into();
self
}
}
37 changes: 30 additions & 7 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::msgs::{
Auth, Sub, Ready, Cls,
Resume, NsqBackoff, Fin, Msg,
NsqMsg, AddHandler, InFlight};
//use crate::consumer_srvc::ConsumerService;

#[derive(Message, Clone)]
pub struct TcpConnect(pub String);
Expand All @@ -62,6 +61,28 @@ pub enum ConnState {
Stopped,
}

/// Tcp Connection to NSQ system.
///
/// Tries to connect to nsqd early as started:
///
/// # Examples
/// ```
/// use actix::prelude::*;
/// use nsq_client::Connection;
///
/// fn main() {
/// let sys = System::new("consumer");
/// Supervisor::start(|_| Connection::new(
/// "test", // <- topic
/// "test", // <- channel
/// "0.0.0.0:4150", // <- nsqd tcp address
/// None, // <- config (Optional)
/// None, // <- secret used by Auth
/// Some(1) // <- RDY setting for the Connection
/// ));
/// sys.run();
/// }
/// ```
pub struct Connection
{
addr: String,
Expand Down Expand Up @@ -106,6 +127,14 @@ impl Default for Connection

impl Connection
{
/// Return a Tcp Connection to nsqd.
///
/// * `topic` - Topic String
/// * `channel` - Channel String
/// * `addr` - Tcp address of nsqd
/// * `config` - Optional [`Config`]
/// * `secret` - Optional String used to autenticate to nsqd
/// * `rdy` - Optional initial RDY setting
pub fn new<S: Into<String>>(
topic: S,
channel: S,
Expand Down Expand Up @@ -160,12 +189,6 @@ impl Actor for Connection
}
}

//impl Connection {
// fn add_inflight_handler(&mut self, handler: Recipient<InFlight>) {
// self.info_hashmap.insert(TypeId::of::<InFlight>(), Box::new(handler));
// }
//}
//
impl actix::io::WriteHandler<io::Error> for Connection
{
fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
Expand Down
15 changes: 9 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

//! Nsq-client is a actix based implementation of nsq protocol.
//! Nsq-client is the [actix](https://actix.rs) based client implementation of the nsq protocol.
//!
//! This crate is intended as a swiss-knife base implementation for more
//! complex nsq client applications, it supports even single or multiple connections, single or
//! multiple async readers.
//!
//! Due the actors's actix model, readers and connections are distinct entities witch communicate
//! Due the actors model, readers and connections are distinct entities witch communicate
//! each other throught messages, so one reader could receive messages from multiple connections and multiple
//! connections could easily share multiple readers.
//! connections could easily send messages to multiple readers.
//!
//!
//! # Example
//! # Examples
//! ```
//! use actix::prelude::*;
//! use nsq_client::{Connection, Msg, Subscribe, Fin};
//!
//! struct MyReader{
//! conn: Arc<Addr<Connection>>,
//! };
Expand Down Expand Up @@ -84,10 +87,10 @@ mod producer;
mod conn;
mod subscribe;

pub use commands::{fin, req, touch};
//pub use commands::{fin, req, touch};
pub use subscribe::{Subscribe};
pub use config::Config;
pub use producer::{Producer};
pub use conn::{Connection};
pub use error::Error;
pub use msgs::{Fin, Msg, Reqeue, Touch, Conn, Pub, InFlight};
pub use msgs::{Fin, Msg, Reqeue, Touch, Pub, InFlight};
Loading

0 comments on commit 9eff24b

Please sign in to comment.