Skip to content

Commit

Permalink
Merge pull request #1 from cloudblue/add_fields_tracking
Browse files Browse the repository at this point in the history
Add support to keep tracks of changes to fields.
  • Loading branch information
marcserrat authored Aug 3, 2020
2 parents bedba89 + d1163ed commit ad2a58f
Show file tree
Hide file tree
Showing 39 changed files with 1,323 additions and 85 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ dist/

tests/reports/
.coverage

*.dump

htmlcov
.devcontainer

docs/_build
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=
Development
===========

1. Python 3.5+
1. Python 3.6+
0. Install dependencies `requirements/dev.txt`

Testing
=======

Unit testing
------
1. Python 3.5+
1. Python 3.6+
0. Install dependencies `requirements/test.txt`
0. `export PYTHONPATH=/your/path/to/django-cqrs/`

Expand Down
13 changes: 11 additions & 2 deletions dj_cqrs/constants.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.

ALL_BASIC_FIELDS = '__all__'

FIELDS_TRACKER_FIELD_NAME = '__fields_tracker'
TRACKED_FIELDS_ATTR_NAME = '__tracked_fields'

# Copyright © 2020 Ingram Micro Inc. All rights reserved.

class SignalType:
DELETE = 'DELETE'
"""Type of signal that generates this event."""

SAVE = 'SAVE'
"""The master model has been saved."""

DELETE = 'DELETE'
"""The master model has been deleted."""

SYNC = 'SYNC'
"""The master model needs syncronization."""


NO_QUEUE = 'None'
13 changes: 9 additions & 4 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dj_cqrs.registries import ReplicaRegistry


logger = logging.getLogger()
logger = logging.getLogger('django-cqrs')


def consume(payload):
Expand All @@ -18,10 +18,11 @@ def consume(payload):
"""
return route_signal_to_replica_model(
payload.signal_type, payload.cqrs_id, payload.instance_data,
previous_data=payload.previous_data,
)


def route_signal_to_replica_model(signal_type, cqrs_id, instance_data):
def route_signal_to_replica_model(signal_type, cqrs_id, instance_data, previous_data=None):
""" Routes signal to model method to create/update/delete replica instance.
:param dj_cqrs.constants.SignalType signal_type: Consumed signal type.
Expand All @@ -37,11 +38,15 @@ def route_signal_to_replica_model(signal_type, cqrs_id, instance_data):

elif signal_type == SignalType.SAVE:
with transaction.atomic():
return model_cls.cqrs_save(instance_data)
return model_cls.cqrs_save(instance_data, previous_data=previous_data)

elif signal_type == SignalType.SYNC:
with transaction.atomic():
return model_cls.cqrs_save(instance_data, sync=True)
return model_cls.cqrs_save(
instance_data,
previous_data=previous_data,
sync=True,
)

else:
logger.error('Bad signal type "{}" for CQRS_ID "{}".'.format(signal_type, cqrs_id))
32 changes: 31 additions & 1 deletion dj_cqrs/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,31 @@


class TransportPayload:
def __init__(self, signal_type, cqrs_id, instance_data, instance_pk, queue=None):
"""Transport message payload.
:param signal_type: Type of the signal for this message.
:type signal_type: dj_cqrs.constants.SignalType
:param cqrs_id: The unique CQRS identifier of the model.
:type cqrs_id: str
:param instance_data: Serialized data of the instance that
generates the event.
:type instance_data: dict
:param instance_pk: Primary key of the instance
:param queue: Queue to syncronize, defaults to None
:type queue: str, optional
:param previous_data: Previous values for fields tracked for changes,
defaults to None
:type previous_data: dict, optional
"""

def __init__(self, signal_type, cqrs_id, instance_data, instance_pk, queue=None,
previous_data=None):
self.__signal_type = signal_type
self.__cqrs_id = cqrs_id
self.__instance_data = instance_data
self.__instance_pk = instance_pk
self.__queue = queue
self.__previous_data = previous_data

@property
def signal_type(self):
Expand All @@ -29,10 +48,21 @@ def pk(self):
def queue(self):
return self.__queue

@property
def previous_data(self):
return self.__previous_data

