From cde89cee9bcd59087f0e94eac002078d007f6352 Mon Sep 17 00:00:00 2001 From: alex179ohm Date: Tue, 19 Feb 2019 13:06:01 +0100 Subject: [PATCH] Add support for bytes messages --- Cargo.toml | 1 - src/codec.rs | 31 +++++++++++++++++-------------- src/conn.rs | 48 +++++++++++++++++++++++------------------------- src/msgs.rs | 16 ++++++++++------ 4 files changed, 50 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 51ad3dd..87f328d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ description = """ Rust client for the NSQ realtime message processing system """ license = "MIT" -license-file = "LICENSE" keywords = ["nsq", "queue", "actix", "async", "actor"] categories = ["caching", "asynchronous"] repository = "https://github.com/alex179ohm/nsq-client-rs" diff --git a/src/codec.rs b/src/codec.rs index b4fc292..1aee38d 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -59,7 +59,7 @@ pub enum Cmd { ResponseError(String), /// Message response. - ResponseMsg(Vec<(i64, u16, String, String)>), + ResponseMsg(Vec<(i64, u16, String, Vec)>), /// A simple Command whitch not sends msg. Command(String), @@ -72,9 +72,9 @@ pub enum Cmd { } /// NSQ codec -pub struct NsqCodec {} +pub struct NsqCodec; -pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, String)> { +pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, Vec)> { if buf.len() < 4 { None } else { @@ -88,15 +88,18 @@ pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, String)> { let _ = cursor.get_i32_be(); let timestamp = cursor.get_i64_be(); let attemps = cursor.get_u16_be(); - if let Ok(id_body) = str::from_utf8(&cursor.bytes()[..size - HEADER_LENGTH - 6]) { - let (id, body) = id_body.split_at(16); - // clean the buffer at frame size - buf.split_to(size+4); - Some((timestamp, attemps, id.to_owned(), body.to_owned())) - } else { - error!("error deconding utf8 message frame"); - None - } + let id_body_bytes = &cursor.bytes()[..size - HEADER_LENGTH - 6]; + let (id_bytes, body_bytes) = id_body_bytes.split_at(16); + let id = match str::from_utf8(id_bytes) { + Ok(s) => s, + Err(e) => { + error!("error deconding utf8 id: {}", e); + return None; + }, + }; + // clean the buffer at frame size + buf.split_to(size+4); + Some((timestamp, attemps, id.to_owned(), Vec::from(body_bytes))) } } } @@ -189,12 +192,12 @@ impl Decoder for NsqCodec { // it's a message } else if frame_type == FRAME_TYPE_MESSAGE { let mut resp_buf = buf.clone(); - let mut msg_buf: Vec<(i64, u16, String, String)> = Vec::new(); + let mut msg_buf: Vec<(i64, u16, String, Vec)> = Vec::new(); let mut need_more = false; loop { if resp_buf.is_empty() { break }; if let Some((ts, at, id, bd)) = decode_msg(&mut resp_buf) { - msg_buf.push((ts, at, id.to_owned(), bd.to_owned())); + msg_buf.push((ts, at, id.to_owned(), bd)); } else { need_more = true; break; diff --git a/src/conn.rs b/src/conn.rs index 3b96556..8585694 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -42,7 +42,7 @@ use crate::commands::{identify, nop, rdy, sub, fin, auth, VERSION}; use crate::config::{Config, NsqdConfig}; use crate::error::Error; use crate::msgs::{ - Auth, Sub, Ready, Cls, + Auth, OnAuth, Sub, Ready, Cls, Resume, NsqBackoff, Fin, Msg, NsqMsg, AddHandler, InFlight}; use crate::auth::AuthResp; @@ -88,7 +88,6 @@ pub struct Connection { addr: String, handlers: Vec>, - info_handler: Box, info_hashmap: FnvHashMap>, topic: String, channel: String, @@ -108,7 +107,6 @@ impl Default for Connection fn default() -> Connection { Connection { handlers: Vec::new(), - info_handler: Box::new(()), info_hashmap: FnvHashMap::default(), topic: String::new(), channel: String::new(), @@ -169,7 +167,6 @@ impl Connection channel: channel.into(), state: ConnState::Neg, handlers: Vec::new(), - info_handler: Box::new(()), info_hashmap: FnvHashMap::default(), addr: addr.into(), rdy: rdy, @@ -179,21 +176,22 @@ impl Connection } } -//impl Connection { -// fn add_in_flight(&mut self, n: u32) { -// self.in_flight += 1; -// if let Some(info) = self.info_handler.downcast_ref::>() { -// match info.do_send(InFlight(self.in_flight)) { -// Ok(_) => { -// info!("inflight sent to handler"); -// }, -// Err(e) => { -// error!("Sending in_flight failed: {}", e); -// } -// } -// } -// } -//} +impl Connection { + fn info_in_flight(&self) { + if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::>()) { + if let Some(handler) = box_handler.downcast_ref::>() { + let _ = handler.do_send(InFlight(self.in_flight)); + } + } + } + fn info_on_auth(&self, resp: AuthResp) { + if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::>()) { + if let Some(handler) = box_handler.downcast_ref::>() { + let _ = handler.do_send(OnAuth(resp)); + } + } + } +} impl Actor for Connection { @@ -267,7 +265,7 @@ impl StreamHandler for Connection return ctx.stop(); } }; - info!("authentication [{}] {:#?}", self.addr, auth_resp); + info!("authenticated [{}] {:#?}", self.addr, auth_resp); ctx.notify(Sub); }, ConnState::Sub => { @@ -290,6 +288,8 @@ impl StreamHandler for Connection timestamp, attemps, id, body, }) { Ok(_s) => { + self.in_flight += 1; + self.info_in_flight(); }, Err(e) => { error!("error sending msg to reader: {}", e) } } @@ -385,9 +385,7 @@ impl Handler for Connection cell.write(fin(&msg.0)); } self.in_flight -= 1; - if let Some(info) = self.info_handler.downcast_ref::>() { - let _ = info.do_send(InFlight(self.in_flight)); - } + self.info_in_flight(); } } @@ -476,9 +474,9 @@ impl Handler> for Connection if msg_id == TypeId::of::>() { self.handlers.push(Box::new(msg.0)); info!("Reader added"); - } else if msg_id == TypeId::of::>() { + } else { self.info_hashmap.insert(msg_id, Box::new(msg.0)); - info!("inflight handler added"); + info!("info handler added"); } } } diff --git a/src/msgs.rs b/src/msgs.rs index 0457610..eb4d7fd 100644 --- a/src/msgs.rs +++ b/src/msgs.rs @@ -1,18 +1,18 @@ // MIT License -// +// // Copyright (c) 2019-2021 Alessandro Cresto Miseroglio // Copyright (c) 2019-2021 Tangram Technologies S.R.L. -// +// // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: -// +// // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. -// +// // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -25,6 +25,7 @@ use actix::prelude::*; use crate::codec::Cmd; use crate::error::Error; +use crate::auth::AuthResp; pub trait NsqMsg: Message + Send + 'static {} @@ -71,7 +72,7 @@ pub struct Msg /// Id of the message pub id: String, /// Data sent by nsqd - pub body: String, + pub body: Vec, } impl Default for Msg { @@ -80,7 +81,7 @@ impl Default for Msg { timestamp: 0, attemps: 0, id: "".to_owned(), - body: "".to_owned(), + body: Vec::new(), } } } @@ -205,6 +206,9 @@ pub struct Reqeue(pub String, u32); #[derive(Message, Clone)] pub struct Touch(pub String); +#[derive(Message, Clone)] +pub struct OnAuth(pub AuthResp); + #[derive(Message)] pub struct Cls;