diff --git a/.gitignore b/.gitignore index 14300ab..dd7467a 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,10 @@ dist/ tests/reports/ .coverage + +*.dump + +htmlcov +.devcontainer + +docs/_build \ No newline at end of file diff --git a/README.md b/README.md index 0e49e79..265903d 100644 --- a/README.md +++ b/README.md @@ -157,7 +157,7 @@ kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id= Development =========== -1. Python 3.5+ +1. Python 3.6+ 0. Install dependencies `requirements/dev.txt` Testing @@ -165,7 +165,7 @@ Testing Unit testing ------ -1. Python 3.5+ +1. Python 3.6+ 0. Install dependencies `requirements/test.txt` 0. `export PYTHONPATH=/your/path/to/django-cqrs/` diff --git a/dj_cqrs/constants.py b/dj_cqrs/constants.py index eddeec5..96e4302 100644 --- a/dj_cqrs/constants.py +++ b/dj_cqrs/constants.py @@ -1,13 +1,22 @@ +# Copyright © 2020 Ingram Micro Inc. All rights reserved. + ALL_BASIC_FIELDS = '__all__' +FIELDS_TRACKER_FIELD_NAME = '__fields_tracker' +TRACKED_FIELDS_ATTR_NAME = '__tracked_fields' -# Copyright © 2020 Ingram Micro Inc. All rights reserved. class SignalType: - DELETE = 'DELETE' + """Type of signal that generates this event.""" + SAVE = 'SAVE' + """The master model has been saved.""" + + DELETE = 'DELETE' + """The master model has been deleted.""" SYNC = 'SYNC' + """The master model needs syncronization.""" NO_QUEUE = 'None' diff --git a/dj_cqrs/controller/consumer.py b/dj_cqrs/controller/consumer.py index 576e403..e0d8a19 100644 --- a/dj_cqrs/controller/consumer.py +++ b/dj_cqrs/controller/consumer.py @@ -8,7 +8,7 @@ from dj_cqrs.registries import ReplicaRegistry -logger = logging.getLogger() +logger = logging.getLogger('django-cqrs') def consume(payload): @@ -18,10 +18,11 @@ def consume(payload): """ return route_signal_to_replica_model( payload.signal_type, payload.cqrs_id, payload.instance_data, + previous_data=payload.previous_data, ) -def route_signal_to_replica_model(signal_type, cqrs_id, instance_data): +def route_signal_to_replica_model(signal_type, cqrs_id, instance_data, previous_data=None): """ Routes signal to model method to create/update/delete replica instance. :param dj_cqrs.constants.SignalType signal_type: Consumed signal type. @@ -37,11 +38,15 @@ def route_signal_to_replica_model(signal_type, cqrs_id, instance_data): elif signal_type == SignalType.SAVE: with transaction.atomic(): - return model_cls.cqrs_save(instance_data) + return model_cls.cqrs_save(instance_data, previous_data=previous_data) elif signal_type == SignalType.SYNC: with transaction.atomic(): - return model_cls.cqrs_save(instance_data, sync=True) + return model_cls.cqrs_save( + instance_data, + previous_data=previous_data, + sync=True, + ) else: logger.error('Bad signal type "{}" for CQRS_ID "{}".'.format(signal_type, cqrs_id)) diff --git a/dj_cqrs/dataclasses.py b/dj_cqrs/dataclasses.py index e6b59e6..3d6b9cd 100644 --- a/dj_cqrs/dataclasses.py +++ b/dj_cqrs/dataclasses.py @@ -2,12 +2,31 @@ class TransportPayload: - def __init__(self, signal_type, cqrs_id, instance_data, instance_pk, queue=None): + """Transport message payload. + + :param signal_type: Type of the signal for this message. + :type signal_type: dj_cqrs.constants.SignalType + :param cqrs_id: The unique CQRS identifier of the model. + :type cqrs_id: str + :param instance_data: Serialized data of the instance that + generates the event. + :type instance_data: dict + :param instance_pk: Primary key of the instance + :param queue: Queue to syncronize, defaults to None + :type queue: str, optional + :param previous_data: Previous values for fields tracked for changes, + defaults to None + :type previous_data: dict, optional + """ + + def __init__(self, signal_type, cqrs_id, instance_data, instance_pk, queue=None, + previous_data=None): self.__signal_type = signal_type self.__cqrs_id = cqrs_id self.__instance_data = instance_data self.__instance_pk = instance_pk self.__queue = queue + self.__previous_data = previous_data @property def signal_type(self): @@ -29,10 +48,21 @@ def pk(self): def queue(self): return self.__queue + @property + def previous_data(self): + return self.__previous_data + def to_dict(self): + """ + Return the payload as a dictionary. + + :return: This payload. + :rtype: dict + """ return { 'signal_type': self.__signal_type, 'cqrs_id': self.__cqrs_id, 'instance_data': self.__instance_data, + 'previous_data': self.__previous_data, 'instance_pk': self.__instance_pk, } diff --git a/dj_cqrs/managers.py b/dj_cqrs/managers.py index bf56283..33d4759 100644 --- a/dj_cqrs/managers.py +++ b/dj_cqrs/managers.py @@ -1,7 +1,6 @@ # Copyright © 2020 Ingram Micro Inc. All rights reserved. import logging -from itertools import chain from django.core.exceptions import ValidationError from django.db import Error, IntegrityError, transaction @@ -9,7 +8,7 @@ from django.utils import timezone -logger = logging.getLogger() +logger = logging.getLogger('django-cqrs') class MasterManager(Manager): @@ -29,16 +28,18 @@ def bulk_update(self, queryset, **kwargs): class ReplicaManager(Manager): - def save_instance(self, master_data, sync=False): + def save_instance(self, master_data, previous_data=None, sync=False): """ This method saves (creates or updates) model instance from CQRS master instance data. :param dict master_data: CQRS master instance data. + :param dict previous_data: Previous values for tracked fields. :param bool sync: Sync package flag. :return: Model instance. :rtype: django.db.models.Model """ mapped_data = self._map_save_data(master_data) - + mapped_previous_data = self._map_previous_data(previous_data) \ + if previous_data else None if mapped_data: pk_name = self._get_model_pk_name() pk_value = mapped_data[pk_name] @@ -47,20 +48,34 @@ def save_instance(self, master_data, sync=False): instance = self.model._default_manager.filter(**f_kwargs).first() if instance: - return self.update_instance(instance, mapped_data, sync=sync) + return self.update_instance( + instance, + mapped_data, + previous_data=mapped_previous_data, + sync=sync, + ) - return self.create_instance(mapped_data, sync=sync) + return self.create_instance( + mapped_data, + previous_data=mapped_previous_data, + sync=sync, + ) - def create_instance(self, mapped_data, sync=False): + def create_instance(self, mapped_data, previous_data=None, sync=False): """ This method creates model instance from mapped CQRS master instance data. :param dict mapped_data: Mapped CQRS master instance data. + :param dict previous_data: Previous values for tracked fields. :param bool sync: Sync package flag. :return: ReplicaMixin model instance. :rtype: django.db.models.Model """ try: - return self.model.cqrs_create(sync, **mapped_data) + return self.model.cqrs_create( + sync, + mapped_data, + previous_data=previous_data, + ) except (Error, ValidationError) as e: pk_value = mapped_data[self._get_model_pk_name()] if isinstance(e, IntegrityError): @@ -76,11 +91,12 @@ def create_instance(self, mapped_data, sync=False): ), ) - def update_instance(self, instance, mapped_data, sync=False): + def update_instance(self, instance, mapped_data, previous_data=None, sync=False): """ This method updates model instance from mapped CQRS master instance data. :param django.db.models.Model instance: ReplicaMixin model instance. :param dict mapped_data: Mapped CQRS master instance data. + :param dict previous_data: Previous values for tracked fields. :param bool sync: Sync package flag. :return: ReplicaMixin model instance. :rtype: django.db.models.Model @@ -128,7 +144,11 @@ def update_instance(self, instance, mapped_data, sync=False): )) try: - return instance.cqrs_update(sync, **mapped_data) + return instance.cqrs_update( + sync, + mapped_data, + previous_data=previous_data, + ) except (Error, ValidationError) as e: logger.error( '{}\nCQRS update error: pk = {}, cqrs_revision = {} ({}).'.format( @@ -160,6 +180,20 @@ def delete_instance(self, master_data): return False + def _map_previous_data(self, previous_data): + if self.model.CQRS_MAPPING is None: + return previous_data + + mapped_previous_data = {} + + for master_name, replica_name in self.model.CQRS_MAPPING.items(): + if master_name not in previous_data: + continue + + mapped_previous_data[replica_name] = previous_data[master_name] + mapped_previous_data = self._remove_excessive_data(mapped_previous_data) + return mapped_previous_data + def _map_save_data(self, master_data): if not self._cqrs_fields_are_filled(master_data): return @@ -200,14 +234,16 @@ def _make_initial_mapping(self, master_data): def _remove_excessive_data(self, data): opts = self.model._meta - possible_field_names = {f.name for f in chain(opts.concrete_fields, opts.private_fields)} + possible_field_names = { + f.name for f in opts.fields + } return {k: v for k, v in data.items() if k in possible_field_names} def _all_required_fields_are_filled(self, mapped_data): opts = self.model._meta required_field_names = { - f.name for f in chain(opts.concrete_fields, opts.private_fields) if not f.null + f.name for f in opts.fields if not f.null } if not (required_field_names - set(mapped_data.keys())): return True diff --git a/dj_cqrs/metas.py b/dj_cqrs/metas.py index fe14735..f972420 100644 --- a/dj_cqrs/metas.py +++ b/dj_cqrs/metas.py @@ -1,19 +1,19 @@ # Copyright © 2020 Ingram Micro Inc. All rights reserved. -from itertools import chain - from django.db.models import base from dj_cqrs.constants import ALL_BASIC_FIELDS from dj_cqrs.registries import MasterRegistry, ReplicaRegistry from dj_cqrs.signals import MasterSignals +from dj_cqrs.tracker import CQRSTracker class MasterMeta(base.ModelBase): - def __new__(mcs, *args): - model_cls = super(MasterMeta, mcs).__new__(mcs, *args) + def __new__(mcs, name, bases, attrs, **kwargs): + + model_cls = super(MasterMeta, mcs).__new__(mcs, name, bases, attrs, **kwargs) - if args[0] != 'MasterMixin': + if name != 'MasterMixin': mcs.register(model_cls) return model_cls @@ -22,6 +22,11 @@ def __new__(mcs, *args): def register(model_cls): _MetaUtils.check_cqrs_id(model_cls) MasterMeta._check_correct_configuration(model_cls) + + if model_cls.CQRS_TRACKED_FIELDS is not None: + MasterMeta._check_cqrs_tracked_fields(model_cls) + CQRSTracker.add_to_model(model_cls) + if model_cls.CQRS_SERIALIZER is None: MasterMeta._check_cqrs_fields(model_cls) @@ -29,6 +34,26 @@ def register(model_cls): MasterSignals.register_model(model_cls) return model_cls + @staticmethod + def _check_cqrs_tracked_fields(model_cls): + """Check that the CQRS_TRACKED_FIELDS has correct configuration. + + :param dj_cqrs.mixins.MasterMixin model_cls: CQRS Master Model. + :raises: AssertionError + """ + tracked_fields = model_cls.CQRS_TRACKED_FIELDS + if isinstance(tracked_fields, (list, tuple)): + _MetaUtils._check_no_duplicate_names( + model_cls, + tracked_fields, + 'CQRS_TRACKED_FIELDS', + ) + _MetaUtils._check_unexisting_names(model_cls, tracked_fields, 'CQRS_TRACKED_FIELDS') + return + + assert isinstance(tracked_fields, str) and tracked_fields == ALL_BASIC_FIELDS, \ + "Model {}: Invalid configuration for CQRS_TRACKED_FIELDS".format(model_cls.__name__) + @staticmethod def _check_correct_configuration(model_cls): """ Check that model has correct CQRS configuration. @@ -39,7 +64,7 @@ def _check_correct_configuration(model_cls): if model_cls.CQRS_FIELDS != ALL_BASIC_FIELDS: assert model_cls.CQRS_SERIALIZER is None, \ "Model {}: CQRS_FIELDS can't be set together with CQRS_SERIALIZER.".format( - model_cls, + model_cls.__name__, ) @staticmethod @@ -61,7 +86,6 @@ def __new__(mcs, *args): if args[0] != 'ReplicaMixin': _MetaUtils.check_cqrs_id(model_cls) ReplicaMeta._check_cqrs_mapping(model_cls) - ReplicaRegistry.register_model(model_cls) return model_cls @@ -102,7 +126,7 @@ def _check_unexisting_names(model_cls, cqrs_field_names, cqrs_attr): opts = model_cls._meta model_name = model_cls.__name__ - model_field_names = {f.name for f in chain(opts.concrete_fields, opts.private_fields)} + model_field_names = {f.name for f in opts.fields} assert not set(cqrs_field_names) - model_field_names, \ '{} field is not correctly set for model {}.'.format(cqrs_attr, model_name) diff --git a/dj_cqrs/mixins.py b/dj_cqrs/mixins.py index ce60bc8..0976d40 100644 --- a/dj_cqrs/mixins.py +++ b/dj_cqrs/mixins.py @@ -1,27 +1,57 @@ # Copyright © 2020 Ingram Micro Inc. All rights reserved. -from itertools import chain - import six from django.db.models import DateField, DateTimeField, F, IntegerField, Manager, Model from django.db.models.expressions import CombinedExpression from django.utils.module_loading import import_string -from dj_cqrs.constants import ALL_BASIC_FIELDS +from dj_cqrs.constants import ( + ALL_BASIC_FIELDS, + FIELDS_TRACKER_FIELD_NAME, + TRACKED_FIELDS_ATTR_NAME, +) from dj_cqrs.managers import MasterManager, ReplicaManager from dj_cqrs.metas import MasterMeta, ReplicaMeta from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update class RawMasterMixin(Model): + + """Base class for MasterMixin. **Users shouldn't use this + class directly.**""" + CQRS_ID = None + """Unique CQRS identifier for all microservices.""" + CQRS_PRODUCE = True + """If false, no cqrs data is sent through the transport.""" CQRS_FIELDS = ALL_BASIC_FIELDS - CQRS_SERIALIZER = None # It must be None or string, like: 'path.to.serializer' + """ + List of fields to include in the CQRS payload. + You can also set the fields attribute to the special value '__all__' + to indicate that all fields in the model should be used. + """ + + CQRS_SERIALIZER = None + """ + Optional serializer used to create the instance representation. + Must be expressed as a module dotted path string like + `mymodule.serializers.MasterModelSerializer`. + """ + + CQRS_TRACKED_FIELDS = None + """ + List of fields of the main model for which you want to track the changes + and send the previous values via transport. You can also set the field + attribute to the special value "__all__" to indicate that all fields in + the model must be used. + """ objects = Manager() + cqrs = MasterManager() + """Manager that adds needed CQRS queryset methods.""" cqrs_revision = IntegerField( default=0, help_text="This field must be incremented on any model update. " @@ -38,18 +68,51 @@ class Meta: def save(self, *args, **kwargs): if not self._state.adding: self.cqrs_revision = F('cqrs_revision') + 1 + self._save_tracked_fields() return super(RawMasterMixin, self).save(*args, **kwargs) + def _save_tracked_fields(self): + if hasattr(self, FIELDS_TRACKER_FIELD_NAME): + tracker = getattr(self, FIELDS_TRACKER_FIELD_NAME) + setattr(self, TRACKED_FIELDS_ATTR_NAME, tracker.changed()) + def to_cqrs_dict(self, using=None): - """ CQRS serialization for transport payload. """ + """CQRS serialization for transport payload. + + :param using: The using argument can be used to force the database + to use, defaults to None + :type using: str, optional + :return: The serialized instance data. + :rtype: dict + """ if self.CQRS_SERIALIZER: data = self._class_serialization(using) else: + self._refresh_f_expr_values(using) data = self._common_serialization(using) return data + def get_tracked_fields_data(self): + """CQRS serialization for tracked fields to include + in the transport payload. + + :return: Previous values for tracked fields. + :rtype: dict + """ + return getattr(self, TRACKED_FIELDS_ATTR_NAME, None) + def cqrs_sync(self, using=None, queue=None): - """ Manual instance synchronization. """ + """Manual instance synchronization. + + :param using: The using argument can be used to force the database + to use, defaults to None + :type using: str, optional + :param queue: Syncing can be executed just for a single queue, defaults to None + (all queues) + :type queue: str, optional + :return: True if instance can be synced, False otherwise. + :rtype: bool + """ if self._state.adding: return False @@ -77,7 +140,15 @@ def is_sync_instance(self): @classmethod def relate_cqrs_serialization(cls, queryset): """ - :param django.db.models.QuerySet queryset: + This method shoud be overriden to optimize database access + for example using `select_related` and `prefetch_related` + when related models must be included into the master model + representation. + + :param queryset: The initial queryset. + :type queryset: django.db.models.QuerySet + :return: The optimized queryset. + :rtype: django.db.models.QuerySet """ return queryset @@ -86,6 +157,7 @@ def call_post_bulk_create(cls, instances, using=None): """ Post bulk create signal caller (django doesn't support it by default). .. code-block:: python + # On PostgreSQL instances = model.objects.bulk_create(instances) model.call_post_bulk_create(instances) @@ -97,6 +169,7 @@ def call_post_update(cls, instances, using=None): """ Post bulk update signal caller (django doesn't support it by default). .. code-block:: python + # Used automatically by cqrs.bulk_update() qs = model.objects.filter(k1=v1) model.cqrs.bulk_update(qs, k2=v2) @@ -104,9 +177,6 @@ def call_post_update(cls, instances, using=None): post_update.send(cls, instances=instances, using=using) def _common_serialization(self, using): - if isinstance(self.cqrs_revision, CombinedExpression): - self.refresh_from_db(fields=('cqrs_revision',), using=using) - opts = self._meta if isinstance(self.CQRS_FIELDS, six.string_types) and self.CQRS_FIELDS == ALL_BASIC_FIELDS: @@ -115,7 +185,7 @@ def _common_serialization(self, using): included_fields = self.CQRS_FIELDS data = {} - for f in chain(opts.concrete_fields, opts.private_fields): + for f in opts.fields: if included_fields and (f.name not in included_fields): continue @@ -145,6 +215,29 @@ def _class_serialization(self, using): return data + def _refresh_f_expr_values(self, using): + opts = self._meta + fields_to_refresh = [] + if isinstance(self.cqrs_revision, CombinedExpression): + fields_to_refresh.append('cqrs_revision') + + if isinstance(self.CQRS_FIELDS, six.string_types) and self.CQRS_FIELDS == ALL_BASIC_FIELDS: + included_fields = None + else: + included_fields = self.CQRS_FIELDS + + for f in opts.fields: + if included_fields and (f.name not in included_fields): + continue + + value = f.value_from_object(self) + + if value is not None and isinstance(value, CombinedExpression): + fields_to_refresh.append(f.name) + + if fields_to_refresh: + self.refresh_from_db(fields=fields_to_refresh) + @property def _cqrs_serializer_cls(self): """ Serialization class loader. """ @@ -162,14 +255,6 @@ def _cqrs_serializer_cls(self): class MasterMixin(six.with_metaclass(MasterMeta, RawMasterMixin)): """ Mixin for the master CQRS model, that will send data updates to it's replicas. - - CQRS_ID - Unique CQRS identifier for all microservices. - CQRS_FIELDS - Fields, that need to by synchronized between microservices. - CQRS_SERIALIZER - Serializer, that overrides common serialization logic. - Can't be set together with non-default CQRS_FIELDS. - DRF serializers are only supported now: Serializer(instance).data - - cqrs - Manager, that adds needed CQRS queryset methods. """ class Meta: abstract = True @@ -179,17 +264,20 @@ class ReplicaMixin(six.with_metaclass(ReplicaMeta, Model)): """ Mixin for the replica CQRS model, that will receive data updates from master. Models, using this mixin should be readonly, but this is not enforced (f.e. for admin). - - CQRS_ID - Unique CQRS identifier for all microservices. - CQRS_MAPPING - Mapping of master data field name to replica model field name. - CQRS_CUSTOM_SERIALIZATION - To override default data check. """ CQRS_ID = None + """Unique CQRS identifier for all microservices.""" + CQRS_MAPPING = None + """Mapping of master data field name to replica model field name.""" + CQRS_CUSTOM_SERIALIZATION = False + """Set it to True to skip default data check.""" objects = Manager() + cqrs = ReplicaManager() + """Manager that adds needed CQRS queryset methods.""" cqrs_revision = IntegerField() cqrs_updated = DateTimeField() @@ -198,35 +286,38 @@ class Meta: abstract = True @classmethod - def cqrs_save(cls, master_data, sync=False): + def cqrs_save(cls, master_data, previous_data=None, sync=False): """ This method saves (creates or updates) model instance from CQRS master instance data. This method must not be overridden. Otherwise, sync checks need to be implemented manually. :param dict master_data: CQRS master instance data. + :param dict previous_data: Previous values for tracked fields. :param bool sync: Sync package flag. :return: Model instance. :rtype: django.db.models.Model """ - return cls.cqrs.save_instance(master_data, sync) + return cls.cqrs.save_instance(master_data, previous_data, sync) @classmethod - def cqrs_create(cls, sync, **mapped_data): + def cqrs_create(cls, sync, mapped_data, previous_data=None): """ This method creates model instance from CQRS mapped instance data. It must be overridden by replicas of master models with custom serialization. - :param dict mapped_data: CQRS mapped instance data. :param bool sync: Sync package flag. + :param dict mapped_data: CQRS mapped instance data. + :param dict previous_data: Previous mapped values for tracked fields. :return: Model instance. :rtype: django.db.models.Model """ return cls._default_manager.create(**mapped_data) - def cqrs_update(self, sync, **mapped_data): + def cqrs_update(self, sync, mapped_data, previous_data=None): """ This method updates model instance from CQRS mapped instance data. It must be overridden by replicas of master models with custom serialization. - :param dict mapped_data: CQRS mapped instance data. :param bool sync: Sync package flag. + :param dict mapped_data: CQRS mapped instance data. + :param dict previous_data: Previous mapped values for tracked fields. :return: Model instance. :rtype: django.db.models.Model """ diff --git a/dj_cqrs/models.py b/dj_cqrs/models.py deleted file mode 100644 index e62f5b5..0000000 --- a/dj_cqrs/models.py +++ /dev/null @@ -1 +0,0 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. diff --git a/dj_cqrs/registries.py b/dj_cqrs/registries.py index f8eb2dd..9a8a60b 100644 --- a/dj_cqrs/registries.py +++ b/dj_cqrs/registries.py @@ -4,7 +4,7 @@ from django.conf import settings -logger = logging.getLogger() +logger = logging.getLogger('django-cqrs') class RegistryMixin: @@ -17,6 +17,15 @@ def register_model(cls, model_cls): @classmethod def get_model_by_cqrs_id(cls, cqrs_id): + """ + Returns the model class given its CQRS_ID. + + :param cqrs_id: The CQRS_ID of the model to be retrieved. + :type cqrs_id: str + :return: The model that correspond to the given CQRS_ID or None if it + has not been registered. + :rtype: django.db.models.Model + """ if cqrs_id in cls.models: return cls.models[cqrs_id] diff --git a/dj_cqrs/signals.py b/dj_cqrs/signals.py index 84821e4..0a805cc 100644 --- a/dj_cqrs/signals.py +++ b/dj_cqrs/signals.py @@ -8,8 +8,18 @@ from dj_cqrs.constants import SignalType from dj_cqrs.dataclasses import TransportPayload + post_bulk_create = Signal(providing_args=['instances', 'using']) +""" +Signal sent after a bulk create. +See dj_cqrs.mixins.RawMasterMixin.call_post_bulk_create. +""" + post_update = Signal(providing_args=['instances', 'using']) +""" +Signal sent after a bulk update. +See dj_cqrs.mixins.RawMasterMixin.call_post_update. +""" class MasterSignals: @@ -17,8 +27,12 @@ class MasterSignals: @classmethod def register_model(cls, model_cls): """ - :param dj_cqrs.mixins.MasterMixin model_cls: Class inherited from CQRS MasterMixin. + Registers signals for a model. + + :param model_cls: Model class inherited from CQRS MasterMixin. + :type model_cls: dj_cqrs.mixins.MasterMixin """ + models.signals.post_save.connect(cls.post_save, sender=model_cls) models.signals.post_delete.connect(cls.post_delete, sender=model_cls) @@ -44,10 +58,11 @@ def post_save(cls, sender, **kwargs): if not transaction.get_connection(using).in_atomic_block: instance_data = instance.to_cqrs_dict(using) - + previous_data = instance.get_tracked_fields_data() signal_type = SignalType.SYNC if sync else SignalType.SAVE payload = TransportPayload( signal_type, sender.CQRS_ID, instance_data, instance.pk, queue, + previous_data, ) producer.produce(payload) diff --git a/dj_cqrs/tracker.py b/dj_cqrs/tracker.py new file mode 100644 index 0000000..c24ad59 --- /dev/null +++ b/dj_cqrs/tracker.py @@ -0,0 +1,50 @@ +# Copyright © 2020 Ingram Micro Inc. All rights reserved. + +from model_utils import FieldTracker +from model_utils.tracker import FieldInstanceTracker + +from dj_cqrs.constants import ALL_BASIC_FIELDS, FIELDS_TRACKER_FIELD_NAME + + +class _CQRSTrackerInstance(FieldInstanceTracker): + + def __init__(self, instance, fields, field_map): + super().__init__(instance, fields, field_map) + self._attr_to_field_map = { + f.attname: f.name + for f in instance._meta.concrete_fields if f.is_relation + } + + def changed(self): + changed_fields = super().changed() + return { + self._attr_to_field_map.get(k, k): v + for k, v in changed_fields.items() + } + + +class CQRSTracker(FieldTracker): + + tracker_class = _CQRSTrackerInstance + + @classmethod + def add_to_model(cls, model_cls): + """ + Add the CQRSTracker to a model. + + :param model_cls: the model class to which add the CQRSTracker. + :type model_cls: django.db.models.Model + """ + opts = model_cls._meta + fields_to_track = [] + declared = model_cls.CQRS_TRACKED_FIELDS + + for field in opts.concrete_fields: + if declared == ALL_BASIC_FIELDS or field.name in declared: + fields_to_track.append( + field.attname if field.is_relation else field.name + ) + + tracker = cls(fields=fields_to_track) + model_cls.add_to_class(FIELDS_TRACKER_FIELD_NAME, tracker) + tracker.finalize_class(model_cls) diff --git a/dj_cqrs/transport/base.py b/dj_cqrs/transport/base.py index 0b6ffed..e66abc0 100644 --- a/dj_cqrs/transport/base.py +++ b/dj_cqrs/transport/base.py @@ -6,20 +6,27 @@ class BaseTransport: CQRS pattern can be implemented over any transport (AMQP, HTTP, etc.) All transports need to inherit from this base class. Transport must be set in Django settings: + + .. code-block:: python + CQRS = { 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport', } """ + consumers = {} @staticmethod def produce(payload): - """ Send data from master model to replicas. + """ + Send data from master model to replicas. - :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model. + :param payload: Transport payload from master model. + :type payload: dj_cqrs.dataclasses.TransportPayload """ raise NotImplementedError @staticmethod def consume(*args, **kwargs): + """Receive data from master model.""" raise NotImplementedError diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py index b3f1000..9c2df21 100644 --- a/dj_cqrs/transport/rabbit_mq.py +++ b/dj_cqrs/transport/rabbit_mq.py @@ -14,7 +14,7 @@ from dj_cqrs.registries import ReplicaRegistry from dj_cqrs.transport import BaseTransport -logger = logging.getLogger() +logger = logging.getLogger('django-cqrs') class RabbitMQTransport(BaseTransport): @@ -80,6 +80,7 @@ def _consume_message(cls, ch, method, properties, body): payload = TransportPayload( dct['signal_type'], dct['cqrs_id'], dct['instance_data'], dct.get('instance_pk'), + previous_data=dct.get('previous_data'), ) cls._log_consumed(payload) diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..2ccaf09 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,58 @@ +import os +import sys +from datetime import datetime + +from setuptools_scm import get_version + +import django + + +sys.path.insert(0, os.path.abspath('..')) +os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.dj.settings' +django.setup() + + +# -- Project information ----------------------------------------------------- + +project = 'django-cqrs' +copyright = '{}, CloudBlue Inc.'.format(datetime.now().year) +author = 'CloudBlue' + +# The full version, including alpha/beta/rc tags +release = get_version(root='..', relative_to=__file__) + + +# -- General configuration --------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.autosectionlabel', + 'sphinx_copybutton', +] + +autosectionlabel_prefix_document = True +autodoc_member_order = 'bysource' + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = 'sphinx_rtd_theme' + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] diff --git a/docs/custom_serialization.rst b/docs/custom_serialization.rst new file mode 100644 index 0000000..0860c2b --- /dev/null +++ b/docs/custom_serialization.rst @@ -0,0 +1,75 @@ +Custom serialization +==================== + +By default, `django-cqrs` serializes all the fields declared for the +master model or the subset specified by the ``CQRS_FIELDS`` attribute. + +Sometimes you want to customize how the master model will be serialized, for example +including some other fields from related models. + +.. warning:: + + When there are master models with related entities in CQRS_SERIALIZER, it's important to have operations + within atomic transactions. CQRS sync will happen on transaction commit. Please, avoid saving master model + within transaction more then once to reduce syncing and potential racing on replica side. + Updating of related model won't trigger CQRS automatic synchronization for master model. + This needs to be done manually. + +Master service +-------------- + +In this case you can control how an instance of the master model is serialized providing +a serializer class to be used for that: + +.. code-block:: python + + + class MyMasterModel(MasterMixin): + CQRS_ID = 'my_model' + CQRS_SERIALIZER = 'mymodule.serializers.MyMasterModelSerializer' + + @classmethod + def relate_cqrs_serialization(cls, queryset): + # Optimize related models fetching here + return queryset + +If you would to serialize fields from related models, you can optimize +database access overriding the ``relate_cqrs_serialization`` method using the +`select_related `_ +and `prefetch_related `_ methods of the +`QuerySet `_ object. + +Replica service +--------------- + +If you provide a serializer to customize serialization, you must handle +yourself deserialization for the replica model. + +.. code-block:: python + + + class MyReplicaModel(ReplicaMixin): + CQRS_ID = 'my_model' + CQRS_CUSTOM_SERIALIZATION = True # bypass default deserialization. + + @classmethod + def cqrs_create(cls, sync, mapped_data, previous_data=None): + # Custom deserialization logic here + pass + + def cqrs_update(self, sync, mapped_data, previous_data=None): + # Custom deserialization logic here + pass + +.. note:: + + A serializer class must follow these rules: + + * The constructor must accept the model instance as the only positional argument + * Must have a ``data`` property that returns a python dictionary as the instance + representation. + + If your service exposes a RESTful API written using + `Django REST framework `_ + you can use your model serializers out of the box also for CQRS serialization. + diff --git a/docs/getting_started.rst b/docs/getting_started.rst new file mode 100644 index 0000000..ffd6a71 --- /dev/null +++ b/docs/getting_started.rst @@ -0,0 +1,186 @@ +*************** +Getting started +*************** + +.. note:: + + This guide assumes that you have at least a single instance of `RabbitMQ `_ + up and running. + + + +Requirements +============ + +`django-cqrs` works with Python 3.6 or later and has the following dependencies: + + * Django >= 1.11.20 + * pika 1.1.0 + * ujson 3.0.0 + * django-model-utils 4.0.0 + + +Install +======= + +`django-cqrs` can be installed from pypi.org with pip: + +.. code-block:: shell + + $ pip install django-cqrs + + + +Master service +============== + +Configure master service +------------------------ + +Add dj_cqrs to Django ``INSTALLED_APPS``: + +.. code-block:: python + + INSTALLED_APPS = [ + ... + 'dj_cqrs', + ... + ] + + +and add the `django-cqrs` configuration: + +.. code-block:: python + + CQRS = { + 'transport': 'dj_cqrs.transport.RabbitMQTransport', + 'host': RABBITMQ_HOST, + 'port': RABBITMQ_PORT, + 'user': RABBITMQ_USERNAME, + 'password': RABBITMQ_PASSWORD, + } + + +Setup master models +------------------- + +To setup master models add the ``dj_cqrs.mixins.MasterMixin`` to your model. + +For example: + +.. code-block:: python + + from django.db import models + + from dj_cqrs.mixins import MasterMixin + + + class MyMasterModel(MasterMixin, models.Model): + + CQRS_ID = 'my_model' # each model must have its unique CQRS_ID + + my_field = models.CharField(max_length=100) + .... + + +Create and run migrations for master +------------------------------------ + +Since the ``MasterMixin`` adds the ``cqrs_revision`` and ``cqrs_updated`` fields +to the model, you must create a new migration for it: + +.. code-block:: shell + + $ ./manage.py makemigrations + $ ./manage.py migrate + + +Run your django application +--------------------------- + + +.. code-block:: shell + + $ ./manage.py runserver + + + + +Replica service +=============== + +Configure replica service +------------------------- + +Add dj_cqrs to Django ``INSTALLED_APPS``: + +.. code-block:: python + + INSTALLED_APPS = [ + ... + 'dj_cqrs', + ... + ] + + +and add the `django-cqrs` configuration: + +.. code-block:: python + :emphasize-lines: 3 + + CQRS = { + 'transport': 'dj_cqrs.transport.RabbitMQTransport', + 'queue': 'my_replica', # Each replica service must have a unique queue. + 'host': RABBITMQ_HOST, + 'port': RABBITMQ_PORT, + 'user': RABBITMQ_USERNAME, + 'password': RABBITMQ_PASSWORD, + } + + +Setup replica models +-------------------- + +To setup replica models add the ``dj_cqrs.mixins.ReplicaMixin`` to each model. + +For example: + +.. code-block:: python + + from django.db import models + + from dj_cqrs.mixins import ReplicaMixin + + + class MyReplicaModel(MasterMixin, models.Model): + + CQRS_ID = 'my_model' + + my_field = models.CharField(max_length=100) + .... + + +Create and run migrations for replica +------------------------------------- + +Since the ``ReplicaMixin`` adds the ``cqrs_revision`` and ``cqrs_updated`` fields +to the model, you must create a new migration for it: + +.. code-block:: shell + + $ ./manage.py makemigrations + $ ./manage.py migrate + + +Run consumer process +-------------------- + +.. code-block:: shell + + $ ./manage.py cqrs_consume -w 2 + + +And that's all! + +Now every time you modify your master model, changes are replicated to +all the service that have a replica model with the same CQRS_ID. diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..585cfde --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,46 @@ +.. django-cqrs documentation master file, created by + sphinx-quickstart on Tue Jul 28 09:05:03 2020. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to Django CQRS's documentation! +======================================= + +`django-cqrs` is an Django application, that implements CQRS data synchronisation between several Django microservices. + + +CQRS +---- + +In `CloudBlue Connect `_ we have a rather complex Domain Model. There are many microservices, that are +`decomposed by subdomain `_ +and which follow `database-per-service `_ pattern. +These microservices have rich and consistent APIs. They are deployed in cloud k8s cluster and scale automatically under load. +Many of these services aggregate data from other ones and usually +`API Composition `_ is totally enough. +But, some services are working too slowly with API JOINS, so another pattern needs to be applied. + +The pattern, that solves this issue is called `CQRS - Command Query Responsibility Segregation `_. +Core idea behind this pattern is that view databases (replicas) are defined for efficient querying and DB joins. +Applications keep their replicas up to data by subscribing to `Domain events `_ +published by the service that owns the data. Data is `eventually consistent `_ +and that's okay for non-critical business transactions. + + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + getting_started + custom_serialization + track_fields_changes + utilities + reference + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/reference.rst b/docs/reference.rst new file mode 100644 index 0000000..585f0de --- /dev/null +++ b/docs/reference.rst @@ -0,0 +1,67 @@ +API Reference +============= + + +Mixins +------ + +.. autoclass:: dj_cqrs.mixins.RawMasterMixin + :members: + :exclude-members: save + +.. autoclass:: dj_cqrs.mixins.MasterMixin + :members: + :show-inheritance: + + +.. autoclass:: dj_cqrs.mixins.ReplicaMixin + :members: + + +Managers +-------- + +.. autoclass:: dj_cqrs.managers.MasterManager + :members: + + +.. autoclass:: dj_cqrs.managers.ReplicaManager + :members: + + +Signals +------- + +.. automodule:: dj_cqrs.signals + :members: post_bulk_create, post_update + +.. autoclass:: dj_cqrs.signals.MasterSignals + :members: + + + +Transports +---------- + +.. autoclass:: dj_cqrs.transport.base.BaseTransport + :members: + +.. autoclass:: dj_cqrs.transport.rabbit_mq.RabbitMQTransport + :members: + +.. autoclass:: dj_cqrs.constants.SignalType + :members: + +.. autoclass:: dj_cqrs.dataclasses.TransportPayload + :members: + + +Registries +---------- + +.. autoclass:: dj_cqrs.registries.MasterRegistry + :members: register_model, get_model_by_cqrs_id + + +.. autoclass:: dj_cqrs.registries.ReplicaRegistry + :members: register_model, get_model_by_cqrs_id \ No newline at end of file diff --git a/docs/track_fields_changes.rst b/docs/track_fields_changes.rst new file mode 100644 index 0000000..9bda052 --- /dev/null +++ b/docs/track_fields_changes.rst @@ -0,0 +1,49 @@ +Keep track of changes to fields +=============================== + +In some circumstances, you want to keep track of changes made on some fields of the master model. + +`django-cqrs` can send previous values of the tracked fields to replicas. + +To do so, you can use the ``CQRS_TRACKED_FIELDS`` attribute to specify which fields to track: + +.. code-block:: python + + class MyMasterModel(MasterMixin): + + CQRS_ID = 'my_model' + CQRS_TRACKED_FIELDS = ('char_field', 'parent', 'status') + + + char_field = models.CharField(max_length=100) + status = models.CharField(max_length=15, choices=STATUSES) + + parent = models.ForeignKey(ParentMode, on_delete=models.CASCADE) + + +This way, you can override the ``cqrs_save`` and apply your persistence logic +based o tracked fields before accessing your database: + + +.. code-block:: python + + class MyReplicaModel(ReplicaMixin): + + CQRS_ID = 'my_model' + + @classmethod + def cqrs_save(cls, master_data, previous_data=None, sync=False): + # Custom logic based on previous_data here. + pass + + +.. note:: + + The fields tracking features honors the ``CQRS_MAPPING`` attribute. + + +.. note:: + + The fields tracking features relies on the + `FieldTracker `_ + utility class from the `django-model-utils `_ library. diff --git a/docs/utilities.rst b/docs/utilities.rst new file mode 100644 index 0000000..2d799a7 --- /dev/null +++ b/docs/utilities.rst @@ -0,0 +1,41 @@ +Utilities +========= + +Bulk synchronizer without transport +----------------------------------- + +Usage example: it may be used for initial configuration and/or may be used at planned downtime. + +On master service: + +.. code-block:: shell + + $ python manage.py cqrs_bulk_dump --cqrs-id=author --output author.dump + + +On replica service: + +.. code-block:: shell + + $ python manage.py cqrs_bulk_load --input=author.dump + + + +Filter synchronizer over transport +---------------------------------- + +Usage example: sync some specific records to a given replica. Can be used dynamically. + +To sync all replicas: + +.. code-block:: shell + + $ python manage.py cqrs_sync --cqrs-id=author --filter="{\"id__in\": [1, 2]}" + + +To sync all instances only with one replica: + +.. code-block:: shell + + $ python manage.py cqrs_sync --cqrs-id=author --filter="{}" --queue=replica + diff --git a/integration_tests/Dockerfile.MasterV1 b/integration_tests/Dockerfile.MasterV1 new file mode 100644 index 0000000..2eccac1 --- /dev/null +++ b/integration_tests/Dockerfile.MasterV1 @@ -0,0 +1,15 @@ +FROM python:3.8 +ENV PYTHONUNBUFFERED 1 +ENV PYTHONPATH /master + +RUN mkdir /master + +COPY ./requirements /master/requirements +COPY ./tests /master/tests +COPY ./integration_tests /master/integration_tests +ADD integration_tests/setup.cfg /master/ + +RUN pip install -r /master/requirements/dev.txt -r /master/requirements/test.txt \ + && pip install psycopg2-binary django-cqrs==1.3.1 + +WORKDIR /master/ diff --git a/integration_tests/Dockerfile.ReplicaV1 b/integration_tests/Dockerfile.ReplicaV1 new file mode 100644 index 0000000..11f9a16 --- /dev/null +++ b/integration_tests/Dockerfile.ReplicaV1 @@ -0,0 +1,14 @@ +FROM python:3.8 +ENV PYTHONUNBUFFERED 1 +ENV PYTHONPATH /replica + +RUN mkdir /replica +COPY ./requirements /replica/requirements +COPY ./tests /replica/tests +COPY ./integration_tests /replica/integration_tests +ADD integration_tests/manage.py /replica/ + +RUN pip install -r /replica/requirements/dev.txt -r /replica/requirements/test.txt \ + && pip install psycopg2-binary django-cqrs==1.3.1 + +WORKDIR /replica/ diff --git a/integration_tests/Makefile b/integration_tests/Makefile new file mode 100644 index 0000000..b0a11a2 --- /dev/null +++ b/integration_tests/Makefile @@ -0,0 +1,28 @@ +.PHONY: build test + +.DEFAULT_GOAL := current + +stop: + @echo "Stopping running containers..." + docker-compose -f docker-compose.yml -f masterV1.yml -f replicaV1.yml down --remove-orphans + @echo "Done!" + +build_current: + docker-compose build + +build_master_v1: + docker-compose -f docker-compose.yml -f masterV1.yml build + +build_replica_v1: + docker-compose -f docker-compose.yml -f replicaV1.yml build + +current: build_current + docker-compose run master + +master_v1: build_master_v1 + docker-compose -f docker-compose.yml -f masterV1.yml run master + +replica_v1: build_replica_v1 + docker-compose -f docker-compose.yml -f replicaV1.yml run master + +all: build_master_v1 build_replica_v1 current master_v1 replica_v1 diff --git a/integration_tests/docker-compose.yml b/integration_tests/docker-compose.yml index 95f0d29..087fa27 100644 --- a/integration_tests/docker-compose.yml +++ b/integration_tests/docker-compose.yml @@ -21,7 +21,7 @@ services: context: .. dockerfile: integration_tests/Dockerfile.Replica restart: always - command: + command: > bash -c " sleep 10 && python manage.py makemigrations --settings=integration_tests.replica_settings && @@ -40,8 +40,14 @@ services: build: context: .. dockerfile: integration_tests/Dockerfile.Master - command: - bash -c "sleep 12 && pytest integration_tests/" + command: > + bash -c " + sleep 12 && + echo '####################################################' && + echo 'Running integration tests for current version (v2) ' && + echo '####################################################' && + pytest integration_tests/ + " container_name: django_cqrs_test_master depends_on: - rabbit diff --git a/integration_tests/masterV1.yml b/integration_tests/masterV1.yml new file mode 100644 index 0000000..fbeb690 --- /dev/null +++ b/integration_tests/masterV1.yml @@ -0,0 +1,17 @@ +version: '3' + +services: + master: + build: + context: .. + dockerfile: integration_tests/Dockerfile.MasterV1 + image: django_cqrs_test_maste_v1 + container_name: django_cqrs_test_masterV1 + command: > + bash -c " + sleep 12 && + echo '####################################################' && + echo 'Running integration tests Master v1.3.1 - Replica v2' && + echo '####################################################' && + pytest integration_tests/ + " diff --git a/integration_tests/replicaV1.yml b/integration_tests/replicaV1.yml new file mode 100644 index 0000000..5c096fc --- /dev/null +++ b/integration_tests/replicaV1.yml @@ -0,0 +1,32 @@ +version: '3' + +services: + + master: + command: > + bash -c " + sleep 12 && + echo '####################################################' && + echo 'Running integration tests Master v2 - Replica v1.3.1' && + echo '####################################################' && + pytest integration_tests/ + " + + replica: + build: + context: .. + dockerfile: integration_tests/Dockerfile.ReplicaV1 + image: django_cqrs_test_replica_v1 + restart: always + command: > + bash -c " + sleep 10 && + python manage.py makemigrations --settings=integration_tests.replica_settings && + python manage.py makemigrations dj_replica --settings=integration_tests.replica_settings && + python manage.py migrate --settings=integration_tests.replica_settings && + python manage.py cqrs_consume -w 2 --settings=integration_tests.replica_settings + " + container_name: django_cqrs_test_replicaV1 + depends_on: + - rabbit + - postgres diff --git a/integration_tests/tests/test_single_basic_instance.py b/integration_tests/tests/test_single_basic_instance.py index b32ee13..031a21d 100644 --- a/integration_tests/tests/test_single_basic_instance.py +++ b/integration_tests/tests/test_single_basic_instance.py @@ -18,6 +18,7 @@ def test_flow(replica_cursor): int_field=1, char_field='text', date_field=now().date(), + bool_field=False, ) assert master_instance.cqrs_revision == 0 @@ -40,6 +41,12 @@ def test_flow(replica_cursor): # Update master_instance.bool_field = True master_instance.save() + + if hasattr(master_instance, 'get_tracked_fields_data'): + previous_values = master_instance.get_tracked_fields_data() + assert 'bool_field' in previous_values + assert previous_values['bool_field'] is False + master_instance.refresh_from_db() assert master_instance.cqrs_revision == 1 diff --git a/requirements/dev.txt b/requirements/dev.txt index fabfb2e..9ca2137 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -1,3 +1,4 @@ Django >= 1.11.20 -pika -ujson \ No newline at end of file +pika>=1.0.0 +ujson==3.0.0 +django-model-utils==4.0.0 diff --git a/requirements/docs.txt b/requirements/docs.txt new file mode 100644 index 0000000..0edca9f --- /dev/null +++ b/requirements/docs.txt @@ -0,0 +1,6 @@ +-r test.txt +-r dev.txt +Sphinx==3.1.2 +sphinx-rtd-theme==0.4.3 +sphinx-copybutton==0.2.11 +setuptools-scm==3.5.0 diff --git a/tests/dj_master/models.py b/tests/dj_master/models.py index 0c72a87..cdc0d0b 100644 --- a/tests/dj_master/models.py +++ b/tests/dj_master/models.py @@ -1,5 +1,7 @@ # Copyright © 2020 Ingram Micro Inc. All rights reserved. +from django.contrib.contenttypes.fields import GenericForeignKey +from django.contrib.contenttypes.models import ContentType from django.db import models from mptt.models import MPTTModel, TreeForeignKey @@ -10,6 +12,7 @@ class BasicFieldsModel(MasterMixin, models.Model): CQRS_ID = 'basic' + CQRS_TRACKED_FIELDS = ALL_BASIC_FIELDS int_field = models.IntegerField(primary_key=True) bool_field = models.NullBooleanField() @@ -24,6 +27,7 @@ class BasicFieldsModel(MasterMixin, models.Model): class AllFieldsModel(MasterMixin, models.Model): CQRS_FIELDS = ALL_BASIC_FIELDS CQRS_ID = 'all' + CQRS_TRACKED_FIELDS = ALL_BASIC_FIELDS int_field = models.IntegerField(null=True) char_field = models.CharField(max_length=200, null=True) @@ -117,3 +121,66 @@ class NonMetaClassModel(MPTTModel, RawMasterMixin): class NonSentModel(MasterMixin, models.Model): CQRS_ID = 'non_sent' CQRS_PRODUCE = False + + +class TrackedFieldsParentModel(MasterMixin, models.Model): + CQRS_ID = 'tracked_parent' + CQRS_TRACKED_FIELDS = ALL_BASIC_FIELDS + + char_field = models.CharField(max_length=10) + + +class TrackedFieldsChildModel(MasterMixin): + CQRS_ID = 'tracked_child' + CQRS_TRACKED_FIELDS = ('char_field', 'parent') + + char_field = models.CharField(max_length=10) + parent = models.ForeignKey(TrackedFieldsParentModel, on_delete=models.CASCADE) + + +class TrackedFieldsAllWithChildModel(MasterMixin): + CQRS_ID = 'tracked_child_all' + CQRS_TRACKED_FIELDS = '__all__' + + char_field = models.CharField(max_length=10) + parent = models.ForeignKey(TrackedFieldsParentModel, on_delete=models.CASCADE) + + +class MPTTWithTrackingModel(MPTTModel, RawMasterMixin): + CQRS_ID = 'with_tracking' + + CQRS_TRACKED_FIELDS = '__all__' + + name = models.CharField(max_length=50, unique=True) + parent = TreeForeignKey( + 'self', on_delete=models.CASCADE, null=True, blank=True, related_name='children', + ) + + +MasterMeta.register(MPTTWithTrackingModel) + + +class WithGenericFKModel(MasterMixin): + CQRS_ID = 'with_generic_fk' + + CQRS_TRACKED_FIELDS = '__all__' + + content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True, blank=True) + object_id = models.PositiveIntegerField(null=True) + content_object = GenericForeignKey('content_type', 'object_id') + + +class M2MModel(models.Model): + CQRS_ID = 'm2m_model' + + id = models.IntegerField(primary_key=True) + name = models.CharField(max_length=200, null=True) + + +class WithM2MModel(MasterMixin): + CQRS_ID = 'with_m2m_fk' + + CQRS_TRACKED_FIELDS = '__all__' + + char_field = models.CharField(max_length=100) + m2m_field = models.ManyToManyField(M2MModel) diff --git a/tests/dj_replica/models.py b/tests/dj_replica/models.py index 6630772..4a98bbe 100644 --- a/tests/dj_replica/models.py +++ b/tests/dj_replica/models.py @@ -71,7 +71,7 @@ class AuthorRef(ReplicaMixin, models.Model): publisher = models.ForeignKey(Publisher, null=True, on_delete=models.CASCADE) @classmethod - def cqrs_create(cls, sync, **mapped_data): + def cqrs_create(cls, sync, mapped_data, previous_data=None): publisher_data, publisher = mapped_data.pop('publisher', None), None if publisher_data: publisher, _ = Publisher.objects.get_or_create(**publisher_data) @@ -82,7 +82,7 @@ def cqrs_create(cls, sync, **mapped_data): Book.objects.bulk_create(Book(author=author, **book_data) for book_data in books_data) return author - def cqrs_update(self, sync, **mapped_data): + def cqrs_update(self, sync, mapped_data, previous_data=None): # It's just an example, that doesn't make sense in real cases publisher_data, publisher = mapped_data.pop('publisher', None), None if publisher_data: diff --git a/tests/test_controller.py b/tests/test_controller.py index 57665bd..f9f4a62 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -7,7 +7,7 @@ def test_producer(mocker): transport_mock = mocker.patch('tests.dj.transport.TransportStub.produce') - produce(TransportPayload('a', 'b', {}, 'c')) + produce(TransportPayload('a', 'b', {}, 'c', previous_data={'e': 'f'})) assert transport_mock.call_count == 1 assert transport_mock.call_args[0][0].to_dict() == { @@ -15,11 +15,12 @@ def test_producer(mocker): 'cqrs_id': 'b', 'instance_data': {}, 'instance_pk': 'c', + 'previous_data': {'e': 'f'}, } def test_consumer(mocker): factory_mock = mocker.patch('dj_cqrs.controller.consumer.route_signal_to_replica_model') - consume(TransportPayload('a', 'b', {}, 'c')) + consume(TransportPayload('a', 'b', {}, 'c', previous_data={'e': 'f'})) - factory_mock.assert_called_once_with('a', 'b', {}) + factory_mock.assert_called_once_with('a', 'b', {}, previous_data={'e': 'f'}) diff --git a/tests/test_master/test_mixin.py b/tests/test_master/test_mixin.py index 94ca0e6..ceb11ac 100644 --- a/tests/test_master/test_mixin.py +++ b/tests/test_master/test_mixin.py @@ -3,15 +3,20 @@ import pytest from uuid import uuid4 +from django.contrib.contenttypes.models import ContentType from django.db import transaction -from django.db.models import CharField, IntegerField +from django.db.models import CharField, IntegerField, F from django.utils.timezone import now -from dj_cqrs.constants import SignalType +from dj_cqrs.constants import SignalType, FIELDS_TRACKER_FIELD_NAME from dj_cqrs.metas import MasterMeta from tests.dj_master import models from tests.dj_master.serializers import AuthorSerializer -from tests.utils import assert_is_sub_dict, assert_publisher_once_called_with_args +from tests.utils import ( + assert_is_sub_dict, + assert_publisher_once_called_with_args, + assert_tracked_fields, +) class MasterMetaTest(MasterMeta): @@ -23,6 +28,10 @@ def check_cqrs_fields(cls, model_cls): def check_correct_configuration(cls, model_cls): return cls._check_correct_configuration(model_cls) + @classmethod + def check_cqrs_tracked_fields(cls, model_cls): + return cls._check_cqrs_tracked_fields(model_cls) + def test_cqrs_fields_non_existing_field(mocker): with pytest.raises(AssertionError) as e: @@ -501,3 +510,161 @@ def test_non_sent(mocker): m.delete() assert publisher_mock.call_count == 0 + + +def test_cqrs_tracked_fields_non_existing_field(mocker): + with pytest.raises(AssertionError) as e: + + class Cls(object): + CQRD_ID = 'ID' + CQRS_TRACKED_FIELDS = ('char_field', 'integer_field') + + char_field = CharField(max_length=100, primary_key=True) + int_field = IntegerField() + + _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=()) + _meta.pk.name = 'char_field' + + MasterMetaTest.check_cqrs_tracked_fields(Cls) + + assert str(e.value) == 'CQRS_TRACKED_FIELDS field is not correctly set for model Cls.' + + +def test_cqrs_tracked_fields_duplicates(mocker): + with pytest.raises(AssertionError) as e: + + class Cls(object): + CQRD_ID = 'ID' + CQRS_TRACKED_FIELDS = ('char_field', 'char_field') + + char_field = CharField(max_length=100, primary_key=True) + int_field = IntegerField() + + _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=()) + _meta.pk.name = 'char_field' + + MasterMetaTest.check_cqrs_tracked_fields(Cls) + + assert str(e.value) == 'Duplicate names in CQRS_TRACKED_FIELDS field for model Cls.' + + +def test_cqrs_tracked_fields_bad_configuration(mocker): + with pytest.raises(AssertionError) as e: + + class Cls(object): + CQRD_ID = 'ID' + CQRS_TRACKED_FIELDS = 'bad_config' + + char_field = CharField(max_length=100, primary_key=True) + int_field = IntegerField() + + _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=()) + _meta.pk.name = 'char_field' + + MasterMetaTest.check_cqrs_tracked_fields(Cls) + + assert str(e.value) == 'Model Cls: Invalid configuration for CQRS_TRACKED_FIELDS' + + +def test_cqrs_tracked_fields_model_has_tracker(mocker): + instance = models.TrackedFieldsChildModel() + tracker = getattr(instance, FIELDS_TRACKER_FIELD_NAME) + assert tracker is not None + + +def test_cqrs_tracked_fields_related_fields(mocker): + instance = models.TrackedFieldsChildModel() + tracker = getattr(instance, FIELDS_TRACKER_FIELD_NAME) + assert_tracked_fields(models.TrackedFieldsChildModel, tracker.fields) + + +def test_cqrs_tracked_fields_all_related_fields(mocker): + instance = models.TrackedFieldsAllWithChildModel() + tracker = getattr(instance, FIELDS_TRACKER_FIELD_NAME) + assert_tracked_fields(models.TrackedFieldsAllWithChildModel, tracker.fields) + + +@pytest.mark.django_db(transaction=True) +def test_cqrs_tracked_fields_tracking(mocker): + publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce') + instance = models.TrackedFieldsParentModel() + instance.char_field = 'Value' + instance.save() + tracked_data = instance.get_tracked_fields_data() + assert publisher_mock.call_args[0][0].previous_data == tracked_data + assert tracked_data is not None + assert 'char_field' in tracked_data + assert tracked_data['char_field'] is None + instance.char_field = 'New Value' + instance.save() + tracked_data = instance.get_tracked_fields_data() + assert 'char_field' in tracked_data + assert tracked_data['char_field'] == 'Value' + assert publisher_mock.call_args[0][0].previous_data == tracked_data + + +def test_mptt_cqrs_tracked_fields_model_has_tracker(): + instance = models.MPTTWithTrackingModel() + tracker = getattr(instance, FIELDS_TRACKER_FIELD_NAME) + assert tracker is not None + + +def test_mptt_cqrs_tracked_fields_related_fields(): + instance = models.MPTTWithTrackingModel() + tracker = getattr(instance, FIELDS_TRACKER_FIELD_NAME) + assert_tracked_fields(models.MPTTWithTrackingModel, tracker.fields) + + +@pytest.mark.django_db(transaction=True) +def test_f_expr(): + m = models.AllFieldsModel.objects.create(int_field=0, char_field='char') + m.int_field = F('int_field') + 1 + m.save() + + cqrs_data = m.to_cqrs_dict() + previous_data = m.get_tracked_fields_data() + + assert 'int_field' in cqrs_data + assert cqrs_data['int_field'] == 1 + assert 'int_field' in previous_data + assert previous_data['int_field'] == 0 + + +@pytest.mark.django_db(transaction=True) +def test_generic_fk(): + sm = models.SimplestModel.objects.create(id=1, name='char') + m = models.WithGenericFKModel.objects.create(content_object=sm) + ct = ContentType.objects.get_for_model(models.SimplestModel) + cqrs_data = m.to_cqrs_dict() + previous_data = m.get_tracked_fields_data() + + assert 'content_object' not in cqrs_data + assert 'content_type' in cqrs_data + assert 'object_id' in cqrs_data + assert cqrs_data['object_id'] == sm.pk + assert cqrs_data['content_type'] == ct.pk + + assert 'content_object' not in previous_data + assert 'content_type' in previous_data + assert 'object_id' in previous_data + + sm1 = models.SimplestModel.objects.create(id=2, name='name') + m.content_object = sm1 + m.save() + previous_data = m.get_tracked_fields_data() + + assert 'content_object' not in previous_data + assert 'content_type' not in previous_data + assert previous_data['object_id'] == sm.pk + + +@pytest.mark.django_db(transaction=True) +def test_m2m_not_supported(): + m1 = models.M2MModel.objects.create(id=1, name='name') + m2m = models.WithM2MModel.objects.create(char_field='test') + m2m.m2m_field.add(m1) + m2m.save() + cqrs_data = m2m.to_cqrs_dict() + + assert 'm2m_field' not in cqrs_data + assert 'char_field' in cqrs_data diff --git a/tests/test_replica/test_factory.py b/tests/test_replica/test_factory.py index 65c260f..e1585c5 100644 --- a/tests/test_replica/test_factory.py +++ b/tests/test_replica/test_factory.py @@ -20,9 +20,9 @@ def test_bad_signal(caplog): @pytest.mark.django_db def test_save_model(mocker): cqrs_save_mock = mocker.patch.object(ReplicaMixin, 'cqrs_save') - route_signal_to_replica_model(SignalType.SAVE, 'basic', {}) + route_signal_to_replica_model(SignalType.SAVE, 'basic', {}, {}) - cqrs_save_mock.assert_called_once_with({}) + cqrs_save_mock.assert_called_once_with({}, previous_data={}) @pytest.mark.django_db diff --git a/tests/test_replica/test_mixin.py b/tests/test_replica/test_mixin.py index f01b71c..c09b134 100644 --- a/tests/test_replica/test_mixin.py +++ b/tests/test_replica/test_mixin.py @@ -444,3 +444,28 @@ def test_updates_were_lost(caplog): }) assert 'Lost or filtered out 4 CQRS packages: pk = 1, cqrs_revision = 5 (basic)' in caplog.text + + +@pytest.mark.django_db() +def test_tracked_fields_mapped(mocker): + cqrs_update_mock = mocker.patch.object(models.MappedFieldsModelRef, 'cqrs_update') + first_payload = { + 'int_field': 1, + 'cqrs_revision': 0, + 'cqrs_updated': now(), + 'char_field': 'text', + } + + second_payload = { + 'int_field': 1, + 'cqrs_revision': 1, + 'cqrs_updated': now(), + 'char_field': 'new_text', + } + + models.MappedFieldsModelRef.cqrs_save(first_payload) + models.MappedFieldsModelRef.cqrs_save(second_payload, previous_data={'char_field': 'text'}) + assert cqrs_update_mock.call_count == 1 + _, kwargs = cqrs_update_mock.call_args + assert 'previous_data' in kwargs + assert kwargs['previous_data'] == {'name': 'text'} diff --git a/tests/test_transport/test_rabbit_mq.py b/tests/test_transport/test_rabbit_mq.py index d3f548a..94980f9 100644 --- a/tests/test_transport/test_rabbit_mq.py +++ b/tests/test_transport/test_rabbit_mq.py @@ -132,7 +132,9 @@ def test_produce_ok(rabbit_transport, mocker, caplog): def test_produce_message_ok(mocker): channel = mocker.MagicMock() - payload = TransportPayload(SignalType.SAVE, 'cqrs_id', {}, 'id') + payload = TransportPayload( + SignalType.SAVE, 'cqrs_id', {}, 'id', previous_data={'e': 'f'}, + ) PublicRabbitMQTransport.produce_message(channel, 'exchange', payload) @@ -145,6 +147,7 @@ def test_produce_message_ok(mocker): 'cqrs_id': 'cqrs_id', 'instance_data': {}, 'instance_pk': 'id', + 'previous_data': {'e': 'f'}, } assert basic_publish_kwargs['exchange'] == 'exchange' assert basic_publish_kwargs['mandatory'] @@ -166,6 +169,7 @@ def test_produce_sync_message_no_queue(mocker): 'cqrs_id': 'cqrs_id', 'instance_data': {}, 'instance_pk': None, + 'previous_data': None, } assert basic_publish_kwargs['routing_key'] == 'cqrs_id' @@ -183,6 +187,7 @@ def test_produce_sync_message_queue(mocker): 'cqrs_id': 'cqrs_id', 'instance_data': {}, 'instance_pk': 'id', + 'previous_data': None, } assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id' @@ -219,7 +224,8 @@ def test_consume_message_ack(mocker, caplog): mocker.MagicMock(), mocker.MagicMock(), None, - '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},"instance_pk":1}', + '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},' + '"instance_pk":1, "previous_data":{}}', ) assert consumer_mock.call_count == 1 @@ -228,6 +234,7 @@ def test_consume_message_ack(mocker, caplog): assert payload.signal_type == 'signal' assert payload.cqrs_id == 'cqrs_id' assert payload.instance_data == {} + assert payload.previous_data == {} assert payload.pk == 1 assert 'CQRS is received: pk = 1 (cqrs_id).' in caplog.text @@ -242,7 +249,8 @@ def test_consume_message_ack_deprecated_structure(mocker, caplog): mocker.MagicMock(), mocker.MagicMock(), None, - '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{}}', + '{"signal_type":"signal","cqrs_id":"cqrs_id",' + '"instance_data":{},"previous_data":null}', ) assert consumer_mock.call_count == 1 @@ -266,7 +274,8 @@ def test_consume_message_nack(mocker, caplog): mocker.MagicMock(), mocker.MagicMock(), None, - '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},"instance_pk":1}', + '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},' + '"instance_pk":1,"previous_data":null}', ) assert 'CQRS is received: pk = 1 (cqrs_id).' in caplog.text @@ -293,7 +302,7 @@ def test_consume_message_json_parsing_error(mocker, caplog): mocker.MagicMock(), mocker.MagicMock(), None, '{bad_payload:', ) - assert "CQRS couldn't be parsed: {bad_payload:." in caplog.text + assert ": {bad_payload:." in caplog.text def test_consume_message_package_structure_error(mocker, caplog): diff --git a/tests/utils.py b/tests/utils.py index 766943b..884d486 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -23,3 +23,21 @@ def assert_publisher_once_called_with_args(publisher_mock, *args): def db_error(*args, **kwargs): raise DatabaseError() + + +def assert_tracked_fields(model_cls, fields): + if model_cls.CQRS_TRACKED_FIELDS == '__all__': + fields_to_track = { + f.attname if f.is_relation else f.name + for f in model_cls._meta.concrete_fields + } + else: + fields_to_track = set() + for fname in model_cls.CQRS_TRACKED_FIELDS: + field = model_cls._meta.get_field(fname) + if field.is_relation: + fields_to_track.add(field.attname) + else: + fields_to_track.add(field.name) + + assert fields_to_track == fields