Skip to content

Commit

Permalink
Merge pull request #138 from cloudblue/feature/LITE-27792
Browse files Browse the repository at this point in the history
LITE-27792 Support for logging of timed out PG and MySQL queries
  • Loading branch information
maxipavlovic authored Jul 4, 2023
2 parents b993d76 + 7fffd70 commit 48ae6d7
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 21 deletions.
7 changes: 7 additions & 0 deletions dj_cqrs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ class SignalType:
DEFAULT_REPLICA_MAX_RETRIES = 30
DEFAULT_REPLICA_RETRY_DELAY = 2 # seconds
DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE = 1000

DB_VENDOR_PG = 'postgresql'
DB_VENDOR_MYSQL = 'mysql'
SUPPORTED_TIMEOUT_DB_VENDORS = {DB_VENDOR_MYSQL, DB_VENDOR_PG}

PG_TIMEOUT_FLAG = 'statement timeout'
MYSQL_TIMEOUT_ERROR_CODE = 3024
25 changes: 4 additions & 21 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from django.db import Error, close_old_connections, transaction

from dj_cqrs.constants import SignalType
from dj_cqrs.logger import log_timed_out_queries
from dj_cqrs.registries import ReplicaRegistry
from dj_cqrs.utils import apply_query_timeouts


logger = logging.getLogger('django-cqrs')
Expand Down Expand Up @@ -66,7 +68,7 @@ def route_signal_to_replica_model(
is_meta_supported = model_cls.CQRS_META
try:
if db_is_needed:
_apply_query_timeouts(model_cls)
apply_query_timeouts(model_cls)

with transaction.atomic(savepoint=False) if db_is_needed else ExitStack():
if signal_type == SignalType.DELETE:
Expand Down Expand Up @@ -101,23 +103,4 @@ def route_signal_to_replica_model(
),
)


def _apply_query_timeouts(model_cls): # pragma: no cover
query_timeout = int(settings.CQRS['replica'].get('CQRS_QUERY_TIMEOUT', 0))
if query_timeout <= 0:
return

model_db = model_cls._default_manager.db
conn = transaction.get_connection(using=model_db)
conn_vendor = getattr(conn, 'vendor', '')

if conn_vendor not in {'postgresql', 'mysql'}:
return

if conn_vendor == 'postgresql':
statement = 'SET statement_timeout TO %s'
else:
statement = 'SET SESSION MAX_EXECUTION_TIME=%s'

with conn.cursor() as cursor:
cursor.execute(statement, params=(query_timeout,))
log_timed_out_queries(e, model_cls)
60 changes: 60 additions & 0 deletions dj_cqrs/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging

from django.conf import settings
from django.db import OperationalError, transaction

from dj_cqrs.constants import (
DB_VENDOR_MYSQL,
DB_VENDOR_PG,
MYSQL_TIMEOUT_ERROR_CODE,
PG_TIMEOUT_FLAG,
SUPPORTED_TIMEOUT_DB_VENDORS,
)


def install_last_query_capturer(model_cls):
conn = _connection(model_cls)
if not _get_last_query_capturer(conn):
conn.execute_wrappers.append(_LastQueryCaptureWrapper())


def log_timed_out_queries(error, model_cls): # pragma: no cover
log_q = bool(settings.CQRS['replica'].get('CQRS_LOG_TIMED_OUT_QUERIES', False))
if not (log_q and isinstance(error, OperationalError) and error.args):
return

conn = _connection(model_cls)
conn_vendor = getattr(conn, 'vendor', '')
if conn_vendor not in SUPPORTED_TIMEOUT_DB_VENDORS:
return

e_arg = error.args[0]
is_timeout_error = bool(
(conn_vendor == DB_VENDOR_MYSQL and e_arg == MYSQL_TIMEOUT_ERROR_CODE)
or (conn_vendor == DB_VENDOR_PG and isinstance(e_arg, str) and PG_TIMEOUT_FLAG in e_arg)
)
if is_timeout_error:
query = getattr(_get_last_query_capturer(conn), 'query', None)
if query:
logger_name = settings.CQRS['replica'].get('CQRS_QUERY_LOGGER', '') or 'django-cqrs'
logger = logging.getLogger(logger_name)
logger.error('Timed out query:\n%s', query)


class _LastQueryCaptureWrapper:
def __init__(self):
self.query = None

def __call__(self, execute, sql, params, many, context):
try:
execute(sql, params, many, context)
finally:
self.query = sql


def _get_last_query_capturer(conn):
return next((w for w in conn.execute_wrappers if isinstance(w, _LastQueryCaptureWrapper)), None)


def _connection(model_cls):
return transaction.get_connection(using=model_cls._default_manager.db)
26 changes: 26 additions & 0 deletions dj_cqrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from uuid import UUID

from django.conf import settings
from django.db import transaction
from django.utils import timezone

from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS
from dj_cqrs.logger import install_last_query_capturer


logger = logging.getLogger('django-cqrs')

Expand Down Expand Up @@ -54,3 +58,25 @@ def get_messages_prefetch_count_per_worker():

def get_json_valid_value(value):
return str(value) if isinstance(value, (date, datetime, UUID)) else value


def apply_query_timeouts(model_cls): # pragma: no cover
query_timeout = int(settings.CQRS['replica'].get('CQRS_QUERY_TIMEOUT', 0))
if query_timeout <= 0:
return