def to_dict(self):
"""
Return the payload as a dictionary.
:return: This payload.
:rtype: dict
"""
return {
'signal_type': self.__signal_type,
'cqrs_id': self.__cqrs_id,
'instance_data': self.__instance_data,
'previous_data': self.__previous_data,
'instance_pk': self.__instance_pk,
}
60 changes: 48 additions & 12 deletions dj_cqrs/managers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.

import logging
from itertools import chain

from django.core.exceptions import ValidationError
from django.db import Error, IntegrityError, transaction
from django.db.models import Manager, F
from django.utils import timezone


logger = logging.getLogger()
logger = logging.getLogger('django-cqrs')


class MasterManager(Manager):
Expand All @@ -29,16 +28,18 @@ def bulk_update(self, queryset, **kwargs):


class ReplicaManager(Manager):
def save_instance(self, master_data, sync=False):
def save_instance(self, master_data, previous_data=None, sync=False):
""" This method saves (creates or updates) model instance from CQRS master instance data.
:param dict master_data: CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:return: Model instance.
:rtype: django.db.models.Model
"""
mapped_data = self._map_save_data(master_data)

mapped_previous_data = self._map_previous_data(previous_data) \
if previous_data else None
if mapped_data:
pk_name = self._get_model_pk_name()
pk_value = mapped_data[pk_name]
Expand All @@ -47,20 +48,34 @@ def save_instance(self, master_data, sync=False):
instance = self.model._default_manager.filter(**f_kwargs).first()

if instance:
return self.update_instance(instance, mapped_data, sync=sync)
return self.update_instance(
instance,
mapped_data,
previous_data=mapped_previous_data,
sync=sync,
)

return self.create_instance(mapped_data, sync=sync)
return self.create_instance(
mapped_data,
previous_data=mapped_previous_data,
sync=sync,
)

def create_instance(self, mapped_data, sync=False):
def create_instance(self, mapped_data, previous_data=None, sync=False):
""" This method creates model instance from mapped CQRS master instance data.
:param dict mapped_data: Mapped CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:return: ReplicaMixin model instance.
:rtype: django.db.models.Model
"""
try:
return self.model.cqrs_create(sync, **mapped_data)
return self.model.cqrs_create(
sync,
mapped_data,
previous_data=previous_data,
)
except (Error, ValidationError) as e:
pk_value = mapped_data[self._get_model_pk_name()]
if isinstance(e, IntegrityError):
Expand All @@ -76,11 +91,12 @@ def create_instance(self, mapped_data, sync=False):
),
)

