diff --git a/test/unit_tests/src/load_balancing.cpp b/test/unit_tests/src/load_balancing.cpp index 11358882c..0912635ce 100644 --- a/test/unit_tests/src/load_balancing.cpp +++ b/test/unit_tests/src/load_balancing.cpp @@ -32,19 +32,167 @@ using std::string; const string LOCAL_DC = "local"; const string REMOTE_DC = "remote"; -void populate_hosts(size_t count, const std::string& rack, - const std::string& dc, cass::HostMap* hosts) { +#define VECTOR_FROM(t, a) std::vector(a, a + sizeof(a)/sizeof(a[0])) + +cass::Address addr_for_sequence(size_t i) { cass::Address addr("0.0.0.0", 9042); - size_t first = hosts->size() + 1; - for (size_t i = first; i < first+count; ++i) { - addr.addr_in()->sin_addr.s_addr = i; + addr.addr_in()->sin_addr.s_addr = i; + return addr; +} + +cass::SharedRefPtr host_for_addr(const cass::Address addr, + const std::string& rack = "rack", + const std::string& dc = "dc") { cass::Host* host = new cass::Host(addr, false); host->set_up(); host->set_rack_and_dc(rack, dc); - (*hosts)[addr] = cass::SharedRefPtr(host); + return cass::SharedRefPtr(host); +} + +void populate_hosts(size_t count, const std::string& rack, + const std::string& dc, cass::HostMap* hosts) { + cass::Address addr; + size_t first = hosts->size() + 1; + for (size_t i = first; i < first+count; ++i) { + addr = addr_for_sequence(i); + (*hosts)[addr] = host_for_addr(addr, rack, dc); } } +void verify_sequence(cass::QueryPlan* qp, const std::vector& sequence) { + cass::Address expected("0.0.0.0", 9042); + cass::Address received; + for (std::vector::const_iterator it = sequence.begin(); + it!= sequence.end(); + ++it) { + BOOST_REQUIRE(qp->compute_next(&received)); + expected.addr_in()->sin_addr.s_addr = *it; + BOOST_CHECK_EQUAL(expected, received); + } + BOOST_CHECK(!qp->compute_next(&received)); +} + +BOOST_AUTO_TEST_SUITE(round_robin_lb) + +BOOST_AUTO_TEST_CASE(simple) { + cass::HostMap hosts; + populate_hosts(2, "rack", "dc", &hosts); + + cass::RoundRobinPolicy policy; + policy.init(hosts); + + // start on first elem + boost::scoped_ptr qp(policy.new_query_plan()); + const size_t seq1[] = {1, 2}; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1)); + + // rotate starting element + boost::scoped_ptr qp2(policy.new_query_plan()); + const size_t seq2[] = {2, 1}; + verify_sequence(qp2.get(), VECTOR_FROM(size_t, seq2)); + + // back around + boost::scoped_ptr qp3(policy.new_query_plan()); + verify_sequence(qp3.get(), VECTOR_FROM(size_t, seq1)); +} + +BOOST_AUTO_TEST_CASE(on_add) +{ + cass::HostMap hosts; + populate_hosts(2, "rack", "dc", &hosts); + + cass::RoundRobinPolicy policy; + policy.init(hosts); + + // baseline + boost::scoped_ptr qp(policy.new_query_plan()); + const size_t seq1[] = {1, 2}; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1)); + + const size_t seq_new = 5; + cass::Address addr_new = addr_for_sequence(seq_new); + cass::SharedRefPtr host = host_for_addr(addr_new); + policy.on_add(host); + + boost::scoped_ptr qp2(policy.new_query_plan()); + const size_t seq2[] = {2, seq_new, 1}; + verify_sequence(qp2.get(), VECTOR_FROM(size_t, seq2)); +} + +BOOST_AUTO_TEST_CASE(on_remove) +{ + cass::HostMap hosts; + populate_hosts(3, "rack", "dc", &hosts); + + cass::RoundRobinPolicy policy; + policy.init(hosts); + + boost::scoped_ptr qp(policy.new_query_plan()); + cass::SharedRefPtr host = hosts.begin()->second; + policy.on_remove(host); + + boost::scoped_ptr qp2(policy.new_query_plan()); + + // first query plan has it + // (note: not manipulating Host::state_ for dynamic removal) + const size_t seq1[] = {1, 2, 3}; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1)); + + // second one does not + const size_t seq2[] = {3, 2}; + verify_sequence(qp2.get(), VECTOR_FROM(size_t, seq2)); +} + +BOOST_AUTO_TEST_CASE(on_down_on_up) +{ + cass::HostMap hosts; + populate_hosts(3, "rack", "dc", &hosts); + + cass::RoundRobinPolicy policy; + policy.init(hosts); + + boost::scoped_ptr qp_before1(policy.new_query_plan()); + boost::scoped_ptr qp_before2(policy.new_query_plan()); + cass::SharedRefPtr host = hosts.begin()->second; + policy.on_down(host); + + // 'before' qp both have the down host + // Ahead of set_down, it will be returned + { + const size_t seq[] = {1, 2, 3}; + verify_sequence(qp_before1.get(), VECTOR_FROM(size_t, seq)); + } + + host->set_down(); + // Following set_down, it is dynamically excluded + { + const size_t seq[] = {2, 3}; + verify_sequence(qp_before2.get(), VECTOR_FROM(size_t, seq)); + } + + // host is added to the list, but not 'up' + policy.on_up(host); + + boost::scoped_ptr qp_after1(policy.new_query_plan()); + boost::scoped_ptr qp_after2(policy.new_query_plan()); + + // 1 is dynamically excluded from plan + { + const size_t seq[] = {2, 3}; + verify_sequence(qp_after1.get(), VECTOR_FROM(size_t, seq)); + } + + host->set_up(); + + // now included + { + const size_t seq[] = {2, 3, 1}; + verify_sequence(qp_after2.get(), VECTOR_FROM(size_t, seq)); + } +} + +BOOST_AUTO_TEST_SUITE_END() + BOOST_AUTO_TEST_SUITE(dc_aware_lb) void test_dc_aware_policy(size_t local_count, size_t remote_count) { @@ -57,16 +205,9 @@ void test_dc_aware_policy(size_t local_count, size_t remote_count) { const size_t total_hosts = local_count + remote_count; boost::scoped_ptr qp(policy.new_query_plan()); - cass::Address expected("0.0.0.0", 9042); - cass::Address received; - - for (size_t i = 0; i < total_hosts; ++i) { - BOOST_REQUIRE(qp->compute_next(&received)); - ++expected.addr_in()->sin_addr.s_addr; - BOOST_CHECK_EQUAL(expected, received); - } - - BOOST_CHECK(!qp->compute_next(&received)); + std::vector seq(total_hosts); + for (size_t i = 0; i < total_hosts; ++i) seq[i] = i + 1; + verify_sequence(qp.get(), seq); } BOOST_AUTO_TEST_CASE(simple) { @@ -83,33 +224,22 @@ BOOST_AUTO_TEST_CASE(some_dc_local_unspecified) populate_hosts(total_hosts, "rack", LOCAL_DC, &hosts); cass::Host* h = hosts.begin()->second.get(); h->set_rack_and_dc("", ""); - cass::Address addr_no_dc = h->address(); cass::DCAwarePolicy policy(LOCAL_DC); policy.init(hosts); boost::scoped_ptr qp(policy.new_query_plan()); - cass::Address received; - for (size_t i = 0; i < total_hosts - 1; ++i) { - BOOST_REQUIRE(qp->compute_next(&received)); - BOOST_CHECK_NE(received, addr_no_dc); - } - - BOOST_REQUIRE(qp->compute_next(&received)); - BOOST_CHECK_EQUAL(received, addr_no_dc); - - BOOST_CHECK(!qp->compute_next(&received)); + const size_t seq[] = {2, 3, 1}; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } BOOST_AUTO_TEST_CASE(single_local_down) { - const size_t local_count = 3; cass::HostMap hosts; - populate_hosts(local_count, "rack", LOCAL_DC, &hosts); + populate_hosts(3, "rack", LOCAL_DC, &hosts); cass::SharedRefPtr target_host = hosts.begin()->second; populate_hosts(1, "rack", REMOTE_DC, &hosts); - const size_t total_hosts = local_count + 1; cass::DCAwarePolicy policy(LOCAL_DC); policy.init(hosts); @@ -119,18 +249,15 @@ BOOST_AUTO_TEST_CASE(single_local_down) policy.on_down(target_host); boost::scoped_ptr qp_after(policy.new_query_plan());// should not have down host ptr in plan - cass::Address received; - for (size_t i = 0; i < total_hosts - 1; ++i) { - BOOST_REQUIRE(qp_before->compute_next(&received)); - BOOST_CHECK_NE(received, target_host->address()); + { + const size_t seq[] = {2, 3, 4}; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); } - BOOST_CHECK(!qp_before->compute_next(&received)); - for (size_t i = 0; i < total_hosts - 1; ++i) { - BOOST_REQUIRE(qp_after->compute_next(&received)); - BOOST_CHECK_NE(received, target_host->address()); + { + const size_t seq[] = {3, 2, 4};// local dc wrapped before remote offered + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); } - BOOST_CHECK(!qp_after->compute_next(&received)); } BOOST_AUTO_TEST_CASE(all_local_removed_returned) @@ -139,7 +266,6 @@ BOOST_AUTO_TEST_CASE(all_local_removed_returned) populate_hosts(1, "rack", LOCAL_DC, &hosts); cass::SharedRefPtr target_host = hosts.begin()->second; populate_hosts(1, "rack", REMOTE_DC, &hosts); - const size_t total_hosts = 2; cass::DCAwarePolicy policy(LOCAL_DC); policy.init(hosts); @@ -149,31 +275,21 @@ BOOST_AUTO_TEST_CASE(all_local_removed_returned) policy.on_down(target_host); boost::scoped_ptr qp_after(policy.new_query_plan());// should not have down host ptr in plan - cass::Address received; - for (size_t i = 0; i < total_hosts - 1; ++i) { - BOOST_REQUIRE(qp_before->compute_next(&received)); - BOOST_CHECK_NE(received, target_host->address()); - } - BOOST_CHECK(!qp_before->compute_next(&received)); - - for (size_t i = 0; i < total_hosts - 1; ++i) { - BOOST_REQUIRE(qp_after->compute_next(&received)); - BOOST_CHECK_NE(received, target_host->address()); + { + const size_t seq[] = {2}; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); } - BOOST_CHECK(!qp_after->compute_next(&received)); target_host->set_up(); policy.on_up(target_host); // make sure we get the local node first after on_up boost::scoped_ptr qp(policy.new_query_plan()); - cass::Address expected("0.0.0.0", 9042); - for (size_t i = 0; i < total_hosts; ++i) { - BOOST_REQUIRE(qp->compute_next(&received)); - ++expected.addr_in()->sin_addr.s_addr; - BOOST_CHECK_EQUAL(expected, received); + { + const size_t seq[] = {1, 2}; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } - BOOST_CHECK(!qp_before->compute_next(&received)); } BOOST_AUTO_TEST_CASE(remote_removed_returned) @@ -181,7 +297,6 @@ BOOST_AUTO_TEST_CASE(remote_removed_returned) cass::HostMap hosts; populate_hosts(1, "rack", LOCAL_DC, &hosts); populate_hosts(1, "rack", REMOTE_DC, &hosts); - const size_t total_hosts = 2; cass::Address target_addr("2.0.0.0", 9042); cass::SharedRefPtr target_host = hosts[target_addr]; @@ -193,31 +308,21 @@ BOOST_AUTO_TEST_CASE(remote_removed_returned) policy.on_down(target_host); boost::scoped_ptr qp_after(policy.new_query_plan());// should not have down host ptr in plan - cass::Address received; - for (size_t i = 0; i < total_hosts - 1; ++i) { - BOOST_REQUIRE(qp_before->compute_next(&received)); - BOOST_CHECK_NE(received, target_host->address()); - } - BOOST_CHECK(!qp_before->compute_next(&received)); - - for (size_t i = 0; i < total_hosts - 1; ++i) { - BOOST_REQUIRE(qp_after->compute_next(&received)); - BOOST_CHECK_NE(received, target_host->address()); + { + const size_t seq[] = {1}; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); } - BOOST_CHECK(!qp_after->compute_next(&received)); target_host->set_up(); policy.on_up(target_host); // make sure we get both nodes, correct order after boost::scoped_ptr qp(policy.new_query_plan()); - cass::Address expected("0.0.0.0", 9042); - for (size_t i = 0; i < total_hosts; ++i) { - BOOST_REQUIRE(qp->compute_next(&received)); - ++expected.addr_in()->sin_addr.s_addr; - BOOST_CHECK_EQUAL(expected, received); + { + const size_t seq[] = {1, 2}; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } - BOOST_CHECK(!qp_before->compute_next(&received)); } BOOST_AUTO_TEST_SUITE_END()