Skip to content

Commit

Permalink
Add support for bytes messages
Browse files Browse the repository at this point in the history
  • Loading branch information
alex179ohm committed Feb 19, 2019
1 parent 794e9d4 commit cde89ce
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 46 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ description = """
Rust client for the NSQ realtime message processing system
"""
license = "MIT"
license-file = "LICENSE"
keywords = ["nsq", "queue", "actix", "async", "actor"]
categories = ["caching", "asynchronous"]
repository = "https://github.com/alex179ohm/nsq-client-rs"
Expand Down
31 changes: 17 additions & 14 deletions src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub enum Cmd {
ResponseError(String),

/// Message response.
ResponseMsg(Vec<(i64, u16, String, String)>),
ResponseMsg(Vec<(i64, u16, String, Vec<u8>)>),

/// A simple Command whitch not sends msg.
Command(String),
Expand All @@ -72,9 +72,9 @@ pub enum Cmd {
}

/// NSQ codec
pub struct NsqCodec {}
pub struct NsqCodec;

pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, String)> {
pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, Vec<u8>)> {
if buf.len() < 4 {
None
} else {
Expand All @@ -88,15 +88,18 @@ pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, String)> {
let _ = cursor.get_i32_be();
let timestamp = cursor.get_i64_be();
let attemps = cursor.get_u16_be();
if let Ok(id_body) = str::from_utf8(&cursor.bytes()[..size - HEADER_LENGTH - 6]) {
let (id, body) = id_body.split_at(16);
// clean the buffer at frame size
buf.split_to(size+4);
Some((timestamp, attemps, id.to_owned(), body.to_owned()))
} else {
error!("error deconding utf8 message frame");
None
}
let id_body_bytes = &cursor.bytes()[..size - HEADER_LENGTH - 6];
let (id_bytes, body_bytes) = id_body_bytes.split_at(16);
let id = match str::from_utf8(id_bytes) {
Ok(s) => s,
Err(e) => {
error!("error deconding utf8 id: {}", e);
return None;
},
};
// clean the buffer at frame size
buf.split_to(size+4);
Some((timestamp, attemps, id.to_owned(), Vec::from(body_bytes)))
}
}
}
Expand Down Expand Up @@ -189,12 +192,12 @@ impl Decoder for NsqCodec {
// it's a message
} else if frame_type == FRAME_TYPE_MESSAGE {
let mut resp_buf = buf.clone();
let mut msg_buf: Vec<(i64, u16, String, String)> = Vec::new();
let mut msg_buf: Vec<(i64, u16, String, Vec<u8>)> = Vec::new();
let mut need_more = false;
loop {
if resp_buf.is_empty() { break };
if let Some((ts, at, id, bd)) = decode_msg(&mut resp_buf) {
msg_buf.push((ts, at, id.to_owned(), bd.to_owned()));
msg_buf.push((ts, at, id.to_owned(), bd));
} else {
need_more = true;
break;
Expand Down
48 changes: 23 additions & 25 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::commands::{identify, nop, rdy, sub, fin, auth, VERSION};
use crate::config::{Config, NsqdConfig};
use crate::error::Error;
use crate::msgs::{
Auth, Sub, Ready, Cls,
Auth, OnAuth, Sub, Ready, Cls,
Resume, NsqBackoff, Fin, Msg,
NsqMsg, AddHandler, InFlight};
use crate::auth::AuthResp;
Expand Down Expand Up @@ -88,7 +88,6 @@ pub struct Connection
{
addr: String,
handlers: Vec<Box<Any>>,
info_handler: Box<Any>,
info_hashmap: FnvHashMap<TypeId, Box<Any>>,
topic: String,
channel: String,
Expand All @@ -108,7 +107,6 @@ impl Default for Connection
fn default() -> Connection {
Connection {
handlers: Vec::new(),
info_handler: Box::new(()),
info_hashmap: FnvHashMap::default(),
topic: String::new(),
channel: String::new(),
Expand Down Expand Up @@ -169,7 +167,6 @@ impl Connection
channel: channel.into(),
state: ConnState::Neg,
handlers: Vec::new(),
info_handler: Box::new(()),
info_hashmap: FnvHashMap::default(),
addr: addr.into(),
rdy: rdy,
Expand All @@ -179,21 +176,22 @@ impl Connection
}
}

//impl Connection {
// fn add_in_flight(&mut self, n: u32) {
// self.in_flight += 1;
// if let Some(info) = self.info_handler.downcast_ref::<Recipient<InFlight>>() {
// match info.do_send(InFlight(self.in_flight)) {
// Ok(_) => {
// info!("inflight sent to handler");
// },
// Err(e) => {
// error!("Sending in_flight failed: {}", e);
// }
// }
// }
// }
//}
impl Connection {
fn info_in_flight(&self) {
if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<InFlight>>()) {
if let Some(handler) = box_handler.downcast_ref::<Recipient<InFlight>>() {
let _ = handler.do_send(InFlight(self.in_flight));
}
}
}
fn info_on_auth(&self, resp: AuthResp) {
if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnAuth>>()) {
if let Some(handler) = box_handler.downcast_ref::<Recipient<OnAuth>>() {
let _ = handler.do_send(OnAuth(resp));
}
}
}
}

impl Actor for Connection
{
Expand Down Expand Up @@ -267,7 +265,7 @@ impl StreamHandler<Cmd, Error> for Connection
return ctx.stop();
}
};
info!("authentication [{}] {:#?}", self.addr, auth_resp);
info!("authenticated [{}] {:#?}", self.addr, auth_resp);
ctx.notify(Sub);
},
ConnState::Sub => {
Expand All @@ -290,6 +288,8 @@ impl StreamHandler<Cmd, Error> for Connection
timestamp, attemps, id, body,
}) {
Ok(_s) => {
self.in_flight += 1;
self.info_in_flight();
},
Err(e) => { error!("error sending msg to reader: {}", e) }
}
Expand Down Expand Up @@ -385,9 +385,7 @@ impl Handler<Fin> for Connection
cell.write(fin(&msg.0));
}
self.in_flight -= 1;
if let Some(info) = self.info_handler.downcast_ref::<Recipient<InFlight>>() {
let _ = info.do_send(InFlight(self.in_flight));
}
self.info_in_flight();
}
}

