From 8873d41787caea1c7d725e61479ea7d7385b85f0 Mon Sep 17 00:00:00 2001 From: mpenick Date: Fri, 5 Sep 2014 16:03:58 -0700 Subject: [PATCH] Fixed/Added load balancing policy integration test - Added a DC aware integration test - Fix to not assert when the io queue setting is not a pow of 2 - Require boost 1.55 in CMake - Removed use of cass::Address from policy_tools.hpp in favor of using a method from src/testing.hpp --- CMakeLists.txt | 2 +- src/common.hpp | 9 +++ src/metadata.cpp | 11 +-- src/mpmc_queue.hpp | 7 +- src/spsc_queue.hpp | 9 +-- test/ccm_bridge/CMakeLists.txt | 2 +- test/integration_tests/CMakeLists.txt | 2 +- test/integration_tests/src/load_balancing.cpp | 74 ++++++++++++++----- test/integration_tests/src/policy_tools.cpp | 27 ++++--- test/integration_tests/src/policy_tools.hpp | 12 +-- test/integration_tests/src/test_utils.cpp | 1 + test/integration_tests/src/test_utils.hpp | 1 + test/unit_tests/CMakeLists.txt | 2 +- 13 files changed, 104 insertions(+), 55 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 21933576b..beda20f2e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -253,7 +253,7 @@ if(WIN32) set(LIBSSH2_ROOT "${PROJECT_SOURCE_DIR}/lib/libssh2/") endif() -find_package(Boost 1.41.0 COMPONENTS system chrono thread unit_test_framework date_time program_options) +find_package(Boost 1.55.0 COMPONENTS system chrono thread unit_test_framework date_time program_options) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PROJECT_SOURCE_DIR}/test/ccm_bridge/cmake/Modules/") find_package(LIBSSH2) diff --git a/src/common.hpp b/src/common.hpp index 525e536f7..19681da9d 100644 --- a/src/common.hpp +++ b/src/common.hpp @@ -43,6 +43,15 @@ inline To copy_cast(const From& from) memcpy(&to, &from, sizeof(from)); return to; } + +inline size_t next_pow_2(size_t num) { + size_t next = 2; + size_t i = 0; + while (next < num) { + next = 1 << i++; + } + return next; +} uv_buf_t alloc_buffer(size_t suggested_size); uv_buf_t alloc_buffer(uv_handle_t* handle, size_t suggested_size); diff --git a/src/metadata.cpp b/src/metadata.cpp index f9854fe24..2133d8c99 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -16,6 +16,8 @@ #include "metadata.hpp" +#include "common.hpp" + #include "third_party/boost/boost/functional/hash.hpp" #include "third_party/boost/boost/algorithm/string.hpp" @@ -27,15 +29,6 @@ namespace { -size_t next_pow_2(size_t num) { - size_t next = 2; - size_t i = 0; - while (next < num) { - next = 1 << i++; - } - return next; -} - struct ToLowerIterator { public: typedef boost::string_ref::value_type value_type; diff --git a/src/mpmc_queue.hpp b/src/mpmc_queue.hpp index fb2581f81..38df10dd1 100644 --- a/src/mpmc_queue.hpp +++ b/src/mpmc_queue.hpp @@ -24,6 +24,8 @@ #include +#include "common.hpp" + #include "third_party/boost/boost/atomic.hpp" #include "third_party/boost/boost/type_traits/alignment_of.hpp" #include "third_party/boost/boost/aligned_storage.hpp" @@ -36,14 +38,11 @@ class MPMCQueue { typedef T EntryType; MPMCQueue(size_t size) - : size_(size) + : size_(next_pow_2(size)) , mask_(size - 1) , buffer_(reinterpret_cast(new AlignedNode[size_])) , head_seq_(0) , tail_seq_(0) { - // make sure it's a power of 2 - assert((size_ != 0) && ((size_ & (~size_ + 1)) == size_)); - // populate the sequence initial values for (size_t i = 0; i < size_; ++i) { buffer_[i].seq.store(i, boost::memory_order_relaxed); diff --git a/src/spsc_queue.hpp b/src/spsc_queue.hpp index a08dd41a4..d6d9af0a3 100644 --- a/src/spsc_queue.hpp +++ b/src/spsc_queue.hpp @@ -29,6 +29,8 @@ #include +#include "common.hpp" + #include "third_party/boost/boost/atomic.hpp" #include "third_party/boost/boost/type_traits/alignment_of.hpp" #include "third_party/boost/boost/aligned_storage.hpp" @@ -41,16 +43,13 @@ class SPSCQueue { typedef T EntryType; SPSCQueue(size_t size) - : _size(size) + : _size(next_pow_2(size)) , _mask(size - 1) , _buffer(reinterpret_cast( // need one extra element for a guard new SPSCQueueAlignedEntry[_size + 1])) , _head(0) - , _tail(0) { - // make sure it's a power of 2 - assert((_size != 0) && ((_size & (~_size + 1)) == _size)); - } + , _tail(0) {} ~SPSCQueue() { delete[] _buffer; } diff --git a/test/ccm_bridge/CMakeLists.txt b/test/ccm_bridge/CMakeLists.txt index 503c51a89..a66c59c59 100644 --- a/test/ccm_bridge/CMakeLists.txt +++ b/test/ccm_bridge/CMakeLists.txt @@ -17,7 +17,7 @@ set(Boost_USE_MULTITHREADED ON) # needed by boost to link dynamically add_definitions(-DBOOST_ALL_DYN_LINK) -find_package(Boost 1.41.0 COMPONENTS system date_time REQUIRED) +find_package(Boost 1.55.0 COMPONENTS system date_time REQUIRED) set(LIBRARIES ${LIBRARIES} ${Boost_LIBRARIES}) set(INCLUDES ${INCLUDES} ${Boost_INCLUDE_DIRS}) diff --git a/test/integration_tests/CMakeLists.txt b/test/integration_tests/CMakeLists.txt index 75f3494b3..0172796af 100644 --- a/test/integration_tests/CMakeLists.txt +++ b/test/integration_tests/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 2.6.4) -find_package(Boost 1.41.0 COMPONENTS system chrono thread unit_test_framework date_time program_options REQUIRED) +find_package(Boost 1.55.0 COMPONENTS system chrono thread unit_test_framework date_time program_options REQUIRED) set(Boost_USE_STATIC_LIBS OFF) set(Boost_USE_STATIC_RUNTIME OFF) diff --git a/test/integration_tests/src/load_balancing.cpp b/test/integration_tests/src/load_balancing.cpp index 5b1f2c68b..05c68fcae 100644 --- a/test/integration_tests/src/load_balancing.cpp +++ b/test/integration_tests/src/load_balancing.cpp @@ -32,42 +32,80 @@ #include "cql_ccm_bridge.hpp" -struct LoadBalancingTests : test_utils::SingleSessionTest { - LoadBalancingTests() : SingleSessionTest(3, 0) {} +struct LoadBalancingTests { }; BOOST_FIXTURE_TEST_SUITE(load_balancing, LoadBalancingTests) BOOST_AUTO_TEST_CASE(test_round_robin) { - PolicyTool policy_tool; - policy_tool.create_schema(session, 1); + test_utils::CassClusterPtr cluster(cass_cluster_new()); + + const cql::cql_ccm_bridge_configuration_t& conf = cql::get_ccm_bridge_configuration(); + boost::shared_ptr ccm = cql::cql_ccm_bridge_t::create(conf, "test", 3, 0); + + cass_cluster_set_load_balance_round_robin(cluster.get());; - policy_tool.init(session, 12, CASS_CONSISTENCY_ONE); - policy_tool.query(session, 12, CASS_CONSISTENCY_ONE); + test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0); - cass::Address host1; - BOOST_REQUIRE(cass::Address::from_string(conf.ip_prefix() + "1", 9042, &host1)); + test_utils::CassSessionPtr session(test_utils::create_session(cluster.get())); - cass::Address host2; - BOOST_REQUIRE(cass::Address::from_string(conf.ip_prefix() + "2", 9042, &host2)); + PolicyTool policy_tool; + policy_tool.create_schema(session.get(), 1); + + policy_tool.init(session.get(), 12, CASS_CONSISTENCY_ONE); + policy_tool.query(session.get(), 12, CASS_CONSISTENCY_ONE); - cass::Address host3; - BOOST_REQUIRE(cass::Address::from_string(conf.ip_prefix() + "3", 9042, &host3)); + std::string host1(conf.ip_prefix() + "1"); + std::string host2(conf.ip_prefix() + "2"); + std::string host3(conf.ip_prefix() + "3"); policy_tool.assert_queried(host1, 4); policy_tool.assert_queried(host2, 4); policy_tool.assert_queried(host3, 4); policy_tool.reset_coordinators(); - ccm->decommission(1); + ccm->stop(1); - policy_tool.query(session, 12, CASS_CONSISTENCY_ONE); + policy_tool.query(session.get(), 12, CASS_CONSISTENCY_ONE); - // TODO(mpenick): This currently wrong, because we don't have a state listener interface - // and control connection to remove the host from the load balancing policy - policy_tool.assert_queried(host2, 8); - policy_tool.assert_queried(host3, 4); + policy_tool.assert_queried(host2, 6); + policy_tool.assert_queried(host3, 6); +} + +BOOST_AUTO_TEST_CASE(test_dc_aware) +{ + test_utils::CassClusterPtr cluster(cass_cluster_new()); + + const cql::cql_ccm_bridge_configuration_t& conf = cql::get_ccm_bridge_configuration(); + boost::shared_ptr ccm = cql::cql_ccm_bridge_t::create(conf, "test", 2, 1); + + cass_cluster_set_load_balance_dc_aware(cluster.get(), "dc1"); + + test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0); + + test_utils::CassSessionPtr session(test_utils::create_session(cluster.get())); + + PolicyTool policy_tool; + policy_tool.create_schema(session.get(), 2, 1); + + policy_tool.init(session.get(), 12, CASS_CONSISTENCY_EACH_QUORUM); + policy_tool.query(session.get(), 12, CASS_CONSISTENCY_ONE); + + std::string host1(conf.ip_prefix() + "1"); + std::string host2(conf.ip_prefix() + "2"); + std::string host3(conf.ip_prefix() + "3"); + + policy_tool.assert_queried(host1, 6); + policy_tool.assert_queried(host2, 6); + + policy_tool.reset_coordinators(); + ccm->stop(1); + ccm->stop(2); + + policy_tool.query(session.get(), 12, CASS_CONSISTENCY_ONE); + + policy_tool.assert_queried(host3, 12); } BOOST_AUTO_TEST_SUITE_END() diff --git a/test/integration_tests/src/policy_tools.cpp b/test/integration_tests/src/policy_tools.cpp index 03e5f922d..ac16eb416 100644 --- a/test/integration_tests/src/policy_tools.cpp +++ b/test/integration_tests/src/policy_tools.cpp @@ -19,15 +19,13 @@ #include #include +#include "testing.hpp" #include "policy_tools.hpp" -#include "future.hpp" -#include "types.hpp" -#include "request_handler.hpp" void PolicyTool::show_coordinators() // show what queries went to what nodes IP. { - for(std::map::const_iterator p = coordinators.begin(); p != coordinators.end(); ++p) { - std::cout << p->first.to_string() << " : " << p->second << std::endl; + for(std::map::const_iterator p = coordinators.begin(); p != coordinators.end(); ++p) { + std::cout << p->first << " : " << p->second << std::endl; } } @@ -45,6 +43,15 @@ void PolicyTool::create_schema(CassSession* session, int replicationFactor) str(boost::format("CREATE TABLE %s (k int PRIMARY KEY, i int)") % test_utils::SIMPLE_TABLE)); } +void PolicyTool::create_schema(CassSession* session, int dc1, int dc2) +{ + test_utils::execute_query(session, + str(boost::format(test_utils::CREATE_KEYSPACE_NETWORK_FORMAT) % test_utils::SIMPLE_KEYSPACE % dc1 % dc2)); + test_utils::execute_query(session, str(boost::format("USE %s") % test_utils::SIMPLE_KEYSPACE)); + test_utils::execute_query(session, + str(boost::format("CREATE TABLE %s (k int PRIMARY KEY, i int)") % test_utils::SIMPLE_TABLE)); +} + void PolicyTool::init(CassSession* session, int n, CassConsistency cl, bool batch) { std::string query = str(boost::format("INSERT INTO %s (k, i) VALUES (0, 0)") % test_utils::SIMPLE_TABLE); @@ -91,7 +98,7 @@ CassError PolicyTool::init_return_error(CassSession* session, int n, CassConsist return CASS_OK; } -void PolicyTool::add_coordinator(cass::Address address) +void PolicyTool::add_coordinator(std::string address) { if(coordinators.count(address) == 0) { coordinators.insert(std::make_pair(address, 1)); @@ -100,7 +107,7 @@ void PolicyTool::add_coordinator(cass::Address address) } } -void PolicyTool::assert_queried(cass::Address address, int n) +void PolicyTool::assert_queried(std::string address, int n) { if(coordinators.count(address) != 0) { int c = coordinators[address]; @@ -110,7 +117,7 @@ void PolicyTool::assert_queried(cass::Address address, int n) } } -void PolicyTool::assertQueriedAtLeast(cass::Address address, int n) +void PolicyTool::assertQueriedAtLeast(std::string address, int n) { int queried = coordinators[address]; BOOST_REQUIRE(queried >= n); @@ -126,7 +133,7 @@ void PolicyTool::query(CassSession* session, int n, CassConsistency cl) cass_statement_set_consistency(statement.get(), cl); test_utils::CassFuturePtr future(cass_session_execute(session, statement.get())); test_utils::wait_and_check_error(future.get()); - add_coordinator(static_cast(future->from())->get_host_address()); + add_coordinator(cass::get_host_from_future(future.get())); } } @@ -143,7 +150,7 @@ CassError PolicyTool::query_return_error(CassSession* session, int n, CassConsis if (rc != CASS_OK) { return rc; } - add_coordinator(static_cast(future->from())->get_host_address()); + add_coordinator(cass::get_host_from_future(future.get())); } return CASS_OK; } diff --git a/test/integration_tests/src/policy_tools.hpp b/test/integration_tests/src/policy_tools.hpp index 9df149c69..49a2ac016 100644 --- a/test/integration_tests/src/policy_tools.hpp +++ b/test/integration_tests/src/policy_tools.hpp @@ -16,12 +16,11 @@ #pragma once -#include "address.hpp" #include "test_utils.hpp" struct PolicyTool { - std::map coordinators; + std::map coordinators; void show_coordinators(); // show what queries went to what node IP. void reset_coordinators(); @@ -39,11 +38,14 @@ struct PolicyTool { void create_schema(CassSession* session, int replicationFactor); - void add_coordinator(cass::Address address); + void create_schema(CassSession* session, + int dc1, int dc2); + + void add_coordinator(std::string address); - void assert_queried(cass::Address address, int n); + void assert_queried(std::string address, int n); - void assertQueriedAtLeast(cass::Address address, int n); + void assertQueriedAtLeast(std::string address, int n); void query(CassSession* session, int n, CassConsistency cl); diff --git a/test/integration_tests/src/test_utils.cpp b/test/integration_tests/src/test_utils.cpp index 4b991c1b6..1c4b02c95 100644 --- a/test/integration_tests/src/test_utils.cpp +++ b/test/integration_tests/src/test_utils.cpp @@ -113,6 +113,7 @@ const char* get_value_type(CassValueType type) { //----------------------------------------------------------------------------------- const std::string CREATE_KEYSPACE_SIMPLE_FORMAT = "CREATE KEYSPACE %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }"; +const std::string CREATE_KEYSPACE_NETWORK_FORMAT = "CREATE KEYSPACE %s WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : %d, 'dc2' : %d }"; const std::string CREATE_KEYSPACE_GENERIC_FORMAT = "CREATE KEYSPACE {0} WITH replication = { 'class' : '{1}', {2} }"; const std::string SIMPLE_KEYSPACE = "ks"; const std::string SIMPLE_TABLE = "test"; diff --git a/test/integration_tests/src/test_utils.hpp b/test/integration_tests/src/test_utils.hpp index 9df1d5d17..b7e3ae0bd 100644 --- a/test/integration_tests/src/test_utils.hpp +++ b/test/integration_tests/src/test_utils.hpp @@ -551,6 +551,7 @@ extern const char* CREATE_TABLE_TIME_SERIES; extern const char* CREATE_TABLE_SIMPLE; extern const std::string CREATE_KEYSPACE_SIMPLE_FORMAT; +extern const std::string CREATE_KEYSPACE_NETWORK_FORMAT; extern const std::string CREATE_KEYSPACE_GENERIC_FORMAT; extern const std::string SIMPLE_KEYSPACE; extern const std::string SIMPLE_TABLE; diff --git a/test/unit_tests/CMakeLists.txt b/test/unit_tests/CMakeLists.txt index e8a746800..72110149f 100644 --- a/test/unit_tests/CMakeLists.txt +++ b/test/unit_tests/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 2.6.4) -find_package(Boost 1.41.0 COMPONENTS system chrono thread unit_test_framework date_time program_options REQUIRED) +find_package(Boost 1.55.0 COMPONENTS system chrono thread unit_test_framework date_time program_options REQUIRED) set(Boost_USE_STATIC_LIBS OFF) set(Boost_USE_STATIC_RUNTIME OFF)