Skip to content

Commit

Permalink
Fixed/Added load balancing policy integration test
Browse files Browse the repository at this point in the history
- Added a DC aware integration test
- Fix to not assert when the io queue setting is not a pow of 2
- Require boost 1.55 in CMake
- Removed use of cass::Address from policy_tools.hpp in favor of
  using a method from src/testing.hpp
  • Loading branch information
mpenick committed Sep 5, 2014
1 parent ab9bd21 commit 8873d41
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 55 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ if(WIN32)
set(LIBSSH2_ROOT "${PROJECT_SOURCE_DIR}/lib/libssh2/")
endif()

find_package(Boost 1.41.0 COMPONENTS system chrono thread unit_test_framework date_time program_options)
find_package(Boost 1.55.0 COMPONENTS system chrono thread unit_test_framework date_time program_options)

set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PROJECT_SOURCE_DIR}/test/ccm_bridge/cmake/Modules/")
find_package(LIBSSH2)
Expand Down
9 changes: 9 additions & 0 deletions src/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ inline To copy_cast(const From& from)
memcpy(&to, &from, sizeof(from));
return to;
}

inline size_t next_pow_2(size_t num) {
size_t next = 2;
size_t i = 0;
while (next < num) {
next = 1 << i++;
}
return next;
}

uv_buf_t alloc_buffer(size_t suggested_size);
uv_buf_t alloc_buffer(uv_handle_t* handle, size_t suggested_size);
Expand Down
11 changes: 2 additions & 9 deletions src/metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "metadata.hpp"

#include "common.hpp"

#include "third_party/boost/boost/functional/hash.hpp"
#include "third_party/boost/boost/algorithm/string.hpp"

Expand All @@ -27,15 +29,6 @@

