Skip to content

Commit

Permalink
Merge pull request #78 from cloudblue/feature/LITE-23776
Browse files Browse the repository at this point in the history
LITE-23776 Added support for meta information
  • Loading branch information
maxipavlovic authored Jun 2, 2022
2 parents 0b9cc8a + e331c45 commit 33897f6
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 46 deletions.
27 changes: 26 additions & 1 deletion dj_cqrs/_validation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
# Copyright © 2022 Ingram Micro Inc. All rights reserved.

import logging
from inspect import getfullargspec, isfunction

from dj_cqrs.constants import (
DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
Expand Down Expand Up @@ -56,6 +57,7 @@ def _validate_master(cqrs_settings):
'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL,
'correlation_function': None,
'meta_function': None,
},
}

Expand All @@ -69,6 +71,7 @@ def _validate_master(cqrs_settings):
_validate_master_auto_update_fields(master_settings)
_validate_master_message_ttl(master_settings)
_validate_master_correlation_func(master_settings)
_validate_master_meta_func(master_settings)


def _validate_master_auto_update_fields(master_settings):
Expand Down Expand Up @@ -106,6 +109,28 @@ def _validate_master_correlation_func(master_settings):
raise AssertionError('CQRS master correlation_function must be callable.')


def _validate_master_meta_func(master_settings):
meta_func = master_settings.get('meta_function')
if not meta_func:
master_settings['meta_function'] = None
return

if isinstance(meta_func, str):
try:
meta_func = import_string(meta_func)
except ImportError:
raise AssertionError('CQRS master meta_function import error.')

if not isfunction(meta_func):
raise AssertionError('CQRS master meta_function must be function.')

r = getfullargspec(meta_func)
if not r.varkw:
raise AssertionError('CQRS master meta_function must support **kwargs.')

master_settings['meta_function'] = meta_func


def _validate_replica(cqrs_settings):
queue = cqrs_settings.get('queue')
assert queue, 'CQRS queue is not set.'
Expand Down
35 changes: 24 additions & 11 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
# Copyright © 2022 Ingram Micro Inc. All rights reserved.

import copy
import logging
Expand All @@ -20,17 +20,24 @@ def consume(payload):
"""
payload = copy.deepcopy(payload)
return route_signal_to_replica_model(
payload.signal_type, payload.cqrs_id, payload.instance_data,
payload.signal_type,
payload.cqrs_id,
payload.instance_data,
previous_data=payload.previous_data,
meta=payload.meta,
)


def route_signal_to_replica_model(signal_type, cqrs_id, instance_data, previous_data=None):
def route_signal_to_replica_model(
signal_type, cqrs_id, instance_data, previous_data=None, meta=None,
):
""" Routes signal to model method to create/update/delete replica instance.
:param dj_cqrs.constants.SignalType signal_type: Consumed signal type.
:param str cqrs_id: Replica model CQRS unique identifier.
:param dict instance_data: Master model data.
:param dict or None previous_data: Previous model data for changed tracked fields, if exists.
:param dict or None meta: Payload metadata, if exists.
"""
if signal_type not in (SignalType.DELETE, SignalType.SAVE, SignalType.SYNC):
logger.error('Bad signal type "{0}" for CQRS_ID "{1}".'.format(signal_type, cqrs_id))
Expand All @@ -42,20 +49,26 @@ def route_signal_to_replica_model(signal_type, cqrs_id, instance_data, previous_
if db_is_needed:
close_old_connections()

is_meta_supported = model_cls.CQRS_META
try:
with transaction.atomic(savepoint=False) if db_is_needed else ExitStack():
if signal_type == SignalType.DELETE:
if is_meta_supported:
return model_cls.cqrs_delete(instance_data, meta=meta)

return model_cls.cqrs_delete(instance_data)

elif signal_type == SignalType.SAVE:
return model_cls.cqrs_save(instance_data, previous_data=previous_data)
f_kw = {'previous_data': previous_data}
if is_meta_supported:
f_kw['meta'] = meta

if signal_type == SignalType.SAVE:
return model_cls.cqrs_save(instance_data, **f_kw)

if signal_type == SignalType.SYNC:
f_kw['sync'] = True
return model_cls.cqrs_save(instance_data, **f_kw)

elif signal_type == SignalType.SYNC:
return model_cls.cqrs_save(
instance_data,
previous_data=previous_data,
sync=True,
)
except Error as e:
pk_value = instance_data.get(model_cls._meta.pk.name)
cqrs_revision = instance_data.get('cqrs_revision')
Expand Down
12 changes: 11 additions & 1 deletion dj_cqrs/dataclasses.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
# Copyright © 2022 Ingram Micro Inc. All rights reserved.

from dateutil.parser import parse as dateutil_parse

Expand Down Expand Up @@ -30,6 +30,8 @@ class TransportPayload:
:type retries: int, optional
:param expires: Message expiration datetime, infinite if None
:type expires: datetime, optional
:param meta: Payload metadata
:type meta: dict, optional
"""

