Skip to content

Commit

Permalink
Add support to keep tracks of changes to fields.
Browse files Browse the repository at this point in the history
Add sphinx documentation.
Fix version of dependencies.
Fix F expressions for updates.
Fix for model private fields (GenericForeignKey)

Add integration tests for:
	Master: current -> Replica: v1.3.1
        Master: v1.3.1 -> Replica: current
  • Loading branch information
Francesco Faraone committed Jul 31, 2020
1 parent bedba89 commit d1163ed
Show file tree
Hide file tree
Showing 39 changed files with 1,323 additions and 85 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ dist/

tests/reports/
.coverage

*.dump

htmlcov
.devcontainer

docs/_build
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=
Development
===========

1. Python 3.5+
1. Python 3.6+
0. Install dependencies `requirements/dev.txt`

Testing
=======

Unit testing
------
1. Python 3.5+
1. Python 3.6+
0. Install dependencies `requirements/test.txt`
0. `export PYTHONPATH=/your/path/to/django-cqrs/`

Expand Down
13 changes: 11 additions & 2 deletions dj_cqrs/constants.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.

ALL_BASIC_FIELDS = '__all__'

FIELDS_TRACKER_FIELD_NAME = '__fields_tracker'
TRACKED_FIELDS_ATTR_NAME = '__tracked_fields'

# Copyright © 2020 Ingram Micro Inc. All rights reserved.

class SignalType:
DELETE = 'DELETE'
"""Type of signal that generates this event."""

SAVE = 'SAVE'
"""The master model has been saved."""

DELETE = 'DELETE'
"""The master model has been deleted."""

SYNC = 'SYNC'
"""The master model needs syncronization."""


NO_QUEUE = 'None'
13 changes: 9 additions & 4 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dj_cqrs.registries import ReplicaRegistry


logger = logging.getLogger()
logger = logging.getLogger('django-cqrs')


def consume(payload):
Expand All @@ -18,10 +18,11 @@ def consume(payload):
"""
return route_signal_to_replica_model(
payload.signal_type, payload.cqrs_id, payload.instance_data,
previous_data=payload.previous_data,
)


def route_signal_to_replica_model(signal_type, cqrs_id, instance_data):
def route_signal_to_replica_model(signal_type, cqrs_id, instance_data, previous_data=None):
""" Routes signal to model method to create/update/delete replica instance.
:param dj_cqrs.constants.SignalType signal_type: Consumed signal type.
Expand All @@ -37,11 +38,15 @@ def route_signal_to_replica_model(signal_type, cqrs_id, instance_data):

elif signal_type == SignalType.SAVE:
with transaction.atomic():
return model_cls.cqrs_save(instance_data)
return model_cls.cqrs_save(instance_data, previous_data=previous_data)

elif signal_type == SignalType.SYNC:
with transaction.atomic():
return model_cls.cqrs_save(instance_data, sync=True)
return model_cls.cqrs_save(
instance_data,
previous_data=previous_data,
sync=True,
)

else:
logger.error('Bad signal type "{}" for CQRS_ID "{}".'.format(signal_type, cqrs_id))
32 changes: 31 additions & 1 deletion dj_cqrs/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,31 @@


class TransportPayload:
def __init__(self, signal_type, cqrs_id, instance_data, instance_pk, queue=None):
"""Transport message payload.
:param signal_type: Type of the signal for this message.
:type signal_type: dj_cqrs.constants.SignalType
:param cqrs_id: The unique CQRS identifier of the model.
:type cqrs_id: str
:param instance_data: Serialized data of the instance that
generates the event.
:type instance_data: dict
:param instance_pk: Primary key of the instance
:param queue: Queue to syncronize, defaults to None
:type queue: str, optional
:param previous_data: Previous values for fields tracked for changes,
defaults to None
:type previous_data: dict, optional
"""

def __init__(self, signal_type, cqrs_id, instance_data, instance_pk, queue=None,
previous_data=None):
self.__signal_type = signal_type
self.__cqrs_id = cqrs_id
self.__instance_data = instance_data
self.__instance_pk = instance_pk
self.__queue = queue
self.__previous_data = previous_data

@property
def signal_type(self):
Expand All @@ -29,10 +48,21 @@ def pk(self):
def queue(self):
return self.__queue