namespace {

size_t next_pow_2(size_t num) {
size_t next = 2;
size_t i = 0;
while (next < num) {
next = 1 << i++;
}
return next;
}

struct ToLowerIterator {
public:
typedef boost::string_ref::value_type value_type;
Expand Down
7 changes: 3 additions & 4 deletions src/mpmc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

#include <assert.h>

#include "common.hpp"

#include "third_party/boost/boost/atomic.hpp"
#include "third_party/boost/boost/type_traits/alignment_of.hpp"
#include "third_party/boost/boost/aligned_storage.hpp"
Expand All @@ -36,14 +38,11 @@ class MPMCQueue {
typedef T EntryType;

MPMCQueue(size_t size)
: size_(size)
: size_(next_pow_2(size))
, mask_(size - 1)
, buffer_(reinterpret_cast<Node*>(new AlignedNode[size_]))
, head_seq_(0)
, tail_seq_(0) {
// make sure it's a power of 2
assert((size_ != 0) && ((size_ & (~size_ + 1)) == size_));

// populate the sequence initial values
for (size_t i = 0; i < size_; ++i) {
buffer_[i].seq.store(i, boost::memory_order_relaxed);
Expand Down
9 changes: 4 additions & 5 deletions src/spsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

#include <assert.h>

#include "common.hpp"

#include "third_party/boost/boost/atomic.hpp"
#include "third_party/boost/boost/type_traits/alignment_of.hpp"
#include "third_party/boost/boost/aligned_storage.hpp"
Expand All @@ -41,16 +43,13 @@ class SPSCQueue {
typedef T EntryType;

SPSCQueue(size_t size)
: _size(size)
: _size(next_pow_2(size))
, _mask(size - 1)
, _buffer(reinterpret_cast<T*>(
// need one extra element for a guard
new SPSCQueueAlignedEntry[_size + 1]))
, _head(0)
, _tail(0) {
// make sure it's a power of 2
assert((_size != 0) && ((_size & (~_size + 1)) == _size));
}
, _tail(0) {}

~SPSCQueue() { delete[] _buffer; }

Expand Down
2 changes: 1 addition & 1 deletion test/ccm_bridge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ set(Boost_USE_MULTITHREADED ON)
# needed by boost to link dynamically
add_definitions(-DBOOST_ALL_DYN_LINK)

find_package(Boost 1.41.0 COMPONENTS system date_time REQUIRED)
find_package(Boost 1.55.0 COMPONENTS system date_time REQUIRED)
set(LIBRARIES ${LIBRARIES} ${Boost_LIBRARIES})
set(INCLUDES ${INCLUDES} ${Boost_INCLUDE_DIRS})

Expand Down
2 changes: 1 addition & 1 deletion test/integration_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 2.6.4)

find_package(Boost 1.41.0 COMPONENTS system chrono thread unit_test_framework date_time program_options REQUIRED)
find_package(Boost 1.55.0 COMPONENTS system chrono thread unit_test_framework date_time program_options REQUIRED)

set(Boost_USE_STATIC_LIBS OFF)
set(Boost_USE_STATIC_RUNTIME OFF)
Expand Down
74 changes: 56 additions & 18 deletions test/integration_tests/src/load_balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,80 @@

#include "cql_ccm_bridge.hpp"

struct LoadBalancingTests : test_utils::SingleSessionTest {
LoadBalancingTests() : SingleSessionTest(3, 0) {}
struct LoadBalancingTests {
};

BOOST_FIXTURE_TEST_SUITE(load_balancing, LoadBalancingTests)

BOOST_AUTO_TEST_CASE(test_round_robin)
{
PolicyTool policy_tool;
policy_tool.create_schema(session, 1);
test_utils::CassClusterPtr cluster(cass_cluster_new());

const cql::cql_ccm_bridge_configuration_t& conf = cql::get_ccm_bridge_configuration();
boost::shared_ptr<cql::cql_ccm_bridge_t> ccm = cql::cql_ccm_bridge_t::create(conf, "test", 3, 0);

cass_cluster_set_load_balance_round_robin(cluster.get());;

policy_tool.init(session, 12, CASS_CONSISTENCY_ONE);
policy_tool.query(session, 12, CASS_CONSISTENCY_ONE);
test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0);

cass::Address host1;
BOOST_REQUIRE(cass::Address::from_string(conf.ip_prefix() + "1", 9042, &host1));
test_utils::CassSessionPtr session(test_utils::create_session(cluster.get()));

cass::Address host2;
BOOST_REQUIRE(cass::Address::from_string(conf.ip_prefix() + "2", 9042, &host2));
PolicyTool policy_tool;
policy_tool.create_schema(session.get(), 1);

policy_tool.init(session.get(), 12, CASS_CONSISTENCY_ONE);
policy_tool.query(session.get(), 12, CASS_CONSISTENCY_ONE);

cass::Address host3;
BOOST_REQUIRE(cass::Address::from_string(conf.ip_prefix() + "3", 9042, &host3));
std::string host1(conf.ip_prefix() + "1");
std::string host2(conf.ip_prefix() + "2");
std::string host3(conf.ip_prefix() + "3");

policy_tool.assert_queried(host1, 4);
policy_tool.assert_queried(host2, 4);
policy_tool.assert_queried(host3, 4);

policy_tool.reset_coordinators();
ccm->decommission(1);
ccm->stop(1);

policy_tool.query(session, 12, CASS_CONSISTENCY_ONE);
policy_tool.query(session.get(), 12, CASS_CONSISTENCY_ONE);

// TODO(mpenick): This currently wrong, because we don't have a state listener interface
// and control connection to remove the host from the load balancing policy
policy_tool.assert_queried(host2, 8);
policy_tool.assert_queried(host3, 4);
policy_tool.assert_queried(host2, 6);
policy_tool.assert_queried(host3, 6);
}

BOOST_AUTO_TEST_CASE(test_dc_aware)
{
test_utils::CassClusterPtr cluster(cass_cluster_new());

const cql::cql_ccm_bridge_configuration_t& conf = cql::get_ccm_bridge_configuration();
boost::shared_ptr<cql::cql_ccm_bridge_t> ccm = cql::cql_ccm_bridge_t::create(conf, "test", 2, 1);

cass_cluster_set_load_balance_dc_aware(cluster.get(), "dc1");

test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0);

test_utils::CassSessionPtr session(test_utils::create_session(cluster.get()));

PolicyTool policy_tool;
policy_tool.create_schema(session.get(), 2, 1);

policy_tool.init(session.get(), 12, CASS_CONSISTENCY_EACH_QUORUM);
policy_tool.query(session.get(), 12, CASS_CONSISTENCY_ONE);

std::string host1(conf.ip_prefix() + "1");
std::string host2(conf.ip_prefix() + "2");
std::string host3(conf.ip_prefix() + "3");

policy_tool.assert_queried(host1, 6);
policy_tool.assert_queried(host2, 6);

policy_tool.reset_coordinators();
ccm->stop(1);
ccm->stop(2);

policy_tool.query(session.get(), 12, CASS_CONSISTENCY_ONE);

policy_tool.assert_queried(host3, 12);
}

BOOST_AUTO_TEST_SUITE_END()
27 changes: 17 additions & 10 deletions test/integration_tests/src/policy_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
#include <boost/thread.hpp>
#include <boost/format.hpp>

#include "testing.hpp"
#include "policy_tools.hpp"
#include "future.hpp"
#include "types.hpp"
#include "request_handler.hpp"

void PolicyTool::show_coordinators() // show what queries went to what nodes IP.
{
for(std::map<cass::Address, int>::const_iterator p = coordinators.begin(); p != coordinators.end(); ++p) {
std::cout << p->first.to_string() << " : " << p->second << std::endl;
for(std::map<std::string, int>::const_iterator p = coordinators.begin(); p != coordinators.end(); ++p) {
std::cout << p->first << " : " << p->second << std::endl;
}
}

Expand All @@ -45,6 +43,15 @@ void PolicyTool::create_schema(CassSession* session, int replicationFactor)
str(boost::format("CREATE TABLE %s (k int PRIMARY KEY, i int)") % test_utils::SIMPLE_TABLE));
}

void PolicyTool::create_schema(CassSession* session, int dc1, int dc2)
{
test_utils::execute_query(session,
str(boost::format(test_utils::CREATE_KEYSPACE_NETWORK_FORMAT) % test_utils::SIMPLE_KEYSPACE % dc1 % dc2));
test_utils::execute_query(session, str(boost::format("USE %s") % test_utils::SIMPLE_KEYSPACE));
test_utils::execute_query(session,
str(boost::format("CREATE TABLE %s (k int PRIMARY KEY, i int)") % test_utils::SIMPLE_TABLE));
}

void PolicyTool::init(CassSession* session, int n, CassConsistency cl, bool batch)
{
std::string query = str(boost::format("INSERT INTO %s (k, i) VALUES (0, 0)") % test_utils::SIMPLE_TABLE);
Expand Down Expand Up @@ -91,7 +98,7 @@ CassError PolicyTool::init_return_error(CassSession* session, int n, CassConsist
return CASS_OK;
}

void PolicyTool::add_coordinator(cass::Address address)
void PolicyTool::add_coordinator(std::string address)
{
if(coordinators.count(address) == 0) {
coordinators.insert(std::make_pair(address, 1));
Expand All @@ -100,7 +107,7 @@ void PolicyTool::add_coordinator(cass::Address address)
}
}

void PolicyTool::assert_queried(cass::Address address, int n)
void PolicyTool::assert_queried(std::string address, int n)
{
if(coordinators.count(address) != 0) {
int c = coordinators[address];
Expand All @@ -110,7 +117,7 @@ void PolicyTool::assert_queried(cass::Address address, int n)
}
}

void PolicyTool::assertQueriedAtLeast(cass::Address address, int n)
void PolicyTool::assertQueriedAtLeast(std::string address, int n)
{
int queried = coordinators[address];
BOOST_REQUIRE(queried >= n);
Expand All @@ -126,7 +133,7 @@ void PolicyTool::query(CassSession* session, int n, CassConsistency cl)
cass_statement_set_consistency(statement.get(), cl);
test_utils::CassFuturePtr future(cass_session_execute(session, statement.get()));
test_utils::wait_and_check_error(future.get());
add_coordinator(static_cast<cass::ResponseFuture*>(future->from())->get_host_address());
add_coordinator(cass::get_host_from_future(future.get()));
}
}

Expand All @@ -143,7 +150,7 @@ CassError PolicyTool::query_return_error(CassSession* session, int n, CassConsis
if (rc != CASS_OK) {
return rc;
}
add_coordinator(static_cast<cass::ResponseFuture*>(future->from())->get_host_address());
add_coordinator(cass::get_host_from_future(future.get()));
}
return CASS_OK;
}
12 changes: 7 additions & 5 deletions test/integration_tests/src/policy_tools.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

#pragma once

#include "address.hpp"
#include "test_utils.hpp"

struct PolicyTool {

std::map<cass::Address, int> coordinators;
std::map<std::string, int> coordinators;

void show_coordinators(); // show what queries went to what node IP.
void reset_coordinators();
Expand All @@ -39,11 +38,14 @@ struct PolicyTool {
void create_schema(CassSession* session,
int replicationFactor);

void add_coordinator(cass::Address address);
void create_schema(CassSession* session,
int dc1, int dc2);

void add_coordinator(std::string address);

void assert_queried(cass::Address address, int n);
void assert_queried(std::string address, int n);

void assertQueriedAtLeast(cass::Address address, int n);
void assertQueriedAtLeast(std::string address, int n);

void query(CassSession* session, int n, CassConsistency cl);

Expand Down
1 change: 1 addition & 0 deletions test/integration_tests/src/test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ const char* get_value_type(CassValueType type) {

//-----------------------------------------------------------------------------------
const std::string CREATE_KEYSPACE_SIMPLE_FORMAT = "CREATE KEYSPACE %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }";
const std::string CREATE_KEYSPACE_NETWORK_FORMAT = "CREATE KEYSPACE %s WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : %d, 'dc2' : %d }";
const std::string CREATE_KEYSPACE_GENERIC_FORMAT = "CREATE KEYSPACE {0} WITH replication = { 'class' : '{1}', {2} }";
const std::string SIMPLE_KEYSPACE = "ks";
const std::string SIMPLE_TABLE = "test";
Expand Down
1 change: 1 addition & 0 deletions test/integration_tests/src/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ extern const char* CREATE_TABLE_TIME_SERIES;
extern const char* CREATE_TABLE_SIMPLE;

extern const std::string CREATE_KEYSPACE_SIMPLE_FORMAT;
extern const std::string CREATE_KEYSPACE_NETWORK_FORMAT;
extern const std::string CREATE_KEYSPACE_GENERIC_FORMAT;
extern const std::string SIMPLE_KEYSPACE;
extern const std::string SIMPLE_TABLE;
Expand Down
2 changes: 1 addition & 1 deletion test/unit_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 2.6.4)

find_package(Boost 1.41.0 COMPONENTS system chrono thread unit_test_framework date_time program_options REQUIRED)
find_package(Boost 1.55.0 COMPONENTS system chrono thread unit_test_framework date_time program_options REQUIRED)

set(Boost_USE_STATIC_LIBS OFF)
set(Boost_USE_STATIC_RUNTIME OFF)
Expand Down

0 comments on commit 8873d41

Please sign in to comment.