Skip to content

Commit

Permalink
Schema agreement tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aholmberg committed Sep 6, 2014
1 parent 8873d41 commit c2d5188
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 6 deletions.
168 changes: 168 additions & 0 deletions test/integration_tests/src/schema_agreement.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright (c) 2014 DataStax
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#define BOOST_TEST_DYN_LINK
#ifdef STAND_ALONE
# define BOOST_TEST_MODULE cassandra
#endif

#include "cassandra.h"
#include "test_utils.hpp"
#include "cql_ccm_bridge.hpp"

#include <boost/format.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/test/unit_test.hpp>
#include <boost/thread.hpp>

#include <algorithm>

test_utils::MultipleNodesTest* inst;
struct ClusterInit {
ClusterInit() {
inst = new test_utils::MultipleNodesTest(3, 0);
cass_cluster_set_log_level(inst->cluster, CASS_LOG_DEBUG);
}

~ClusterInit() {
delete inst;
}
};

BOOST_GLOBAL_FIXTURE(ClusterInit)

struct SessionInit {
SessionInit()
: session(NULL),
log_counter("", true) {
cass_cluster_set_log_callback(inst->cluster, test_utils::count_message_log_callback, &log_counter);
new_session();
}

~SessionInit() {
close_session();
}

void new_session() {
close_session();
test_utils::CassFuturePtr connect_future(cass_cluster_connect(inst->cluster));
test_utils::wait_and_check_error(connect_future.get());
session = cass_future_get_session(connect_future.get());
}

void close_session() {
if (session != NULL) {
test_utils::CassFuturePtr close_future(cass_session_close(session));
cass_future_wait(close_future.get());
session = NULL;
}
}

CassSession* session;
test_utils::LogData log_counter;
};

BOOST_FIXTURE_TEST_SUITE(schema_agreement, SessionInit)

// only doing a keyspace for now since there is no difference for types or tables
BOOST_AUTO_TEST_CASE(keyspace_add_drop)
{
log_counter.reset("Found schema agreement in");

// "USE" in fast succession would normally fail on the next node if the previous query did not wait
const std::string use_simple = str(boost::format("USE %s") % test_utils::SIMPLE_KEYSPACE);
test_utils::execute_query(session, str(boost::format(test_utils::CREATE_KEYSPACE_SIMPLE_FORMAT)
% test_utils::SIMPLE_KEYSPACE % 2));
test_utils::execute_query(session, use_simple);
test_utils::execute_query(session, "USE system");
test_utils::execute_query(session, str(boost::format("DROP KEYSPACE %s") % test_utils::SIMPLE_KEYSPACE));
test_utils::CassResultPtr result;
CassError rc = test_utils::execute_query_with_error(session, use_simple, &result);
BOOST_CHECK_EQUAL(rc, CASS_ERROR_SERVER_INVALID_QUERY);

// close session to flush logger
close_session();

BOOST_CHECK_EQUAL(log_counter.message_count, 2);
}

BOOST_AUTO_TEST_CASE(agreement_node_down) {
log_counter.reset("ControlConnection: Node 127.0.0.3 is down");

inst->ccm->stop(3);

size_t t = 0;
size_t max_tries = 15;
for (; t < max_tries; ++t) {
boost::this_thread::sleep_for(boost::chrono::seconds(1));
if (log_counter.message_count > 0) break;
}
BOOST_REQUIRE_MESSAGE(t < max_tries, "Timed out waiting for node down log message");

log_counter.reset("Found schema agreement in");
const std::string use_simple = str(boost::format("USE %s") % test_utils::SIMPLE_KEYSPACE);
test_utils::execute_query(session, str(boost::format(test_utils::CREATE_KEYSPACE_SIMPLE_FORMAT)
% test_utils::SIMPLE_KEYSPACE % 2));
test_utils::execute_query(session, use_simple);
test_utils::execute_query(session, "USE system");
test_utils::execute_query(session, str(boost::format("DROP KEYSPACE %s") % test_utils::SIMPLE_KEYSPACE));
test_utils::CassResultPtr result;
CassError rc = test_utils::execute_query_with_error(session, use_simple, &result);
BOOST_CHECK_EQUAL(rc, CASS_ERROR_SERVER_INVALID_QUERY);

// close session to flush logger
close_session();

BOOST_CHECK_EQUAL(log_counter.message_count, 2);

inst->ccm->start(3);
}

