From 5febfbfc5f4bce20160a8d6f4e1473587a1dd5e6 Mon Sep 17 00:00:00 2001 From: Yuji Nishida Date: Fri, 27 May 2022 13:58:27 +0000 Subject: [PATCH 1/4] Applied consistent hashing as a hash function instead of using default hasher to get connection index from a key. --- src/client.rs | 141 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 137 insertions(+), 4 deletions(-) diff --git a/src/client.rs b/src/client.rs index 9b0cb19..19be260 100644 --- a/src/client.rs +++ b/src/client.rs @@ -50,6 +50,18 @@ impl Connectable for Vec<&str> { pub struct Client { connections: Vec>, pub hash_function: fn(&str) -> u64, + continuum: Continuum +} + +#[derive(Clone)] +struct VNode { + position: u64, + connection_index: usize +} + +#[derive(Clone)] +pub struct Continuum { + vnodes : Vec } unsafe impl Send for Client {} @@ -60,6 +72,10 @@ fn default_hash_function(key: &str) -> u64 { return hasher.finish(); } +fn empty_hash_function(_key: &str) -> u64 { + return 0; +} + pub(crate) fn check_key_len(key: &str) -> Result<(), MemcacheError> { if key.len() > 250 { Err(ClientError::KeyTooLong)? @@ -67,32 +83,80 @@ pub(crate) fn check_key_len(key: &str) -> Result<(), MemcacheError> { Ok(()) } +impl Continuum { + fn bsearch_continuum_index(&self, hash: u64) -> usize { + let mut left=0; + let mut right=self.vnodes.len(); + let mut middle; + while left < right { + middle = (left+right)/2; + if self.vnodes[middle].position < hash { + left = middle+1; + }else{ + right = middle; + } + } + left // This can be continuum.vnodes.len() + } + + fn add_vnode(&mut self, vnodename: String, idx: usize) { + let hash = default_hash_function(&vnodename); + if self.vnodes.len() == 0 { + self.vnodes.push( VNode{ position:hash, connection_index:idx }); + }else{ + let cont_idx = self.bsearch_continuum_index(hash); + if cont_idx == self.vnodes.len() { + self.vnodes.push(VNode{ position:hash, connection_index:idx }); + } else { + self.vnodes.insert(cont_idx, VNode{ position:hash, connection_index:idx }); + } + } + } + + fn add_vnodes(&mut self, hostname: String, idx: usize) { + for n in 0..160 { + let vnodename = format!("{}-{}#{}",hostname,idx,n); + self.add_vnode(vnodename,idx); + } + } +} + impl Client { #[deprecated(since = "0.10.0", note = "please use `connect` instead")] pub fn new(target: C) -> Result { return Self::connect(target); } + fn get_connections_index(&self, hash: u64) -> usize { + let mut idx = self.continuum.bsearch_continuum_index(hash); + if idx == self.continuum.vnodes.len() { idx = 0; } + self.continuum.vnodes[idx].connection_index + } + pub fn with_pool_size(target: C, size: u32) -> Result { let urls = target.get_urls(); let mut connections = vec![]; + let mut continuum = Continuum{ vnodes: Vec::new() }; for url in urls { let parsed = Url::parse(url.as_str())?; let pool = r2d2::Pool::builder() .max_size(size) .build(ConnectionManager::new(parsed))?; connections.push(pool); + continuum.add_vnodes(url.to_string(),connections.len()-1); } Ok(Client { connections, - hash_function: default_hash_function, + hash_function: empty_hash_function, + continuum: continuum, }) } pub fn with_pool(pool: Pool) -> Result { Ok(Client { connections: vec![pool], - hash_function: default_hash_function, + hash_function: empty_hash_function, + continuum: Continuum{ vnodes: Vec::new() } }) } @@ -102,7 +166,13 @@ impl Client { fn get_connection(&self, key: &str) -> Pool { let connections_count = self.connections.len(); - return self.connections[(self.hash_function)(key) as usize % connections_count].clone(); + // XXX: This may not be deterministic so ideally need to hash the whole fuction itself + if self.hash_function as isize == empty_hash_function as isize { + let connection_index = self.get_connections_index(default_hash_function(key)); + self.connections[connection_index as usize].clone() + } else { + self.connections[(self.hash_function)(key) as usize % connections_count].clone() + } } /// Set the socket read timeout for TCP connections. @@ -224,10 +294,17 @@ impl Client { let connections_count = self.connections.len(); for key in keys { - let connection_index = (self.hash_function)(key) as usize % connections_count; + let connection_index; + // XXX: This may not be deterministic so ideally need to hash the whole fuction itself + if self.hash_function as isize == empty_hash_function as isize { + connection_index = self.get_connections_index(default_hash_function(key)); + } else { + connection_index = (self.hash_function)(key) as usize % connections_count; + } let array = con_keys.entry(connection_index).or_insert_with(Vec::new); array.push(key); } + for (&connection_index, keys) in con_keys.iter() { let connection = self.connections[connection_index].clone(); result.extend(connection.get()?.gets(keys)?); @@ -472,4 +549,60 @@ mod tests { client.set("counter", 321, 0).unwrap(); assert_eq!(client.increment("counter", 123).unwrap(), 444); } + + #[test] + fn test_bsearch() { + let mut continuum = super::Continuum{vnodes: Vec::new()}; + + continuum.vnodes.push(super::VNode{position:10,connection_index:1}); + continuum.vnodes.push(super::VNode{position:20,connection_index:2}); + continuum.vnodes.push(super::VNode{position:30,connection_index:3}); + continuum.vnodes.push(super::VNode{position:40,connection_index:2}); + + assert_eq!(0, continuum.bsearch_continuum_index(5)); + assert_eq!(1, continuum.vnodes[continuum.bsearch_continuum_index(5)].connection_index); + assert_eq!(1, continuum.bsearch_continuum_index(15)); + assert_eq!(2, continuum.vnodes[continuum.bsearch_continuum_index(15)].connection_index); + assert_eq!(2, continuum.bsearch_continuum_index(30)); + assert_eq!(3, continuum.vnodes[continuum.bsearch_continuum_index(30)].connection_index); + assert_eq!(3, continuum.bsearch_continuum_index(31)); + assert_eq!(2, continuum.vnodes[continuum.bsearch_continuum_index(31)].connection_index); + + assert_eq!(4, continuum.bsearch_continuum_index(41)); + assert_eq!(4, continuum.bsearch_continuum_index(65535)); + + continuum.vnodes.insert(0,super::VNode{position:1, connection_index:3}); + continuum.vnodes.insert(2,super::VNode{position:19, connection_index:1}); + continuum.vnodes.insert(6,super::VNode{position:50, connection_index:4}); + + assert_eq!(0, continuum.bsearch_continuum_index(0)); + assert_eq!(3, continuum.vnodes[continuum.bsearch_continuum_index(0)].connection_index); + assert_eq!(2, continuum.bsearch_continuum_index(15)); + assert_eq!(1, continuum.vnodes[continuum.bsearch_continuum_index(15)].connection_index); + assert_eq!(5, continuum.bsearch_continuum_index(31)); + assert_eq!(2, continuum.vnodes[continuum.bsearch_continuum_index(31)].connection_index); + assert_eq!(6, continuum.bsearch_continuum_index(41)); + assert_eq!(4, continuum.vnodes[continuum.bsearch_continuum_index(41)].connection_index); + + assert_eq!(7, continuum.bsearch_continuum_index(51)); + } + + #[test] + fn test_key_distribution() { + let mut servers = Vec::new(); + for i in 0..5 { + servers.push(format!("memcache://localhost:{}",12345+i)); + } + let client = super::Client::connect(servers).unwrap(); + let mut map = super::HashMap::::new(); + for i in 1..10000 { + let key = super::default_hash_function(&format!("key{}",i)); + let idx = client.get_connections_index(key); + *map.entry(idx).or_insert(1) += 1; + } + for (_k,v) in map { + // Each server should contain at least 75% of expected # of keys + assert!( ((10000/5) as f64) * 0.75 < v as f64 ); + } + } } From 65c7156f906b9da85907999593f91bf4e05e6140 Mon Sep 17 00:00:00 2001 From: AN Long Date: Thu, 9 Nov 2023 21:13:25 +0800 Subject: [PATCH 2/4] Update src/client.rs --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 19be260..7353c92 100644 --- a/src/client.rs +++ b/src/client.rs @@ -166,7 +166,7 @@ impl Client { fn get_connection(&self, key: &str) -> Pool { let connections_count = self.connections.len(); - // XXX: This may not be deterministic so ideally need to hash the whole fuction itself + // XXX: This may not be deterministic so ideally need to hash the whole function itself if self.hash_function as isize == empty_hash_function as isize { let connection_index = self.get_connections_index(default_hash_function(key)); self.connections[connection_index as usize].clone() From 167e695f8c9dfbaea0b66e99ce3ba99b8d230368 Mon Sep 17 00:00:00 2001 From: AN Long Date: Thu, 9 Nov 2023 21:13:30 +0800 Subject: [PATCH 3/4] Update src/client.rs --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 7353c92..8176c87 100644 --- a/src/client.rs +++ b/src/client.rs @@ -295,7 +295,7 @@ impl Client { for key in keys { let connection_index; - // XXX: This may not be deterministic so ideally need to hash the whole fuction itself + // XXX: This may not be deterministic so ideally need to hash the whole function itself if self.hash_function as isize == empty_hash_function as isize { connection_index = self.get_connections_index(default_hash_function(key)); } else { From 6f411ca4d44b30fafb9a061bba52c369b682ccce Mon Sep 17 00:00:00 2001 From: AN Long Date: Thu, 9 Nov 2023 21:38:42 +0800 Subject: [PATCH 4/4] format the codes --- src/client.rs | 146 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 107 insertions(+), 39 deletions(-) diff --git a/src/client.rs b/src/client.rs index 8176c87..fede63d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -50,18 +50,18 @@ impl Connectable for Vec<&str> { pub struct Client { connections: Vec>, pub hash_function: fn(&str) -> u64, - continuum: Continuum + continuum: Continuum, } #[derive(Clone)] struct VNode { position: u64, - connection_index: usize + connection_index: usize, } #[derive(Clone)] pub struct Continuum { - vnodes : Vec + vnodes: Vec, } unsafe impl Send for Client {} @@ -85,14 +85,14 @@ pub(crate) fn check_key_len(key: &str) -> Result<(), MemcacheError> { impl Continuum { fn bsearch_continuum_index(&self, hash: u64) -> usize { - let mut left=0; - let mut right=self.vnodes.len(); + let mut left = 0; + let mut right = self.vnodes.len(); let mut middle; while left < right { - middle = (left+right)/2; + middle = (left + right) / 2; if self.vnodes[middle].position < hash { - left = middle+1; - }else{ + left = middle + 1; + } else { right = middle; } } @@ -102,21 +102,33 @@ impl Continuum { fn add_vnode(&mut self, vnodename: String, idx: usize) { let hash = default_hash_function(&vnodename); if self.vnodes.len() == 0 { - self.vnodes.push( VNode{ position:hash, connection_index:idx }); - }else{ + self.vnodes.push(VNode { + position: hash, + connection_index: idx, + }); + } else { let cont_idx = self.bsearch_continuum_index(hash); if cont_idx == self.vnodes.len() { - self.vnodes.push(VNode{ position:hash, connection_index:idx }); + self.vnodes.push(VNode { + position: hash, + connection_index: idx, + }); } else { - self.vnodes.insert(cont_idx, VNode{ position:hash, connection_index:idx }); + self.vnodes.insert( + cont_idx, + VNode { + position: hash, + connection_index: idx, + }, + ); } } } fn add_vnodes(&mut self, hostname: String, idx: usize) { for n in 0..160 { - let vnodename = format!("{}-{}#{}",hostname,idx,n); - self.add_vnode(vnodename,idx); + let vnodename = format!("{}-{}#{}", hostname, idx, n); + self.add_vnode(vnodename, idx); } } } @@ -129,21 +141,23 @@ impl Client { fn get_connections_index(&self, hash: u64) -> usize { let mut idx = self.continuum.bsearch_continuum_index(hash); - if idx == self.continuum.vnodes.len() { idx = 0; } + if idx == self.continuum.vnodes.len() { + idx = 0; + } self.continuum.vnodes[idx].connection_index } pub fn with_pool_size(target: C, size: u32) -> Result { let urls = target.get_urls(); let mut connections = vec![]; - let mut continuum = Continuum{ vnodes: Vec::new() }; + let mut continuum = Continuum { vnodes: Vec::new() }; for url in urls { let parsed = Url::parse(url.as_str())?; let pool = r2d2::Pool::builder() .max_size(size) .build(ConnectionManager::new(parsed))?; connections.push(pool); - continuum.add_vnodes(url.to_string(),connections.len()-1); + continuum.add_vnodes(url.to_string(), connections.len() - 1); } Ok(Client { connections, @@ -156,7 +170,7 @@ impl Client { Ok(Client { connections: vec![pool], hash_function: empty_hash_function, - continuum: Continuum{ vnodes: Vec::new() } + continuum: Continuum { vnodes: Vec::new() }, }) } @@ -552,37 +566,91 @@ mod tests { #[test] fn test_bsearch() { - let mut continuum = super::Continuum{vnodes: Vec::new()}; - - continuum.vnodes.push(super::VNode{position:10,connection_index:1}); - continuum.vnodes.push(super::VNode{position:20,connection_index:2}); - continuum.vnodes.push(super::VNode{position:30,connection_index:3}); - continuum.vnodes.push(super::VNode{position:40,connection_index:2}); + let mut continuum = super::Continuum { vnodes: Vec::new() }; + + continuum.vnodes.push(super::VNode { + position: 10, + connection_index: 1, + }); + continuum.vnodes.push(super::VNode { + position: 20, + connection_index: 2, + }); + continuum.vnodes.push(super::VNode { + position: 30, + connection_index: 3, + }); + continuum.vnodes.push(super::VNode { + position: 40, + connection_index: 2, + }); assert_eq!(0, continuum.bsearch_continuum_index(5)); - assert_eq!(1, continuum.vnodes[continuum.bsearch_continuum_index(5)].connection_index); + assert_eq!( + 1, + continuum.vnodes[continuum.bsearch_continuum_index(5)].connection_index + ); assert_eq!(1, continuum.bsearch_continuum_index(15)); - assert_eq!(2, continuum.vnodes[continuum.bsearch_continuum_index(15)].connection_index); + assert_eq!( + 2, + continuum.vnodes[continuum.bsearch_continuum_index(15)].connection_index + ); assert_eq!(2, continuum.bsearch_continuum_index(30)); - assert_eq!(3, continuum.vnodes[continuum.bsearch_continuum_index(30)].connection_index); + assert_eq!( + 3, + continuum.vnodes[continuum.bsearch_continuum_index(30)].connection_index + ); assert_eq!(3, continuum.bsearch_continuum_index(31)); - assert_eq!(2, continuum.vnodes[continuum.bsearch_continuum_index(31)].connection_index); + assert_eq!( + 2, + continuum.vnodes[continuum.bsearch_continuum_index(31)].connection_index + ); assert_eq!(4, continuum.bsearch_continuum_index(41)); assert_eq!(4, continuum.bsearch_continuum_index(65535)); - continuum.vnodes.insert(0,super::VNode{position:1, connection_index:3}); - continuum.vnodes.insert(2,super::VNode{position:19, connection_index:1}); - continuum.vnodes.insert(6,super::VNode{position:50, connection_index:4}); + continuum.vnodes.insert( + 0, + super::VNode { + position: 1, + connection_index: 3, + }, + ); + continuum.vnodes.insert( + 2, + super::VNode { + position: 19, + connection_index: 1, + }, + ); + continuum.vnodes.insert( + 6, + super::VNode { + position: 50, + connection_index: 4, + }, + ); assert_eq!(0, continuum.bsearch_continuum_index(0)); - assert_eq!(3, continuum.vnodes[continuum.bsearch_continuum_index(0)].connection_index); + assert_eq!( + 3, + continuum.vnodes[continuum.bsearch_continuum_index(0)].connection_index + ); assert_eq!(2, continuum.bsearch_continuum_index(15)); - assert_eq!(1, continuum.vnodes[continuum.bsearch_continuum_index(15)].connection_index); + assert_eq!( + 1, + continuum.vnodes[continuum.bsearch_continuum_index(15)].connection_index + ); assert_eq!(5, continuum.bsearch_continuum_index(31)); - assert_eq!(2, continuum.vnodes[continuum.bsearch_continuum_index(31)].connection_index); + assert_eq!( + 2, + continuum.vnodes[continuum.bsearch_continuum_index(31)].connection_index + ); assert_eq!(6, continuum.bsearch_continuum_index(41)); - assert_eq!(4, continuum.vnodes[continuum.bsearch_continuum_index(41)].connection_index); + assert_eq!( + 4, + continuum.vnodes[continuum.bsearch_continuum_index(41)].connection_index + ); assert_eq!(7, continuum.bsearch_continuum_index(51)); } @@ -591,18 +659,18 @@ mod tests { fn test_key_distribution() { let mut servers = Vec::new(); for i in 0..5 { - servers.push(format!("memcache://localhost:{}",12345+i)); + servers.push(format!("memcache://localhost:{}", 12345 + i)); } let client = super::Client::connect(servers).unwrap(); let mut map = super::HashMap::::new(); for i in 1..10000 { - let key = super::default_hash_function(&format!("key{}",i)); + let key = super::default_hash_function(&format!("key{}", i)); let idx = client.get_connections_index(key); *map.entry(idx).or_insert(1) += 1; } - for (_k,v) in map { + for (_k, v) in map { // Each server should contain at least 75% of expected # of keys - assert!( ((10000/5) as f64) * 0.75 < v as f64 ); + assert!(((10000 / 5) as f64) * 0.75 < v as f64); } } }