@property
def previous_data(self):
return self.__previous_data

def to_dict(self):
"""
Return the payload as a dictionary.
:return: This payload.
:rtype: dict
"""
return {
'signal_type': self.__signal_type,
'cqrs_id': self.__cqrs_id,
'instance_data': self.__instance_data,
'previous_data': self.__previous_data,
'instance_pk': self.__instance_pk,
}
60 changes: 48 additions & 12 deletions dj_cqrs/managers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.

import logging
from itertools import chain

from django.core.exceptions import ValidationError
from django.db import Error, IntegrityError, transaction
from django.db.models import Manager, F
from django.utils import timezone


logger = logging.getLogger()
logger = logging.getLogger('django-cqrs')


class MasterManager(Manager):
Expand All @@ -29,16 +28,18 @@ def bulk_update(self, queryset, **kwargs):


class ReplicaManager(Manager):
def save_instance(self, master_data, sync=False):
def save_instance(self, master_data, previous_data=None, sync=False):
""" This method saves (creates or updates) model instance from CQRS master instance data.
:param dict master_data: CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:return: Model instance.
:rtype: django.db.models.Model
"""
mapped_data = self._map_save_data(master_data)

mapped_previous_data = self._map_previous_data(previous_data) \
if previous_data else None
if mapped_data:
pk_name = self._get_model_pk_name()
pk_value = mapped_data[pk_name]
Expand All @@ -47,20 +48,34 @@ def save_instance(self, master_data, sync=False):
instance = self.model._default_manager.filter(**f_kwargs).first()

if instance:
return self.update_instance(instance, mapped_data, sync=sync)
return self.update_instance(
instance,
mapped_data,
previous_data=mapped_previous_data,
sync=sync,
)

return self.create_instance(mapped_data, sync=sync)
return self.create_instance(
mapped_data,
previous_data=mapped_previous_data,
sync=sync,
)

def create_instance(self, mapped_data, sync=False):
def create_instance(self, mapped_data, previous_data=None, sync=False):
""" This method creates model instance from mapped CQRS master instance data.
:param dict mapped_data: Mapped CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:return: ReplicaMixin model instance.
:rtype: django.db.models.Model
"""
try:
return self.model.cqrs_create(sync, **mapped_data)
return self.model.cqrs_create(
sync,
mapped_data,
previous_data=previous_data,
)
except (Error, ValidationError) as e:
pk_value = mapped_data[self._get_model_pk_name()]
if isinstance(e, IntegrityError):
Expand All @@ -76,11 +91,12 @@ def create_instance(self, mapped_data, sync=False):
),
)