model_db = model_cls._default_manager.db
conn = transaction.get_connection(using=model_db)
conn_vendor = getattr(conn, 'vendor', '')
if conn_vendor not in SUPPORTED_TIMEOUT_DB_VENDORS:
return

if conn_vendor == DB_VENDOR_PG:
statement = 'SET statement_timeout TO %s'
else:
statement = 'SET SESSION MAX_EXECUTION_TIME=%s'

with conn.cursor() as cursor:
cursor.execute(statement, params=(query_timeout,))

install_last_query_capturer(model_cls)
2 changes: 2 additions & 0 deletions integration_tests/replica_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
'delay_queue_max_size': 50,
'dead_letter_queue': 'dead_letter_replica',
'dead_message_ttl': 5,
'CQRS_QUERY_TIMEOUT': 10000,
'CQRS_LOG_TIMED_OUT_QUERIES': 1,
},
}

Expand Down
146 changes: 146 additions & 0 deletions tests/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import logging

import pytest
from django.db import (
DatabaseError,
IntegrityError,
OperationalError,
connection,
)

from dj_cqrs.logger import (
_LastQueryCaptureWrapper,
install_last_query_capturer,
log_timed_out_queries,
)
from tests.dj_replica import models


@pytest.mark.django_db(transaction=True)
def test_install_last_query_capturer():
for _ in range(2):
install_last_query_capturer(models.AuthorRef)

assert len(connection.execute_wrappers) == 1
assert isinstance(connection.execute_wrappers[0], _LastQueryCaptureWrapper)

with connection.cursor() as c:
c.execute('SELECT 1')

assert connection.execute_wrappers[0].query == 'SELECT 1'

connection.execute_wrappers.pop()


def test_log_timed_out_queries_not_supported(caplog):
assert log_timed_out_queries(None, None) is None
assert not caplog.record_tuples


@pytest.mark.parametrize(
'error',
[
IntegrityError('some error'),
DatabaseError(),
OperationalError(),
],
)
def test_log_timed_out_queries_other_error(error, settings, caplog):
settings.CQRS_LOG_TIMED_OUT_QUERIES = 1

assert log_timed_out_queries(error, None) is None
assert not caplog.record_tuples


@pytest.mark.django_db(transaction=True)
@pytest.mark.parametrize(
'engine, error, l_name, records',
[
('sqlite', None, None, []),
(
'postgres',
OperationalError('canceling statement due to statement timeout'),
None,
[
(
'django-cqrs',
logging.ERROR,
'Timed out query:\nSELECT 1',
)
],
),
(
'postgres',
OperationalError('canceling statement due to statement timeout'),
'long-query',
[
(
'long-query',
logging.ERROR,
'Timed out query:\nSELECT 1',
)
],
),
(
'postgres',
OperationalError('could not connect to server'),
None,
[],
),
(
'postgres',
OperationalError(125, 'Some error'),
None,
[],
),
(
'mysql',
OperationalError(3024),
None,
[
(
'django-cqrs',
logging.ERROR,
'Timed out query:\nSELECT 1',
)
],
),
(
'mysql',
OperationalError(
3024, 'Query exec was interrupted, max statement execution time exceeded'
),
'long-query-1',
[
(
'long-query-1',
logging.ERROR,
'Timed out query:\nSELECT 1',
)
],
),
(
'mysql',
OperationalError(1040, 'Too many connections'),
None,
[],
),
],
)
def test_apply_query_timeouts(settings, engine, l_name, error, records, caplog):
if settings.DB_ENGINE != engine:
return

settings.CQRS['replica']['CQRS_LOG_TIMED_OUT_QUERIES'] = True
settings.CQRS['replica']['CQRS_QUERY_LOGGER'] = l_name

model_cls = models.BasicFieldsModelRef
install_last_query_capturer(model_cls)

with connection.cursor() as c:
c.execute('SELECT 1')

assert log_timed_out_queries(error, model_cls) is None
assert caplog.record_tuples == records

connection.execute_wrappers.pop()
23 changes: 23 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
timedelta,
timezone,
)
from unittest.mock import patch
from uuid import UUID

import pytest

from dj_cqrs.utils import (
apply_query_timeouts,
get_delay_queue_max_size,
get_json_valid_value,
get_message_expiration_dt,
get_messages_prefetch_count_per_worker,
)
from tests.dj_replica import models


def test_get_message_expiration_dt_fixed(mocker, settings):
Expand Down Expand Up @@ -86,3 +89,23 @@ def test_get_messaged_prefetch_count_per_worker_with_delay_queue(settings):
)
def test_get_json_valid_value(value, result):
assert get_json_valid_value(value) == result


@pytest.mark.django_db
@pytest.mark.parametrize(
'engine, p_count',
[
('sqlite', 0),
('postgres', 1),
('mysql', 1),
],
)
def test_apply_query_timeouts(settings, engine, p_count):
if settings.DB_ENGINE != engine:
return

settings.CQRS['replica']['CQRS_QUERY_TIMEOUT'] = 1
with patch('dj_cqrs.utils.install_last_query_capturer') as p:
assert apply_query_timeouts(models.BasicFieldsModelRef) is None

assert p.call_count == p_count

0 comments on commit 48ae6d7

Please sign in to comment.