#define MAX_SCHEMA_AGREEMENT_WAIT_MS 10000
BOOST_AUTO_TEST_CASE(no_agreement_timeout) {

test_utils::CassFuturePtr prepared_future(
cass_session_prepare(session, cass_string_init("UPDATE system.peers SET schema_version=? WHERE peer='127.0.0.1'")));
test_utils::wait_and_check_error(prepared_future.get());
test_utils::CassPreparedPtr prep = cass_future_get_prepared(prepared_future.get());
test_utils::CassStatementPtr schema_stmt(cass_prepared_bind(prep.get()));

log_counter.reset("SchemaChangeHandler: No schema aggreement on live nodes after ");
test_utils::CassStatementPtr create_stmt =
cass_statement_new(
cass_string_init(str(boost::format(test_utils::CREATE_KEYSPACE_SIMPLE_FORMAT)
% test_utils::SIMPLE_KEYSPACE % 2).c_str()), 0);
test_utils::CassFuturePtr create_future(cass_session_execute(session, create_stmt.get()));

boost::chrono::steady_clock::time_point end =
boost::chrono::steady_clock::now() + boost::chrono::milliseconds(MAX_SCHEMA_AGREEMENT_WAIT_MS + 1000);
// mess with system.peers for more than the wait time
// this is hitting more than the required node, but should cycle around to the one being queried enough
do {
cass_statement_bind_uuid(schema_stmt.get(), 0, test_utils::generate_random_uuid());

test_utils::CassFuturePtr future(cass_session_execute(session, schema_stmt.get()));
cass_future_wait(future.get());
BOOST_REQUIRE_EQUAL(cass_future_error_code(future.get()), CASS_OK);
} while(boost::chrono::steady_clock::now() < end && log_counter.message_count == 0);

cass_future_wait(create_future.get());
BOOST_CHECK_EQUAL(cass_future_error_code(create_future.get()), CASS_OK);
close_session();
BOOST_CHECK_EQUAL(log_counter.message_count, 1);
}

BOOST_AUTO_TEST_SUITE_END()
3 changes: 3 additions & 0 deletions test/integration_tests/src/test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void count_message_log_callback(cass_uint64_t time,
if (log_data->output_messages) {
fprintf(stderr, "Log: %s\n", str.c_str());
}
boost::lock_guard<LogData> l(*log_data);
if (log_data->message.empty()) return;
if (str.find(log_data->message) != std::string::npos) {
log_data->message_count++;
}
Expand Down Expand Up @@ -143,6 +145,7 @@ MultipleNodesTest::MultipleNodesTest(int num_nodes_dc1, int num_nodes_dc2, int p
cass_cluster_set_request_timeout(cluster, 10000);
cass_cluster_set_num_threads_io(cluster, 2);
cass_cluster_set_protocol_version(cluster, protocol_version);
cass_cluster_set_log_level(cluster, CASS_LOG_DEBUG);
}

MultipleNodesTest::~MultipleNodesTest() {
Expand Down
22 changes: 16 additions & 6 deletions test/integration_tests/src/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
#include <string>
#include <limits>

#include <boost/asio/ip/address.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/chrono.hpp>
#include <boost/atomic.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/lockable_adapter.hpp>
#include <boost/thread/lock_guard.hpp>

#include "cassandra.h"

Expand All @@ -49,15 +51,23 @@ namespace test_utils {
extern const cass_duration_t ONE_MILLISECOND_IN_MICROS;
extern const cass_duration_t ONE_SECOND_IN_MICROS;

struct LogData {
struct LogData : public boost::basic_lockable_adapter<boost::mutex> {
LogData(const std::string& message, bool output_messages = false)
: message(message)
, message_count(0)
, output_messages(output_messages) {}

const std::string message;
boost::atomic<int> message_count;
void reset(const std::string& msg) {
boost::lock_guard<LogData> l(*this);
message = msg;
message_count = 0;
}

boost::mutex m;
std::string message;
size_t message_count;
bool output_messages;

};

void count_message_log_callback(cass_uint64_t time,
Expand Down

0 comments on commit c2d5188

Please sign in to comment.