Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Rust 2021, and auto-apply clippy lints #148

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ readme = "README.md"
license = "MIT"
description = "memcached client for rust"
keywords = ["memcache", "memcached", "driver", "cache", "database"]
edition = "2018"
edition = "2021"

[features]
default = ["tls"]
Expand Down
114 changes: 66 additions & 48 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ unsafe impl Send for Client {}
fn default_hash_function(key: &str) -> u64 {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
return hasher.finish();
hasher.finish()
}

pub(crate) fn check_key_len(key: &str) -> Result<(), MemcacheError> {
Expand All @@ -88,7 +88,7 @@ pub(crate) fn check_key_len(key: &str) -> Result<(), MemcacheError> {
impl Client {
#[deprecated(since = "0.10.0", note = "please use `connect` instead")]
pub fn new<C: Connectable>(target: C) -> Result<Self, MemcacheError> {
return Self::connect(target);
Self::connect(target)
}

pub fn builder() -> ClientBuilder {
Expand All @@ -102,7 +102,7 @@ impl Client {
let parsed = Url::parse(url.as_str())?;
let timeout = parsed
.query_pairs()
.find(|&(ref k, ref _v)| k == "connect_timeout")
.find(|(k, _v)| k == "connect_timeout")
.and_then(|(ref _k, ref v)| v.parse::<f64>().ok())
.map(Duration::from_secs_f64);
let builder = r2d2::Pool::builder().max_size(size);
Expand Down Expand Up @@ -131,9 +131,29 @@ impl Client {
Self::builder().add_server(target)?.build()
}

fn get_connection(&self, key: &str) -> Pool<ConnectionManager> {
fn get_connection(&self, key: &str) -> &Pool<ConnectionManager> {
let connections_count = self.connections.len();
return self.connections[(self.hash_function)(key) as usize % connections_count].clone();
&self.connections[(self.hash_function)(key) as usize % connections_count]
}

/// Distributes the input `keys` to the available `connections`.
///
/// This uses the `hash_function` internally, and the returned [`Vec`] matches
/// the available `connections`.
fn distribute_keys<'a>(&self, keys: &[&'a str]) -> Result<Vec<Vec<&'a str>>, MemcacheError> {
for key in keys {
check_key_len(key)?;
}

let connections_count = self.connections.len();
let mut con_keys = Vec::new();
con_keys.resize_with(connections_count, Vec::new);
for key in keys {
let connection_index = (self.hash_function)(key) as usize % connections_count;
con_keys[connection_index].push(*key);
}

Ok(con_keys)
}

/// Set the socket read timeout for TCP connections.
Expand All @@ -145,7 +165,7 @@ impl Client {
/// client.set_read_timeout(Some(::std::time::Duration::from_secs(3))).unwrap();
/// ```
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> Result<(), MemcacheError> {
for conn in self.connections.iter() {
for conn in &self.connections {
let mut conn = conn.get()?;
match **conn {
Protocol::Ascii(ref mut protocol) => protocol.stream().set_read_timeout(timeout)?,
Expand All @@ -164,7 +184,7 @@ impl Client {
/// client.set_write_timeout(Some(::std::time::Duration::from_secs(3))).unwrap();
/// ```
pub fn set_write_timeout(&self, timeout: Option<Duration>) -> Result<(), MemcacheError> {
for conn in self.connections.iter() {
for conn in &self.connections {
let mut conn = conn.get()?;
match **conn {
Protocol::Ascii(ref mut protocol) => protocol.stream().set_write_timeout(timeout)?,
Expand All @@ -184,7 +204,7 @@ impl Client {
/// ```
pub fn version(&self) -> Result<Vec<(String, String)>, MemcacheError> {
let mut result = Vec::with_capacity(self.connections.len());
for connection in self.connections.iter() {
for connection in &self.connections {
let mut connection = connection.get()?;
let url = connection.get_url();
result.push((url, connection.version()?));
Expand All @@ -201,10 +221,10 @@ impl Client {
/// client.flush().unwrap();
/// ```
pub fn flush(&self) -> Result<(), MemcacheError> {
for connection in self.connections.iter() {
for connection in &self.connections {
connection.get()?.flush()?;
}
return Ok(());
Ok(())
}

/// Flush all cache on memcached server with a delay seconds.
Expand All @@ -216,10 +236,10 @@ impl Client {
/// client.flush_with_delay(10).unwrap();
/// ```
pub fn flush_with_delay(&self, delay: u32) -> Result<(), MemcacheError> {
for connection in self.connections.iter() {
for connection in &self.connections {
connection.get()?.flush_with_delay(delay)?;
}
return Ok(());
Ok(())
}

/// Get a key from memcached server.
Expand All @@ -232,7 +252,7 @@ impl Client {
/// ```
pub fn get<V: FromMemcacheValueExt>(&self, key: &str) -> Result<Option<V>, MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.get(key);
self.get_connection(key).get()?.get(key)
}

/// Get multiple keys from memcached server. Using this function instead of calling `get` multiple times can reduce network workloads.
Expand All @@ -247,23 +267,15 @@ impl Client {
/// assert_eq!(result["foo"], "42");
/// ```
pub fn gets<V: FromMemcacheValueExt>(&self, keys: &[&str]) -> Result<HashMap<String, V>, MemcacheError> {
for key in keys {
check_key_len(key)?;
}
let mut con_keys: HashMap<usize, Vec<&str>> = HashMap::new();
let mut result: HashMap<String, V> = HashMap::new();
let connections_count = self.connections.len();
let distributed_keys = self.distribute_keys(keys)?;

for key in keys {
let connection_index = (self.hash_function)(key) as usize % connections_count;
let array = con_keys.entry(connection_index).or_insert_with(Vec::new);
array.push(key);
}
for (&connection_index, keys) in con_keys.iter() {
let connection = self.connections[connection_index].clone();
result.extend(connection.get()?.gets(keys)?);
let mut result: HashMap<String, V> = HashMap::new();
for (connection, keys) in self.connections.iter().zip(distributed_keys) {
if !keys.is_empty() {
result.extend(connection.get()?.gets(&keys)?);
}
}
return Ok(result);
Ok(result)
}

/// Set a key with associate value into memcached server with expiration seconds.
Expand All @@ -277,7 +289,7 @@ impl Client {
/// ```
pub fn set<V: ToMemcacheValue<Stream>>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.set(key, value, expiration);
self.get_connection(key).get()?.set(key, value, expiration)
}

/// Compare and swap a key with the associate value into memcached server with expiration seconds.
Expand Down Expand Up @@ -319,7 +331,7 @@ impl Client {
/// ```
pub fn add<V: ToMemcacheValue<Stream>>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.add(key, value, expiration);
self.get_connection(key).get()?.add(key, value, expiration)
}

/// Replace a key with associate value into memcached server with expiration seconds.
Expand All @@ -340,7 +352,7 @@ impl Client {
expiration: u32,
) -> Result<(), MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.replace(key, value, expiration);
self.get_connection(key).get()?.replace(key, value, expiration)
}

/// Append value to the key.
Expand All @@ -358,7 +370,7 @@ impl Client {
/// ```
pub fn append<V: ToMemcacheValue<Stream>>(&self, key: &str, value: V) -> Result<(), MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.append(key, value);
self.get_connection(key).get()?.append(key, value)
}

/// Prepend value to the key.
Expand All @@ -376,7 +388,7 @@ impl Client {
/// ```
pub fn prepend<V: ToMemcacheValue<Stream>>(&self, key: &str, value: V) -> Result<(), MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.prepend(key, value);
self.get_connection(key).get()?.prepend(key, value)
}

/// Delete a key from memcached server.
Expand All @@ -390,7 +402,7 @@ impl Client {
/// ```
pub fn delete(&self, key: &str) -> Result<bool, MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.delete(key);
self.get_connection(key).get()?.delete(key)
}

/// Increment the value with amount.
Expand All @@ -404,7 +416,7 @@ impl Client {
/// ```
pub fn increment(&self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.increment(key, amount);
self.get_connection(key).get()?.increment(key, amount)
}

/// Decrement the value with amount.
Expand All @@ -418,7 +430,7 @@ impl Client {
/// ```
pub fn decrement(&self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.decrement(key, amount);
self.get_connection(key).get()?.decrement(key, amount)
}

/// Set a new expiration time for a exist key.
Expand All @@ -434,7 +446,7 @@ impl Client {
/// ```
pub fn touch(&self, key: &str, expiration: u32) -> Result<bool, MemcacheError> {
check_key_len(key)?;
return self.get_connection(key).get()?.touch(key, expiration);
self.get_connection(key).get()?.touch(key, expiration)
}

/// Get all servers' statistics.
Expand All @@ -452,7 +464,7 @@ impl Client {
let url = connection.get_url();
result.push((url, stats_info));
}
return Ok(result);
Ok(result)
}
}

Expand All @@ -467,6 +479,12 @@ pub struct ClientBuilder {
hash_function: fn(&str) -> u64,
}

impl Default for ClientBuilder {
fn default() -> Self {
Self::new()
}
}

impl ClientBuilder {
/// Create an empty client builder.
pub fn new() -> Self {
Expand All @@ -486,7 +504,7 @@ impl ClientBuilder {
pub fn add_server<C: Connectable>(mut self, target: C) -> Result<Self, MemcacheError> {
let targets = target.get_urls();

if targets.len() == 0 {
if targets.is_empty() {
return Err(MemcacheError::BadURL("No servers specified".to_string()));
}

Expand Down Expand Up @@ -540,7 +558,7 @@ impl ClientBuilder {
pub fn build(self) -> Result<Client, MemcacheError> {
let urls = self.targets;

if urls.len() == 0 {
if urls.is_empty() {
return Err(MemcacheError::BadURL("No servers specified".to_string()));
}

Expand Down Expand Up @@ -572,7 +590,7 @@ impl ClientBuilder {

let connection = builder
.build(ConnectionManager::new(url))
.map_err(|e| MemcacheError::PoolError(e))?;
.map_err(MemcacheError::PoolError)?;

connections.push(connection);
}
Expand Down Expand Up @@ -600,7 +618,7 @@ mod tests {
.unwrap()
.build()
.unwrap();
assert!(client.version().unwrap()[0].1 != "");
assert!(!client.version().unwrap()[0].1.is_empty());
}

#[test]
Expand Down Expand Up @@ -710,14 +728,14 @@ mod tests {
#[test]
fn unix() {
let client = super::Client::connect("memcache:///tmp/memcached.sock").unwrap();
assert!(client.version().unwrap()[0].1 != "");
assert!(!client.version().unwrap()[0].1.is_empty());
}

#[cfg(feature = "tls")]
#[test]
fn ssl_noverify() {
let client = super::Client::connect("memcache+tls://localhost:12350?verify_mode=none").unwrap();
assert!(client.version().unwrap()[0].1 != "");
assert!(!client.version().unwrap()[0].1.is_empty());
}

#[cfg(feature = "tls")]
Expand All @@ -726,22 +744,22 @@ mod tests {
let client =
super::Client::connect("memcache+tls://localhost:12350?ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt")
.unwrap();
assert!(client.version().unwrap()[0].1 != "");
assert!(!client.version().unwrap()[0].1.is_empty());
}

#[cfg(feature = "tls")]
#[test]
fn ssl_client_certs() {
let client = super::Client::connect("memcache+tls://localhost:12351?key_path=tests/assets/client.key&cert_path=tests/assets/client.crt&ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt").unwrap();
assert!(client.version().unwrap()[0].1 != "");
assert!(!client.version().unwrap()[0].1.is_empty());
}

#[test]
fn delete() {
let client = super::Client::connect("memcache://localhost:12345").unwrap();
client.set("an_exists_key", "value", 0).unwrap();
assert_eq!(client.delete("an_exists_key").unwrap(), true);
assert_eq!(client.delete("a_not_exists_key").unwrap(), false);
assert!(client.delete("an_exists_key").unwrap());
assert!(!client.delete("a_not_exists_key").unwrap());
}

#[test]
Expand Down
Loading
Loading