From fa95177b1f6fc9b4f21716686710e97134422039 Mon Sep 17 00:00:00 2001 From: mpenick Date: Thu, 4 Sep 2014 08:52:51 -0700 Subject: [PATCH] Added additional integration tests for CPP-145 - Added handling for error/invalid responses in SchemaChangeHandler - Created some utility methods for handling error/invalid responses --- src/connection.cpp | 6 +- src/control_connection.cpp | 35 +++----- src/error_response.cpp | 31 +++++++ src/error_response.hpp | 8 ++ src/schema_change_handler.cpp | 29 ++++--- .../src/control_connection.cpp | 83 +++++++++++++++++-- test/integration_tests/src/test_utils.cpp | 10 ++- test/integration_tests/src/test_utils.hpp | 8 +- 8 files changed, 162 insertions(+), 48 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index 0bd5c5b34..a5748d228 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -57,11 +57,7 @@ void Connection::StartupHandler::on_set(ResponseMessage* response) { } else if (error->code() == CQL_ERROR_BAD_CREDENTIALS) { connection_->auth_error_ = error->message(); } - std::ostringstream ss; - ss << "Error response: '" << error->message() - << "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') - << CASS_ERROR(CASS_ERROR_SOURCE_SERVER, error->code()) << ")"; - connection_->notify_error(ss.str()); + connection_->notify_error(error_response_message("Error response", error)); break; } diff --git a/src/control_connection.cpp b/src/control_connection.cpp index ea3b731cf..7325dde7b 100644 --- a/src/control_connection.cpp +++ b/src/control_connection.cpp @@ -222,11 +222,11 @@ void ControlConnection::on_node_refresh(const MultipleRequestHandler::ResponseVe local_result->decode_first_row(); update_node_info(host, &local_result->first_row()); } else { - logger_->debug("No row found in %s's local system table", + logger_->debug("ControlConnection: No row found in %s's local system table", connection_->address_string().c_str()); } } else { - logger_->debug("Host %s from local system table not found", + logger_->debug("ControlConnection: Host %s from local system table not found", connection_->address_string().c_str()); } } @@ -400,30 +400,17 @@ void ControlConnection::update_node_info(SharedRefPtr host, const Row* row } bool ControlConnection::handle_query_invalid_response(Response* response) { - if (response->opcode() == CQL_OPCODE_RESULT) { - return false; - } - - std::ostringstream ss; - if (response->opcode() == CQL_OPCODE_ERROR) { - ErrorResponse* error - = static_cast(response); - ss << "Error while querying system tables: '" << error->message() - << "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') - << CASS_ERROR(CASS_ERROR_SOURCE_SERVER, error->code()) << ")"; - logger_->error(ss.str().c_str()); - } else { - ss << "Unexpected opcode while querying system tables: " << opcode_to_string(response->opcode()); - logger_->error(ss.str().c_str()); - } - - if (connection_ != NULL) { - connection_->defunct(); + if (check_error_or_invalid_response("ControlConnection", CQL_OPCODE_RESULT, + response, logger_)) { + if (connection_ != NULL) { + connection_->defunct(); + } + return true; } - - return true; + return false; } + void ControlConnection::handle_query_failure(CassError code, const std::string& message) { // TODO(mpenick): This is a placeholder and might not be the right action for // all error scenarios @@ -461,7 +448,7 @@ void ControlConnection::on_connection_event(EventResponse* response) { if (host) { session_->on_remove(host); } else { - logger_->debug("Session: Tried to remove host %s that doesn't exist", address_str.c_str()); + logger_->debug("ControlConnection: Tried to remove host %s that doesn't exist", address_str.c_str()); } break; } diff --git a/src/error_response.cpp b/src/error_response.cpp index f8a4e837a..d775d6ca3 100644 --- a/src/error_response.cpp +++ b/src/error_response.cpp @@ -16,8 +16,12 @@ #include "error_response.hpp" +#include "logger.hpp" #include "serialization.hpp" +#include +#include + namespace cass { bool ErrorResponse::decode(int version, char* buffer, size_t size) { @@ -31,4 +35,31 @@ bool ErrorResponse::decode(int version, char* buffer, size_t size) { return true; } +std::string error_response_message(const std::string& prefix, ErrorResponse* error) { + std::ostringstream ss; + ss << prefix << ": '" << error->message() + << "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') + << CASS_ERROR(CASS_ERROR_SOURCE_SERVER, error->code()) << ")"; + return ss.str(); +} + +bool check_error_or_invalid_response(const std::string& prefix, uint8_t expected_opcode, + Response* response, Logger* logger) { + if (response->opcode() == expected_opcode) { + return false; + } + + if (response->opcode() == CQL_OPCODE_ERROR) { + std::ostringstream ss; + ss << prefix << ": Error response"; + logger->error(error_response_message(ss.str(), static_cast(response)).c_str()); + } else { + std::ostringstream ss; + ss << prefix << ": Unexpected opcode " << opcode_to_string(response->opcode()); + logger->error(ss.str().c_str()); + } + + return true; +} + } // namespace cass diff --git a/src/error_response.hpp b/src/error_response.hpp index d0aabf37c..1de077297 100644 --- a/src/error_response.hpp +++ b/src/error_response.hpp @@ -21,11 +21,15 @@ #include "constants.hpp" #include "scoped_ptr.hpp" +#include "third_party/boost/boost/cstdint.hpp" + #include #include namespace cass { +class Logger; + class ErrorResponse : public Response { public: ErrorResponse() @@ -64,6 +68,10 @@ class ErrorResponse : public Response { size_t prepared_id_size_; }; +std::string error_response_message(const std::string& prefix, ErrorResponse* error); +bool check_error_or_invalid_response(const std::string& prefix, uint8_t expected_opcode, + Response* response, Logger* logger); + } // namespace cass #endif diff --git a/src/schema_change_handler.cpp b/src/schema_change_handler.cpp index 5ecb3b002..58ba7e65a 100644 --- a/src/schema_change_handler.cpp +++ b/src/schema_change_handler.cpp @@ -19,6 +19,7 @@ #include "address.hpp" #include "connection.hpp" #include "control_connection.hpp" +#include "error_response.hpp" #include "get_time.hpp" #include "logger.hpp" #include "result_iterator.hpp" @@ -32,7 +33,6 @@ #define MAX_SCHEMA_AGREEMENT_WAIT_MS 10000 #define RETRY_SCHEMA_AGREEMENT_WAIT_MS 200 - namespace cass { SchemaChangeHandler::SchemaChangeHandler(Connection* connection, @@ -67,7 +67,7 @@ bool SchemaChangeHandler::has_schema_agreement(const ResponseVec& responses) { current_version = boost::string_ref(v->buffer().data(), v->buffer().size()); } } else { - logger_->debug("No row found in %s's local system table", + logger_->debug("SchemaChangeHandler: No row found in %s's local system table", connection()->address_string().c_str()); } @@ -104,19 +104,29 @@ bool SchemaChangeHandler::has_schema_agreement(const ResponseVec& responses) { void SchemaChangeHandler::on_set(const ResponseVec& responses) { elaspsed_ += get_time_since_epoch() - start_; + bool has_error = false; + for (MultipleRequestHandler::ResponseVec::const_iterator it = responses.begin(), + end = responses.end(); it != end; ++it) { + if (check_error_or_invalid_response("SchemaChangeHandler", CQL_OPCODE_RESULT, + *it, logger_)) { + has_error = true; + } + } + if (has_error) return; + if (has_schema_agreement(responses)) { - logger_->debug("Found schema agreement in %llu ms", elaspsed_); + logger_->debug("SchemaChangeHandler: Found schema agreement in %llu ms", elaspsed_); request_handler_->set_response(request_response_); return; } else if (elaspsed_ >= MAX_SCHEMA_AGREEMENT_WAIT_MS) { - logger_->warn("No schema aggreement on live nodes after %llu ms. " + logger_->warn("SchemaChangeHandler: No schema aggreement on live nodes after %llu ms. " "Schema may not be up-to-date on some nodes.", elaspsed_); request_handler_->set_response(request_response_); return; } - logger_->debug("Schema still not up-to-date on some live nodes. " + logger_->debug("SchemaChangeHandler: Schema still not up-to-date on some live nodes. " "Trying again in %d ms", RETRY_SCHEMA_AGREEMENT_WAIT_MS); // Try again @@ -131,20 +141,19 @@ void SchemaChangeHandler::on_set(const ResponseVec& responses) { void SchemaChangeHandler::on_error(CassError code, const std::string& message) { std::ostringstream ss; - ss << "An error occured waiting for schema agreement: '" << message - << "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') - << CASS_ERROR(CASS_ERROR_SOURCE_SERVER, code) << ")"; + ss << "SchemaChangeHandler: An error occured waiting for schema agreement: '" << message + << "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') << code << ")"; logger_->error(ss.str().c_str()); request_handler_->set_response(request_response_); } void SchemaChangeHandler::on_timeout() { - logger_->error("A timeout occured waiting for schema agreement"); + logger_->error("SchemaChangeHandler: A timeout occured waiting for schema agreement"); request_handler_->set_response(request_response_); } void SchemaChangeHandler::on_closing() { - logger_->warn("Connection closed while waiting for schema agreement"); + logger_->warn("SchemaChangeHandler: Connection closed while waiting for schema agreement"); request_handler_->set_response(request_response_); } diff --git a/test/integration_tests/src/control_connection.cpp b/test/integration_tests/src/control_connection.cpp index 9c3b52c43..a1988b51c 100644 --- a/test/integration_tests/src/control_connection.cpp +++ b/test/integration_tests/src/control_connection.cpp @@ -27,6 +27,7 @@ #include "test_utils.hpp" #include +#include struct ControlConnectionTests { ControlConnectionTests() {} @@ -79,9 +80,7 @@ BOOST_AUTO_TEST_CASE(test_node_discovery) // Only add a single IP test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0); - test_utils::CassFuturePtr session_future(cass_cluster_connect(cluster.get())); - test_utils::wait_and_check_error(session_future.get()); - test_utils::CassSessionPtr session(cass_future_get_session(session_future.get())); + test_utils::CassSessionPtr session(test_utils::create_session(cluster)); cass::AddressSet addresses; for (int i = 0; i < 3; ++i) { @@ -116,9 +115,7 @@ BOOST_AUTO_TEST_CASE(test_node_discovery_invalid_ips) // Only add a single valid IP test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0); - test_utils::CassFuturePtr session_future(cass_cluster_connect(cluster.get())); - test_utils::wait_and_check_error(session_future.get()); - test_utils::CassSessionPtr session(cass_future_get_session(session_future.get())); + test_utils::CassSessionPtr session(test_utils::create_session(cluster)); cass::AddressSet addresses; for (int i = 0; i < 4; ++i) { @@ -134,4 +131,78 @@ BOOST_AUTO_TEST_CASE(test_node_discovery_invalid_ips) BOOST_CHECK_EQUAL(log_data->message_count, 3); } +BOOST_AUTO_TEST_CASE(test_node_discovery_no_local_rows) +{ + test_utils::CassClusterPtr cluster(cass_cluster_new()); + + const cql::cql_ccm_bridge_configuration_t& conf = cql::get_ccm_bridge_configuration(); + boost::shared_ptr ccm = cql::cql_ccm_bridge_t::create(conf, "test", 3, 0); + + // Ensure RR policy + cass_cluster_set_load_balance_round_robin(cluster.get());; + + // Only add a single valid IP + test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0); + + { + test_utils::CassSessionPtr session(test_utils::create_session(cluster)); + test_utils::execute_query(session.get(), "DELETE FROM system.local WHERE key = 'local'"); + } + + test_utils::CassSessionPtr session(test_utils::create_session(cluster)); + + cass::AddressSet addresses; + for (int i = 0; i < 3; ++i) { + CassString query = cass_string_init("SELECT * FROM system.schema_keyspaces"); + test_utils::CassStatementPtr statement(cass_statement_new(query, 0)); + test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get())); + addresses.insert(static_cast(future->from())->get_host_address()); + } + + BOOST_CHECK(addresses.size() == 3); +} + +BOOST_AUTO_TEST_CASE(test_node_discovery_no_rpc_addresss) +{ + boost::scoped_ptr log_data(new test_utils::LogData("No rpc_address for host 127.0.0.2 in system.peers on 127.0.0.1. Ignoring this entry.")); + + { + test_utils::CassClusterPtr cluster(cass_cluster_new()); + + const cql::cql_ccm_bridge_configuration_t& conf = cql::get_ccm_bridge_configuration(); + boost::shared_ptr ccm = cql::cql_ccm_bridge_t::create(conf, "test", 3, 0); + + cass_cluster_set_log_callback(cluster.get(), test_utils::count_message_log_callback, log_data.get()); + + // Ensure RR policy + cass_cluster_set_load_balance_round_robin(cluster.get());; + + // Only add a single valid IP + test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0); + + { + test_utils::CassSessionPtr session(test_utils::create_session(cluster)); + std::ostringstream ss; + ss << "UPDATE system.peers SET rpc_address = null WHERE peer = '" << conf.ip_prefix() << "2'"; + test_utils::execute_query(session.get(), ss.str()); + } + + test_utils::CassSessionPtr session(test_utils::create_session(cluster)); + + cass::AddressSet addresses; + for (int i = 0; i < 3; ++i) { + CassString query = cass_string_init("SELECT * FROM system.schema_keyspaces"); + test_utils::CassStatementPtr statement(cass_statement_new(query, 0)); + test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get())); + addresses.insert(static_cast(future->from())->get_host_address()); + } + + // This should only contain 2 address because one pee is ignored + BOOST_CHECK(addresses.size() == 2); + } + + BOOST_CHECK(log_data->message_count > 0); +} + + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/integration_tests/src/test_utils.cpp b/test/integration_tests/src/test_utils.cpp index 6fe40dc10..36c559f38 100644 --- a/test/integration_tests/src/test_utils.cpp +++ b/test/integration_tests/src/test_utils.cpp @@ -75,7 +75,9 @@ void count_message_log_callback(cass_uint64_t time, void* data) { LogData* log_data = reinterpret_cast(data); std::string str(message.data, message.length); - fprintf(stderr, "Log message: %s\n", str.c_str()); + if (log_data->output_messages) { + fprintf(stderr, "Log: %s\n", str.c_str()); + } if (str.find(log_data->message) != std::string::npos) { log_data->message_count++; } @@ -165,6 +167,12 @@ void initialize_contact_points(CassCluster* cluster, std::string prefix, int num } } +CassSessionPtr create_session(CassClusterPtr cluster) { + test_utils::CassFuturePtr session_future(cass_cluster_connect(cluster.get())); + test_utils::wait_and_check_error(session_future.get()); + return test_utils::CassSessionPtr(cass_future_get_session(session_future.get())); +} + void execute_query(CassSession* session, const std::string& query, CassResultPtr* result, diff --git a/test/integration_tests/src/test_utils.hpp b/test/integration_tests/src/test_utils.hpp index 7ab16b992..63a804597 100644 --- a/test/integration_tests/src/test_utils.hpp +++ b/test/integration_tests/src/test_utils.hpp @@ -50,12 +50,14 @@ extern const cass_duration_t ONE_MILLISECOND_IN_MICROS; extern const cass_duration_t ONE_SECOND_IN_MICROS; struct LogData { - LogData(const std::string& message) + LogData(const std::string& message, bool output_messages = false) : message(message) - , message_count(0) {} + , message_count(0) + , output_messages(output_messages) {} const std::string message; boost::atomic message_count; + bool output_messages; }; void count_message_log_callback(cass_uint64_t time, @@ -488,6 +490,8 @@ void initialize_contact_points(CassCluster* cluster, std::string prefix, int num const char* get_value_type(CassValueType type); +CassSessionPtr create_session(CassClusterPtr cluster); + CassError execute_query_with_error(CassSession* session, const std::string& query, CassResultPtr* result = NULL,