Skip to content
This repository has been archived by the owner on Jun 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #59 from azuqua/fix/elasticache-individual-hash-slots
Browse files Browse the repository at this point in the history
Fix/elasticache individual hash slots
  • Loading branch information
alecembke-okta authored Aug 16, 2019
2 parents c4d0b9c + 19ca608 commit 813c5ab
Show file tree
Hide file tree
Showing 2 changed files with 356 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/protocol/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub use redis_protocol::{
use crate::multiplexer::types::SplitCommand;
use futures::sync::mpsc::UnboundedSender;
use std::collections::VecDeque;
use std::rc::Rc;

#[derive(Clone)]
pub enum ResponseKind {
Expand Down
365 changes: 355 additions & 10 deletions src/protocol/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,31 @@ pub fn parse_cluster_nodes(status: String) -> Result<HashMap<String, Vec<SlotRan
for slot in parts[8..].iter() {
let inner_parts: Vec<&str> = slot.split("-").collect();

if inner_parts.len() < 2 {
if inner_parts.len() == 1 {
// looking at an individual slot

slots.push(SlotRange {
start: inner_parts[0].parse::<u16>()?,
end: inner_parts[0].parse::<u16>()?,
server: server.to_owned(),
id: id.clone(),
slaves: None
});
}else if inner_parts.len() == 2 {
// looking at a slot range

slots.push(SlotRange {
start: inner_parts[0].parse::<u16>()?,
end: inner_parts[1].parse::<u16>()?,
server: server.to_owned(),
id: id.clone(),
slaves: None
});
}else{
return Err(RedisError::new(
RedisErrorKind::ProtocolError, format!("Invalid cluster node hash slot range {}.", slot)
RedisErrorKind::ProtocolError, format!("Invalid redis hash slot range: {}", slot)
));
}

slots.push(SlotRange {
start: inner_parts[0].parse::<u16>()?,
end: inner_parts[1].parse::<u16>()?,
server: server.to_owned(),
id: id.clone(),
slaves: None
});
}

out.insert(server.clone(), slots);
Expand Down Expand Up @@ -414,6 +426,339 @@ mod tests {
use crate::protocol::types::*;
use std::collections::HashMap;

#[test]
fn should_parse_cluster_node_status_individual_slot() {
let status = "2edc9a62355eacff9376c4e09643e2c932b0356a foo.use2.cache.amazonaws.com:6379@1122 master - 0 1565908731456 2950 connected 1242-1696 8195-8245 8247-8423 10923-12287
db2fd89f83daa5fe49110ef760794f9ccee07d06 bar.use2.cache.amazonaws.com:6379@1122 master - 0 1565908731000 2952 connected 332-1241 8152-8194 8424-8439 9203-10112 12288-12346 12576-12685
d9aeabb1525e5656c98545a0ed42c8c99bbacae1 baz.use2.cache.amazonaws.com:6379@1122 master - 0 1565908729402 2956 connected 1697 1815-2291 3657-4089 5861-6770 7531-7713 13154-13197
5671f02def98d0279224f717aba0f95874e5fb89 wibble.use2.cache.amazonaws.com:6379@1122 master - 0 1565908728391 2953 connected 7900-8125 12427 13198-13760 15126-16383
0b1923e386f6f6f3adc1b6deb250ef08f937e9b5 wobble.use2.cache.amazonaws.com:6379@1122 master - 0 1565908731000 2954 connected 5462-5860 6771-7382 8133-8151 10113-10922 12686-12893
1c5d99e3d6fca2090d0903d61d4e51594f6dcc05 qux.use2.cache.amazonaws.com:6379@1122 master - 0 1565908732462 2949 connected 2292-3656 7383-7530 8896-9202 12347-12426 12428-12575
b8553a4fae8ae99fca716d423b14875ebb10fefe quux.use2.cache.amazonaws.com:6379@1122 master - 0 1565908730439 2951 connected 8246 8440-8895 12919-13144 13761-15125
4a58ba550f37208c9a9909986ce808cdb058e31f quuz.use2.cache.amazonaws.com:6379@1122 myself,master - 0 1565908730000 2955 connected 0-331 1698-1814 4090-5461 7714-7899 8126-8132 12894-12918 13145-13153";

let mut expected: HashMap<String, Vec<SlotRange>> = HashMap::new();
expected.insert("foo.use2.cache.amazonaws.com:6379".into(), vec![
SlotRange {
start: 1242,
end: 1696,
server: "foo.use2.cache.amazonaws.com:6379".into(),
id: "2edc9a62355eacff9376c4e09643e2c932b0356a".into(),
slaves: None,
},
SlotRange {
start: 8195,
end: 8245,
server: "foo.use2.cache.amazonaws.com:6379".into(),
id: "2edc9a62355eacff9376c4e09643e2c932b0356a".into(),
slaves: None,
},
SlotRange {
start: 8247,
end: 8423,
server: "foo.use2.cache.amazonaws.com:6379".into(),
id: "2edc9a62355eacff9376c4e09643e2c932b0356a".into(),
slaves: None,
},
SlotRange {
start: 10923,
end: 12287,
server: "foo.use2.cache.amazonaws.com:6379".into(),
id: "2edc9a62355eacff9376c4e09643e2c932b0356a".into(),
slaves: None,
}
]);
expected.insert("bar.use2.cache.amazonaws.com:6379".into(), vec![
SlotRange {
start: 332,
end: 1241,
server: "bar.use2.cache.amazonaws.com:6379".into(),
id: "db2fd89f83daa5fe49110ef760794f9ccee07d06".into(),
slaves: None,
},
SlotRange {
start: 8152,
end: 8194,
server: "bar.use2.cache.amazonaws.com:6379".into(),
id: "db2fd89f83daa5fe49110ef760794f9ccee07d06".into(),
slaves: None,
},
SlotRange {
start: 8424,
end: 8439,
server: "bar.use2.cache.amazonaws.com:6379".into(),
id: "db2fd89f83daa5fe49110ef760794f9ccee07d06".into(),
slaves: None,
},
SlotRange {
start: 9203,
end: 10112,
server: "bar.use2.cache.amazonaws.com:6379".into(),
id: "db2fd89f83daa5fe49110ef760794f9ccee07d06".into(),
slaves: None,
},
SlotRange {
start: 12288,
end: 12346,
server: "bar.use2.cache.amazonaws.com:6379".into(),
id: "db2fd89f83daa5fe49110ef760794f9ccee07d06".into(),
slaves: None,
},
SlotRange {
start: 12576,
end: 12685,
server: "bar.use2.cache.amazonaws.com:6379".into(),
id: "db2fd89f83daa5fe49110ef760794f9ccee07d06".into(),
slaves: None,
}
]);
expected.insert("baz.use2.cache.amazonaws.com:6379".into(), vec![
SlotRange {
start: 1697,
end: 1697,
server: "baz.use2.cache.amazonaws.com:6379".into(),
id: "d9aeabb1525e5656c98545a0ed42c8c99bbacae1".into(),
slaves: None,
},
SlotRange {
start: 1815,
end: 2291,
server: "baz.use2.cache.amazonaws.com:6379".into(),
id: "d9aeabb1525e5656c98545a0ed42c8c99bbacae1".into(),
slaves: None,
},
SlotRange {
start: 3657,
end: 4089,
server: "baz.use2.cache.amazonaws.com:6379".into(),
id: "d9aeabb1525e5656c98545a0ed42c8c99bbacae1".into(),
slaves: None,
},
SlotRange {
start: 5861,
end: 6770,
server: "baz.use2.cache.amazonaws.com:6379".into(),
id: "d9aeabb1525e5656c98545a0ed42c8c99bbacae1".into(),
slaves: None,
},
SlotRange {
start: 7531,
end: 7713,
server: "baz.use2.cache.amazonaws.com:6379".into(),
id: "d9aeabb1525e5656c98545a0ed42c8c99bbacae1".into(),
slaves: None,
},
SlotRange {
start: 13154,
end: 13197,
server: "baz.use2.cache.amazonaws.com:6379".into(),
id: "d9aeabb1525e5656c98545a0ed42c8c99bbacae1".into(),
slaves: None,
}
]);
expected.insert("wibble.use2.cache.amazonaws.com:6379".into(), vec![
SlotRange {
start: 7900,
end: 8125,
server: "wibble.use2.cache.amazonaws.com:6379".into(),
id: "5671f02def98d0279224f717aba0f95874e5fb89".into(),
slaves: None,
},
SlotRange {
start: 12427,
end: 12427,
server: "wibble.use2.cache.amazonaws.com:6379".into(),
id: "5671f02def98d0279224f717aba0f95874e5fb89".into(),
slaves: None,
},
SlotRange {
start: 13198,
end: 13760,
server: "wibble.use2.cache.amazonaws.com:6379".into(),
id: "5671f02def98d0279224f717aba0f95874e5fb89".into(),
slaves: None,
},
SlotRange {
start: 15126,
end: 16383,
server: "wibble.use2.cache.amazonaws.com:6379".into(),
id: "5671f02def98d0279224f717aba0f95874e5fb89".into(),
slaves: None,
}
]);
expected.insert("wobble.use2.cache.amazonaws.com:6379".into(), vec![
SlotRange {
start: 5462,
end: 5860,
server: "wobble.use2.cache.amazonaws.com:6379".into(),
id: "0b1923e386f6f6f3adc1b6deb250ef08f937e9b5".into(),
slaves: None,
},
SlotRange {
start: 6771,
end: 7382,
server: "wobble.use2.cache.amazonaws.com:6379".into(),
id: "0b1923e386f6f6f3adc1b6deb250ef08f937e9b5".into(),
slaves: None,
},
SlotRange {
start: 8133,
end: 8151,
server: "wobble.use2.cache.amazonaws.com:6379".into(),
id: "0b1923e386f6f6f3adc1b6deb250ef08f937e9b5".into(),
slaves: None,
},
SlotRange {
start: 10113,
end: 10922,
server: "wobble.use2.cache.amazonaws.com:6379".into(),
id: "0b1923e386f6f6f3adc1b6deb250ef08f937e9b5".into(),
slaves: None,
},
SlotRange {
start: 12686,
end: 12893,
server: "wobble.use2.cache.amazonaws.com:6379".into(),
id: "0b1923e386f6f6f3adc1b6deb250ef08f937e9b5".into(),
slaves: None,
}
]);
expected.insert("qux.use2.cache.amazonaws.com:6379".into(), vec![
SlotRange {
start: 2292,
end: 3656,
server: "qux.use2.cache.amazonaws.com:6379".into(),
id: "1c5d99e3d6fca2090d0903d61d4e51594f6dcc05".into(),
slaves: None,
},
SlotRange {
start: 7383,
end: 7530,
server: "qux.use2.cache.amazonaws.com:6379".into(),
id: "1c5d99e3d6fca2090d0903d61d4e51594f6dcc05".into(),
slaves: None,
},
SlotRange {
start: 8896,
end: 9202,
server: "qux.use2.cache.amazonaws.com:6379".into(),
id: "1c5d99e3d6fca2090d0903d61d4e51594f6dcc05".into(),
slaves: None,
},
SlotRange {
start: 12347,
end: 12426,
server: "qux.use2.cache.amazonaws.com:6379".into(),
id: "1c5d99e3d6fca2090d0903d61d4e51594f6dcc05".into(),
slaves: None,
},
SlotRange {
start: 12428,
end: 12575,
server: "qux.use2.cache.amazonaws.com:6379".into(),
id: "1c5d99e3d6fca2090d0903d61d4e51594f6dcc05".into(),
slaves: None,
}
]);
expected.insert("quux.use2.cache.amazonaws.com:6379".into(), vec![
SlotRange {
start: 8246,
end: 8246,
server: "quux.use2.cache.amazonaws.com:6379".into(),
id: "b8553a4fae8ae99fca716d423b14875ebb10fefe".into(),
slaves: None,
},
SlotRange {
start: 8440,
end: 8895,
server: "quux.use2.cache.amazonaws.com:6379".into(),
id: "b8553a4fae8ae99fca716d423b14875ebb10fefe".into(),
slaves: None,
},
SlotRange {
start: 12919,
end: 13144,
server: "quux.use2.cache.amazonaws.com:6379".into(),
id: "b8553a4fae8ae99fca716d423b14875ebb10fefe".into(),
slaves: None,
},
SlotRange {
start: 13761,
end: 15125,
server: "quux.use2.cache.amazonaws.com:6379".into(),
id: "b8553a4fae8ae99fca716d423b14875ebb10fefe".into(),
slaves: None,
}
]);
expected.insert("quuz.use2.cache.amazonaws.com:6379".into(), vec![
SlotRange {
start: 0,
end: 331,
server: "quuz.use2.cache.amazonaws.com:6379".into(),
id: "4a58ba550f37208c9a9909986ce808cdb058e31f".into(),
slaves: None,
},
SlotRange {
start: 1698,
end: 1814,
server: "quuz.use2.cache.amazonaws.com:6379".into(),
id: "4a58ba550f37208c9a9909986ce808cdb058e31f".into(),
slaves: None,
},
SlotRange {
start: 4090,
end: 5461,
server: "quuz.use2.cache.amazonaws.com:6379".into(),
id: "4a58ba550f37208c9a9909986ce808cdb058e31f".into(),
slaves: None,
},
SlotRange {
start: 7714,
end: 7899,
server: "quuz.use2.cache.amazonaws.com:6379".into(),
id: "4a58ba550f37208c9a9909986ce808cdb058e31f".into(),
slaves: None,
},
SlotRange {
start: 8126,
end: 8132,
server: "quuz.use2.cache.amazonaws.com:6379".into(),
id: "4a58ba550f37208c9a9909986ce808cdb058e31f".into(),
slaves: None,
},
SlotRange {
start: 12894,
end: 12918,
server: "quuz.use2.cache.amazonaws.com:6379".into(),
id: "4a58ba550f37208c9a9909986ce808cdb058e31f".into(),
slaves: None,
},
SlotRange {
start: 13145,
end: 13153,
server: "quuz.use2.cache.amazonaws.com:6379".into(),
id: "4a58ba550f37208c9a9909986ce808cdb058e31f".into(),
slaves: None,
}
]);

let actual = match parse_cluster_nodes(status.to_owned()) {
Ok(h) => h,
Err(e) => panic!("{}", e)
};
assert_eq!(actual, expected);

let cache = ClusterKeyCache::new(Some(status.to_owned())).expect("Failed to build cluster cache");
let slot = cache.get_server(8246).unwrap();
assert_eq!(slot.server, "quux.use2.cache.amazonaws.com:6379".to_owned());
let slot = cache.get_server(1697).unwrap();
assert_eq!(slot.server, "baz.use2.cache.amazonaws.com:6379".to_owned());
let slot = cache.get_server(12427).unwrap();
assert_eq!(slot.server, "wibble.use2.cache.amazonaws.com:6379".to_owned());
let slot = cache.get_server(8445).unwrap();
assert_eq!(slot.server, "quux.use2.cache.amazonaws.com:6379".to_owned());
}

#[test]
fn should_parse_cluster_node_status() {
let status = "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected
Expand Down

0 comments on commit 813c5ab

Please sign in to comment.