Skip to content

Commit

Permalink
Added RR LBP tests. Updated DCA tests to be more concise.
Browse files Browse the repository at this point in the history
  • Loading branch information
aholmberg committed Sep 4, 2014
1 parent 82c3de1 commit 88448a9
Showing 1 changed file with 180 additions and 75 deletions.
255 changes: 180 additions & 75 deletions test/unit_tests/src/load_balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,167 @@ using std::string;
const string LOCAL_DC = "local";
const string REMOTE_DC = "remote";

void populate_hosts(size_t count, const std::string& rack,
const std::string& dc, cass::HostMap* hosts) {
#define VECTOR_FROM(t, a) std::vector<t>(a, a + sizeof(a)/sizeof(a[0]))

cass::Address addr_for_sequence(size_t i) {
cass::Address addr("0.0.0.0", 9042);
size_t first = hosts->size() + 1;
for (size_t i = first; i < first+count; ++i) {
addr.addr_in()->sin_addr.s_addr = i;
addr.addr_in()->sin_addr.s_addr = i;
return addr;
}

cass::SharedRefPtr<cass::Host> host_for_addr(const cass::Address addr,
const std::string& rack = "rack",
const std::string& dc = "dc") {
cass::Host* host = new cass::Host(addr, false);
host->set_up();
host->set_rack_and_dc(rack, dc);
(*hosts)[addr] = cass::SharedRefPtr<cass::Host>(host);
return cass::SharedRefPtr<cass::Host>(host);
}

void populate_hosts(size_t count, const std::string& rack,
const std::string& dc, cass::HostMap* hosts) {
cass::Address addr;
size_t first = hosts->size() + 1;
for (size_t i = first; i < first+count; ++i) {
addr = addr_for_sequence(i);
(*hosts)[addr] = host_for_addr(addr, rack, dc);
}
}

void verify_sequence(cass::QueryPlan* qp, const std::vector<size_t>& sequence) {
cass::Address expected("0.0.0.0", 9042);
cass::Address received;
for (std::vector<size_t>::const_iterator it = sequence.begin();
it!= sequence.end();
++it) {
BOOST_REQUIRE(qp->compute_next(&received));
expected.addr_in()->sin_addr.s_addr = *it;
BOOST_CHECK_EQUAL(expected, received);
}
BOOST_CHECK(!qp->compute_next(&received));
}

BOOST_AUTO_TEST_SUITE(round_robin_lb)

BOOST_AUTO_TEST_CASE(simple) {
cass::HostMap hosts;
populate_hosts(2, "rack", "dc", &hosts);

cass::RoundRobinPolicy policy;
policy.init(hosts);

// start on first elem
boost::scoped_ptr<cass::QueryPlan> qp(policy.new_query_plan());
const size_t seq1[] = {1, 2};
verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1));

// rotate starting element
boost::scoped_ptr<cass::QueryPlan> qp2(policy.new_query_plan());
const size_t seq2[] = {2, 1};
verify_sequence(qp2.get(), VECTOR_FROM(size_t, seq2));

// back around
boost::scoped_ptr<cass::QueryPlan> qp3(policy.new_query_plan());
verify_sequence(qp3.get(), VECTOR_FROM(size_t, seq1));
}

BOOST_AUTO_TEST_CASE(on_add)
{
cass::HostMap hosts;
populate_hosts(2, "rack", "dc", &hosts);

cass::RoundRobinPolicy policy;
policy.init(hosts);

// baseline
boost::scoped_ptr<cass::QueryPlan> qp(policy.new_query_plan());
const size_t seq1[] = {1, 2};
verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1));

const size_t seq_new = 5;
cass::Address addr_new = addr_for_sequence(seq_new);
cass::SharedRefPtr<cass::Host> host = host_for_addr(addr_new);
policy.on_add(host);

boost::scoped_ptr<cass::QueryPlan> qp2(policy.new_query_plan());
const size_t seq2[] = {2, seq_new, 1};
verify_sequence(qp2.get(), VECTOR_FROM(size_t, seq2));
}

BOOST_AUTO_TEST_CASE(on_remove)
{
cass::HostMap hosts;
populate_hosts(3, "rack", "dc", &hosts);

cass::RoundRobinPolicy policy;
policy.init(hosts);

boost::scoped_ptr<cass::QueryPlan> qp(policy.new_query_plan());
cass::SharedRefPtr<cass::Host> host = hosts.begin()->second;
policy.on_remove(host);

boost::scoped_ptr<cass::QueryPlan> qp2(policy.new_query_plan());

// first query plan has it
// (note: not manipulating Host::state_ for dynamic removal)
const size_t seq1[] = {1, 2, 3};
verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1));

