diff --git a/test/integration_tests/src/schema_agreement.cpp b/test/integration_tests/src/schema_agreement.cpp new file mode 100644 index 000000000..ca0384649 --- /dev/null +++ b/test/integration_tests/src/schema_agreement.cpp @@ -0,0 +1,168 @@ +/* + Copyright (c) 2014 DataStax + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#define BOOST_TEST_DYN_LINK +#ifdef STAND_ALONE +# define BOOST_TEST_MODULE cassandra +#endif + +#include "cassandra.h" +#include "test_utils.hpp" +#include "cql_ccm_bridge.hpp" + +#include +#include +#include +#include + +#include + +test_utils::MultipleNodesTest* inst; +struct ClusterInit { + ClusterInit() { + inst = new test_utils::MultipleNodesTest(3, 0); + cass_cluster_set_log_level(inst->cluster, CASS_LOG_DEBUG); + } + + ~ClusterInit() { + delete inst; + } +}; + +BOOST_GLOBAL_FIXTURE(ClusterInit) + +struct SessionInit { + SessionInit() + : session(NULL), + log_counter("", true) { + cass_cluster_set_log_callback(inst->cluster, test_utils::count_message_log_callback, &log_counter); + new_session(); + } + + ~SessionInit() { + close_session(); + } + + void new_session() { + close_session(); + test_utils::CassFuturePtr connect_future(cass_cluster_connect(inst->cluster)); + test_utils::wait_and_check_error(connect_future.get()); + session = cass_future_get_session(connect_future.get()); + } + + void close_session() { + if (session != NULL) { + test_utils::CassFuturePtr close_future(cass_session_close(session)); + cass_future_wait(close_future.get()); + session = NULL; + } + } + + CassSession* session; + test_utils::LogData log_counter; +}; + +BOOST_FIXTURE_TEST_SUITE(schema_agreement, SessionInit) + +// only doing a keyspace for now since there is no difference for types or tables +BOOST_AUTO_TEST_CASE(keyspace_add_drop) +{ + log_counter.reset("Found schema agreement in"); + + // "USE" in fast succession would normally fail on the next node if the previous query did not wait + const std::string use_simple = str(boost::format("USE %s") % test_utils::SIMPLE_KEYSPACE); + test_utils::execute_query(session, str(boost::format(test_utils::CREATE_KEYSPACE_SIMPLE_FORMAT) + % test_utils::SIMPLE_KEYSPACE % 2)); + test_utils::execute_query(session, use_simple); + test_utils::execute_query(session, "USE system"); + test_utils::execute_query(session, str(boost::format("DROP KEYSPACE %s") % test_utils::SIMPLE_KEYSPACE)); + test_utils::CassResultPtr result; + CassError rc = test_utils::execute_query_with_error(session, use_simple, &result); + BOOST_CHECK_EQUAL(rc, CASS_ERROR_SERVER_INVALID_QUERY); + + // close session to flush logger + close_session(); + + BOOST_CHECK_EQUAL(log_counter.message_count, 2); +} + +BOOST_AUTO_TEST_CASE(agreement_node_down) { + log_counter.reset("ControlConnection: Node 127.0.0.3 is down"); + + inst->ccm->stop(3); + + size_t t = 0; + size_t max_tries = 15; + for (; t < max_tries; ++t) { + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + if (log_counter.message_count > 0) break; + } + BOOST_REQUIRE_MESSAGE(t < max_tries, "Timed out waiting for node down log message"); + + log_counter.reset("Found schema agreement in"); + const std::string use_simple = str(boost::format("USE %s") % test_utils::SIMPLE_KEYSPACE); + test_utils::execute_query(session, str(boost::format(test_utils::CREATE_KEYSPACE_SIMPLE_FORMAT) + % test_utils::SIMPLE_KEYSPACE % 2)); + test_utils::execute_query(session, use_simple); + test_utils::execute_query(session, "USE system"); + test_utils::execute_query(session, str(boost::format("DROP KEYSPACE %s") % test_utils::SIMPLE_KEYSPACE)); + test_utils::CassResultPtr result; + CassError rc = test_utils::execute_query_with_error(session, use_simple, &result); + BOOST_CHECK_EQUAL(rc, CASS_ERROR_SERVER_INVALID_QUERY); + + // close session to flush logger + close_session(); + + BOOST_CHECK_EQUAL(log_counter.message_count, 2); + + inst->ccm->start(3); +} + +#define MAX_SCHEMA_AGREEMENT_WAIT_MS 10000 +BOOST_AUTO_TEST_CASE(no_agreement_timeout) { + + test_utils::CassFuturePtr prepared_future( + cass_session_prepare(session, cass_string_init("UPDATE system.peers SET schema_version=? WHERE peer='127.0.0.1'"))); + test_utils::wait_and_check_error(prepared_future.get()); + test_utils::CassPreparedPtr prep = cass_future_get_prepared(prepared_future.get()); + test_utils::CassStatementPtr schema_stmt(cass_prepared_bind(prep.get())); + + log_counter.reset("SchemaChangeHandler: No schema aggreement on live nodes after "); + test_utils::CassStatementPtr create_stmt = + cass_statement_new( + cass_string_init(str(boost::format(test_utils::CREATE_KEYSPACE_SIMPLE_FORMAT) + % test_utils::SIMPLE_KEYSPACE % 2).c_str()), 0); + test_utils::CassFuturePtr create_future(cass_session_execute(session, create_stmt.get())); + + boost::chrono::steady_clock::time_point end = + boost::chrono::steady_clock::now() + boost::chrono::milliseconds(MAX_SCHEMA_AGREEMENT_WAIT_MS + 1000); + // mess with system.peers for more than the wait time + // this is hitting more than the required node, but should cycle around to the one being queried enough + do { + cass_statement_bind_uuid(schema_stmt.get(), 0, test_utils::generate_random_uuid()); + + test_utils::CassFuturePtr future(cass_session_execute(session, schema_stmt.get())); + cass_future_wait(future.get()); + BOOST_REQUIRE_EQUAL(cass_future_error_code(future.get()), CASS_OK); + } while(boost::chrono::steady_clock::now() < end && log_counter.message_count == 0); + + cass_future_wait(create_future.get()); + BOOST_CHECK_EQUAL(cass_future_error_code(create_future.get()), CASS_OK); + close_session(); + BOOST_CHECK_EQUAL(log_counter.message_count, 1); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/integration_tests/src/test_utils.cpp b/test/integration_tests/src/test_utils.cpp index 1c4b02c95..61c573590 100644 --- a/test/integration_tests/src/test_utils.cpp +++ b/test/integration_tests/src/test_utils.cpp @@ -78,6 +78,8 @@ void count_message_log_callback(cass_uint64_t time, if (log_data->output_messages) { fprintf(stderr, "Log: %s\n", str.c_str()); } + boost::lock_guard l(*log_data); + if (log_data->message.empty()) return; if (str.find(log_data->message) != std::string::npos) { log_data->message_count++; } @@ -143,6 +145,7 @@ MultipleNodesTest::MultipleNodesTest(int num_nodes_dc1, int num_nodes_dc2, int p cass_cluster_set_request_timeout(cluster, 10000); cass_cluster_set_num_threads_io(cluster, 2); cass_cluster_set_protocol_version(cluster, protocol_version); + cass_cluster_set_log_level(cluster, CASS_LOG_DEBUG); } MultipleNodesTest::~MultipleNodesTest() { diff --git a/test/integration_tests/src/test_utils.hpp b/test/integration_tests/src/test_utils.hpp index b7e3ae0bd..590c41cb9 100644 --- a/test/integration_tests/src/test_utils.hpp +++ b/test/integration_tests/src/test_utils.hpp @@ -19,11 +19,13 @@ #include #include -#include -#include #include +#include #include -#include +#include +#include +#include +#include #include "cassandra.h" @@ -49,15 +51,23 @@ namespace test_utils { extern const cass_duration_t ONE_MILLISECOND_IN_MICROS; extern const cass_duration_t ONE_SECOND_IN_MICROS; -struct LogData { +struct LogData : public boost::basic_lockable_adapter { LogData(const std::string& message, bool output_messages = false) : message(message) , message_count(0) , output_messages(output_messages) {} - const std::string message; - boost::atomic message_count; + void reset(const std::string& msg) { + boost::lock_guard l(*this); + message = msg; + message_count = 0; + } + + boost::mutex m; + std::string message; + size_t message_count; bool output_messages; + }; void count_message_log_callback(cass_uint64_t time,