Skip to content

Commit

Permalink
Added additional integration tests for CPP-145
Browse files Browse the repository at this point in the history
- Added handling for error/invalid responses in SchemaChangeHandler
- Created some utility methods for handling error/invalid responses
  • Loading branch information
mpenick committed Sep 4, 2014
1 parent 60745b0 commit fa95177
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 48 deletions.
6 changes: 1 addition & 5 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ void Connection::StartupHandler::on_set(ResponseMessage* response) {
} else if (error->code() == CQL_ERROR_BAD_CREDENTIALS) {
connection_->auth_error_ = error->message();
}
std::ostringstream ss;
ss << "Error response: '" << error->message()
<< "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0')
<< CASS_ERROR(CASS_ERROR_SOURCE_SERVER, error->code()) << ")";
connection_->notify_error(ss.str());
connection_->notify_error(error_response_message("Error response", error));
break;
}

Expand Down
35 changes: 11 additions & 24 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ void ControlConnection::on_node_refresh(const MultipleRequestHandler::ResponseVe
local_result->decode_first_row();
update_node_info(host, &local_result->first_row());
} else {
logger_->debug("No row found in %s's local system table",
logger_->debug("ControlConnection: No row found in %s's local system table",
connection_->address_string().c_str());
}
} else {
logger_->debug("Host %s from local system table not found",
logger_->debug("ControlConnection: Host %s from local system table not found",
connection_->address_string().c_str());
}
}
Expand Down Expand Up @@ -400,30 +400,17 @@ void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row
}