Expand Down Expand Up @@ -476,9 +474,9 @@ impl<M: NsqMsg> Handler<AddHandler<M>> for Connection
if msg_id == TypeId::of::<Recipient<Msg>>() {
self.handlers.push(Box::new(msg.0));
info!("Reader added");
} else if msg_id == TypeId::of::<Recipient<InFlight>>() {
} else {
self.info_hashmap.insert(msg_id, Box::new(msg.0));
info!("inflight handler added");
info!("info handler added");
}
}
}
Expand Down
16 changes: 10 additions & 6 deletions src/msgs.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// MIT License
//
//
// Copyright (c) 2019-2021 Alessandro Cresto Miseroglio <alex179ohm@gmail.com>
// Copyright (c) 2019-2021 Tangram Technologies S.R.L. <https://tngrm.io>
//
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Expand All @@ -25,6 +25,7 @@ use actix::prelude::*;

use crate::codec::Cmd;
use crate::error::Error;
use crate::auth::AuthResp;

pub trait NsqMsg: Message<Result = ()> + Send + 'static {}

Expand Down Expand Up @@ -71,7 +72,7 @@ pub struct Msg
/// Id of the message
pub id: String,
/// Data sent by nsqd
pub body: String,
pub body: Vec<u8>,
}

impl Default for Msg {
Expand All @@ -80,7 +81,7 @@ impl Default for Msg {
timestamp: 0,
attemps: 0,
id: "".to_owned(),
body: "".to_owned(),
body: Vec::new(),
}
}
}
Expand Down Expand Up @@ -205,6 +206,9 @@ pub struct Reqeue(pub String, u32);
#[derive(Message, Clone)]
pub struct Touch(pub String);

#[derive(Message, Clone)]
pub struct OnAuth(pub AuthResp);

#[derive(Message)]
pub struct Cls;

Expand Down

0 comments on commit cde89ce

Please sign in to comment.