// second one does not
const size_t seq2[] = {3, 2};
verify_sequence(qp2.get(), VECTOR_FROM(size_t, seq2));
}

BOOST_AUTO_TEST_CASE(on_down_on_up)
{
cass::HostMap hosts;
populate_hosts(3, "rack", "dc", &hosts);

cass::RoundRobinPolicy policy;
policy.init(hosts);

boost::scoped_ptr<cass::QueryPlan> qp_before1(policy.new_query_plan());
boost::scoped_ptr<cass::QueryPlan> qp_before2(policy.new_query_plan());
cass::SharedRefPtr<cass::Host> host = hosts.begin()->second;
policy.on_down(host);

// 'before' qp both have the down host
// Ahead of set_down, it will be returned
{
const size_t seq[] = {1, 2, 3};
verify_sequence(qp_before1.get(), VECTOR_FROM(size_t, seq));
}

host->set_down();
// Following set_down, it is dynamically excluded
{
const size_t seq[] = {2, 3};
verify_sequence(qp_before2.get(), VECTOR_FROM(size_t, seq));
}

// host is added to the list, but not 'up'
policy.on_up(host);

boost::scoped_ptr<cass::QueryPlan> qp_after1(policy.new_query_plan());
boost::scoped_ptr<cass::QueryPlan> qp_after2(policy.new_query_plan());

// 1 is dynamically excluded from plan
{
const size_t seq[] = {2, 3};
verify_sequence(qp_after1.get(), VECTOR_FROM(size_t, seq));
}

host->set_up();

// now included
{
const size_t seq[] = {2, 3, 1};
verify_sequence(qp_after2.get(), VECTOR_FROM(size_t, seq));
}
}

BOOST_AUTO_TEST_SUITE_END()

BOOST_AUTO_TEST_SUITE(dc_aware_lb)

void test_dc_aware_policy(size_t local_count, size_t remote_count) {
Expand All @@ -57,16 +205,9 @@ void test_dc_aware_policy(size_t local_count, size_t remote_count) {
const size_t total_hosts = local_count + remote_count;

boost::scoped_ptr<cass::QueryPlan> qp(policy.new_query_plan());
cass::Address expected("0.0.0.0", 9042);
cass::Address received;

for (size_t i = 0; i < total_hosts; ++i) {
BOOST_REQUIRE(qp->compute_next(&received));
++expected.addr_in()->sin_addr.s_addr;
BOOST_CHECK_EQUAL(expected, received);
}

BOOST_CHECK(!qp->compute_next(&received));
std::vector<size_t> seq(total_hosts);
for (size_t i = 0; i < total_hosts; ++i) seq[i] = i + 1;
verify_sequence(qp.get(), seq);
}

BOOST_AUTO_TEST_CASE(simple) {
Expand All @@ -83,33 +224,22 @@ BOOST_AUTO_TEST_CASE(some_dc_local_unspecified)
populate_hosts(total_hosts, "rack", LOCAL_DC, &hosts);
cass::Host* h = hosts.begin()->second.get();
h->set_rack_and_dc("", "");
cass::Address addr_no_dc = h->address();

cass::DCAwarePolicy policy(LOCAL_DC);
policy.init(hosts);

boost::scoped_ptr<cass::QueryPlan> qp(policy.new_query_plan());

cass::Address received;
for (size_t i = 0; i < total_hosts - 1; ++i) {
BOOST_REQUIRE(qp->compute_next(&received));
BOOST_CHECK_NE(received, addr_no_dc);
}

BOOST_REQUIRE(qp->compute_next(&received));
BOOST_CHECK_EQUAL(received, addr_no_dc);

BOOST_CHECK(!qp->compute_next(&received));
const size_t seq[] = {2, 3, 1};
verify_sequence(qp.get(), VECTOR_FROM(size_t, seq));
}

BOOST_AUTO_TEST_CASE(single_local_down)
{
const size_t local_count = 3;
cass::HostMap hosts;
populate_hosts(local_count, "rack", LOCAL_DC, &hosts);
populate_hosts(3, "rack", LOCAL_DC, &hosts);
cass::SharedRefPtr<cass::Host> target_host = hosts.begin()->second;
populate_hosts(1, "rack", REMOTE_DC, &hosts);
const size_t total_hosts = local_count + 1;

cass::DCAwarePolicy policy(LOCAL_DC);
policy.init(hosts);
Expand All @@ -119,18 +249,15 @@ BOOST_AUTO_TEST_CASE(single_local_down)
policy.on_down(target_host);
boost::scoped_ptr<cass::QueryPlan> qp_after(policy.new_query_plan());// should not have down host ptr in plan

cass::Address received;
for (size_t i = 0; i < total_hosts - 1; ++i) {
BOOST_REQUIRE(qp_before->compute_next(&received));
BOOST_CHECK_NE(received, target_host->address());
{
const size_t seq[] = {2, 3, 4};
verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq));
}
BOOST_CHECK(!qp_before->compute_next(&received));

for (size_t i = 0; i < total_hosts - 1; ++i) {
BOOST_REQUIRE(qp_after->compute_next(&received));
BOOST_CHECK_NE(received, target_host->address());
{
const size_t seq[] = {3, 2, 4};// local dc wrapped before remote offered
verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq));
}
BOOST_CHECK(!qp_after->compute_next(&received));
}

