Skip to content

Commit

Permalink
Add OnAuth, OnIdentify, OnBackoff, OnResume msgs
Browse files Browse the repository at this point in the history
  • Loading branch information
alex179ohm committed Feb 19, 2019
1 parent 993acdd commit e480b7c
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 14 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ In order to use nsq-client you first need to create a Reader actor which impleme
from the connections and then subscribe it to the connections to be able to receive the type of messages you've selected.

Available messages are:
- Msg nsqd messages sent to the Connection (routed to your Reader)
- InFlight Connection message sent to the reader every time inflight is increased or decreased
- [Msg](https://docs.rs/nsq-client/0.1.7/nsq_client/struct.Msg.html) nsqd messages sent to the Connection (routed to your Reader)
- [InFlight](https://docs.rs/nsq-client/0.1.7/nsq_client/struct.InFlight.html) Connection message sent to the reader every time inflight is increased or decreased

### Simple Consumer (SUB)
```rust
Expand Down
17 changes: 12 additions & 5 deletions examples/reader/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// MIT License
//
//
// Copyright (c) 2019-2021 Alessandro Cresto Miseroglio <alex179ohm@gmail.com>
// Copyright (c) 2019-2021 Tangram Technologies S.R.L. <https://tngrm.io>
//
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Expand All @@ -38,12 +38,12 @@ impl Actor for MyReader {

fn started(&mut self, ctx: &mut Self::Context) {
self.subscribe::<Msg>(ctx, self.0.clone());
self.subscribe::<OnAuth>(ctx, self.0.clone());
}
}

impl Handler<Msg> for MyReader {
type Result = ();
// on identify
fn handle(&mut self, msg: Msg, _ctx: &mut Self::Context) {
println!("MyReader: {:?}", msg);
if let Ok(body) = String::from_utf8(msg.body) {
Expand All @@ -53,6 +53,13 @@ impl Handler<Msg> for MyReader {
}
}

impl Handler<OnAuth> for MyReader {
type Result = ();
fn handle(&mut self, msg: OnAuth, _: &mut Self::Context) {
println!("authenticated: {:#?}", msg.0);
}
}


fn main() {
env_logger::init();
Expand Down
1 change: 0 additions & 1 deletion src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use tokio_io::codec::{Encoder, Decoder};
use log::error;

use crate::error::Error;
//use crate::message::Msg;

// Header: Size(4-Byte) + FrameType(4-Byte)
const HEADER_LENGTH: usize = 8;
Expand Down
91 changes: 86 additions & 5 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::error::Error;
use crate::msgs::{
Auth, OnAuth, Sub, Ready, Cls,
Resume, NsqBackoff, Fin, Msg,
NsqMsg, AddHandler, InFlight};
NsqMsg, AddHandler, InFlight, OnIdentify, OnClose, OnBackoff, OnResume};
use crate::auth::AuthResp;

#[derive(Message, Clone)]
Expand All @@ -59,6 +59,7 @@ pub enum ConnState {
Started,
Backoff,
Resume,
Closing,
Stopped,
}

Expand Down Expand Up @@ -177,17 +178,81 @@ impl Connection
}

impl Connection {

fn info_in_flight(&self) {
if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<InFlight>>()) {
if let Some(handler) = box_handler.downcast_ref::<Recipient<InFlight>>() {
let _ = handler.do_send(InFlight(self.in_flight));
match handler.do_send(InFlight(self.in_flight)) {
Ok(_) => {},
Err(e) => {
error!("sending InFlight: {}", e)
}
}
}
}
}

fn info_on_auth(&self, resp: AuthResp) {
if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnAuth>>()) {
if let Some(handler) = box_handler.downcast_ref::<Recipient<OnAuth>>() {
let _ = handler.do_send(OnAuth(resp));
match handler.do_send(OnAuth(resp)) {
Ok(_) => {},
Err(e) => {
error!("sending OnAuth: {}", e);
}
}
}
}
}

fn info_on_identify(&self, resp: NsqdConfig) {
if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnIdentify>>()) {
if let Some(handler) = box_handler.downcast_ref::<Recipient<OnIdentify>>() {
match handler.do_send(OnIdentify(resp)) {
Ok(_) => {},
Err(e) => {
error!("sending OnIdentify: {}", e);
}
}
}
}
}

fn info_on_close(&self, resp: bool) {
if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnClose>>()) {
if let Some(handler) = box_handler.downcast_ref::<Recipient<OnClose>>() {
match handler.do_send(OnClose(resp)) {
Ok(_) => {},
Err(e) => {
error!("sending OnClose: {}", e);
}
}
}
}
}

fn info_on_backoff(&self) {
if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnBackoff>>()) {
if let Some(handler) = box_handler.downcast_ref::<Recipient<OnBackoff>>() {
match handler.do_send(OnBackoff) {
Ok(_) => {},
Err(e) => {
error!("sending OnBackoff: {}", e);
}
}
}
}
}

fn info_on_resume(&self) {
if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnResume>>()) {
if let Some(handler) = box_handler.downcast_ref::<Recipient<OnResume>>() {
match handler.do_send(OnResume) {
Ok(_) => {},
Err(e) => {
error!("sending OnBackoff: {}", e);
}
}
}
}
}
Expand Down Expand Up @@ -249,6 +314,7 @@ impl StreamHandler<Cmd, Error> for Connection
}
};
info!("configuration [{}] {:#?}", self.addr, config);
self.info_on_identify(config.clone());
if config.auth_required {
info!("trying authentication [{}]", self.addr);
ctx.notify(Auth);
Expand All @@ -274,7 +340,11 @@ impl StreamHandler<Cmd, Error> for Connection
},
ConnState::Ready => {
ctx.notify(Ready(self.rdy));
}
},
ConnState::Closing => {
self.info_on_close(true);
self.state = ConnState::Stopped;
},
_ => {},
}
}
Expand All @@ -301,6 +371,11 @@ impl StreamHandler<Cmd, Error> for Connection
}
},
Cmd::ResponseError(s) => {
if self.state == ConnState::Closing {
error!("Closing connection: {}", s);
self.info_on_close(false);
self.state = ConnState::Started;
}
error!("failed: {}", s);
}
Cmd::Command(_) => {
Expand Down Expand Up @@ -372,7 +447,7 @@ impl Handler<TcpConnect> for Connection
impl Handler<Cls> for Connection {
type Result=();
fn handle(&mut self, _msg: Cls, ctx: &mut Self::Context) {
self.state = ConnState::Stopped;
self.state = ConnState::Closing;
ctx.stop();
}
}
Expand All @@ -395,6 +470,10 @@ impl Handler<Ready> for Connection
type Result = ();

fn handle(&mut self, msg: Ready, _ctx: &mut Self::Context) {
if self.state != ConnState::Ready {
self.rdy = msg.0;
return
}
if let Some(ref mut cell) = self.cell {
cell.write(rdy(msg.0));
}
Expand Down Expand Up @@ -449,6 +528,7 @@ impl Handler<NsqBackoff> for Connection
error!("backoff failed: connection dropped [{}]", self.addr);
Self::add_stream(once::<Cmd, Error>(Err(Error::NotConnected)), ctx);
}
self.info_on_backoff();
}
}
}
Expand All @@ -464,6 +544,7 @@ impl Handler<Resume> for Connection
error!("resume failed: connection dropped [{}]", self.addr);
Self::add_stream(once::<Cmd, Error>(Err(Error::NotConnected)), ctx);
}
self.info_on_resume();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,4 @@ pub use config::Config;
pub use producer::{Producer};
pub use conn::{Connection};
pub use error::Error;
pub use msgs::{Fin, Msg, Reqeue, Touch, Pub, InFlight};
pub use msgs::{Fin, Msg, Reqeue, Touch, Pub, InFlight, OnAuth, OnIdentify, Ready};
127 changes: 127 additions & 0 deletions src/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use actix::prelude::*;
use crate::codec::Cmd;
use crate::error::Error;
use crate::auth::AuthResp;
use crate::config::NsqdConfig;

pub trait NsqMsg: Message<Result = ()> + Send + 'static {}

Expand Down Expand Up @@ -206,9 +207,135 @@ pub struct Reqeue(pub String, u32);
#[derive(Message, Clone)]
pub struct Touch(pub String);

/// Sent by Connection if auth be successful
///
/// ```no-run
/// use actix::prelude::*;
/// use nsq_client::{Connection, Subscribe, OnAuth};
///
/// struct Consumer(pub Addr<Connection>);
///
/// impl Actor for Consumer {
/// type Context = Context<Self>;
/// fn started(&mut self, ctx: &mut Self::Context) {
/// self.subscribe::<OnAuth>(ctx, self.0.clone());
/// }
/// }
///
/// impl Handler<OnAuth> for Consumer {
/// type Result = ();
/// fn handle(&mut self, msg: OnAuth, ctx: &mut Self::Conetxt) {
/// println!("authenticated: {:?}", msg.0);
/// }
/// }
/// ```
#[derive(Message, Clone)]
pub struct OnAuth(pub AuthResp);

/// Sent by Connection after identify succeeds
///
/// ```no-run
/// use actix::prelude::*;
/// use nsq_client::{Connection, Subscribe, OnIdentify};
///
/// struct Consumer(pub Addr<Connection>);
///
/// impl Actor for Consumer {
/// type Context = Context<Self>;
/// fn started(&mut self, ctx: &mut Self::Context) {
/// self.subscribe::<OnIdentify>(ctx, self.0.clone());
/// }
/// }
///
/// impl Handler<OnIdentify> for Consumer {
/// type Result = ();
/// fn handle(&mut self, msg: OnIdentify, ctx: &mut Self::Conetxt) {
/// println!("identified: {:?}", msg.0);
/// }
/// }
/// ```
#[derive(Message, Clone)]
pub struct OnIdentify(pub NsqdConfig);

/// Sent by Connection after CLS is sent to nsqd
///
/// ```no-run
/// use actix::prelude::*;
/// use nsq_client::{Connection, Subscribe, OnClose};
///
/// struct Consumer(pub Addr<Connection>);
///
/// impl Actor for Consumer {
/// type Context = Context<Self>;
/// fn started(&mut self, ctx: &mut Self::Context) {
/// self.subscribe::<OnClose>(ctx, self.0.clone());
/// }
/// }
///
/// impl Handler<OnClose> for Consumer {
/// type Result = ();
/// fn handle(&mut self, msg: OnClose, ctx: &mut Self::Conetxt) {
/// if msg.0 == true {
/// println!("connection closed");
/// } else {
/// println!("connection closing failed");
/// }
/// }
/// }
/// ```
#[derive(Message, Clone)]
pub struct OnClose(pub bool);

/// Sent by Connection after Backoff state is activated
///
/// ```no-run
/// use actix::prelude::*;
/// use nsq_client::{Connection, Subscribe, OnBackoff};
///
/// struct Consumer(pub Addr<Connection>);
///
/// impl Actor for Consumer {
/// type Context = Context<Self>;
/// fn started(&mut self, ctx: &mut Self::Context) {
/// self.subscribe::<OnBackoff>(ctx, self.0.clone());
/// }
/// }
///
/// impl Handler<OnBackoff> for Consumer {
/// type Result = ();
/// fn handle(&mut self, msg: OnBackoff, ctx: &mut Self::Conetxt) {
/// println!("connection backoff activated");
/// }
/// }
/// ```
#[derive(Message, Clone)]
pub struct OnBackoff;

/// Sent by Connection after Backoff state is terminated
///
/// ```no-run
/// use actix::prelude::*;
/// use nsq_client::{Connection, Subscribe, OnResume};
///
/// struct Consumer(pub Addr<Connection>);
///
/// impl Actor for Consumer {
/// type Context = Context<Self>;
/// fn started(&mut self, ctx: &mut Self::Context) {
/// self.subscribe::<OnResume>(ctx, self.0.clone());
/// }
/// }
///
/// impl Handler<OnResume> for Consumer {
/// type Result = ();
/// fn handle(&mut self, msg: OnResume, ctx: &mut Self::Conetxt) {
/// println!("resuming connection from backoff state");
/// }
/// }
/// ```
#[derive(Message, Clone)]
pub struct OnResume;

#[derive(Message)]
pub struct Cls;

Expand Down

0 comments on commit e480b7c

Please sign in to comment.