From e480b7c2d626ce8f21894f3fefa068f971ce4288 Mon Sep 17 00:00:00 2001 From: alex179ohm Date: Tue, 19 Feb 2019 16:00:36 +0100 Subject: [PATCH] Add OnAuth, OnIdentify, OnBackoff, OnResume msgs --- README.md | 4 +- examples/reader/src/main.rs | 17 +++-- src/codec.rs | 1 - src/conn.rs | 91 ++++++++++++++++++++++++-- src/lib.rs | 2 +- src/msgs.rs | 127 ++++++++++++++++++++++++++++++++++++ 6 files changed, 228 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 906a1f6..839f8e3 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ In order to use nsq-client you first need to create a Reader actor which impleme from the connections and then subscribe it to the connections to be able to receive the type of messages you've selected. Available messages are: -- Msg nsqd messages sent to the Connection (routed to your Reader) -- InFlight Connection message sent to the reader every time inflight is increased or decreased +- [Msg](https://docs.rs/nsq-client/0.1.7/nsq_client/struct.Msg.html) nsqd messages sent to the Connection (routed to your Reader) +- [InFlight](https://docs.rs/nsq-client/0.1.7/nsq_client/struct.InFlight.html) Connection message sent to the reader every time inflight is increased or decreased ### Simple Consumer (SUB) ```rust diff --git a/examples/reader/src/main.rs b/examples/reader/src/main.rs index eb2cff1..3aececc 100644 --- a/examples/reader/src/main.rs +++ b/examples/reader/src/main.rs @@ -1,18 +1,18 @@ // MIT License -// +// // Copyright (c) 2019-2021 Alessandro Cresto Miseroglio // Copyright (c) 2019-2021 Tangram Technologies S.R.L. -// +// // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: -// +// // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. -// +// // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -38,12 +38,12 @@ impl Actor for MyReader { fn started(&mut self, ctx: &mut Self::Context) { self.subscribe::(ctx, self.0.clone()); + self.subscribe::(ctx, self.0.clone()); } } impl Handler for MyReader { type Result = (); - // on identify fn handle(&mut self, msg: Msg, _ctx: &mut Self::Context) { println!("MyReader: {:?}", msg); if let Ok(body) = String::from_utf8(msg.body) { @@ -53,6 +53,13 @@ impl Handler for MyReader { } } +impl Handler for MyReader { + type Result = (); + fn handle(&mut self, msg: OnAuth, _: &mut Self::Context) { + println!("authenticated: {:#?}", msg.0); + } +} + fn main() { env_logger::init(); diff --git a/src/codec.rs b/src/codec.rs index 1aee38d..2e9277b 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -32,7 +32,6 @@ use tokio_io::codec::{Encoder, Decoder}; use log::error; use crate::error::Error; -//use crate::message::Msg; // Header: Size(4-Byte) + FrameType(4-Byte) const HEADER_LENGTH: usize = 8; diff --git a/src/conn.rs b/src/conn.rs index b408b4c..5d8b2ab 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -44,7 +44,7 @@ use crate::error::Error; use crate::msgs::{ Auth, OnAuth, Sub, Ready, Cls, Resume, NsqBackoff, Fin, Msg, - NsqMsg, AddHandler, InFlight}; + NsqMsg, AddHandler, InFlight, OnIdentify, OnClose, OnBackoff, OnResume}; use crate::auth::AuthResp; #[derive(Message, Clone)] @@ -59,6 +59,7 @@ pub enum ConnState { Started, Backoff, Resume, + Closing, Stopped, } @@ -177,17 +178,81 @@ impl Connection } impl Connection { + fn info_in_flight(&self) { if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::>()) { if let Some(handler) = box_handler.downcast_ref::>() { - let _ = handler.do_send(InFlight(self.in_flight)); + match handler.do_send(InFlight(self.in_flight)) { + Ok(_) => {}, + Err(e) => { + error!("sending InFlight: {}", e) + } + } } } } + fn info_on_auth(&self, resp: AuthResp) { if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::>()) { if let Some(handler) = box_handler.downcast_ref::>() { - let _ = handler.do_send(OnAuth(resp)); + match handler.do_send(OnAuth(resp)) { + Ok(_) => {}, + Err(e) => { + error!("sending OnAuth: {}", e); + } + } + } + } + } + + fn info_on_identify(&self, resp: NsqdConfig) { + if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::>()) { + if let Some(handler) = box_handler.downcast_ref::>() { + match handler.do_send(OnIdentify(resp)) { + Ok(_) => {}, + Err(e) => { + error!("sending OnIdentify: {}", e); + } + } + } + } + } + + fn info_on_close(&self, resp: bool) { + if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::>()) { + if let Some(handler) = box_handler.downcast_ref::>() { + match handler.do_send(OnClose(resp)) { + Ok(_) => {}, + Err(e) => { + error!("sending OnClose: {}", e); + } + } + } + } + } + + fn info_on_backoff(&self) { + if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::>()) { + if let Some(handler) = box_handler.downcast_ref::>() { + match handler.do_send(OnBackoff) { + Ok(_) => {}, + Err(e) => { + error!("sending OnBackoff: {}", e); + } + } + } + } + } + + fn info_on_resume(&self) { + if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::>()) { + if let Some(handler) = box_handler.downcast_ref::>() { + match handler.do_send(OnResume) { + Ok(_) => {}, + Err(e) => { + error!("sending OnBackoff: {}", e); + } + } } } } @@ -249,6 +314,7 @@ impl StreamHandler for Connection } }; info!("configuration [{}] {:#?}", self.addr, config); + self.info_on_identify(config.clone()); if config.auth_required { info!("trying authentication [{}]", self.addr); ctx.notify(Auth); @@ -274,7 +340,11 @@ impl StreamHandler for Connection }, ConnState::Ready => { ctx.notify(Ready(self.rdy)); - } + }, + ConnState::Closing => { + self.info_on_close(true); + self.state = ConnState::Stopped; + }, _ => {}, } } @@ -301,6 +371,11 @@ impl StreamHandler for Connection } }, Cmd::ResponseError(s) => { + if self.state == ConnState::Closing { + error!("Closing connection: {}", s); + self.info_on_close(false); + self.state = ConnState::Started; + } error!("failed: {}", s); } Cmd::Command(_) => { @@ -372,7 +447,7 @@ impl Handler for Connection impl Handler for Connection { type Result=(); fn handle(&mut self, _msg: Cls, ctx: &mut Self::Context) { - self.state = ConnState::Stopped; + self.state = ConnState::Closing; ctx.stop(); } } @@ -395,6 +470,10 @@ impl Handler for Connection type Result = (); fn handle(&mut self, msg: Ready, _ctx: &mut Self::Context) { + if self.state != ConnState::Ready { + self.rdy = msg.0; + return + } if let Some(ref mut cell) = self.cell { cell.write(rdy(msg.0)); } @@ -449,6 +528,7 @@ impl Handler for Connection error!("backoff failed: connection dropped [{}]", self.addr); Self::add_stream(once::(Err(Error::NotConnected)), ctx); } + self.info_on_backoff(); } } } @@ -464,6 +544,7 @@ impl Handler for Connection error!("resume failed: connection dropped [{}]", self.addr); Self::add_stream(once::(Err(Error::NotConnected)), ctx); } + self.info_on_resume(); } } diff --git a/src/lib.rs b/src/lib.rs index e733543..02e769f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,4 +94,4 @@ pub use config::Config; pub use producer::{Producer}; pub use conn::{Connection}; pub use error::Error; -pub use msgs::{Fin, Msg, Reqeue, Touch, Pub, InFlight}; +pub use msgs::{Fin, Msg, Reqeue, Touch, Pub, InFlight, OnAuth, OnIdentify, Ready}; diff --git a/src/msgs.rs b/src/msgs.rs index eb4d7fd..648f945 100644 --- a/src/msgs.rs +++ b/src/msgs.rs @@ -26,6 +26,7 @@ use actix::prelude::*; use crate::codec::Cmd; use crate::error::Error; use crate::auth::AuthResp; +use crate::config::NsqdConfig; pub trait NsqMsg: Message + Send + 'static {} @@ -206,9 +207,135 @@ pub struct Reqeue(pub String, u32); #[derive(Message, Clone)] pub struct Touch(pub String); +/// Sent by Connection if auth be successful +/// +/// ```no-run +/// use actix::prelude::*; +/// use nsq_client::{Connection, Subscribe, OnAuth}; +/// +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: OnAuth, ctx: &mut Self::Conetxt) { +/// println!("authenticated: {:?}", msg.0); +/// } +/// } +/// ``` #[derive(Message, Clone)] pub struct OnAuth(pub AuthResp); +/// Sent by Connection after identify succeeds +/// +/// ```no-run +/// use actix::prelude::*; +/// use nsq_client::{Connection, Subscribe, OnIdentify}; +/// +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: OnIdentify, ctx: &mut Self::Conetxt) { +/// println!("identified: {:?}", msg.0); +/// } +/// } +/// ``` +#[derive(Message, Clone)] +pub struct OnIdentify(pub NsqdConfig); + +/// Sent by Connection after CLS is sent to nsqd +/// +/// ```no-run +/// use actix::prelude::*; +/// use nsq_client::{Connection, Subscribe, OnClose}; +/// +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: OnClose, ctx: &mut Self::Conetxt) { +/// if msg.0 == true { +/// println!("connection closed"); +/// } else { +/// println!("connection closing failed"); +/// } +/// } +/// } +/// ``` +#[derive(Message, Clone)] +pub struct OnClose(pub bool); + +/// Sent by Connection after Backoff state is activated +/// +/// ```no-run +/// use actix::prelude::*; +/// use nsq_client::{Connection, Subscribe, OnBackoff}; +/// +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: OnBackoff, ctx: &mut Self::Conetxt) { +/// println!("connection backoff activated"); +/// } +/// } +/// ``` +#[derive(Message, Clone)] +pub struct OnBackoff; + +/// Sent by Connection after Backoff state is terminated +/// +/// ```no-run +/// use actix::prelude::*; +/// use nsq_client::{Connection, Subscribe, OnResume}; +/// +/// struct Consumer(pub Addr); +/// +/// impl Actor for Consumer { +/// type Context = Context; +/// fn started(&mut self, ctx: &mut Self::Context) { +/// self.subscribe::(ctx, self.0.clone()); +/// } +/// } +/// +/// impl Handler for Consumer { +/// type Result = (); +/// fn handle(&mut self, msg: OnResume, ctx: &mut Self::Conetxt) { +/// println!("resuming connection from backoff state"); +/// } +/// } +/// ``` +#[derive(Message, Clone)] +pub struct OnResume; + #[derive(Message)] pub struct Cls;