BOOST_AUTO_TEST_CASE(all_local_removed_returned)
Expand All @@ -139,7 +266,6 @@ BOOST_AUTO_TEST_CASE(all_local_removed_returned)
populate_hosts(1, "rack", LOCAL_DC, &hosts);
cass::SharedRefPtr<cass::Host> target_host = hosts.begin()->second;
populate_hosts(1, "rack", REMOTE_DC, &hosts);
const size_t total_hosts = 2;

cass::DCAwarePolicy policy(LOCAL_DC);
policy.init(hosts);
Expand All @@ -149,39 +275,28 @@ BOOST_AUTO_TEST_CASE(all_local_removed_returned)
policy.on_down(target_host);
boost::scoped_ptr<cass::QueryPlan> qp_after(policy.new_query_plan());// should not have down host ptr in plan

cass::Address received;
for (size_t i = 0; i < total_hosts - 1; ++i) {
BOOST_REQUIRE(qp_before->compute_next(&received));
BOOST_CHECK_NE(received, target_host->address());
}
BOOST_CHECK(!qp_before->compute_next(&received));

for (size_t i = 0; i < total_hosts - 1; ++i) {
BOOST_REQUIRE(qp_after->compute_next(&received));
BOOST_CHECK_NE(received, target_host->address());
{
const size_t seq[] = {2};
verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq));
verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq));
}
BOOST_CHECK(!qp_after->compute_next(&received));

target_host->set_up();
policy.on_up(target_host);

// make sure we get the local node first after on_up
boost::scoped_ptr<cass::QueryPlan> qp(policy.new_query_plan());
cass::Address expected("0.0.0.0", 9042);
for (size_t i = 0; i < total_hosts; ++i) {
BOOST_REQUIRE(qp->compute_next(&received));
++expected.addr_in()->sin_addr.s_addr;
BOOST_CHECK_EQUAL(expected, received);
{
const size_t seq[] = {1, 2};
verify_sequence(qp.get(), VECTOR_FROM(size_t, seq));
}
BOOST_CHECK(!qp_before->compute_next(&received));
}

BOOST_AUTO_TEST_CASE(remote_removed_returned)
{
cass::HostMap hosts;
populate_hosts(1, "rack", LOCAL_DC, &hosts);
populate_hosts(1, "rack", REMOTE_DC, &hosts);
const size_t total_hosts = 2;
cass::Address target_addr("2.0.0.0", 9042);
cass::SharedRefPtr<cass::Host> target_host = hosts[target_addr];

Expand All @@ -193,31 +308,21 @@ BOOST_AUTO_TEST_CASE(remote_removed_returned)
policy.on_down(target_host);
boost::scoped_ptr<cass::QueryPlan> qp_after(policy.new_query_plan());// should not have down host ptr in plan

cass::Address received;
for (size_t i = 0; i < total_hosts - 1; ++i) {
BOOST_REQUIRE(qp_before->compute_next(&received));
BOOST_CHECK_NE(received, target_host->address());
}
BOOST_CHECK(!qp_before->compute_next(&received));

for (size_t i = 0; i < total_hosts - 1; ++i) {
BOOST_REQUIRE(qp_after->compute_next(&received));
BOOST_CHECK_NE(received, target_host->address());
{
const size_t seq[] = {1};
verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq));
verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq));
}
BOOST_CHECK(!qp_after->compute_next(&received));

target_host->set_up();
policy.on_up(target_host);

// make sure we get both nodes, correct order after
boost::scoped_ptr<cass::QueryPlan> qp(policy.new_query_plan());
cass::Address expected("0.0.0.0", 9042);
for (size_t i = 0; i < total_hosts; ++i) {
BOOST_REQUIRE(qp->compute_next(&received));
++expected.addr_in()->sin_addr.s_addr;
BOOST_CHECK_EQUAL(expected, received);
{
const size_t seq[] = {1, 2};
verify_sequence(qp.get(), VECTOR_FROM(size_t, seq));
}
BOOST_CHECK(!qp_before->compute_next(&received));
}

BOOST_AUTO_TEST_SUITE_END()
Expand Down

0 comments on commit 88448a9

Please sign in to comment.