Skip to content
This repository has been archived by the owner on Jun 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #57 from azuqua/feature/pattern-pubsub
Browse files Browse the repository at this point in the history
Feature/pattern pubsub
  • Loading branch information
alecembke-okta authored Jul 5, 2019
2 parents 44252d9 + e183ce0 commit f95bbe0
Show file tree
Hide file tree
Showing 19 changed files with 848 additions and 178 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "1.0.2"
version = "1.1.0"
authors = ["Alec Embke <aembke@gmail.com>"]
edition = "2018"
description = "A Redis client for Rust built on Futures and Tokio."
Expand All @@ -20,7 +20,7 @@ futures = "0.1"
parking_lot = "0.7.0"
lazy_static = "1.3.0"
bytes = "0.4.11"
redis-protocol = "0.1"
redis-protocol = "0.1.2"
log = "0.4.6"
pretty_env_logger = "0.3"
url = "1.7.2"
Expand All @@ -31,9 +31,9 @@ tokio-timer-patched = "0.1.3"
tokio-core = "0.1.17"
tokio-proto = "0.1.1"
tokio-io = "0.1.12"
rand = "0.3"

[dev-dependencies]
rand = "0.3"
hyper = "0.11"

[lib]
Expand Down
22 changes: 22 additions & 0 deletions src/borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ pub trait RedisClientBorrowed {

fn smembers<K: Into<RedisKey>>(&self, key: K) -> Box<Future<Item=Vec<RedisValue>, Error=RedisError>>;

fn psubscribe<K: Into<MultipleKeys>>(&self, patterns: K) -> Box<Future<Item=Vec<usize>, Error=RedisError>>;

fn punsubscribe<K: Into<MultipleKeys>>(&self, patterns: K) -> Box<Future<Item=Vec<usize>, Error=RedisError>>;

}


Expand Down Expand Up @@ -556,6 +560,24 @@ impl RedisClientBorrowed for RedisClient {
commands::smembers(&self.inner, key)
}

/// Subscribes the client to the given patterns.
///
/// Returns the subscription count for each of the provided patterns.
///
/// <https://redis.io/commands/psubscribe>
fn psubscribe<K: Into<MultipleKeys>>(&self, patterns: K) -> Box<Future<Item=Vec<usize>, Error=RedisError>> {
commands::psubscribe(&self.inner, patterns)
}

/// Unsubscribes the client from the given patterns, or from all of them if none is given.
///
/// Returns the subscription count for each of the provided patterns.
///
/// <https://redis.io/commands/punsubscribe>
fn punsubscribe<K: Into<MultipleKeys>>(&self, patterns: K) -> Box<Future<Item=Vec<usize>, Error=RedisError>> {
commands::punsubscribe(&self.inner, patterns)
}

}


Expand Down
53 changes: 49 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,20 @@ use crate::multiplexer::init;

use std::sync::atomic::AtomicUsize;

use std::borrow::Cow;
use std::mem;

const EMPTY_STR: &'static str = "";
const SPLIT_TIMEOUT_MS: u64 = 30_000;

#[macro_use]
use crate::utils;


#[doc(hidden)]
pub struct RedisClientInner {
/// The client ID as seen by the server.
pub id: RwLock<String>,
/// The state of the underlying connection.
pub state: RwLock<ClientState>,
/// The redis config used for initializing connections.
Expand Down Expand Up @@ -72,7 +79,32 @@ pub struct RedisClientInner {
/// A timer for handling timeouts and reconnection delays.
pub timer: Timer,
/// Command queue buffer size.
pub cmd_buffer_len: Arc<AtomicUsize>
pub cmd_buffer_len: Arc<AtomicUsize>,
/// Number of message redeliveries.
pub redeliver_count: Arc<AtomicUsize>
}

impl RedisClientInner {

pub fn log_client_name(&self, level: log::Level) -> Cow<'static, str> {
if log_enabled!(level) {
Cow::Owned(self.id.read().deref().to_owned())
}else{
Cow::Borrowed(EMPTY_STR)
}
}

pub fn change_client_name(&self, id: String) {
let mut guard = self.id.write();
let mut guard_ref = guard.deref_mut();

mem::replace(guard_ref, id);
}

pub fn client_name(&self) -> String {
self.id.read().deref().clone()
}

}

