Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYTHON-539 Timeouts in ResponseFuture should notify the conviction policy #1127

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4383,7 +4383,9 @@ def _on_timeout(self, _attempts=0):
host = str(connection.endpoint) if connection else 'unknown'
errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."}

self._set_final_exception(OperationTimedOut(errors, self._current_host))
final_exc = OperationTimedOut(errors, self._current_host)
self.session.cluster.signal_connection_failure(self._current_host, final_exc, is_host_addition=False)
self._set_final_exception(final_exc)

def _on_speculative_execute(self):
self._timer = None
Expand Down
51 changes: 47 additions & 4 deletions tests/unit/test_response_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
from mock import Mock, MagicMock, ANY

from cassandra import ConsistencyLevel, Unavailable, SchemaTargetType, SchemaChangeType, OperationTimedOut
from cassandra.cluster import Session, ResponseFuture, NoHostAvailable, ProtocolVersion
from cassandra.connection import Connection, ConnectionException
from cassandra.cluster import Session, ResponseFuture, NoHostAvailable, ProtocolVersion, Cluster
from cassandra.connection import Connection, ConnectionException, DefaultEndPoint
from cassandra.protocol import (ReadTimeoutErrorMessage, WriteTimeoutErrorMessage,
UnavailableErrorMessage, ResultMessage, QueryMessage,
OverloadedErrorMessage, IsBootstrappingErrorMessage,
PreparedQueryNotFound, PrepareMessage,
RESULT_KIND_ROWS, RESULT_KIND_SET_KEYSPACE,
RESULT_KIND_SCHEMA_CHANGE, RESULT_KIND_PREPARED,
ProtocolHandler)
from cassandra.policies import RetryPolicy
from cassandra.pool import NoConnectionsAvailable
from cassandra.policies import RetryPolicy, ConvictionPolicy
from cassandra.pool import NoConnectionsAvailable, Host
from cassandra.query import SimpleStatement


Expand Down Expand Up @@ -160,6 +160,49 @@ def test_heartbeat_defunct_deadlock(self):
rf._on_timeout()
self.assertRaisesRegexp(OperationTimedOut, "Connection defunct by heartbeat", rf.result)

def test_timeout_updates_conviction_policy(self):
"""
PYTHON-539

Timeouts from ResponseFuture should notify the host's conviction policy, giving
the driver a mechanism to take action on repeated/systemic timeouts.
"""
conviction_policy = MagicMock(spec=ConvictionPolicy)
conviction_policy.add_failure.return_value = False

host = Host(DefaultEndPoint("ip1"), lambda h: conviction_policy)

connection = MagicMock(spec=Connection)
connection._requests = {1:False}
connection.orphaned_request_ids = set()
connection.orphaned_threshold = 2

pool = Mock()
pool.is_shutdown = False
pool.borrow_connection.return_value = [connection, 1]

session = self.make_basic_session()
session.cluster._default_load_balancing_policy.make_query_plan.return_value = [host]
session._pools.get.return_value = pool

# An extra bit of connective tissue. session.cluster is a mock but we want to use the
# actual impl in Cluster in order to get into the host (and from there to the conviction
# policy). As of this writing Cluster.signal_connection_failure is effectively static
# if the return value from add_failure() on the conviction poilcy is false so we can
# create this linkage _using the actual impl in Cluster_ via a mock side_effect.
def foo(*args, **kwargs):
Cluster.signal_connection_failure(Cluster(), *args, **kwargs)
session.cluster.signal_connection_failure.side_effect = foo

query = SimpleStatement("SELECT * FROM foo")
message = QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE)

rf = ResponseFuture(session, message, query, 1)
rf.send_request()
rf._on_timeout()

host.conviction_policy.add_failure.assert_called_once()

def test_read_timeout_error_message(self):
session = self.make_session()
query = SimpleStatement("SELECT * FROM foo")
Expand Down