diff --git a/README.md b/README.md index 46f76da..8ba3254 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ Django CQRS =========== -![pyversions](https://img.shields.io/pypi/pyversions/django-cqrs.svg) [![PyPi Status](https://img.shields.io/pypi/v/django-cqrs.svg)](https://pypi.org/project/django-cqrs/)) [![Docs](https://readthedocs.org/projects/django-cqrs/badge/?version=latest)](https://readthedocs.org/projects/django-cqrs) [![Coverage](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=coverage)](https://sonarcloud.io/dashboard?id=django-cqrs) +![pyversions](https://img.shields.io/pypi/pyversions/django-cqrs.svg) [![PyPi Status](https://img.shields.io/pypi/v/django-cqrs.svg)](https://pypi.org/project/django-cqrs/) [![Docs](https://readthedocs.org/projects/django-cqrs/badge/?version=latest)](https://readthedocs.org/projects/django-cqrs) [![Coverage](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=coverage)](https://sonarcloud.io/dashboard?id=django-cqrs) [![Build Status](https://travis-ci.org/cloudblue/django-cqrs.svg?branch=master)](https://travis-ci.org/cloudblue/django-cqrs) [![PyPI status](https://img.shields.io/pypi/status/django-cqrs.svg)](https://pypi.python.org/pypi/django-cqrs/) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=alert_status)](https://sonarcloud.io/dashboard?id=django-cqrs) `django-cqrs` is an Django application, that implements CQRS data synchronisation between several Django microservices. diff --git a/dj_cqrs/__init__.py b/dj_cqrs/__init__.py index e62f5b5..57e6ded 100644 --- a/dj_cqrs/__init__.py +++ b/dj_cqrs/__init__.py @@ -1 +1,3 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +default_app_config = 'dj_cqrs.apps.CQRSConfig' # pragma: no cover diff --git a/dj_cqrs/_validation.py b/dj_cqrs/_validation.py new file mode 100644 index 0000000..464ed09 --- /dev/null +++ b/dj_cqrs/_validation.py @@ -0,0 +1,181 @@ +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +import logging + +from dj_cqrs.constants import ( + DEFAULT_MASTER_AUTO_UPDATE_FIELDS, + DEFAULT_MASTER_MESSAGE_TTL, + DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE, + DEFAULT_REPLICA_MAX_RETRIES, + DEFAULT_REPLICA_RETRY_DELAY, +) +from dj_cqrs.registries import MasterRegistry, ReplicaRegistry +from dj_cqrs.transport import BaseTransport + +from django.utils.module_loading import import_string + + +logger = logging.getLogger('django-cqrs') + + +def validate_settings(settings): + is_master = bool(MasterRegistry.models) + is_replica = bool(ReplicaRegistry.models) + if (not is_master) and (not is_replica): # pragma: no cover + return + + assert hasattr(settings, 'CQRS'), 'CQRS configuration must be set in Django project settings.' + + cqrs_settings = settings.CQRS + assert isinstance(cqrs_settings, dict), 'CQRS configuration must be dict.' + + _validate_transport(cqrs_settings) + + if is_master or ('master' in cqrs_settings): + _validate_master(cqrs_settings) + + if is_replica or ('replica' in cqrs_settings): + _validate_replica(cqrs_settings) + + +def _validate_transport(cqrs_settings): + transport_cls_location = cqrs_settings.get('transport') + if not transport_cls_location: + raise AssertionError('CQRS transport is not set.') + + transport = import_string(transport_cls_location) + if not issubclass(transport, BaseTransport): + raise AssertionError( + 'CQRS transport must be inherited from `dj_cqrs.transport.BaseTransport`.', + ) + + +def _validate_master(cqrs_settings): + default_master_settings = { + 'master': { + 'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS, + 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL, + 'correlation_function': None, + }, + } + + if 'master' not in cqrs_settings: + cqrs_settings.update(default_master_settings) + return + + master_settings = cqrs_settings['master'] + assert isinstance(master_settings, dict), 'CQRS master configuration must be dict.' + + _validate_master_auto_update_fields(master_settings) + _validate_master_message_ttl(master_settings) + _validate_master_correlation_func(master_settings) + + +def _validate_master_auto_update_fields(master_settings): + if 'CQRS_AUTO_UPDATE_FIELDS' in master_settings: + assert isinstance(master_settings['CQRS_AUTO_UPDATE_FIELDS'], bool), ( + 'CQRS master CQRS_AUTO_UPDATE_FIELDS must be bool.' + ) + else: + master_settings['CQRS_AUTO_UPDATE_FIELDS'] = DEFAULT_MASTER_AUTO_UPDATE_FIELDS + + +def _validate_master_message_ttl(master_settings): + if 'CQRS_MESSAGE_TTL' in master_settings: + min_message_ttl = 1 + message_ttl = master_settings['CQRS_MESSAGE_TTL'] + if (message_ttl is not None) and ( + not isinstance(message_ttl, int) or message_ttl < min_message_ttl + ): + # No error is raised for backward compatibility + # TODO: raise error in 2.0.0 + logger.warning( + 'Settings CQRS_MESSAGE_TTL=%s is invalid, using default %s.', + message_ttl, DEFAULT_MASTER_MESSAGE_TTL, + ) + master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL + else: + master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL + + +def _validate_master_correlation_func(master_settings): + correlation_func = master_settings.get('correlation_function') + if not correlation_func: + master_settings['correlation_function'] = None + elif not callable(correlation_func): + raise AssertionError('CQRS master correlation_function must be callable.') + + +def _validate_replica(cqrs_settings): + queue = cqrs_settings.get('queue') + assert queue, 'CQRS queue is not set.' + assert isinstance(queue, str), 'CQRS queue must be string.' + + default_replica_settings = { + 'replica': { + 'CQRS_MAX_RETRIES': DEFAULT_REPLICA_MAX_RETRIES, + 'CQRS_RETRY_DELAY': DEFAULT_REPLICA_RETRY_DELAY, + 'delay_queue_max_size': DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE, + }, + } + + if 'replica' not in cqrs_settings: + cqrs_settings.update(default_replica_settings) + return + + replica_settings = cqrs_settings['replica'] + assert isinstance(replica_settings, dict), 'CQRS replica configuration must be dict.' + + _validate_replica_max_retries(replica_settings) + _validate_replica_retry_delay(replica_settings) + _validate_replica_delay_queue_max_size(replica_settings) + + +def _validate_replica_max_retries(replica_settings): + if 'CQRS_MAX_RETRIES' in replica_settings: + min_retries = 0 + max_retries = replica_settings['CQRS_MAX_RETRIES'] + if (max_retries is not None) and ( + not isinstance(max_retries, int) or max_retries < min_retries + ): + # No error is raised for backward compatibility + # TODO: raise error in 2.0.0 + logger.warning( + 'Replica setting CQRS_MAX_RETRIES=%s is invalid, using default %s.', + max_retries, DEFAULT_REPLICA_MAX_RETRIES, + ) + replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES + else: + replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES + + +def _validate_replica_retry_delay(replica_settings): + min_retry_delay = 0 + retry_delay = replica_settings.get('CQRS_RETRY_DELAY') + if 'CQRS_RETRY_DELAY' not in replica_settings: + replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY + elif not isinstance(retry_delay, int) or retry_delay < min_retry_delay: + # No error is raised for backward compatibility + # TODO: raise error in 2.0.0 + logger.warning( + 'Replica setting CQRS_RETRY_DELAY=%s is invalid, using default %s.', + retry_delay, DEFAULT_REPLICA_RETRY_DELAY, + ) + replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY + + +def _validate_replica_delay_queue_max_size(replica_settings): + min_qsize = 0 + max_qsize = replica_settings.get('delay_queue_max_size') + if 'delay_queue_max_size' not in replica_settings: + max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE + elif (max_qsize is not None) and (not isinstance(max_qsize, int) or max_qsize <= min_qsize): + # No error is raised for backward compatibility + # TODO: raise error in 2.0.0 + logger.warning( + 'Settings delay_queue_max_size=%s is invalid, using default %s.', + max_qsize, DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE, + ) + max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE + + replica_settings['delay_queue_max_size'] = max_qsize diff --git a/dj_cqrs/apps.py b/dj_cqrs/apps.py new file mode 100644 index 0000000..bd0536c --- /dev/null +++ b/dj_cqrs/apps.py @@ -0,0 +1,13 @@ +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +from dj_cqrs._validation import validate_settings + +from django.apps import AppConfig +from django.conf import settings + + +class CQRSConfig(AppConfig): + name = 'dj_cqrs' + + def ready(self): + validate_settings(settings) diff --git a/dj_cqrs/constants.py b/dj_cqrs/constants.py index 127d8af..7637887 100644 --- a/dj_cqrs/constants.py +++ b/dj_cqrs/constants.py @@ -22,7 +22,10 @@ class SignalType: NO_QUEUE = 'None' DEFAULT_DEAD_MESSAGE_TTL = 864000 # 10 days -DEFAULT_DELAY_QUEUE_MAX_SIZE = 1000 -DEFAULT_CQRS_MESSAGE_TTL = 86400 # 1 day -DEFAULT_CQRS_MAX_RETRIES = 30 -DEFAULT_CQRS_RETRY_DELAY = 2 # seconds + +DEFAULT_MASTER_AUTO_UPDATE_FIELDS = False +DEFAULT_MASTER_MESSAGE_TTL = 86400 # 1 day + +DEFAULT_REPLICA_MAX_RETRIES = 30 +DEFAULT_REPLICA_RETRY_DELAY = 2 # seconds +DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE = 1000 diff --git a/dj_cqrs/correlation.py b/dj_cqrs/correlation.py index 907527a..a5f9388 100644 --- a/dj_cqrs/correlation.py +++ b/dj_cqrs/correlation.py @@ -3,11 +3,6 @@ from django.conf import settings -_correlation_function = getattr(settings, 'CQRS', {}).get('master', {}).get('correlation_function') -if _correlation_function and (not callable(_correlation_function)): - raise AttributeError('CQRS correlation_function must be callable.') - - def get_correlation_id(signal_type, cqrs_id, instance_pk, queue): """ :param signal_type: Type of the signal for this message. @@ -18,5 +13,6 @@ def get_correlation_id(signal_type, cqrs_id, instance_pk, queue): :param queue: Queue to synchronize, defaults to None :type queue: str, optional """ - if _correlation_function: - return _correlation_function(signal_type, cqrs_id, instance_pk, queue) + correlation_func = settings.CQRS.get('master', {}).get('correlation_function') + if correlation_func: + return correlation_func(signal_type, cqrs_id, instance_pk, queue) diff --git a/dj_cqrs/dataclasses.py b/dj_cqrs/dataclasses.py index dfcb823..6c7ae73 100644 --- a/dj_cqrs/dataclasses.py +++ b/dj_cqrs/dataclasses.py @@ -3,7 +3,7 @@ from dateutil.parser import parse as dateutil_parse from dj_cqrs.correlation import get_correlation_id -from dj_cqrs.utils import get_expires_datetime +from dj_cqrs.utils import get_message_expiration_dt from django.utils import timezone @@ -74,7 +74,7 @@ def from_message(cls, dct): expires = dateutil_parse(dct['expires']) else: # Backward compatibility for old messages otherwise they are infinite by default. - expires = get_expires_datetime() + expires = get_message_expiration_dt() return cls( dct['signal_type'], diff --git a/dj_cqrs/management/commands/cqrs_dead_letters.py b/dj_cqrs/management/commands/cqrs_dead_letters.py index ad74f5b..e12841c 100644 --- a/dj_cqrs/management/commands/cqrs_dead_letters.py +++ b/dj_cqrs/management/commands/cqrs_dead_letters.py @@ -4,7 +4,7 @@ from dj_cqrs.registries import ReplicaRegistry from dj_cqrs.transport import current_transport from dj_cqrs.transport.rabbit_mq import RabbitMQTransport -from dj_cqrs.utils import get_expires_datetime +from dj_cqrs.utils import get_message_expiration_dt from django.core.management.base import BaseCommand, CommandError @@ -105,7 +105,7 @@ def handle_retry(self, channel, consumer_generator, dead_letters_count): dct['retries'] = 0 if dct.get('expires'): # Message could expire already - expires = get_expires_datetime() + expires = get_message_expiration_dt() dct['expires'] = expires.replace(microsecond=0).isoformat() payload = TransportPayload.from_message(dct) payload.is_requeue = True diff --git a/dj_cqrs/mixins.py b/dj_cqrs/mixins.py index ca616af..a4cfb92 100644 --- a/dj_cqrs/mixins.py +++ b/dj_cqrs/mixins.py @@ -4,8 +4,6 @@ from dj_cqrs.constants import ( ALL_BASIC_FIELDS, - DEFAULT_CQRS_MAX_RETRIES, - DEFAULT_CQRS_RETRY_DELAY, FIELDS_TRACKER_FIELD_NAME, TRACKED_FIELDS_ATTR_NAME, ) @@ -128,9 +126,7 @@ def save_tracked_fields(self): @property def _update_cqrs_fields_default(self): - return bool( - getattr(settings, 'CQRS', {}).get('master', {}).get('CQRS_AUTO_UPDATE_FIELDS', False), - ) + return settings.CQRS['master']['CQRS_AUTO_UPDATE_FIELDS'] def to_cqrs_dict(self, using=None, sync=False): """CQRS serialization for transport payload. @@ -425,20 +421,11 @@ def should_retry_cqrs(current_retry, exception=None): :return: True if message should be retried, False otherwise. :rtype: bool """ - replica_settings = settings.CQRS.get('replica', {}) - if 'CQRS_MAX_RETRIES' in replica_settings and replica_settings['CQRS_MAX_RETRIES'] is None: + max_retries = settings.CQRS['replica']['CQRS_MAX_RETRIES'] + if max_retries is None: # Infinite return True - min_value = 0 - max_retries = replica_settings.get('CQRS_MAX_RETRIES', DEFAULT_CQRS_MAX_RETRIES) - if not isinstance(max_retries, int) or max_retries < min_value: - logger.warning( - "Replica setting CQRS_MAX_RETRIES=%s is invalid, using default %s", - max_retries, DEFAULT_CQRS_MAX_RETRIES, - ) - max_retries = DEFAULT_CQRS_MAX_RETRIES - return current_retry < max_retries @staticmethod @@ -450,18 +437,4 @@ def get_cqrs_retry_delay(current_retry): :return: Delay in seconds. :rtype: int """ - retry_delay = ( - settings.CQRS - .get('replica', {}) - .get('CQRS_RETRY_DELAY', DEFAULT_CQRS_RETRY_DELAY) - ) - - min_value = 1 - if not isinstance(retry_delay, int) or retry_delay < min_value: - logger.warning( - "Replica setting CQRS_RETRY_DELAY=%s is invalid, using default %s", - retry_delay, DEFAULT_CQRS_RETRY_DELAY, - ) - retry_delay = DEFAULT_CQRS_RETRY_DELAY - - return retry_delay + return settings.CQRS['replica']['CQRS_RETRY_DELAY'] diff --git a/dj_cqrs/signals.py b/dj_cqrs/signals.py index c6acfba..8ec9466 100644 --- a/dj_cqrs/signals.py +++ b/dj_cqrs/signals.py @@ -3,7 +3,7 @@ from dj_cqrs.constants import SignalType from dj_cqrs.controller import producer from dj_cqrs.dataclasses import TransportPayload -from dj_cqrs.utils import get_expires_datetime +from dj_cqrs.utils import get_message_expiration_dt from django.db import models, transaction from django.dispatch import Signal @@ -74,7 +74,7 @@ def post_save(cls, sender, **kwargs): instance.pk, queue, previous_data, - expires=get_expires_datetime(), + expires=get_message_expiration_dt(), ) producer.produce(payload) @@ -114,7 +114,7 @@ def post_delete(cls, sender, **kwargs): sender.CQRS_ID, instance_data, instance.pk, - expires=get_expires_datetime(), + expires=get_message_expiration_dt(), ) # Delete is always in transaction! transaction.on_commit(lambda: producer.produce(payload)) diff --git a/dj_cqrs/transport/__init__.py b/dj_cqrs/transport/__init__.py index 6dc7c27..296222e 100644 --- a/dj_cqrs/transport/__init__.py +++ b/dj_cqrs/transport/__init__.py @@ -8,18 +8,10 @@ from django.utils.module_loading import import_string -transport_cls_location = getattr(settings, 'CQRS', {}).get('transport') -if not transport_cls_location: - raise AttributeError('CQRS transport is not set.') - try: - current_transport = import_string(transport_cls_location) - - if not issubclass(current_transport, BaseTransport): - raise ValueError - -except (ImportError, ValueError): - raise ImportError('Bad CQRS transport class.') + current_transport = import_string(settings.CQRS['transport']) +except (AttributeError, ImportError, KeyError): + current_transport = None __all__ = ['BaseTransport', 'KombuTransport', 'RabbitMQTransport', current_transport] diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py index 57f0168..35e4bab 100644 --- a/dj_cqrs/transport/rabbit_mq.py +++ b/dj_cqrs/transport/rabbit_mq.py @@ -13,7 +13,7 @@ from dj_cqrs.registries import ReplicaRegistry from dj_cqrs.transport import BaseTransport from dj_cqrs.transport.mixins import LoggingMixin -from dj_cqrs.utils import get_delay_queue_max_size, get_prefetch_count +from dj_cqrs.utils import get_delay_queue_max_size, get_messages_prefetch_count_per_worker from django.conf import settings from django.utils import timezone @@ -305,7 +305,7 @@ def _create_connection(cls, host, port, creds, exchange): ), ) channel = connection.channel() - channel.basic_qos(prefetch_count=get_prefetch_count()) + channel.basic_qos(prefetch_count=get_messages_prefetch_count_per_worker()) cls._declare_exchange(channel, exchange) return connection, channel @@ -363,7 +363,7 @@ def _get_consumer_settings(): logger.warning( "The 'consumer_prefetch_count' setting is ignored for RabbitMQTransport.", ) - prefetch_count = get_prefetch_count() + prefetch_count = get_messages_prefetch_count_per_worker() return ( queue_name, diff --git a/dj_cqrs/utils.py b/dj_cqrs/utils.py index efc07fe..fe02160 100644 --- a/dj_cqrs/utils.py +++ b/dj_cqrs/utils.py @@ -3,8 +3,6 @@ import logging from datetime import timedelta -from dj_cqrs.constants import DEFAULT_CQRS_MESSAGE_TTL, DEFAULT_DELAY_QUEUE_MAX_SIZE - from django.conf import settings from django.utils import timezone @@ -12,58 +10,41 @@ logger = logging.getLogger('django-cqrs') -def get_expires_datetime(): +def get_message_expiration_dt(): """Calculates when message should expire. - :return: datetime instance, None if infinite - :rtype: datetime.datetime + :return: Expiration datetime or None if infinite + :rtype: datetime.datetime or None """ - master_settings = settings.CQRS.get('master', {}) - if 'CQRS_MESSAGE_TTL' in master_settings and master_settings['CQRS_MESSAGE_TTL'] is None: + message_ttl = settings.CQRS['master']['CQRS_MESSAGE_TTL'] + if message_ttl is None: # Infinite return - min_message_ttl = 1 - message_ttl = master_settings.get('CQRS_MESSAGE_TTL', DEFAULT_CQRS_MESSAGE_TTL) - if not isinstance(message_ttl, int) or message_ttl < min_message_ttl: - logger.warning( - "Settings CQRS_MESSAGE_TTL=%s is invalid, using default %s.", - message_ttl, DEFAULT_CQRS_MESSAGE_TTL, - ) - message_ttl = DEFAULT_CQRS_MESSAGE_TTL - return timezone.now() + timedelta(seconds=message_ttl) def get_delay_queue_max_size(): - """Returns delay queue "waiting" messages number. + """Returns max allowed number of "waiting" messages in the delay queue. - :return: integer instance, None if infinite + :return: Positive integer number or None if infinite :rtype: int """ - replica_settings = settings.CQRS.get('replica', {}) - max_size = DEFAULT_DELAY_QUEUE_MAX_SIZE - if 'delay_queue_max_size' in replica_settings: - max_size = replica_settings['delay_queue_max_size'] + if 'replica' not in settings.CQRS: + return None - if max_size is not None and max_size <= 0: - logger.warning( - "Settings delay_queue_max_size=%s is invalid, using default %s.", - max_size, DEFAULT_DELAY_QUEUE_MAX_SIZE, - ) - max_size = DEFAULT_DELAY_QUEUE_MAX_SIZE - return max_size + return settings.CQRS['replica']['delay_queue_max_size'] -def get_prefetch_count(): - """Returns per worker consuming (unacked) messages number limit. +def get_messages_prefetch_count_per_worker(): + """Returns max allowed number of unacked messages, that can be consumed by a single worker. - :return: integer instance, 0 if infinite + :return: Positive integer number or 0 if infinite :rtype: int """ delay_queue_max_size = get_delay_queue_max_size() - prefetch_count = 0 # Infinite - if delay_queue_max_size is not None: - # 1 message is in progress, others could be delayed - prefetch_count = delay_queue_max_size + 1 - return prefetch_count + if delay_queue_max_size is None: + # Infinite + return 0 + + return delay_queue_max_size + 1 diff --git a/docs/lifecycle.rst b/docs/lifecycle.rst index 66b81e9..5ce9f0f 100644 --- a/docs/lifecycle.rst +++ b/docs/lifecycle.rst @@ -52,15 +52,15 @@ Message assumed as failed when a consumer raises an exception or returns negativ Retrying -------- -+----------------------+----------+-----------------------------------------------------------------------------+ -| Name | Default | Description | -+======================+==========+=============================================================================+ -| CQRS_MAX_RETRIES | 30 | Maximum number of retry attempts. Infinite if *None*, 0 to disable retries. | -+----------------------+----------+-----------------------------------------------------------------------------+ -| CQRS_RETRY_DELAY | 2 | Constant delay in **seconds** between message failure and requeueing. | -+----------------------+----------+-----------------------------------------------------------------------------+ -| delay_queue_max_size | 1000 | Maximum number of delayed messages per worker. Infinite if *None*. | -+----------------------+----------+-----------------------------------------------------------------------------+ ++---------------------------+----------+-----------------------------------------------------------------------------+ +| Name | Default | Description | ++===========================+==========+=============================================================================+ +| CQRS_MAX_RETRIES | 30 | Maximum number of retry attempts. Infinite if *None*, 0 to disable retries. | ++---------------------------+----------+-----------------------------------------------------------------------------+ +| CQRS_RETRY_DELAY | 2 | Constant delay in **seconds** between message failure and requeueing. | ++---------------------------+----------+-----------------------------------------------------------------------------+ +| CQRS_DELAY_QUEUE_MAX_SIZE | 1000 | Maximum number of delayed messages per worker. Infinite if *None*. | ++---------------------------+----------+-----------------------------------------------------------------------------+ .. code-block:: python @@ -71,7 +71,7 @@ Retrying 'replica': { 'CQRS_MAX_RETRIES': 30, # attempts 'CQRS_RETRY_DELAY': 2, # seconds - 'delay_queue_max_size': 1000, + 'CQRS_DELAY_QUEUE_MAX_SIZE': 1000, }, } diff --git a/tests/test_correlation.py b/tests/test_correlation.py index 0e5a16b..9843d40 100644 --- a/tests/test_correlation.py +++ b/tests/test_correlation.py @@ -4,26 +4,11 @@ from dj_cqrs.correlation import get_correlation_id -import pytest - def test_default_correlation(): assert get_correlation_id(None, None, None, None) is None -def test_wrong_correlation_type_in_settings(settings): - previous_cqrs_settings = settings.CQRS - settings.CQRS = {'master': {'correlation_function': 1}} - - with pytest.raises(AttributeError) as e: - reload(import_module('dj_cqrs.correlation')) - - assert str(e.value) == 'CQRS correlation_function must be callable.' - - settings.CQRS = previous_cqrs_settings - reload(import_module('dj_cqrs.correlation')) - - def test_custom_correlation(settings): previous_cqrs_settings = settings.CQRS settings.CQRS = {'master': {'correlation_function': lambda *args: '1q2w3e'}} diff --git a/tests/test_master/test_mixin.py b/tests/test_master/test_mixin.py index 35489cf..5913448 100644 --- a/tests/test_master/test_mixin.py +++ b/tests/test_master/test_mixin.py @@ -3,7 +3,12 @@ from time import sleep from uuid import uuid4 -from dj_cqrs.constants import FIELDS_TRACKER_FIELD_NAME, SignalType +from dj_cqrs.constants import ( + DEFAULT_MASTER_AUTO_UPDATE_FIELDS, + DEFAULT_MASTER_MESSAGE_TTL, + FIELDS_TRACKER_FIELD_NAME, + SignalType, +) from dj_cqrs.metas import MasterMeta from django.contrib.contenttypes.models import ContentType @@ -865,7 +870,9 @@ def test_save_update_fields_no_cqrs_fields_global_flag_changed(mocker, settings) settings.CQRS = { 'transport': 'tests.dj.transport.TransportStub', 'master': { - 'CQRS_AUTO_UPDATE_FIELDS': True, + 'CQRS_AUTO_UPDATE_FIELDS': not DEFAULT_MASTER_AUTO_UPDATE_FIELDS, + 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL, + 'correlation_function': None, }, } instance.name = 'New' diff --git a/tests/test_replica/test_mixin.py b/tests/test_replica/test_mixin.py index 90c79ee..3aa3bfe 100644 --- a/tests/test_replica/test_mixin.py +++ b/tests/test_replica/test_mixin.py @@ -502,9 +502,7 @@ def test_nodb(mocker): 'cqrs_max_retries, current_retry, expected_result', [ (5, 0, True), (5, 5, False), - (-1, 0, True), # For invalid cqrs_max_retries=30 - (-1, 30, False), - ('test', 9, True), + (-1, 0, False), (0, 0, False), # Disabled (None, 10000, True), # Infinite ], @@ -517,29 +515,11 @@ def test_should_retry_cqrs(settings, cqrs_max_retries, current_retry, expected_r assert result is expected_result -@pytest.mark.parametrize( - 'current_retry, expected_result', [(0, True), (30, False)], -) -def test_should_retry_cqrs_no_setting_field(settings, current_retry, expected_result): - settings.CQRS['replica'].pop('CQRS_MAX_RETRIES', None) - - result = models.BasicFieldsModelRef.should_retry_cqrs(current_retry) - - assert result is expected_result +@pytest.mark.parametrize('retry_delay', (0, 5)) +@pytest.mark.parametrize('current_retry', (0, 1)) +def test_get_cqrs_retry_delay(settings, retry_delay, current_retry): + settings.CQRS['replica']['CQRS_RETRY_DELAY'] = retry_delay + result = models.BasicFieldsModelRef.get_cqrs_retry_delay(current_retry=current_retry) -@pytest.mark.parametrize( - 'cqrs_retry_delay, expected_result', [ - (5, 5), - (-1, 2), # For invalid CQRS_RETRY_DELAY=2 - (0, 2), - ('test', 2), - (None, 2), - ], -) -def test_get_cqrs_retry_delay(settings, cqrs_retry_delay, expected_result): - settings.CQRS['replica']['CQRS_RETRY_DELAY'] = cqrs_retry_delay - - result = models.BasicFieldsModelRef.get_cqrs_retry_delay(current_retry=0) - - assert result is expected_result + assert result is retry_delay diff --git a/tests/test_transport/test_base.py b/tests/test_transport/test_base.py index dee84e0..434be71 100644 --- a/tests/test_transport/test_base.py +++ b/tests/test_transport/test_base.py @@ -1,43 +1,10 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -from importlib import import_module, reload - from dj_cqrs.transport.base import BaseTransport import pytest -def test_no_transport_setting(settings): - settings.CQRS = {} - - with pytest.raises(AttributeError) as e: - reload(import_module('dj_cqrs.transport')) - - assert str(e.value) == 'CQRS transport is not set.' - - -def test_bad_transport_setting(settings): - settings.CQRS = {'transport': '1221'} - - with pytest.raises(ImportError) as e: - reload(import_module('dj_cqrs.transport')) - - assert str(e.value) == 'Bad CQRS transport class.' - - -class NoneBaseTransportCls(object): - pass - - -def test_not_inherited_from_base_transport(settings): - settings.CQRS = {'transport': 'tests.test_transport.test_base.NoneBaseTransportCls'} - - with pytest.raises(ImportError) as e: - reload(import_module('dj_cqrs.transport')) - - assert str(e.value) == 'Bad CQRS transport class.' - - def test_base_transport_consume(): with pytest.raises(NotImplementedError): BaseTransport.consume(None) diff --git a/tests/test_transport/test_kombu.py b/tests/test_transport/test_kombu.py index 9b35852..58eaa8f 100644 --- a/tests/test_transport/test_kombu.py +++ b/tests/test_transport/test_kombu.py @@ -3,7 +3,9 @@ import logging from importlib import import_module, reload -from dj_cqrs.constants import SignalType +from dj_cqrs.constants import ( + DEFAULT_MASTER_AUTO_UPDATE_FIELDS, DEFAULT_MASTER_MESSAGE_TTL, SignalType, +) from dj_cqrs.dataclasses import TransportPayload from dj_cqrs.registries import ReplicaRegistry from dj_cqrs.transport.kombu import KombuTransport, _KombuConsumer @@ -79,6 +81,11 @@ def kombu_transport(settings): settings.CQRS = { 'transport': 'dj_cqrs.transport.kombu.KombuTransport', 'queue': 'replica', + 'master': { + 'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS, + 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL, + 'correlation_function': None, + }, } module = reload(import_module('dj_cqrs.transport')) yield module.current_transport diff --git a/tests/test_transport/test_rabbit_mq.py b/tests/test_transport/test_rabbit_mq.py index 5c1e050..0e18e32 100644 --- a/tests/test_transport/test_rabbit_mq.py +++ b/tests/test_transport/test_rabbit_mq.py @@ -4,7 +4,14 @@ from datetime import datetime, timedelta, timezone from importlib import import_module, reload -from dj_cqrs.constants import SignalType +from dj_cqrs.constants import ( + DEFAULT_MASTER_AUTO_UPDATE_FIELDS, + DEFAULT_MASTER_MESSAGE_TTL, + DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE, + DEFAULT_REPLICA_MAX_RETRIES, + DEFAULT_REPLICA_RETRY_DELAY, + SignalType, +) from dj_cqrs.dataclasses import TransportPayload from dj_cqrs.delay import DelayMessage, DelayQueue from dj_cqrs.transport.rabbit_mq import RabbitMQTransport @@ -147,6 +154,16 @@ def rabbit_transport(settings): settings.CQRS = { 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport', 'queue': 'replica', + 'master': { + 'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS, + 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL, + 'correlation_function': None, + }, + 'replica': { + 'CQRS_MAX_RETRIES': DEFAULT_REPLICA_MAX_RETRIES, + 'CQRS_RETRY_DELAY': DEFAULT_REPLICA_RETRY_DELAY, + 'delay_queue_max_size': DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE, + }, } module = reload(import_module('dj_cqrs.transport')) yield module.current_transport diff --git a/tests/test_utils.py b/tests/test_utils.py index c6c1c34..b61abf2 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,50 +2,51 @@ from datetime import datetime, timedelta, timezone -from dj_cqrs.utils import get_expires_datetime +from dj_cqrs.utils import ( + get_delay_queue_max_size, get_message_expiration_dt, get_messages_prefetch_count_per_worker, +) -import pytest - -def test_get_expires_datetime(mocker, settings): +def test_get_message_expiration_dt_fixed(mocker, settings): settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 3600 fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc) mocker.patch('django.utils.timezone.now', return_value=fake_now) - result = get_expires_datetime() + result = get_message_expiration_dt() expected_result = fake_now + timedelta(seconds=3600) assert result == expected_result -def test_get_expires_datetime_no_setting_field(mocker, settings): - settings.CQRS['master'].pop('CQRS_MESSAGE_TTL', None) +def test_get_message_expiration_dt_infinite(mocker, settings): + settings.CQRS['master']['CQRS_MESSAGE_TTL'] = None fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc) mocker.patch('django.utils.timezone.now', return_value=fake_now) - result = get_expires_datetime() + result = get_message_expiration_dt() - expected_result = fake_now + timedelta(seconds=86400) - assert result == expected_result + assert result is None -@pytest.mark.parametrize('cqrs_message_ttl', [-1, 0, 'test']) -def test_get_expires_datetime_invalid_filed(cqrs_message_ttl, mocker, settings): - settings.CQRS['master']['CQRS_MESSAGE_TTL'] = cqrs_message_ttl - fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc) - mocker.patch('django.utils.timezone.now', return_value=fake_now) +def test_get_delay_queue_max_size_master(settings): + del settings.CQRS['replica'] - result = get_expires_datetime() + assert get_delay_queue_max_size() is None - expected_result = fake_now + timedelta(seconds=86400) - assert result == expected_result +def test_get_delay_queue_max_size_replica(settings): + settings.CQRS['replica']['delay_queue_max_size'] = 4 -def test_get_expires_datetime_infinite(mocker, settings): - settings.CQRS['master']['CQRS_MESSAGE_TTL'] = None - fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc) - mocker.patch('django.utils.timezone.now', return_value=fake_now) + assert get_delay_queue_max_size() == 4 - result = get_expires_datetime() - assert result is None +def test_get_messaged_prefetch_count_per_worker_no_delay_queue(settings): + settings.CQRS['replica']['delay_queue_max_size'] = None + + assert get_messages_prefetch_count_per_worker() == 0 + + +def test_get_messaged_prefetch_count_per_worker_with_delay_queue(settings): + settings.CQRS['replica']['delay_queue_max_size'] = 4 + + assert get_messages_prefetch_count_per_worker() == 5 diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 0000000..de80936 --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,285 @@ +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +from unittest.mock import MagicMock + +from dj_cqrs._validation import validate_settings + +import pytest + + +def test_full_configuration(): + def f(*a): + pass + + settings = MagicMock(CQRS={ + 'queue': 'start', + + 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport', + 'host': 'host', + 'port': 1234, + 'user': 'user', + 'password': 'pswd', + + 'master': { + 'CQRS_AUTO_UPDATE_FIELDS': True, + 'CQRS_MESSAGE_TTL': 10, + 'correlation_function': f, + }, + + 'replica': { + 'CQRS_MAX_RETRIES': 5, + 'CQRS_RETRY_DELAY': 4, + 'delay_queue_max_size': 2, + }, + }) + + validate_settings(settings) + + +def test_configuration_does_not_exist(): + with pytest.raises(AssertionError) as e: + validate_settings({}) + + assert str(e.value) == 'CQRS configuration must be set in Django project settings.' + + +@pytest.mark.parametrize('value', ([], 'settings')) +def test_configuration_has_wrong_type(value): + with pytest.raises(AssertionError) as e: + validate_settings(MagicMock(CQRS=value)) + + assert str(e.value) == 'CQRS configuration must be dict.' + + +def test_transport_is_not_set(): + with pytest.raises(AssertionError) as e: + validate_settings(MagicMock(CQRS={})) + + assert str(e.value) == 'CQRS transport is not set.' + + +def test_transport_is_not_importable(): + with pytest.raises(ImportError): + validate_settings(MagicMock(CQRS={'transport': 'abc'})) + + +def test_transport_has_wrong_inheritance(): + with pytest.raises(AssertionError) as e: + validate_settings(MagicMock(CQRS={'transport': 'dj_cqrs.dataclasses.TransportPayload'})) + + assert str(e.value) == ( + 'CQRS transport must be inherited from `dj_cqrs.transport.BaseTransport`.' + ) + + +@pytest.fixture +def cqrs_settings(): + return MagicMock(CQRS={ + 'transport': 'dj_cqrs.transport.mock.TransportMock', + 'queue': 'replica', + }) + + +def test_master_configuration_not_set(cqrs_settings): + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['master'] == { + 'CQRS_AUTO_UPDATE_FIELDS': False, + 'CQRS_MESSAGE_TTL': 86400, + 'correlation_function': None, + } + + +@pytest.mark.parametrize('value', ([], 'settings', None)) +def test_master_configuration_has_wrong_type(cqrs_settings, value): + cqrs_settings.CQRS['master'] = value + + with pytest.raises(AssertionError) as e: + validate_settings(cqrs_settings) + + assert str(e.value) == 'CQRS master configuration must be dict.' + + +def test_master_configuration_is_empty(cqrs_settings): + cqrs_settings.CQRS['master'] = {} + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['master'] == { + 'CQRS_AUTO_UPDATE_FIELDS': False, + 'CQRS_MESSAGE_TTL': 86400, + 'correlation_function': None, + } + + +@pytest.mark.parametrize('value', (None, 'true', 1)) +def test_master_auto_update_fields_has_wrong_type(cqrs_settings, value): + cqrs_settings.CQRS['master'] = {'CQRS_AUTO_UPDATE_FIELDS': value} + + with pytest.raises(AssertionError) as e: + validate_settings(cqrs_settings) + + assert str(e.value) == 'CQRS master CQRS_AUTO_UPDATE_FIELDS must be bool.' + + +def test_master_message_ttl_is_none(cqrs_settings): + cqrs_settings.CQRS['master'] = { + 'CQRS_AUTO_UPDATE_FIELDS': True, + 'CQRS_MESSAGE_TTL': None, + } + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['master'] == { + 'CQRS_AUTO_UPDATE_FIELDS': True, + 'CQRS_MESSAGE_TTL': None, + 'correlation_function': None, + } + + +@pytest.mark.parametrize('value', ({}, 1.23, -2, 0)) +def test_master_message_ttl_has_wrong_type_or_invalid_value(value, cqrs_settings, caplog): + cqrs_settings.CQRS['master'] = {'CQRS_MESSAGE_TTL': value} + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['master'] == { + 'CQRS_AUTO_UPDATE_FIELDS': False, + 'CQRS_MESSAGE_TTL': 86400, + 'correlation_function': None, + } + assert caplog.record_tuples + + +def test_master_correlation_func_is_not_callable(cqrs_settings): + cqrs_settings.CQRS['master'] = {'correlation_function': 'x'} + + with pytest.raises(AssertionError) as e: + validate_settings(cqrs_settings) + + assert str(e.value) == 'CQRS master correlation_function must be callable.' + + +def test_master_correlation_func_is_callable(cqrs_settings): + cqrs_settings.CQRS['master'] = {'correlation_function': lambda: 1} + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['master']['correlation_function']() == 1 + + +def test_replica_configuration_not_set(cqrs_settings): + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica'] == { + 'CQRS_MAX_RETRIES': 30, + 'CQRS_RETRY_DELAY': 2, + 'delay_queue_max_size': 1000, + } + + +@pytest.mark.parametrize('value', ([], 'settings', None)) +def test_replica_configuration_has_wrong_type(cqrs_settings, value): + cqrs_settings.CQRS['replica'] = value + + with pytest.raises(AssertionError) as e: + validate_settings(cqrs_settings) + + assert str(e.value) == 'CQRS replica configuration must be dict.' + + +def test_replica_configuration_is_empty(cqrs_settings): + cqrs_settings.CQRS['replica'] = {} + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica'] == { + 'CQRS_MAX_RETRIES': 30, + 'CQRS_RETRY_DELAY': 2, + 'delay_queue_max_size': 1000, + } + + +def test_replica_max_retries_is_none(cqrs_settings): + cqrs_settings.CQRS['replica'] = {'CQRS_MAX_RETRIES': None} + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica']['CQRS_MAX_RETRIES'] is None + + +@pytest.mark.parametrize('value', ({}, 1.23, -2)) +def test_replica_max_retries_has_wrong_type_or_invalid_value(value, cqrs_settings, caplog): + cqrs_settings.CQRS['replica'] = { + 'CQRS_MAX_RETRIES': value, + 'CQRS_RETRY_DELAY': 10, + 'delay_queue_max_size': value, + } + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica'] == { + 'CQRS_MAX_RETRIES': 30, + 'CQRS_RETRY_DELAY': 10, + 'delay_queue_max_size': 1000, + } + assert caplog.record_tuples + + +def test_replica_retry_delay_is_none(cqrs_settings): + cqrs_settings.CQRS['replica'] = {'CQRS_RETRY_DELAY': None} + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica']['CQRS_RETRY_DELAY'] == 2 + + +@pytest.mark.parametrize('value', ({}, 1.23, -2)) +def test_replica_retry_delay_has_wrong_type_or_invalid_value(value, cqrs_settings, caplog): + cqrs_settings.CQRS['replica'] = { + 'CQRS_MAX_RETRIES': 0, + 'CQRS_RETRY_DELAY': value, + 'delay_queue_max_size': 1, + } + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica'] == { + 'CQRS_MAX_RETRIES': 0, + 'CQRS_RETRY_DELAY': 2, + 'delay_queue_max_size': 1, + } + assert caplog.record_tuples + + +def test_replica_delay_queue_max_size_is_none(cqrs_settings): + cqrs_settings.CQRS['replica'] = {'delay_queue_max_size': None} + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica']['delay_queue_max_size'] is None + + +@pytest.mark.parametrize('value', ({}, 1.23, -2)) +def test_replica_delay_queue_max_size_has_wrong_type_or_invalid_value(value, cqrs_settings, caplog): + cqrs_settings.CQRS['replica'] = { + 'CQRS_RETRY_DELAY': 0, + 'delay_queue_max_size': value, + } + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica'] == { + 'CQRS_MAX_RETRIES': 30, + 'CQRS_RETRY_DELAY': 0, + 'delay_queue_max_size': 1000, + } + assert caplog.record_tuples + + +def test_replica_delay_queue_max_size_deprecated_parameter(cqrs_settings): + cqrs_settings.CQRS['replica'] = {'delay_queue_max_size': 200} + + validate_settings(cqrs_settings) + + assert cqrs_settings.CQRS['replica']['delay_queue_max_size'] == 200