/// A Redis client struct.
Expand Down Expand Up @@ -105,8 +137,10 @@ impl RedisClient {
let latency = LatencyStats::default();
let req_size = SizeStats::default();
let res_size = SizeStats::default();
let init_id = format!("fred-{}", utils::random_string(10));

let inner = Arc::new(RedisClientInner {
id: RwLock::new(init_id),
config: RwLock::new(config),
policy: RwLock::new(None),
state: RwLock::new(state),
Expand All @@ -120,12 +154,23 @@ impl RedisClient {
req_size_stats: Arc::new(RwLock::new(req_size)),
res_size_stats: Arc::new(RwLock::new(res_size)),
timer: timer.unwrap_or(Timer::default()),
cmd_buffer_len: Arc::new(AtomicUsize::new(0))
cmd_buffer_len: Arc::new(AtomicUsize::new(0)),
redeliver_count: Arc::new(AtomicUsize::new(0))
});

RedisClient { inner }
}

/// Read the number of request redeliveries.
pub fn read_redelivery_count(&self) -> usize {
utils::read_atomic(&self.inner.redeliver_count)
}

/// Read and reset the number of request redeliveries.
pub fn take_redelivery_count(&self) -> usize {
utils::set_atomic(&self.inner.redeliver_count, 0)
}

/// Read the state of the underlying connection.
pub fn state(&self) -> ClientState {
self.inner.state.read().deref().clone()
Expand Down Expand Up @@ -173,7 +218,7 @@ impl RedisClient {
fry!(utils::check_client_state(&self.inner.state, ClientState::Disconnected));
fry!(utils::check_and_set_closed_flag(&self.inner.closed, false));

debug!("Connecting to Redis server.");
debug!("{} Connecting to Redis server.", n!(self.inner));

init::connect(handle, self.inner.clone())
}
Expand All @@ -194,7 +239,7 @@ impl RedisClient {
policy.reset_attempts();
utils::set_reconnect_policy(&self.inner.policy, policy);

debug!("Connecting to Redis server with reconnect policy.");
debug!("{} Connecting to Redis server with reconnect policy.", n!(self.inner));
init::connect(handle, self.inner.clone())
}

Expand Down
68 changes: 57 additions & 11 deletions src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::types::*;
use crate::protocol::types::RedisCommandKind;
use crate::protocol::types::{RedisCommandKind, ResponseKind};

use futures::{
Future,
Expand All @@ -23,20 +23,20 @@ use std::ops::{
};

use std::hash::Hash;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};

const ASYNC: &'static str = "ASYNC";

pub fn quit(inner: &Arc<RedisClientInner>) -> Box<Future<Item=(), Error=RedisError>> {
debug!("Closing Redis connection with Quit command.");
debug!("{} Closing Redis connection with Quit command.", n!(inner));

// need to lock the closed flag so any reconnect logic running in another thread doesn't screw this up,
// but we also don't want to hold the lock if the client is connected
let exit_early = {
let mut closed_guard = inner.closed.write();
let mut closed_ref = closed_guard.deref_mut();

debug!("Checking client state in quit command: {:?}", utils::read_client_state(&inner.state));
debug!("{} Checking client state in quit command: {:?}", n!(inner), utils::read_client_state(&inner.state));
if utils::read_client_state(&inner.state) != ClientState::Connected {
if *closed_ref {
// client is already waiting to quit
Expand All @@ -58,7 +58,7 @@ pub fn quit(inner: &Arc<RedisClientInner>) -> Box<Future<Item=(), Error=RedisErr
multiplexer_utils::close_connect_tx(&inner.connect_tx);

if exit_early {
debug!("Exit early in quit command.");
debug!("{} Exit early in quit command.", n!(inner));
utils::future_ok(())
}else{
Box::new(utils::request_response(&inner, || {
Expand Down Expand Up @@ -129,7 +129,7 @@ pub fn set<K: Into<RedisKey>, V: Into<RedisValue>>(inner: &Arc<RedisClientInner>
}

pub fn select(inner: &Arc<RedisClientInner>, db: u8) -> Box<Future<Item=(), Error=RedisError>> {
debug!("Selecting Redis database {}", db);
debug!("{} Selecting Redis database {}", n!(inner), db);

Box::new(utils::request_response(inner, || {
Ok((RedisCommandKind::Select, vec![RedisValue::from(db)]))
Expand Down Expand Up @@ -215,7 +215,7 @@ pub fn subscribe<T: Into<String>>(inner: &Arc<RedisClientInner>, channel: T) ->

pub fn unsubscribe<T: Into<String>>(inner: &Arc<RedisClientInner>, channel: T) -> Box<Future<Item=usize, Error=RedisError>> {
// note: if this ever changes to take in more than one channel then some additional work must be done
// in the multiplexer to associate mutliple responses with a single request
// in the multiplexer to associate multiple responses with a single request
let channel = channel.into();

Box::new(utils::request_response(inner, move || {
Expand Down Expand Up @@ -350,12 +350,13 @@ pub fn decrby<V: Into<RedisValue>, K: Into<RedisKey>>(inner: &Arc<RedisClientInn
}

pub fn ping(inner: &Arc<RedisClientInner>) -> Box<Future<Item=String, Error=RedisError>> {
debug!("Pinging Redis server.");
let inner = inner.clone();
debug!("{} Pinging Redis server.", n!(inner));

Box::new(utils::request_response(inner, move || {
Box::new(utils::request_response(&inner, move || {
Ok((RedisCommandKind::Ping, vec![]))
}).and_then(|frame| {
debug!("Received Redis ping response.");
}).and_then(move |frame| {
debug!("{} Received Redis ping response.", n!(inner));

match protocol_utils::frame_to_single_result(frame) {
Ok(resp) => {
Expand Down Expand Up @@ -452,6 +453,7 @@ pub fn client_getname(inner: &Arc<RedisClientInner>) -> Box<Future<Item=Option<S

pub fn client_setname<V: Into<String>>(inner: &Arc<RedisClientInner>, name: V) -> Box<Future<Item=Option<String>, Error=RedisError>> {
let name = name.into();
inner.change_client_name(name.clone());

Box::new(utils::request_response(inner, move || {
Ok((RedisCommandKind::ClientSetname, vec![name.into()]))
Expand Down Expand Up @@ -1073,4 +1075,48 @@ pub fn smembers<K: Into<RedisKey>> (inner: &Arc<RedisClientInner>, key: K) -> Bo
}).and_then(|frame| {
Ok(protocol_utils::frame_to_results(frame)?)
}))
}

pub fn psubscribe<K: Into<MultipleKeys>>(inner: &Arc<RedisClientInner>, patterns: K) -> Box<Future<Item=Vec<usize>, Error=RedisError>> {
let patterns = patterns.into().inner();

Box::new(utils::request_response(inner, move || {
let mut keys = Vec::with_capacity(patterns.len());

for pattern in patterns.into_iter() {
keys.push(pattern.into());
}

let kind = RedisCommandKind::Psubscribe(ResponseKind::Multiple {
count: keys.len(),
buffer: VecDeque::new()
});

Ok((kind, keys))
}).and_then(|frame| {
let result = protocol_utils::frame_to_results(frame)?;
utils::pattern_pubsub_counts(result)
}))
}

pub fn punsubscribe<K: Into<MultipleKeys>>(inner: &Arc<RedisClientInner>, patterns: K) -> Box<Future<Item=Vec<usize>, Error=RedisError>> {
let patterns = patterns.into().inner();

Box::new(utils::request_response(inner, move || {
let mut keys = Vec::with_capacity(patterns.len());

for pattern in patterns.into_iter() {
keys.push(pattern.into());
}

let kind = RedisCommandKind::Punsubscribe(ResponseKind::Multiple {
count: keys.len(),
buffer: VecDeque::new()
});

Ok((kind, keys))
}).and_then(|frame| {
let result = protocol_utils::frame_to_results(frame)?;
utils::pattern_pubsub_counts(result)
}))
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extern crate redis_protocol;
extern crate float_cmp;
extern crate tokio_timer_patched as tokio_timer;
extern crate tokio_io;
extern crate rand;

#[macro_use]
extern crate log;
Expand Down
Loading

0 comments on commit f95bbe0

Please sign in to comment.