def update_instance(self, instance, mapped_data, sync=False):
def update_instance(self, instance, mapped_data, previous_data=None, sync=False):
""" This method updates model instance from mapped CQRS master instance data.
:param django.db.models.Model instance: ReplicaMixin model instance.
:param dict mapped_data: Mapped CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:return: ReplicaMixin model instance.
:rtype: django.db.models.Model
Expand Down Expand Up @@ -128,7 +144,11 @@ def update_instance(self, instance, mapped_data, sync=False):
))

try:
return instance.cqrs_update(sync, **mapped_data)
return instance.cqrs_update(
sync,
mapped_data,
previous_data=previous_data,
)
except (Error, ValidationError) as e:
logger.error(
'{}\nCQRS update error: pk = {}, cqrs_revision = {} ({}).'.format(
Expand Down Expand Up @@ -160,6 +180,20 @@ def delete_instance(self, master_data):

return False

def _map_previous_data(self, previous_data):
if self.model.CQRS_MAPPING is None:
return previous_data

mapped_previous_data = {}

for master_name, replica_name in self.model.CQRS_MAPPING.items():
if master_name not in previous_data:
continue

mapped_previous_data[replica_name] = previous_data[master_name]
mapped_previous_data = self._remove_excessive_data(mapped_previous_data)
return mapped_previous_data

def _map_save_data(self, master_data):
if not self._cqrs_fields_are_filled(master_data):
return
Expand Down Expand Up @@ -200,14 +234,16 @@ def _make_initial_mapping(self, master_data):

def _remove_excessive_data(self, data):
opts = self.model._meta
possible_field_names = {f.name for f in chain(opts.concrete_fields, opts.private_fields)}
possible_field_names = {
f.name for f in opts.fields
}
return {k: v for k, v in data.items() if k in possible_field_names}

def _all_required_fields_are_filled(self, mapped_data):
opts = self.model._meta

required_field_names = {
f.name for f in chain(opts.concrete_fields, opts.private_fields) if not f.null
f.name for f in opts.fields if not f.null
}
if not (required_field_names - set(mapped_data.keys())):
return True
Expand Down
40 changes: 32 additions & 8 deletions dj_cqrs/metas.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.

from itertools import chain

from django.db.models import base

from dj_cqrs.constants import ALL_BASIC_FIELDS
from dj_cqrs.registries import MasterRegistry, ReplicaRegistry
from dj_cqrs.signals import MasterSignals
from dj_cqrs.tracker import CQRSTracker


class MasterMeta(base.ModelBase):
def __new__(mcs, *args):
model_cls = super(MasterMeta, mcs).__new__(mcs, *args)
def __new__(mcs, name, bases, attrs, **kwargs):

model_cls = super(MasterMeta, mcs).__new__(mcs, name, bases, attrs, **kwargs)

if args[0] != 'MasterMixin':
if name != 'MasterMixin':
mcs.register(model_cls)

return model_cls
Expand All @@ -22,13 +22,38 @@ def __new__(mcs, *args):
def register(model_cls):
_MetaUtils.check_cqrs_id(model_cls)
MasterMeta._check_correct_configuration(model_cls)

if model_cls.CQRS_TRACKED_FIELDS is not None:
MasterMeta._check_cqrs_tracked_fields(model_cls)
CQRSTracker.add_to_model(model_cls)

if model_cls.CQRS_SERIALIZER is None:
MasterMeta._check_cqrs_fields(model_cls)

MasterRegistry.register_model(model_cls)
MasterSignals.register_model(model_cls)
return model_cls

@staticmethod
def _check_cqrs_tracked_fields(model_cls):
"""Check that the CQRS_TRACKED_FIELDS has correct configuration.
:param dj_cqrs.mixins.MasterMixin model_cls: CQRS Master Model.
:raises: AssertionError
"""
tracked_fields = model_cls.CQRS_TRACKED_FIELDS
if isinstance(tracked_fields, (list, tuple)):
_MetaUtils._check_no_duplicate_names(
model_cls,
tracked_fields,
'CQRS_TRACKED_FIELDS',
)
_MetaUtils._check_unexisting_names(model_cls, tracked_fields, 'CQRS_TRACKED_FIELDS')
return

assert isinstance(tracked_fields, str) and tracked_fields == ALL_BASIC_FIELDS, \
"Model {}: Invalid configuration for CQRS_TRACKED_FIELDS".format(model_cls.__name__)

@staticmethod
def _check_correct_configuration(model_cls):
""" Check that model has correct CQRS configuration.
Expand All @@ -39,7 +64,7 @@ def _check_correct_configuration(model_cls):
if model_cls.CQRS_FIELDS != ALL_BASIC_FIELDS:
assert model_cls.CQRS_SERIALIZER is None, \
"Model {}: CQRS_FIELDS can't be set together with CQRS_SERIALIZER.".format(
model_cls,
model_cls.__name__,
)

@staticmethod
Expand All @@ -61,7 +86,6 @@ def __new__(mcs, *args):
if args[0] != 'ReplicaMixin':
_MetaUtils.check_cqrs_id(model_cls)
ReplicaMeta._check_cqrs_mapping(model_cls)

ReplicaRegistry.register_model(model_cls)

return model_cls
Expand Down Expand Up @@ -102,7 +126,7 @@ def _check_unexisting_names(model_cls, cqrs_field_names, cqrs_attr):
opts = model_cls._meta
model_name = model_cls.__name__

model_field_names = {f.name for f in chain(opts.concrete_fields, opts.private_fields)}
model_field_names = {f.name for f in opts.fields}
assert not set(cqrs_field_names) - model_field_names, \
'{} field is not correctly set for model {}.'.format(cqrs_attr, model_name)

Expand Down
Loading

0 comments on commit d1163ed

Please sign in to comment.