bool ControlConnection::handle_query_invalid_response(Response* response) {
if (response->opcode() == CQL_OPCODE_RESULT) {
return false;
}

std::ostringstream ss;
if (response->opcode() == CQL_OPCODE_ERROR) {
ErrorResponse* error
= static_cast<ErrorResponse*>(response);
ss << "Error while querying system tables: '" << error->message()
<< "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0')
<< CASS_ERROR(CASS_ERROR_SOURCE_SERVER, error->code()) << ")";
logger_->error(ss.str().c_str());
} else {
ss << "Unexpected opcode while querying system tables: " << opcode_to_string(response->opcode());
logger_->error(ss.str().c_str());
}

if (connection_ != NULL) {
connection_->defunct();
if (check_error_or_invalid_response("ControlConnection", CQL_OPCODE_RESULT,
response, logger_)) {
if (connection_ != NULL) {
connection_->defunct();
}
return true;
}

return true;
return false;
}


void ControlConnection::handle_query_failure(CassError code, const std::string& message) {
// TODO(mpenick): This is a placeholder and might not be the right action for
// all error scenarios
Expand Down Expand Up @@ -461,7 +448,7 @@ void ControlConnection::on_connection_event(EventResponse* response) {
if (host) {
session_->on_remove(host);
} else {
logger_->debug("Session: Tried to remove host %s that doesn't exist", address_str.c_str());
logger_->debug("ControlConnection: Tried to remove host %s that doesn't exist", address_str.c_str());
}
break;
}
Expand Down
31 changes: 31 additions & 0 deletions src/error_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

#include "error_response.hpp"

#include "logger.hpp"
#include "serialization.hpp"

#include <iomanip>
#include <sstream>

namespace cass {

bool ErrorResponse::decode(int version, char* buffer, size_t size) {
Expand All @@ -31,4 +35,31 @@ bool ErrorResponse::decode(int version, char* buffer, size_t size) {
return true;
}

std::string error_response_message(const std::string& prefix, ErrorResponse* error) {
std::ostringstream ss;
ss << prefix << ": '" << error->message()
<< "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0')
<< CASS_ERROR(CASS_ERROR_SOURCE_SERVER, error->code()) << ")";
return ss.str();
}

bool check_error_or_invalid_response(const std::string& prefix, uint8_t expected_opcode,
Response* response, Logger* logger) {
if (response->opcode() == expected_opcode) {
return false;
}

if (response->opcode() == CQL_OPCODE_ERROR) {
std::ostringstream ss;
ss << prefix << ": Error response";
logger->error(error_response_message(ss.str(), static_cast<ErrorResponse*>(response)).c_str());
} else {
std::ostringstream ss;
ss << prefix << ": Unexpected opcode " << opcode_to_string(response->opcode());
logger->error(ss.str().c_str());
}

return true;
}

} // namespace cass
8 changes: 8 additions & 0 deletions src/error_response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
#include "constants.hpp"
#include "scoped_ptr.hpp"

#include "third_party/boost/boost/cstdint.hpp"

#include <string.h>
#include <string>

namespace cass {

class Logger;

class ErrorResponse : public Response {
public:
ErrorResponse()
Expand Down Expand Up @@ -64,6 +68,10 @@ class ErrorResponse : public Response {
size_t prepared_id_size_;
};

std::string error_response_message(const std::string& prefix, ErrorResponse* error);
bool check_error_or_invalid_response(const std::string& prefix, uint8_t expected_opcode,
Response* response, Logger* logger);

} // namespace cass

#endif
29 changes: 19 additions & 10 deletions src/schema_change_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "address.hpp"
#include "connection.hpp"
#include "control_connection.hpp"
#include "error_response.hpp"
#include "get_time.hpp"
#include "logger.hpp"
#include "result_iterator.hpp"
Expand All @@ -32,7 +33,6 @@
#define MAX_SCHEMA_AGREEMENT_WAIT_MS 10000
#define RETRY_SCHEMA_AGREEMENT_WAIT_MS 200


namespace cass {

SchemaChangeHandler::SchemaChangeHandler(Connection* connection,
Expand Down Expand Up @@ -67,7 +67,7 @@ bool SchemaChangeHandler::has_schema_agreement(const ResponseVec& responses) {
current_version = boost::string_ref(v->buffer().data(), v->buffer().size());
}
} else {
logger_->debug("No row found in %s's local system table",
logger_->debug("SchemaChangeHandler: No row found in %s's local system table",
connection()->address_string().c_str());
}

Expand Down Expand Up @@ -104,19 +104,29 @@ bool SchemaChangeHandler::has_schema_agreement(const ResponseVec& responses) {
void SchemaChangeHandler::on_set(const ResponseVec& responses) {
elaspsed_ += get_time_since_epoch() - start_;

bool has_error = false;
for (MultipleRequestHandler::ResponseVec::const_iterator it = responses.begin(),
end = responses.end(); it != end; ++it) {
if (check_error_or_invalid_response("SchemaChangeHandler", CQL_OPCODE_RESULT,
*it, logger_)) {
has_error = true;
}
}
if (has_error) return;

if (has_schema_agreement(responses)) {
logger_->debug("Found schema agreement in %llu ms", elaspsed_);
logger_->debug("SchemaChangeHandler: Found schema agreement in %llu ms", elaspsed_);
request_handler_->set_response(request_response_);
return;
} else if (elaspsed_ >= MAX_SCHEMA_AGREEMENT_WAIT_MS) {
logger_->warn("No schema aggreement on live nodes after %llu ms. "
logger_->warn("SchemaChangeHandler: No schema aggreement on live nodes after %llu ms. "
"Schema may not be up-to-date on some nodes.",
elaspsed_);
request_handler_->set_response(request_response_);
return;
}

logger_->debug("Schema still not up-to-date on some live nodes. "
logger_->debug("SchemaChangeHandler: Schema still not up-to-date on some live nodes. "
"Trying again in %d ms", RETRY_SCHEMA_AGREEMENT_WAIT_MS);

// Try again
Expand All @@ -131,20 +141,19 @@ void SchemaChangeHandler::on_set(const ResponseVec& responses) {

void SchemaChangeHandler::on_error(CassError code, const std::string& message) {
std::ostringstream ss;
ss << "An error occured waiting for schema agreement: '" << message
<< "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0')
<< CASS_ERROR(CASS_ERROR_SOURCE_SERVER, code) << ")";
ss << "SchemaChangeHandler: An error occured waiting for schema agreement: '" << message
<< "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') << code << ")";
logger_->error(ss.str().c_str());
request_handler_->set_response(request_response_);
}

void SchemaChangeHandler::on_timeout() {
logger_->error("A timeout occured waiting for schema agreement");
logger_->error("SchemaChangeHandler: A timeout occured waiting for schema agreement");
request_handler_->set_response(request_response_);
}

void SchemaChangeHandler::on_closing() {
logger_->warn("Connection closed while waiting for schema agreement");
logger_->warn("SchemaChangeHandler: Connection closed while waiting for schema agreement");
request_handler_->set_response(request_response_);
}

Expand Down
83 changes: 77 additions & 6 deletions test/integration_tests/src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "test_utils.hpp"

#include <boost/test/unit_test.hpp>
#include <sstream>

struct ControlConnectionTests {
ControlConnectionTests() {}
Expand Down Expand Up @@ -79,9 +80,7 @@ BOOST_AUTO_TEST_CASE(test_node_discovery)
// Only add a single IP
test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0);

test_utils::CassFuturePtr session_future(cass_cluster_connect(cluster.get()));
test_utils::wait_and_check_error(session_future.get());
test_utils::CassSessionPtr session(cass_future_get_session(session_future.get()));
test_utils::CassSessionPtr session(test_utils::create_session(cluster));

cass::AddressSet addresses;
for (int i = 0; i < 3; ++i) {
Expand Down Expand Up @@ -116,9 +115,7 @@ BOOST_AUTO_TEST_CASE(test_node_discovery_invalid_ips)
// Only add a single valid IP
test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0);

test_utils::CassFuturePtr session_future(cass_cluster_connect(cluster.get()));
test_utils::wait_and_check_error(session_future.get());
test_utils::CassSessionPtr session(cass_future_get_session(session_future.get()));
test_utils::CassSessionPtr session(test_utils::create_session(cluster));

cass::AddressSet addresses;
for (int i = 0; i < 4; ++i) {
Expand All @@ -134,4 +131,78 @@ BOOST_AUTO_TEST_CASE(test_node_discovery_invalid_ips)
BOOST_CHECK_EQUAL(log_data->message_count, 3);
}

BOOST_AUTO_TEST_CASE(test_node_discovery_no_local_rows)
{
test_utils::CassClusterPtr cluster(cass_cluster_new());

const cql::cql_ccm_bridge_configuration_t& conf = cql::get_ccm_bridge_configuration();
boost::shared_ptr<cql::cql_ccm_bridge_t> ccm = cql::cql_ccm_bridge_t::create(conf, "test", 3, 0);

// Ensure RR policy
cass_cluster_set_load_balance_round_robin(cluster.get());;

// Only add a single valid IP
test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0);

{
test_utils::CassSessionPtr session(test_utils::create_session(cluster));
test_utils::execute_query(session.get(), "DELETE FROM system.local WHERE key = 'local'");
}

test_utils::CassSessionPtr session(test_utils::create_session(cluster));

cass::AddressSet addresses;
for (int i = 0; i < 3; ++i) {
CassString query = cass_string_init("SELECT * FROM system.schema_keyspaces");
test_utils::CassStatementPtr statement(cass_statement_new(query, 0));
test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get()));
addresses.insert(static_cast<cass::ResponseFuture*>(future->from())->get_host_address());
}

BOOST_CHECK(addresses.size() == 3);
}

BOOST_AUTO_TEST_CASE(test_node_discovery_no_rpc_addresss)
{
boost::scoped_ptr<test_utils::LogData> log_data(new test_utils::LogData("No rpc_address for host 127.0.0.2 in system.peers on 127.0.0.1. Ignoring this entry."));

{
test_utils::CassClusterPtr cluster(cass_cluster_new());

const cql::cql_ccm_bridge_configuration_t& conf = cql::get_ccm_bridge_configuration();
boost::shared_ptr<cql::cql_ccm_bridge_t> ccm = cql::cql_ccm_bridge_t::create(conf, "test", 3, 0);

cass_cluster_set_log_callback(cluster.get(), test_utils::count_message_log_callback, log_data.get());

// Ensure RR policy
cass_cluster_set_load_balance_round_robin(cluster.get());;

// Only add a single valid IP
test_utils::initialize_contact_points(cluster.get(), conf.ip_prefix(), 1, 0);

{
test_utils::CassSessionPtr session(test_utils::create_session(cluster));
std::ostringstream ss;
ss << "UPDATE system.peers SET rpc_address = null WHERE peer = '" << conf.ip_prefix() << "2'";
test_utils::execute_query(session.get(), ss.str());
}

test_utils::CassSessionPtr session(test_utils::create_session(cluster));

cass::AddressSet addresses;
for (int i = 0; i < 3; ++i) {
CassString query = cass_string_init("SELECT * FROM system.schema_keyspaces");
test_utils::CassStatementPtr statement(cass_statement_new(query, 0));
test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get()));
addresses.insert(static_cast<cass::ResponseFuture*>(future->from())->get_host_address());
}

// This should only contain 2 address because one pee is ignored
BOOST_CHECK(addresses.size() == 2);
}

BOOST_CHECK(log_data->message_count > 0);
}


BOOST_AUTO_TEST_SUITE_END()
10 changes: 9 additions & 1 deletion test/integration_tests/src/test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ void count_message_log_callback(cass_uint64_t time,
void* data) {
LogData* log_data = reinterpret_cast<LogData*>(data);
std::string str(message.data, message.length);
fprintf(stderr, "Log message: %s\n", str.c_str());
if (log_data->output_messages) {
fprintf(stderr, "Log: %s\n", str.c_str());
}
if (str.find(log_data->message) != std::string::npos) {
log_data->message_count++;
}
Expand Down Expand Up @@ -165,6 +167,12 @@ void initialize_contact_points(CassCluster* cluster, std::string prefix, int num
}
}

CassSessionPtr create_session(CassClusterPtr cluster) {
test_utils::CassFuturePtr session_future(cass_cluster_connect(cluster.get()));
test_utils::wait_and_check_error(session_future.get());
return test_utils::CassSessionPtr(cass_future_get_session(session_future.get()));
}

void execute_query(CassSession* session,
const std::string& query,
CassResultPtr* result,
Expand Down
8 changes: 6 additions & 2 deletions test/integration_tests/src/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ extern const cass_duration_t ONE_MILLISECOND_IN_MICROS;
extern const cass_duration_t ONE_SECOND_IN_MICROS;

struct LogData {
LogData(const std::string& message)
LogData(const std::string& message, bool output_messages = false)
: message(message)
, message_count(0) {}
, message_count(0)
, output_messages(output_messages) {}

const std::string message;
boost::atomic<int> message_count;
bool output_messages;
};

void count_message_log_callback(cass_uint64_t time,
Expand Down Expand Up @@ -488,6 +490,8 @@ void initialize_contact_points(CassCluster* cluster, std::string prefix, int num

const char* get_value_type(CassValueType type);

CassSessionPtr create_session(CassClusterPtr cluster);

CassError execute_query_with_error(CassSession* session,
const std::string& query,
CassResultPtr* result = NULL,
Expand Down

0 comments on commit fa95177

Please sign in to comment.