def __init__(
Expand All @@ -43,13 +45,15 @@ def __init__(
correlation_id=None,
expires=None,
retries=0,
meta=None,
):
self.__signal_type = signal_type
self.__cqrs_id = cqrs_id
self.__instance_data = instance_data
self.__instance_pk = instance_pk
self.__queue = queue
self.__previous_data = previous_data
self.__meta = meta

if correlation_id:
self.__correlation_id = correlation_id
Expand Down Expand Up @@ -85,6 +89,7 @@ def from_message(cls, dct):
correlation_id=dct.get('correlation_id'),
expires=expires,
retries=dct.get('retries') or 0,
meta=dct.get('meta'),
)

@property
Expand Down Expand Up @@ -115,6 +120,10 @@ def previous_data(self):
def correlation_id(self):
return self.__correlation_id

@property
def meta(self):
return self.__meta

@property
def expires(self):
return self.__expires
Expand Down Expand Up @@ -147,6 +156,7 @@ def to_dict(self):
'correlation_id': self.__correlation_id,
'retries': self.__retries,
'expires': expires,
'meta': self.__meta,
}

def is_expired(self):
Expand Down
31 changes: 18 additions & 13 deletions dj_cqrs/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ def list_all():


class ReplicaManager(Manager):
def save_instance(self, master_data, previous_data=None, sync=False):
def save_instance(self, master_data, previous_data=None, sync=False, meta=None):
""" This method saves (creates or updates) model instance from CQRS master instance data.
:param dict master_data: CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:param dict or None meta: Payload metadata, if exists.
:return: Model instance.
:rtype: django.db.models.Model
"""
Expand All @@ -96,29 +97,32 @@ def save_instance(self, master_data, previous_data=None, sync=False):
mapped_data,
previous_data=mapped_previous_data,
sync=sync,
meta=meta,
)

return self.create_instance(
mapped_data,
previous_data=mapped_previous_data,
sync=sync,
meta=meta,
)

def create_instance(self, mapped_data, previous_data=None, sync=False):
def create_instance(self, mapped_data, previous_data=None, sync=False, meta=None):
""" This method creates model instance from mapped CQRS master instance data.
:param dict mapped_data: Mapped CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:param dict or None meta: Payload metadata, if exists.
:return: ReplicaMixin model instance.
:rtype: django.db.models.Model
"""
f_kw = {'previous_data': previous_data}
if self.model.CQRS_META:
f_kw['meta'] = meta

try:
return self.model.cqrs_create(
sync,
mapped_data,
previous_data=previous_data,
)
return self.model.cqrs_create(sync, mapped_data, **f_kw)
except (Error, ValidationError) as e:
pk_value = mapped_data[self._get_model_pk_name()]

Expand All @@ -128,12 +132,13 @@ def create_instance(self, mapped_data, previous_data=None, sync=False):
),
)