def update_instance(self, instance, mapped_data, sync=False):
def update_instance(self, instance, mapped_data, previous_data=None, sync=False):
""" This method updates model instance from mapped CQRS master instance data.
:param django.db.models.Model instance: ReplicaMixin model instance.
:param dict mapped_data: Mapped CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:return: ReplicaMixin model instance.
:rtype: django.db.models.Model
Expand Down Expand Up @@ -128,7 +144,11 @@ def update_instance(self, instance, mapped_data, sync=False):
))

try:
return instance.cqrs_update(sync, **mapped_data)
return instance.cqrs_update(
sync,
mapped_data,
previous_data=previous_data,
)
except (Error, ValidationError) as e:
logger.error(
'{}\nCQRS update error: pk = {}, cqrs_revision = {} ({}).'.format(
Expand Down Expand Up @@ -160,6 +180,20 @@ def delete_instance(self, master_data):

return False

def _map_previous_data(self, previous_data):
if self.model.CQRS_MAPPING is None:
return previous_data

mapped_previous_data = {}

for master_name, replica_name in self.model.CQRS_MAPPING.items():
if master_name not in previous_data:
continue

mapped_previous_data[replica_name] = previous_data[master_name]
mapped_previous_data = self._remove_excessive_data(mapped_previous_data)
return mapped_previous_data

def _map_save_data(self, master_data):
if not self._cqrs_fields_are_filled(master_data):
return
Expand Down Expand Up @@ -200,14 +234,16 @@ def _make_initial_mapping(self, master_data):

def _remove_excessive_data(self, data):
opts = self.model._meta
possible_field_names = {f.name for f in chain(opts.concrete_fields, opts.private_fields)}
possible_field_names = {
f.name for f in opts.fields
}
return {k: v for k, v in data.items() if k in possible_field_names}

def _all_required_fields_are_filled(self, mapped_data):
opts = self.model._meta

required_field_names = {
f.name for f in chain(opts.concrete_fields, opts.private_fields) if not f.null
f.name for f in opts.fields if not f.null
}
if not (required_field_names - set(mapped_data.keys())):
return True
Expand Down
40 changes: 32 additions & 8 deletions dj_cqrs/metas.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.

from itertools import chain

from django.db.models import base

from dj_cqrs.constants import ALL_BASIC_FIELDS
from dj_cqrs.registries import MasterRegistry, ReplicaRegistry
from dj_cqrs.signals import MasterSignals
from dj_cqrs.tracker import CQRSTracker


class MasterMeta(base.ModelBase):
def __new__(mcs, *args):
model_cls = super(MasterMeta, mcs).__new__(mcs, *args)
def __new__(mcs, name, bases, attrs, **kwargs):

model_cls = super(MasterMeta, mcs).__new__(mcs, name, bases, attrs, **kwargs)

if args[0] != 'MasterMixin':
if name != 'MasterMixin':
mcs.register(model_cls)

return model_cls
Expand All @@ -22,13 +22,38 @@ def __new__(mcs, *args):
def register(model_cls):
_MetaUtils.check_cqrs_id(model_cls)
MasterMeta._check_correct_configuration(model_cls)

if model_cls.CQRS_TRACKED_FIELDS is not None:
MasterMeta._check_cqrs_tracked_fields(model_cls)
CQRSTracker.add_to_model(model_cls)

if model_cls.CQRS_SERIALIZER is None:
MasterMeta._check_cqrs_fields(model_cls)

MasterRegistry.register_model(model_cls)
MasterSignals.register_model(model_cls)
return model_cls

@staticmethod
def _check_cqrs_tracked_fields(model_cls):
"""Check that the CQRS_TRACKED_FIELDS has correct configuration.
:param dj_cqrs.mixins.MasterMixin model_cls: CQRS Master Model.
:raises: AssertionError
"""
tracked_fields = model_cls.CQRS_TRACKED_FIELDS
if isinstance(tracked_fields, (list, tuple)):
_MetaUtils._check_no_duplicate_names(
model_cls,
tracked_fields,
'CQRS_TRACKED_FIELDS',
)
_MetaUtils._check_unexisting_names(model_cls, tracked_fields, 'CQRS_TRACKED_FIELDS')
return

assert isinstance(tracked_fields, str) and tracked_fields == ALL_BASIC_FIELDS, \
"Model {}: Invalid configuration for CQRS_TRACKED_FIELDS".format(model_cls.__name__)

@staticmethod
def _check_correct_configuration(model_cls):
""" Check that model has correct CQRS configuration.
Expand All @@ -39,7 +64,7 @@ def _check_correct_configuration(model_cls):
if model_cls.CQRS_FIELDS != ALL_BASIC_FIELDS:
assert model_cls.CQRS_SERIALIZER is None, \
"Model {}: CQRS_FIELDS can't be set together with CQRS_SERIALIZER.".format(
model_cls,
model_cls.__name__,
)

@staticmethod
Expand All @@ -61,7 +86,6 @@ def __new__(mcs, *args):
if args[0] != 'ReplicaMixin':
_MetaUtils.check_cqrs_id(model_cls)
ReplicaMeta._check_cqrs_mapping(model_cls)

ReplicaRegistry.register_model(model_cls)

return model_cls
Expand Down Expand Up @@ -102,7 +126,7 @@ def _check_unexisting_names(model_cls, cqrs_field_names, cqrs_attr):
opts = model_cls._meta
model_name = model_cls.__name__

model_field_names = {f.name for f in chain(opts.concrete_fields, opts.private_fields)}
model_field_names = {f.name for f in opts.fields}
assert not set(cqrs_field_names) - model_field_names, \
'{} field is not correctly set for model {}.'.format(cqrs_attr, model_name)

Expand Down
Loading

0 comments on commit ad2a58f

Please sign in to comment.