def update_instance(self, instance, mapped_data, previous_data=None, sync=False):
def update_instance(self, instance, mapped_data, previous_data=None, sync=False, meta=None):
""" This method updates model instance from mapped CQRS master instance data.
:param django.db.models.Model instance: ReplicaMixin model instance.
:param dict mapped_data: Mapped CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param dict or None meta: Payload metadata, if exists.
:param bool sync: Sync package flag.
:return: ReplicaMixin model instance.
:rtype: django.db.models.Model
Expand Down Expand Up @@ -187,12 +192,12 @@ def update_instance(self, instance, mapped_data, previous_data=None, sync=False)
pk_value, current_cqrs_revision, self.model.CQRS_ID,
))

f_kw = {'previous_data': previous_data}
if self.model.CQRS_META:
f_kw['meta'] = meta

try:
return instance.cqrs_update(
sync,
mapped_data,
previous_data=previous_data,
)
return instance.cqrs_update(sync, mapped_data, **f_kw)
except (Error, ValidationError) as e:
logger.error(
'{0}\nCQRS update error: pk = {1}, cqrs_revision = {2} ({3}).'.format(
Expand Down
33 changes: 27 additions & 6 deletions dj_cqrs/mixins.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
# Copyright © 2022 Ingram Micro Inc. All rights reserved.

import logging

Expand Down Expand Up @@ -191,6 +191,20 @@ def is_sync_instance(self):
"""
return True

def get_cqrs_meta(self, **kwargs):
"""
This method can be overridden to collect model/instance specific metadata.
:type kwargs: Signal type, payload data, etc.
:return: Metadata dictionary if it's provided.
:rtype: dict
"""
generic_meta_func = settings.CQRS['master']['meta_function']
if generic_meta_func:
return generic_meta_func(obj=self, **kwargs)

return {}

@classmethod
def relate_cqrs_serialization(cls, queryset):
"""
Expand Down Expand Up @@ -342,6 +356,9 @@ class ReplicaMixin(Model, metaclass=ReplicaMeta):
CQRS_NO_DB_OPERATIONS = False
"""Set it to True to disable any default DB operations for this model."""

CQRS_META = False
"""Set it to True to receive meta data for this model."""

objects = Manager()

cqrs = ReplicaManager()
Expand All @@ -354,41 +371,44 @@ class Meta:
abstract = True

@classmethod
def cqrs_save(cls, master_data, previous_data=None, sync=False):
def cqrs_save(cls, master_data, previous_data=None, sync=False, meta=None):
""" This method saves (creates or updates) model instance from CQRS master instance data.
This method must not be overridden. Otherwise, sync checks need to be implemented manually.
:param dict master_data: CQRS master instance data.
:param dict previous_data: Previous values for tracked fields.
:param bool sync: Sync package flag.
:param dict or None meta: Payload metadata, if exists.
:return: Model instance.
:rtype: django.db.models.Model
"""
if cls.CQRS_NO_DB_OPERATIONS:
raise NotImplementedError

return cls.cqrs.save_instance(master_data, previous_data, sync)
return cls.cqrs.save_instance(master_data, previous_data, sync, meta)

@classmethod
def cqrs_create(cls, sync, mapped_data, previous_data=None):
def cqrs_create(cls, sync, mapped_data, previous_data=None, meta=None):
""" This method creates model instance from CQRS mapped instance data. It must be overridden
by replicas of master models with custom serialization.
:param bool sync: Sync package flag.
:param dict mapped_data: CQRS mapped instance data.
:param dict previous_data: Previous mapped values for tracked fields.
:param dict or None meta: Payload metadata, if exists.
:return: Model instance.
:rtype: django.db.models.Model
"""
return cls._default_manager.create(**mapped_data)

def cqrs_update(self, sync, mapped_data, previous_data=None):
def cqrs_update(self, sync, mapped_data, previous_data=None, meta=None):
""" This method updates model instance from CQRS mapped instance data. It must be overridden
by replicas of master models with custom serialization.
:param bool sync: Sync package flag.
:param dict mapped_data: CQRS mapped instance data.
:param dict previous_data: Previous mapped values for tracked fields.
:param dict or None meta: Payload metadata, if exists.
:return: Model instance.
:rtype: django.db.models.Model
"""
Expand All @@ -399,10 +419,11 @@ def cqrs_update(self, sync, mapped_data, previous_data=None):
return self

@classmethod
def cqrs_delete(cls, master_data):
def cqrs_delete(cls, master_data, meta=None):
""" This method deletes model instance from mapped CQRS master instance data.
:param dict master_data: CQRS master instance data.
:param dict or None meta: Payload metadata, if exists.
:return: Flag, if delete operation is successful (even if nothing was deleted).
:rtype: bool
"""
Expand Down
Loading

0 comments on commit 33897f6

Please sign in to comment.