diff --git a/dj_cqrs/controller/consumer.py b/dj_cqrs/controller/consumer.py index c463e70..268f179 100644 --- a/dj_cqrs/controller/consumer.py +++ b/dj_cqrs/controller/consumer.py @@ -4,11 +4,12 @@ import logging from contextlib import ExitStack -from django.db import close_old_connections, transaction - from dj_cqrs.constants import SignalType from dj_cqrs.registries import ReplicaRegistry +from django.db import close_old_connections, transaction + + logger = logging.getLogger('django-cqrs') @@ -32,7 +33,7 @@ def route_signal_to_replica_model(signal_type, cqrs_id, instance_data, previous_ :param dict instance_data: Master model data. """ if signal_type not in (SignalType.DELETE, SignalType.SAVE, SignalType.SYNC): - logger.error('Bad signal type "{}" for CQRS_ID "{}".'.format(signal_type, cqrs_id)) + logger.error('Bad signal type "{0}" for CQRS_ID "{1}".'.format(signal_type, cqrs_id)) return model_cls = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id) diff --git a/dj_cqrs/dataclasses.py b/dj_cqrs/dataclasses.py index 0deb235..dfcb823 100644 --- a/dj_cqrs/dataclasses.py +++ b/dj_cqrs/dataclasses.py @@ -1,11 +1,12 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. from dateutil.parser import parse as dateutil_parse -from django.utils import timezone from dj_cqrs.correlation import get_correlation_id from dj_cqrs.utils import get_expires_datetime +from django.utils import timezone + class TransportPayload: """Transport message payload. diff --git a/dj_cqrs/delay.py b/dj_cqrs/delay.py index 2046618..442dbc3 100644 --- a/dj_cqrs/delay.py +++ b/dj_cqrs/delay.py @@ -1,6 +1,6 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -from queue import PriorityQueue, Full +from queue import Full, PriorityQueue from django.utils import timezone diff --git a/dj_cqrs/management/commands/cqrs_bulk_dump.py b/dj_cqrs/management/commands/cqrs_bulk_dump.py index 952fe68..9e9794a 100644 --- a/dj_cqrs/management/commands/cqrs_bulk_dump.py +++ b/dj_cqrs/management/commands/cqrs_bulk_dump.py @@ -1,16 +1,17 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import datetime import os import sys import time -import ujson -from django.core.management.base import BaseCommand, CommandError - from dj_cqrs.management.commands.utils import batch_qs from dj_cqrs.registries import MasterRegistry +from django.core.management.base import BaseCommand, CommandError + +import ujson + class Command(BaseCommand): help = 'Bulk dump of a CQRS model from master service.' @@ -59,7 +60,7 @@ def handle(self, *args, **options): if progress: print( - 'Processing {} records with batch size {}'.format(db_count, batch_size), + 'Processing {0} records with batch size {1}'.format(db_count, batch_size), file=sys.stderr, ) for qs in batch_qs( @@ -76,7 +77,7 @@ def handle(self, *args, **options): ) success_counter += 1 except Exception as e: - print('\nDump record failed for pk={}: {}: {}'.format( + print('\nDump record failed for pk={0}: {1}: {2}'.format( instance.pk, type(e).__name__, str(e), ), file=sys.stderr) if progress: @@ -84,12 +85,13 @@ def handle(self, *args, **options): percent = 100 * counter / db_count eta = datetime.timedelta(seconds=int((db_count - counter) / rate)) sys.stderr.write( - '\r{} of {} processed - {}% with rate {:.1f} rps, to go {} ...{:20}'.format( + '\r{0} of {1} processed - {2}% with ' + 'rate {3:.1f} rps, to go {4} ...{5:20}'.format( counter, db_count, int(percent), rate, str(eta), ' ', )) sys.stderr.flush() - print('Done!\n{} instance(s) saved.\n{} instance(s) processed.'.format( + print('Done!\n{0} instance(s) saved.\n{1} instance(s) processed.'.format( success_counter, counter, ), file=sys.stderr) @@ -99,7 +101,7 @@ def _get_model(options): model = MasterRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) return model @@ -107,10 +109,10 @@ def _get_model(options): def _get_output_filename(options): f_name = options['output'] if f_name is None: - f_name = '{}.dump'.format(options['cqrs_id']) + f_name = '{0}.dump'.format(options['cqrs_id']) if f_name != '-' and os.path.exists(f_name) and not (options['force']): - raise CommandError('File {} exists!'.format(f_name)) + raise CommandError('File {0} exists!'.format(f_name)) return f_name diff --git a/dj_cqrs/management/commands/cqrs_bulk_load.py b/dj_cqrs/management/commands/cqrs_bulk_load.py index 584e476..81a3266 100644 --- a/dj_cqrs/management/commands/cqrs_bulk_load.py +++ b/dj_cqrs/management/commands/cqrs_bulk_load.py @@ -3,11 +3,12 @@ import os import sys -import ujson +from dj_cqrs.registries import ReplicaRegistry + from django.core.management.base import BaseCommand, CommandError -from django.db import transaction, DatabaseError +from django.db import DatabaseError, transaction -from dj_cqrs.registries import ReplicaRegistry +import ujson class Command(BaseCommand): @@ -38,7 +39,7 @@ def handle(self, *args, **options): f_name = options['input'] if f_name != '-' and not os.path.exists(f_name): - raise CommandError("File {} doesn't exist!".format(f_name)) + raise CommandError("File {0} doesn't exist!".format(f_name)) with sys.stdin if f_name == '-' else open(f_name, 'r') as f: try: @@ -47,11 +48,11 @@ def handle(self, *args, **options): cqrs_id = None if not cqrs_id: - raise CommandError('File {} is empty!'.format(f_name)) + raise CommandError('File {0} is empty!'.format(f_name)) model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) with transaction.atomic(): if options['clear']: @@ -80,7 +81,7 @@ def _process(cls, stream, model, batch_size): except EOFError: break - print('Done!\n{} instance(s) loaded.'.format(success_counter), file=sys.stderr) + print('Done!\n{0} instance(s) loaded.'.format(success_counter), file=sys.stderr) @staticmethod def _process_line(line_number, line, model): @@ -91,23 +92,23 @@ def _process_line(line_number, line, model): master_data = ujson.loads(line.strip()) except ValueError: print( - "Dump file can't be parsed: line {}!".format(line_number), - file=sys.stderr + "Dump file can't be parsed: line {0}!".format(line_number), + file=sys.stderr, ) return False instance = model.cqrs_save(master_data) if not instance: print( - "Instance can't be saved: line {}!".format(line_number), - file=sys.stderr + "Instance can't be saved: line {0}!".format(line_number), + file=sys.stderr, ) else: return True except Exception as e: print( - 'Unexpected error: line {}! {}'.format(line_number, str(e)), - file=sys.stderr + 'Unexpected error: line {0}! {1}'.format(line_number, str(e)), + file=sys.stderr, ) return False diff --git a/dj_cqrs/management/commands/cqrs_consume.py b/dj_cqrs/management/commands/cqrs_consume.py index fff7b44..e1fbf1a 100644 --- a/dj_cqrs/management/commands/cqrs_consume.py +++ b/dj_cqrs/management/commands/cqrs_consume.py @@ -2,10 +2,10 @@ from multiprocessing import Process -from django.core.management.base import BaseCommand - from dj_cqrs.transport import current_transport +from django.core.management.base import BaseCommand + class Command(BaseCommand): help = 'Starts CQRS worker, which consumes messages from message queue.' diff --git a/dj_cqrs/management/commands/cqrs_dead_letters.py b/dj_cqrs/management/commands/cqrs_dead_letters.py index 4d0c501..ad74f5b 100644 --- a/dj_cqrs/management/commands/cqrs_dead_letters.py +++ b/dj_cqrs/management/commands/cqrs_dead_letters.py @@ -1,14 +1,15 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import ujson -from django.core.management.base import BaseCommand, CommandError - from dj_cqrs.dataclasses import TransportPayload from dj_cqrs.registries import ReplicaRegistry -from dj_cqrs.transport.rabbit_mq import RabbitMQTransport from dj_cqrs.transport import current_transport +from dj_cqrs.transport.rabbit_mq import RabbitMQTransport from dj_cqrs.utils import get_expires_datetime +from django.core.management.base import BaseCommand, CommandError + +import ujson + class RabbitMQTransportService(RabbitMQTransport): @@ -82,22 +83,22 @@ def init_broker(self): queue_name, dead_letter_queue_name = RabbitMQTransportService.get_consumer_settings() RabbitMQTransportService.declare_queue(channel, queue_name) RabbitMQTransportService.declare_queue(channel, dead_letter_queue_name) - for cqrs_id, replica_model in ReplicaRegistry.models.items(): + for cqrs_id, _ in ReplicaRegistry.models.items(): channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=cqrs_id) # Every service must have specific SYNC or requeue routes channel.queue_bind( exchange=exchange, queue=queue_name, - routing_key='cqrs.{}.{}'.format(queue_name, cqrs_id), + routing_key='cqrs.{0}.{1}'.format(queue_name, cqrs_id), ) return channel, connection def handle_retry(self, channel, consumer_generator, dead_letters_count): - self.stdout.write("Total dead letters: {}".format(dead_letters_count)) + self.stdout.write("Total dead letters: {0}".format(dead_letters_count)) for i in range(1, dead_letters_count + 1): - self.stdout.write("Retrying: {}/{}".format(i, dead_letters_count)) + self.stdout.write("Retrying: {0}/{1}".format(i, dead_letters_count)) method_frame, properties, body = next(consumer_generator) dct = ujson.loads(body) @@ -116,12 +117,12 @@ def handle_retry(self, channel, consumer_generator, dead_letters_count): RabbitMQTransportService.nack(channel, method_frame.delivery_tag) def handle_dump(self, consumer_generator, dead_letters_count): - for i in range(1, dead_letters_count + 1): + for _ in range(1, dead_letters_count + 1): *_, body = next(consumer_generator) self.stdout.write(body.decode('utf-8')) def handle_purge(self, channel, dead_letter_queue_name, dead_letter_count): - self.stdout.write("Total dead letters: {}".format(dead_letter_count)) + self.stdout.write("Total dead letters: {0}".format(dead_letter_count)) if dead_letter_count > 0: channel.queue_purge(dead_letter_queue_name) self.stdout.write("Purged") diff --git a/dj_cqrs/management/commands/cqrs_deleted_diff_master.py b/dj_cqrs/management/commands/cqrs_deleted_diff_master.py index 60d99a2..967088e 100644 --- a/dj_cqrs/management/commands/cqrs_deleted_diff_master.py +++ b/dj_cqrs/management/commands/cqrs_deleted_diff_master.py @@ -2,12 +2,12 @@ import sys -import ujson +from dj_cqrs.registries import MasterRegistry from django.core.management.base import BaseCommand, CommandError from django.db import connection -from dj_cqrs.registries import MasterRegistry +import ujson GET_NON_EXISTING_PKS_SQL_TEMPLATE = """ @@ -46,7 +46,7 @@ def handle(self, *args, **options): master_data = self.deserialize_in(package_line) sql = GET_NON_EXISTING_PKS_SQL_TEMPLATE.format( - values=','.join(["({})".format(pk) for pk in master_data]), + values=','.join(["({0})".format(pk) for pk in master_data]), table=model._meta.db_table, pk_field=model._meta.pk.attname, ) @@ -55,7 +55,7 @@ def handle(self, *args, **options): diff_ids = [r[0] for r in cursor.fetchall()] if diff_ids: self.stdout.write(self.serialize_out(diff_ids)) - self.stderr.write('PK to delete: {}'.format(str(diff_ids))) + self.stderr.write('PK to delete: {0}'.format(str(diff_ids))) @staticmethod def _get_model(first_line): @@ -63,6 +63,6 @@ def _get_model(first_line): model = MasterRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) return model diff --git a/dj_cqrs/management/commands/cqrs_deleted_diff_replica.py b/dj_cqrs/management/commands/cqrs_deleted_diff_replica.py index b953525..a8e8d60 100644 --- a/dj_cqrs/management/commands/cqrs_deleted_diff_replica.py +++ b/dj_cqrs/management/commands/cqrs_deleted_diff_replica.py @@ -1,12 +1,13 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import ujson +from dj_cqrs.management.commands.utils import batch_qs +from dj_cqrs.registries import ReplicaRegistry + from django.core.exceptions import FieldError from django.core.management.base import BaseCommand, CommandError from django.utils.timezone import now -from dj_cqrs.management.commands.utils import batch_qs -from dj_cqrs.registries import ReplicaRegistry +import ujson class Command(BaseCommand): @@ -52,14 +53,14 @@ def handle(self, *args, **options): try: qs = qs.filter(**kwargs) except FieldError as e: - raise CommandError('Bad filter kwargs! {}'.format(str(e))) + raise CommandError('Bad filter kwargs! {0}'.format(str(e))) if not qs.exists(): self.stderr.write('No objects found for filter!') return current_dt = now() - self.stdout.write('{},{}'.format(model.CQRS_ID, str(current_dt))) + self.stdout.write('{0},{1}'.format(model.CQRS_ID, str(current_dt))) for bqs in batch_qs(qs.values_list('pk', flat=True), batch_size=batch_size): self.stdout.write(self.serialize_package(list(bqs))) @@ -70,7 +71,7 @@ def _get_model(options): model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) return model diff --git a/dj_cqrs/management/commands/cqrs_deleted_sync_replica.py b/dj_cqrs/management/commands/cqrs_deleted_sync_replica.py index cf097c4..21fb4ea 100644 --- a/dj_cqrs/management/commands/cqrs_deleted_sync_replica.py +++ b/dj_cqrs/management/commands/cqrs_deleted_sync_replica.py @@ -2,12 +2,12 @@ import sys -import ujson +from dj_cqrs.registries import ReplicaRegistry from django.core.management.base import BaseCommand, CommandError from django.db import DatabaseError -from dj_cqrs.registries import ReplicaRegistry +import ujson class Command(BaseCommand): @@ -36,6 +36,6 @@ def _get_model(first_line): model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) return model diff --git a/dj_cqrs/management/commands/cqrs_diff_master.py b/dj_cqrs/management/commands/cqrs_diff_master.py index b58fa4f..182ccd3 100644 --- a/dj_cqrs/management/commands/cqrs_diff_master.py +++ b/dj_cqrs/management/commands/cqrs_diff_master.py @@ -1,12 +1,13 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +from dj_cqrs.management.commands.utils import batch_qs +from dj_cqrs.registries import MasterRegistry -import ujson from django.core.exceptions import FieldError from django.core.management.base import BaseCommand, CommandError from django.utils.timezone import now -from dj_cqrs.management.commands.utils import batch_qs -from dj_cqrs.registries import MasterRegistry +import ujson class Command(BaseCommand): @@ -52,14 +53,14 @@ def handle(self, *args, **options): try: qs = qs.filter(**kwargs) except FieldError as e: - raise CommandError('Bad filter kwargs! {}'.format(str(e))) + raise CommandError('Bad filter kwargs! {0}'.format(str(e))) if not qs.exists(): self.stderr.write('No objects found for filter!') return current_dt = now() - self.stdout.write('{},{}'.format(model.CQRS_ID, str(current_dt))) + self.stdout.write('{0},{1}'.format(model.CQRS_ID, str(current_dt))) for bqs in batch_qs(qs, batch_size=batch_size): package = [ @@ -75,7 +76,7 @@ def _get_model(options): model = MasterRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) return model diff --git a/dj_cqrs/management/commands/cqrs_diff_replica.py b/dj_cqrs/management/commands/cqrs_diff_replica.py index ca9ab9d..edaed27 100644 --- a/dj_cqrs/management/commands/cqrs_diff_replica.py +++ b/dj_cqrs/management/commands/cqrs_diff_replica.py @@ -1,12 +1,13 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import sys -import ujson +from dj_cqrs.registries import ReplicaRegistry + from django.conf import settings from django.core.management.base import BaseCommand, CommandError -from dj_cqrs.registries import ReplicaRegistry +import ujson class Command(BaseCommand): @@ -24,13 +25,14 @@ def handle(self, *args, **options): with sys.stdin as f: first_line = f.readline() model = self._get_model(first_line) - self.stdout.write('{},{}'.format(first_line.strip(), settings.CQRS.get('queue'))) + self.stdout.write('{0},{1}'.format(first_line.strip(), settings.CQRS.get('queue'))) for package_line in f: master_data = self.deserialize_in(package_line) - qs = model._default_manager.filter(pk__in=master_data.keys()) \ - .order_by().only('pk', 'cqrs_revision') + qs = model._default_manager.filter( + pk__in=master_data.keys(), + ).order_by().only('pk', 'cqrs_revision') replica_data = {instance.pk: instance.cqrs_revision for instance in qs} diff_ids = set() @@ -40,7 +42,7 @@ def handle(self, *args, **options): if diff_ids: self.stdout.write(self.serialize_out(list(diff_ids))) - self.stderr.write('PK to resync: {}'.format(str(diff_ids))) + self.stderr.write('PK to resync: {0}'.format(str(diff_ids))) @staticmethod def _get_model(first_line): @@ -48,6 +50,6 @@ def _get_model(first_line): model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) return model diff --git a/dj_cqrs/management/commands/cqrs_diff_sync.py b/dj_cqrs/management/commands/cqrs_diff_sync.py index efacb60..badc2bb 100644 --- a/dj_cqrs/management/commands/cqrs_diff_sync.py +++ b/dj_cqrs/management/commands/cqrs_diff_sync.py @@ -1,13 +1,13 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import sys -from django.core.management.base import BaseCommand, CommandError - from dj_cqrs.constants import NO_QUEUE from dj_cqrs.management.commands.cqrs_sync import Command as SyncCommand from dj_cqrs.registries import MasterRegistry +from django.core.management.base import BaseCommand, CommandError + class Command(BaseCommand): help = 'Diff synchronizer from CQRS replica stream.' @@ -21,7 +21,7 @@ def handle(self, *args, **options): for pks_line in f: sync_kwargs = { 'cqrs_id': model.CQRS_ID, - 'filter': '{{"id__in": {}}}'.format(pks_line.strip()), + 'filter': '{{"id__in": {0}}}'.format(pks_line.strip()), } if queue: sync_kwargs['queue'] = queue @@ -34,7 +34,7 @@ def _get_model(first_line): model = MasterRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) return model diff --git a/dj_cqrs/management/commands/cqrs_sync.py b/dj_cqrs/management/commands/cqrs_sync.py index aa3270e..970afd4 100644 --- a/dj_cqrs/management/commands/cqrs_sync.py +++ b/dj_cqrs/management/commands/cqrs_sync.py @@ -1,17 +1,17 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import datetime import sys import time +from dj_cqrs.management.commands.utils import batch_qs +from dj_cqrs.registries import MasterRegistry + from django.core.exceptions import FieldError from django.core.management.base import BaseCommand, CommandError import ujson -from dj_cqrs.management.commands.utils import batch_qs -from dj_cqrs.registries import MasterRegistry - DEFAULT_BATCH = 10000 DEFAULT_PROGRESS = False @@ -64,7 +64,7 @@ def handle(self, *args, **options): counter, success_counter = 0, 0 if progress: - print('Processing {} records with batch size {}'.format(db_count, batch_size)) + print('Processing {0} records with batch size {1}'.format(db_count, batch_size)) for qs in batch_qs(model.relate_cqrs_serialization(qs), batch_size=batch_size): ts = time.time() @@ -75,7 +75,7 @@ def handle(self, *args, **options): instance.cqrs_sync(queue=options['queue']) success_counter += 1 except Exception as e: - print('\nSync record failed for pk={}: {}: {}'.format( + print('\nSync record failed for pk={0}: {1}: {2}'.format( instance.pk, type(e).__name__, str(e), )) @@ -84,12 +84,13 @@ def handle(self, *args, **options): percent = 100 * counter / db_count eta = datetime.timedelta(seconds=int((db_count - counter) / rate)) sys.stdout.write( - '\r{} of {} processed - {}% with rate {:.1f} rps, to go {} ...{:20}'.format( + '\r{0} of {1} processed - {2}% with ' + 'rate {3:.1f} rps, to go {4} ...{5:20}'.format( counter, db_count, int(percent), rate, str(eta), ' ', )) sys.stdout.flush() - print('Done!\n{} instance(s) synced.\n{} instance(s) processed.'.format( + print('Done!\n{0} instance(s) synced.\n{1} instance(s) processed.'.format( success_counter, counter, )) @@ -107,7 +108,7 @@ def _prepare_qs(model, options): try: qs = model._default_manager.filter(**kwargs).order_by() except FieldError as e: - raise CommandError('Bad filter kwargs! {}'.format(str(e))) + raise CommandError('Bad filter kwargs! {0}'.format(str(e))) return qs @@ -117,7 +118,7 @@ def _get_model(options): model = MasterRegistry.get_model_by_cqrs_id(cqrs_id) if not model: - raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id)) + raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id)) return model diff --git a/dj_cqrs/managers.py b/dj_cqrs/managers.py index 1a8ccc5..527056e 100644 --- a/dj_cqrs/managers.py +++ b/dj_cqrs/managers.py @@ -1,10 +1,10 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import logging from django.core.exceptions import ValidationError from django.db import Error, IntegrityError, transaction -from django.db.models import Manager, F +from django.db.models import F, Manager from django.utils import timezone @@ -39,7 +39,7 @@ def bulk_update(self, queryset, **kwargs): with transaction.atomic(): current_dt = timezone.now() result = queryset.update( - cqrs_revision=F('cqrs_revision') + 1, cqrs_updated=current_dt, **kwargs + cqrs_revision=F('cqrs_revision') + 1, cqrs_updated=current_dt, **kwargs, ) queryset.model.call_post_update(list(queryset.all()), using=queryset.db) return result @@ -56,8 +56,7 @@ def save_instance(self, master_data, previous_data=None, sync=False): :rtype: django.db.models.Model """ mapped_data = self._map_save_data(master_data) - mapped_previous_data = self._map_previous_data(previous_data) \ - if previous_data else None + mapped_previous_data = self._map_previous_data(previous_data) if previous_data else None if mapped_data: pk_name = self._get_model_pk_name() pk_value = mapped_data[pk_name] @@ -102,13 +101,14 @@ def create_instance(self, mapped_data, previous_data=None, sync=False): pk_value = mapped_data[self._get_model_pk_name()] if isinstance(e, IntegrityError): logger.warning( - 'Potentially wrong CQRS sync order: pk = {}, cqrs_revision = {} ({}).'.format( + 'Potentially wrong CQRS sync order: ' + 'pk = {0}, cqrs_revision = {1} ({2}).'.format( pk_value, mapped_data['cqrs_revision'], self.model.CQRS_ID, ), ) logger.error( - '{}\nCQRS create error: pk = {} ({}).'.format( + '{0}\nCQRS create error: pk = {1} ({2}).'.format( str(e), pk_value, self.model.CQRS_ID, ), ) @@ -129,15 +129,20 @@ def update_instance(self, instance, mapped_data, previous_data=None, sync=False) if sync: if existing_cqrs_revision > current_cqrs_revision: - w_tpl = 'CQRS revision downgrade on sync: pk = {}, ' \ - 'cqrs_revision = new {} / existing {} ({}).' + w_tpl = ( + 'CQRS revision downgrade on sync: pk = {0}, ' + 'cqrs_revision = new {1} / existing {2} ({3}).' + ) logger.warning(w_tpl.format( pk_value, current_cqrs_revision, existing_cqrs_revision, self.model.CQRS_ID, )) else: if existing_cqrs_revision > current_cqrs_revision: - e_tpl = 'Wrong CQRS sync order: pk = {}, cqrs_revision = new {} / existing {} ({}).' + e_tpl = ( + 'Wrong CQRS sync order: pk = {0}, ' + 'cqrs_revision = new {1} / existing {2} ({3}).' + ) logger.error(e_tpl.format( pk_value, current_cqrs_revision, existing_cqrs_revision, self.model.CQRS_ID, )) @@ -145,13 +150,13 @@ def update_instance(self, instance, mapped_data, previous_data=None, sync=False) if existing_cqrs_revision == current_cqrs_revision: logger.error( - 'Received duplicate CQRS data: pk = {}, cqrs_revision = {} ({}).'.format( + 'Received duplicate CQRS data: pk = {0}, cqrs_revision = {1} ({2}).'.format( pk_value, current_cqrs_revision, self.model.CQRS_ID, ), ) if current_cqrs_revision == 0: logger.warning( - 'CQRS potential creation race condition: pk = {} ({}).'.format( + 'CQRS potential creation race condition: pk = {0} ({1}).'.format( pk_value, self.model.CQRS_ID, ), ) @@ -159,7 +164,9 @@ def update_instance(self, instance, mapped_data, previous_data=None, sync=False) return instance if current_cqrs_revision != instance.cqrs_revision + 1: - w_tpl = 'Lost or filtered out {} CQRS packages: pk = {}, cqrs_revision = {} ({})' + w_tpl = ( + 'Lost or filtered out {0} CQRS packages: pk = {1}, cqrs_revision = {2} ({3})' + ) logger.warning(w_tpl.format( current_cqrs_revision - instance.cqrs_revision - 1, pk_value, current_cqrs_revision, self.model.CQRS_ID, @@ -173,7 +180,7 @@ def update_instance(self, instance, mapped_data, previous_data=None, sync=False) ) except (Error, ValidationError) as e: logger.error( - '{}\nCQRS update error: pk = {}, cqrs_revision = {} ({}).'.format( + '{0}\nCQRS update error: pk = {1}, cqrs_revision = {2} ({3}).'.format( str(e), pk_value, current_cqrs_revision, self.model.CQRS_ID, ), ) @@ -195,7 +202,7 @@ def delete_instance(self, master_data): return True except Error as e: logger.error( - '{}\nCQRS delete error: pk = {} ({}).'.format( + '{0}\nCQRS delete error: pk = {1} ({2}).'.format( str(e), pk_value, self.model.CQRS_ID, ), ) @@ -246,7 +253,7 @@ def _make_initial_mapping(self, master_data): } for master_name, replica_name in self.model.CQRS_MAPPING.items(): if master_name not in master_data: - logger.error('Bad master-replica mapping for {} ({}).'.format( + logger.error('Bad master-replica mapping for {0} ({1}).'.format( master_name, self.model.CQRS_ID, )) return @@ -271,7 +278,7 @@ def _all_required_fields_are_filled(self, mapped_data): return True logger.error( - 'Not all required CQRS fields are provided in data ({}).'.format(self.model.CQRS_ID), + 'Not all required CQRS fields are provided in data ({0}).'.format(self.model.CQRS_ID), ) return False @@ -293,11 +300,11 @@ def _cqrs_fields_are_filled(self, data): if 'cqrs_revision' in data and 'cqrs_updated' in data: return True - logger.error('CQRS sync fields are not provided in data ({}).'.format(self.model.CQRS_ID)) + logger.error('CQRS sync fields are not provided in data ({0}).'.format(self.model.CQRS_ID)) return False def _log_pk_data_error(self): - logger.error('CQRS PK is not provided in data ({}).'.format(self.model.CQRS_ID)) + logger.error('CQRS PK is not provided in data ({0}).'.format(self.model.CQRS_ID)) def _get_model_pk_name(self): return self.model._meta.pk.name diff --git a/dj_cqrs/metas.py b/dj_cqrs/metas.py index f972420..6d60ba7 100644 --- a/dj_cqrs/metas.py +++ b/dj_cqrs/metas.py @@ -1,12 +1,12 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. - -from django.db.models import base +# Copyright © 2021 Ingram Micro Inc. All rights reserved. from dj_cqrs.constants import ALL_BASIC_FIELDS from dj_cqrs.registries import MasterRegistry, ReplicaRegistry from dj_cqrs.signals import MasterSignals from dj_cqrs.tracker import CQRSTracker +from django.db.models import base + class MasterMeta(base.ModelBase): def __new__(mcs, name, bases, attrs, **kwargs): @@ -51,8 +51,8 @@ def _check_cqrs_tracked_fields(model_cls): _MetaUtils._check_unexisting_names(model_cls, tracked_fields, 'CQRS_TRACKED_FIELDS') return - assert isinstance(tracked_fields, str) and tracked_fields == ALL_BASIC_FIELDS, \ - "Model {}: Invalid configuration for CQRS_TRACKED_FIELDS".format(model_cls.__name__) + e = "Model {0}: Invalid configuration for CQRS_TRACKED_FIELDS".format(model_cls.__name__) + assert isinstance(tracked_fields, str) and tracked_fields == ALL_BASIC_FIELDS, e @staticmethod def _check_correct_configuration(model_cls): @@ -62,10 +62,10 @@ def _check_correct_configuration(model_cls): :raises: AssertionError """ if model_cls.CQRS_FIELDS != ALL_BASIC_FIELDS: - assert model_cls.CQRS_SERIALIZER is None, \ - "Model {}: CQRS_FIELDS can't be set together with CQRS_SERIALIZER.".format( - model_cls.__name__, - ) + e = "Model {0}: CQRS_FIELDS can't be set together with CQRS_SERIALIZER.".format( + model_cls.__name__, + ) + assert model_cls.CQRS_SERIALIZER is None, e @staticmethod def _check_cqrs_fields(model_cls): @@ -118,8 +118,8 @@ def check_cqrs_id(model_cls): def _check_no_duplicate_names(model_cls, cqrs_field_names, cqrs_attr): model_name = model_cls.__name__ - assert len(set(cqrs_field_names)) == len(cqrs_field_names), \ - 'Duplicate names in {} field for model {}.'.format(cqrs_attr, model_name) + e = 'Duplicate names in {0} field for model {1}.'.format(cqrs_attr, model_name) + assert len(set(cqrs_field_names)) == len(cqrs_field_names), e @staticmethod def _check_unexisting_names(model_cls, cqrs_field_names, cqrs_attr): @@ -127,8 +127,8 @@ def _check_unexisting_names(model_cls, cqrs_field_names, cqrs_attr): model_name = model_cls.__name__ model_field_names = {f.name for f in opts.fields} - assert not set(cqrs_field_names) - model_field_names, \ - '{} field is not correctly set for model {}.'.format(cqrs_attr, model_name) + e = '{0} field is not correctly set for model {1}.'.format(cqrs_attr, model_name) + assert not set(cqrs_field_names) - model_field_names, e @staticmethod def _check_id_in_names(model_cls, cqrs_field_names, cqrs_attr): @@ -136,5 +136,5 @@ def _check_id_in_names(model_cls, cqrs_field_names, cqrs_attr): model_name = model_cls.__name__ pk_name = opts.pk.name - assert pk_name in cqrs_field_names, \ - 'PK is not in {} for model {}.'.format(cqrs_attr, model_name) + e = 'PK is not in {0} for model {1}.'.format(cqrs_attr, model_name) + assert pk_name in cqrs_field_names, e diff --git a/dj_cqrs/mixins.py b/dj_cqrs/mixins.py index 45fb934..ca616af 100644 --- a/dj_cqrs/mixins.py +++ b/dj_cqrs/mixins.py @@ -2,12 +2,6 @@ import logging -from django.conf import settings -from django.db import router, transaction -from django.db.models import DateField, DateTimeField, F, IntegerField, Manager, Model -from django.db.models.expressions import CombinedExpression -from django.utils.module_loading import import_string - from dj_cqrs.constants import ( ALL_BASIC_FIELDS, DEFAULT_CQRS_MAX_RETRIES, @@ -19,6 +13,12 @@ from dj_cqrs.metas import MasterMeta, ReplicaMeta from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update +from django.conf import settings +from django.db import router, transaction +from django.db.models import DateField, DateTimeField, F, IntegerField, Manager, Model +from django.db.models.expressions import CombinedExpression +from django.utils.module_loading import import_string + logger = logging.getLogger('django-cqrs') @@ -270,7 +270,7 @@ def _class_serialization(self, using, sync=False): instance = self.relate_cqrs_serialization(qs).first() if not instance: - raise RuntimeError("Couldn't serialize CQRS class ({}).".format(self.CQRS_ID)) + raise RuntimeError("Couldn't serialize CQRS class ({0}).".format(self.CQRS_ID)) data = self._cqrs_serializer_cls(instance).data data['cqrs_revision'] = instance.cqrs_revision @@ -312,7 +312,9 @@ def _cqrs_serializer_cls(self): self.__class__._cqrs_serializer_class = serializer return serializer except ImportError: - raise ImportError("Model {}: CQRS_SERIALIZER can't be imported.".format(self.__class__)) + raise ImportError( + "Model {0}: CQRS_SERIALIZER can't be imported.".format(self.__class__), + ) class MasterMixin(RawMasterMixin, metaclass=MasterMeta): diff --git a/dj_cqrs/registries.py b/dj_cqrs/registries.py index 9a8a60b..14e9aa0 100644 --- a/dj_cqrs/registries.py +++ b/dj_cqrs/registries.py @@ -1,4 +1,4 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import logging @@ -11,8 +11,10 @@ class RegistryMixin: @classmethod def register_model(cls, model_cls): """ Registration of CQRS model identifiers. """ - assert model_cls.CQRS_ID not in cls.models, "Two models can't have the same CQRS_ID: {}." \ - .format(model_cls.CQRS_ID) + + e = "Two models can't have the same CQRS_ID: {0}.".format(model_cls.CQRS_ID) + assert model_cls.CQRS_ID not in cls.models, e + cls.models[model_cls.CQRS_ID] = model_cls @classmethod @@ -29,7 +31,7 @@ def get_model_by_cqrs_id(cls, cqrs_id): if cqrs_id in cls.models: return cls.models[cqrs_id] - logger.error('No model with such CQRS_ID: {}.'.format(cqrs_id)) + logger.error('No model with such CQRS_ID: {0}.'.format(cqrs_id)) class MasterRegistry(RegistryMixin): @@ -41,6 +43,7 @@ class ReplicaRegistry(RegistryMixin): @classmethod def register_model(cls, model_cls): - assert getattr(settings, 'CQRS', {}).get('queue') is not None, \ - 'CQRS queue must be set for the service, that has replica models.' + e = 'CQRS queue must be set for the service, that has replica models.' + assert getattr(settings, 'CQRS', {}).get('queue') is not None, e + super(ReplicaRegistry, cls).register_model(model_cls) diff --git a/dj_cqrs/signals.py b/dj_cqrs/signals.py index e110c3f..c6acfba 100644 --- a/dj_cqrs/signals.py +++ b/dj_cqrs/signals.py @@ -1,13 +1,14 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. +from dj_cqrs.constants import SignalType +from dj_cqrs.controller import producer +from dj_cqrs.dataclasses import TransportPayload +from dj_cqrs.utils import get_expires_datetime + from django.db import models, transaction from django.dispatch import Signal from django.utils.timezone import now -from dj_cqrs.controller import producer -from dj_cqrs.constants import SignalType -from dj_cqrs.dataclasses import TransportPayload -from dj_cqrs.utils import get_expires_datetime post_bulk_create = Signal(providing_args=['instances', 'using']) """ diff --git a/dj_cqrs/tracker.py b/dj_cqrs/tracker.py index c19bc80..c90e3fd 100644 --- a/dj_cqrs/tracker.py +++ b/dj_cqrs/tracker.py @@ -1,10 +1,10 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +from dj_cqrs.constants import ALL_BASIC_FIELDS, FIELDS_TRACKER_FIELD_NAME from model_utils import FieldTracker from model_utils.tracker import FieldInstanceTracker -from dj_cqrs.constants import ALL_BASIC_FIELDS, FIELDS_TRACKER_FIELD_NAME - class _CQRSTrackerInstance(FieldInstanceTracker): @@ -45,7 +45,7 @@ def add_to_model(cls, model_cls): for field in opts.concrete_fields: if declared == ALL_BASIC_FIELDS or field.name in declared: fields_to_track.append( - field.attname if field.is_relation else field.name + field.attname if field.is_relation else field.name, ) tracker = cls(fields=fields_to_track) diff --git a/dj_cqrs/transport/__init__.py b/dj_cqrs/transport/__init__.py index d002b40..6dc7c27 100644 --- a/dj_cqrs/transport/__init__.py +++ b/dj_cqrs/transport/__init__.py @@ -1,12 +1,12 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -from django.conf import settings -from django.utils.module_loading import import_string - from dj_cqrs.transport.base import BaseTransport from dj_cqrs.transport.kombu import KombuTransport from dj_cqrs.transport.rabbit_mq import RabbitMQTransport +from django.conf import settings +from django.utils.module_loading import import_string + transport_cls_location = getattr(settings, 'CQRS', {}).get('transport') if not transport_cls_location: diff --git a/dj_cqrs/transport/kombu.py b/dj_cqrs/transport/kombu.py index 7006017..a228fe1 100644 --- a/dj_cqrs/transport/kombu.py +++ b/dj_cqrs/transport/kombu.py @@ -2,13 +2,6 @@ import logging -import ujson -from django.conf import settings -from kombu import Connection, Exchange, Producer, Queue -from kombu.exceptions import KombuError -from kombu.mixins import ConsumerMixin - - from dj_cqrs.constants import SignalType from dj_cqrs.controller import consumer from dj_cqrs.dataclasses import TransportPayload @@ -16,6 +9,15 @@ from dj_cqrs.transport import BaseTransport from dj_cqrs.transport.mixins import LoggingMixin +from django.conf import settings + +from kombu import Connection, Exchange, Producer, Queue +from kombu.exceptions import KombuError +from kombu.mixins import ConsumerMixin + +import ujson + + logger = logging.getLogger('django-cqrs') @@ -49,7 +51,7 @@ def _init_queues(self): sync_q = Queue( self.queue_name, exchange=self.exchange, - routing_key='cqrs.{}.{}'.format(self.queue_name, cqrs_id), + routing_key='cqrs.{0}.{1}'.format(self.queue_name, cqrs_id), ) sync_q.maybe_bind(channel) sync_q.declare() @@ -100,7 +102,7 @@ def produce(cls, payload): cls._produce_message(channel, exchange, payload) cls.log_produced(payload) except KombuError: - logger.error("CQRS couldn't be published: pk = {} ({}).".format( + logger.error("CQRS couldn't be published: pk = {0} ({1}).".format( payload.pk, payload.cqrs_id, )) finally: @@ -112,7 +114,7 @@ def _consume_message(cls, body, message): try: dct = ujson.loads(body) except ValueError: - logger.error("CQRS couldn't be parsed: {}.".format(body)) + logger.error("CQRS couldn't be parsed: {0}.".format(body)) message.reject() return @@ -164,7 +166,7 @@ def _get_produced_message_routing_key(payload): routing_key = payload.cqrs_id if payload.signal_type == SignalType.SYNC and payload.queue: - routing_key = 'cqrs.{}.{}'.format(payload.queue, routing_key) + routing_key = 'cqrs.{0}.{1}'.format(payload.queue, routing_key) return routing_key diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py index 980ffdf..cf4dc79 100644 --- a/dj_cqrs/transport/rabbit_mq.py +++ b/dj_cqrs/transport/rabbit_mq.py @@ -3,19 +3,11 @@ import logging import time from datetime import timedelta - from socket import gaierror from urllib.parse import unquote, urlparse -import ujson -from django.conf import settings -from django.utils import timezone -from pika import exceptions, BasicProperties, BlockingConnection, ConnectionParameters, credentials - from dj_cqrs.constants import ( - SignalType, - DEFAULT_DEAD_MESSAGE_TTL, - DEFAULT_DELAY_QUEUE_MAX_SIZE, + DEFAULT_DEAD_MESSAGE_TTL, DEFAULT_DELAY_QUEUE_MAX_SIZE, SignalType, ) from dj_cqrs.controller import consumer from dj_cqrs.dataclasses import TransportPayload @@ -24,6 +16,14 @@ from dj_cqrs.transport import BaseTransport from dj_cqrs.transport.mixins import LoggingMixin +from django.conf import settings +from django.utils import timezone + +from pika import BasicProperties, BlockingConnection, ConnectionParameters, credentials, exceptions + +import ujson + + logger = logging.getLogger('django-cqrs') @@ -55,7 +55,7 @@ def consume(cls): try: delay_queue = cls._get_delay_queue() connection, channel, consumer_generator = cls._get_consumer_rmq_objects( - *(common_rabbit_settings + consumer_rabbit_settings) + *(common_rabbit_settings + consumer_rabbit_settings), ) for method_frame, properties, body in consumer_generator: @@ -79,7 +79,7 @@ def produce(cls, payload): try: cls._produce(payload) except (exceptions.AMQPError, exceptions.ChannelError, exceptions.ReentrancyError): - logger.error("CQRS couldn't be published: pk = {} ({}). Reconnect...".format( + logger.error("CQRS couldn't be published: pk = {0} ({1}). Reconnect...".format( payload.pk, payload.cqrs_id, )) @@ -89,7 +89,7 @@ def produce(cls, payload): try: cls._produce(payload) except (exceptions.AMQPError, exceptions.ChannelError, exceptions.ReentrancyError): - logger.error("CQRS couldn't be published: pk = {} ({}).".format( + logger.error("CQRS couldn't be published: pk = {0} ({1}).".format( payload.pk, payload.cqrs_id, )) @@ -110,7 +110,7 @@ def _consume_message(cls, ch, method, properties, body, delay_queue): try: dct = ujson.loads(body) except ValueError: - logger.error("CQRS couldn't be parsed: {}.".format(body)) + logger.error("CQRS couldn't be parsed: {0}.".format(body)) ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) return @@ -150,7 +150,7 @@ def _fail_message(cls, channel, delivery_tag, payload, exception, delay_queue): cls.log_consumed_failed(payload) model_cls = ReplicaRegistry.get_model_by_cqrs_id(payload.cqrs_id) if model_cls is None: - logger.error("Model for cqrs_id {} is not found.".format(payload.cqrs_id)) + logger.error("Model for cqrs_id {0} is not found.".format(payload.cqrs_id)) cls._nack(channel, delivery_tag) return @@ -220,7 +220,7 @@ def _produce_message(cls, channel, exchange, payload, expiration=None): content_type='text/plain', delivery_mode=2, # make message persistent expiration=expiration, - ) + ), ) @classmethod @@ -228,13 +228,13 @@ def _get_produced_message_routing_key(cls, payload): routing_key = payload.cqrs_id if payload.signal_type == SignalType.SYNC and payload.queue: - routing_key = 'cqrs.{}.{}'.format(payload.queue, routing_key) + routing_key = 'cqrs.{0}.{1}'.format(payload.queue, routing_key) elif getattr(payload, 'is_dead_letter', False): dead_letter_queue_name = cls._get_consumer_settings()[-1] - routing_key = 'cqrs.{}.{}'.format(dead_letter_queue_name, routing_key) + routing_key = 'cqrs.{0}.{1}'.format(dead_letter_queue_name, routing_key) elif getattr(payload, 'is_requeue', False): queue = cls._get_consumer_settings()[0] - routing_key = 'cqrs.{}.{}'.format(queue, routing_key) + routing_key = 'cqrs.{0}.{1}'.format(queue, routing_key) return routing_key @@ -251,21 +251,21 @@ def _get_consumer_rmq_objects( channel.queue_declare(queue_name, durable=True, exclusive=False) channel.queue_declare(dead_letter_queue_name, durable=True, exclusive=False) - for cqrs_id, replica_model in ReplicaRegistry.models.items(): + for cqrs_id, _ in ReplicaRegistry.models.items(): channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=cqrs_id) # Every service must have specific SYNC or requeue routes channel.queue_bind( exchange=exchange, queue=queue_name, - routing_key='cqrs.{}.{}'.format(queue_name, cqrs_id), + routing_key='cqrs.{0}.{1}'.format(queue_name, cqrs_id), ) # Dead letter channel.queue_bind( exchange=exchange, queue=dead_letter_queue_name, - routing_key='cqrs.{}.{}'.format(dead_letter_queue_name, cqrs_id), + routing_key='cqrs.{0}.{1}'.format(dead_letter_queue_name, cqrs_id), ) delay_queue_check_timeout = 1 # seconds @@ -320,12 +320,11 @@ def _declare_exchange(channel, exchange): @staticmethod def _parse_url(url): scheme = urlparse(url).scheme + assert scheme == 'amqp', 'Scheme must be "amqp" for RabbitMQTransport.' + schemeless = url[len(scheme) + 3:] parts = urlparse('http://' + schemeless) - path = parts.path or '' - path = path[1:] if path and path[0] == '/' else path - assert scheme == 'amqp', \ - 'Scheme must be "amqp" for RabbitMQTransport.' + return ( unquote(parts.hostname or '') or ConnectionParameters.DEFAULT_HOST, parts.port or ConnectionParameters.DEFAULT_PORT, @@ -355,7 +354,7 @@ def _get_consumer_settings(): queue_name = settings.CQRS['queue'] replica_settings = settings.CQRS.get('replica', {}) - dead_letter_queue_name = 'dead_letter_{}'.format(queue_name) + dead_letter_queue_name = 'dead_letter_{0}'.format(queue_name) if 'dead_letter_queue' in replica_settings: dead_letter_queue_name = replica_settings['dead_letter_queue'] diff --git a/dj_cqrs/utils.py b/dj_cqrs/utils.py index 8b66c7d..7490709 100644 --- a/dj_cqrs/utils.py +++ b/dj_cqrs/utils.py @@ -3,10 +3,11 @@ import logging from datetime import timedelta +from dj_cqrs.constants import DEFAULT_CQRS_MESSAGE_TTL + from django.conf import settings from django.utils import timezone -from dj_cqrs.constants import DEFAULT_CQRS_MESSAGE_TTL logger = logging.getLogger('django-cqrs') diff --git a/docs/conf.py b/docs/conf.py index b608736..65a48ec 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -4,10 +4,10 @@ import sys from datetime import datetime -from setuptools_scm import get_version - import django +from setuptools_scm import get_version + sys.path.insert(0, os.path.abspath('..')) os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.dj.settings' @@ -17,7 +17,7 @@ # -- Project information ----------------------------------------------------- project = 'django-cqrs' -copyright = '{}, Ingram Micro Inc.'.format(datetime.now().year) +copyright = '{0}, Ingram Micro Inc.'.format(datetime.now().year) author = 'CloudBlue' # The full version, including alpha/beta/rc tags diff --git a/integration_tests/master_settings.py b/integration_tests/master_settings.py index be24489..5b97c2d 100644 --- a/integration_tests/master_settings.py +++ b/integration_tests/master_settings.py @@ -1,4 +1,4 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import os @@ -57,5 +57,5 @@ 'url': os.getenv('CQRS_BROKER_URL'), 'master': { 'CQRS_MESSAGE_TTL': 3600, - } + }, } diff --git a/integration_tests/replica_settings.py b/integration_tests/replica_settings.py index 3c18088..d6c3d78 100644 --- a/integration_tests/replica_settings.py +++ b/integration_tests/replica_settings.py @@ -42,7 +42,7 @@ 'NAME': os.getenv('POSTGRES_DB', 'replica'), 'USER': os.getenv('POSTGRES_USER', 'user'), 'PASSWORD': os.getenv('POSTGRES_PASSWORD', 'pswd'), - } + }, } LANGUAGE_CODE = 'en-us' @@ -66,5 +66,5 @@ 'delay_queue_max_size': 10, 'dead_letter_queue': 'dead_letter_replica', 'dead_message_ttl': 5, - } + }, } diff --git a/integration_tests/setup.cfg b/integration_tests/setup.cfg index ade9d2a..fa17cde 100644 --- a/integration_tests/setup.cfg +++ b/integration_tests/setup.cfg @@ -1,7 +1,11 @@ [aliases] test = pytest +[flake8] +max-line-length = 100 +show-source = True + [tool:pytest] django_find_project = false -addopts = -p no:cacheprovider --reuse-db --nomigrations +addopts = --reuse-db --nomigrations DJANGO_SETTINGS_MODULE = integration_tests.master_settings diff --git a/integration_tests/tests/conftest.py b/integration_tests/tests/conftest.py index 94df57a..a6c03fa 100644 --- a/integration_tests/tests/conftest.py +++ b/integration_tests/tests/conftest.py @@ -1,13 +1,15 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import psycopg2 -import pytest -from pika import BlockingConnection, URLParameters +from dj_cqrs.transport import current_transport +from dj_cqrs.transport.rabbit_mq import RabbitMQTransport from integration_tests.tests.utils import REPLICA_TABLES -from dj_cqrs.transport import current_transport -from dj_cqrs.transport.rabbit_mq import RabbitMQTransport +from pika import BlockingConnection, URLParameters + +import psycopg2 + +import pytest @pytest.fixture @@ -19,7 +21,7 @@ def replica_cursor(): cursor = connection.cursor() for table in REPLICA_TABLES: - cursor.execute('TRUNCATE TABLE {};'.format(table)) + cursor.execute('TRUNCATE TABLE {0};'.format(table)) yield cursor diff --git a/integration_tests/tests/test_asynchronous_consuming.py b/integration_tests/tests/test_asynchronous_consuming.py index c50b3f2..506046e 100644 --- a/integration_tests/tests/test_asynchronous_consuming.py +++ b/integration_tests/tests/test_asynchronous_consuming.py @@ -1,11 +1,15 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import pytest - from integration_tests.tests.utils import ( - REPLICA_BASIC_TABLE, REPLICA_EVENT_TABLE, - count_replica_rows, get_replica_all, transport_delay, + REPLICA_BASIC_TABLE, + REPLICA_EVENT_TABLE, + count_replica_rows, + get_replica_all, + transport_delay, ) + +import pytest + from tests.dj_master.models import BasicFieldsModel @@ -26,7 +30,7 @@ def test_both_consumers_consume(replica_cursor, clean_rabbit_transport_connectio assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 9 assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 9 - events_data = get_replica_all(replica_cursor, REPLICA_EVENT_TABLE, ('pid', ),) + events_data = get_replica_all(replica_cursor, REPLICA_EVENT_TABLE, ('pid',)) assert len({d[0] for d in events_data}) == 2 @@ -39,7 +43,7 @@ def test_de_duplication(replica_cursor, clean_rabbit_transport_connection): BasicFieldsModel.call_post_bulk_create([master_instance]) transport_delay(3) - replica_cursor.execute('TRUNCATE TABLE {};'.format(REPLICA_EVENT_TABLE)) + replica_cursor.execute('TRUNCATE TABLE {0};'.format(REPLICA_EVENT_TABLE)) BasicFieldsModel.call_post_bulk_create([master_instance for _ in range(10)]) transport_delay(3) diff --git a/integration_tests/tests/test_bulk_operations.py b/integration_tests/tests/test_bulk_operations.py index d47ac26..495d729 100644 --- a/integration_tests/tests/test_bulk_operations.py +++ b/integration_tests/tests/test_bulk_operations.py @@ -1,10 +1,11 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. - -import pytest +# Copyright © 2021 Ingram Micro Inc. All rights reserved. from integration_tests.tests.utils import ( REPLICA_BASIC_TABLE, count_replica_rows, get_replica_all, get_replica_first, transport_delay, ) + +import pytest + from tests.dj_master.models import BasicFieldsModel diff --git a/integration_tests/tests/test_dead_letter.py b/integration_tests/tests/test_dead_letter.py index 8336c85..792d16d 100644 --- a/integration_tests/tests/test_dead_letter.py +++ b/integration_tests/tests/test_dead_letter.py @@ -2,9 +2,10 @@ import json +from integration_tests.tests.utils import transport_delay + import pytest -from integration_tests.tests.utils import transport_delay from tests.dj_master.models import FailModel diff --git a/integration_tests/tests/test_single_basic_instance.py b/integration_tests/tests/test_single_basic_instance.py index 750eca7..95080d5 100644 --- a/integration_tests/tests/test_single_basic_instance.py +++ b/integration_tests/tests/test_single_basic_instance.py @@ -1,11 +1,14 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. -import pytest from django.utils.timezone import now from integration_tests.tests.utils import ( REPLICA_BASIC_TABLE, count_replica_rows, get_replica_first, transport_delay, ) + + +import pytest + from tests.dj_master.models import BasicFieldsModel diff --git a/integration_tests/tests/test_sync_to_a_certain_service.py b/integration_tests/tests/test_sync_to_a_certain_service.py index 9d1aa48..c46009c 100644 --- a/integration_tests/tests/test_sync_to_a_certain_service.py +++ b/integration_tests/tests/test_sync_to_a_certain_service.py @@ -1,10 +1,11 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. - -import pytest +# Copyright © 2021 Ingram Micro Inc. All rights reserved. from integration_tests.tests.utils import ( REPLICA_BASIC_TABLE, count_replica_rows, get_replica_first, transport_delay, ) + +import pytest + from tests.dj_master.models import BasicFieldsModel diff --git a/integration_tests/tests/utils.py b/integration_tests/tests/utils.py index f049aa1..462b08d 100644 --- a/integration_tests/tests/utils.py +++ b/integration_tests/tests/utils.py @@ -1,4 +1,4 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import time @@ -10,16 +10,16 @@ def count_replica_rows(cursor, table): - cursor.execute('SELECT COUNT(*) FROM {};'.format(table)) + cursor.execute('SELECT COUNT(*) FROM {0};'.format(table)) return cursor.fetchone()[0] def get_replica_all(cursor, table, columns=None, order_asc_by=None): select = ','.join(columns) if columns else '*' - sql = 'SELECT {} FROM {}'.format(select, table) + sql = 'SELECT {0} FROM {1}'.format(select, table) if order_asc_by: - sql = '{} ORDER BY {} ASC;'.format(sql, order_asc_by) + sql = '{0} ORDER BY {1} ASC;'.format(sql, order_asc_by) cursor.execute(sql) return cursor.fetchall() @@ -28,7 +28,7 @@ def get_replica_all(cursor, table, columns=None, order_asc_by=None): def get_replica_first(cursor, table, columns=None): select = ','.join(columns) if columns else '*' - cursor.execute('SELECT {} FROM {} LIMIT 1;'.format(select, table)) + cursor.execute('SELECT {0} FROM {1} LIMIT 1;'.format(select, table)) return cursor.fetchone() diff --git a/requirements/test.txt b/requirements/test.txt index bac2b34..5fb07ee 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -4,6 +4,15 @@ pytest pytest-cov pytest-django pytest-mock +pytest-deadfixtures +pytest-randomly djangorestframework django-mptt -pytest-random-order +flake8-bugbear +flake8-broken-line +flake8-commas +flake8-comprehensions +flake8-debugger +flake8-eradicate +flake8-import-order +flake8-string-format \ No newline at end of file diff --git a/setup.py b/setup.py index 389b0b5..a7e6b1d 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. from setuptools import find_packages, setup @@ -30,7 +30,10 @@ def read_file(name): keywords='django cqrs sql mixin amqp', classifiers=[ 'Development Status :: 5 - Production/Stable', - 'Framework :: Django :: 1.11', + 'Framework :: Django :: 2.2', + 'Framework :: Django :: 3.0', + 'Framework :: Django :: 3.1', + 'Framework :: Django :: 3.2', 'Intended Audience :: Developers', 'License :: OSI Approved :: Apache Software License', 'Operating System :: Unix', @@ -39,5 +42,5 @@ def read_file(name): 'Programming Language :: Python :: 3.8', 'Topic :: Communications', 'Topic :: Database', - ] + ], ) diff --git a/tests/dj/settings.py b/tests/dj/settings.py index bee75ca..b90a593 100644 --- a/tests/dj/settings.py +++ b/tests/dj/settings.py @@ -40,7 +40,7 @@ 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), - } + }, } LANGUAGE_CODE = 'en-us' diff --git a/tests/dj_master/models.py b/tests/dj_master/models.py index 1d11e4a..0060b01 100644 --- a/tests/dj_master/models.py +++ b/tests/dj_master/models.py @@ -1,13 +1,14 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. +from dj_cqrs.constants import ALL_BASIC_FIELDS +from dj_cqrs.metas import MasterMeta +from dj_cqrs.mixins import MasterMixin, RawMasterMixin + from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.db import models from mptt.models import MPTTModel, TreeForeignKey -from dj_cqrs.constants import ALL_BASIC_FIELDS -from dj_cqrs.metas import MasterMeta -from dj_cqrs.mixins import MasterMixin, RawMasterMixin class BasicFieldsModel(MasterMixin, models.Model): diff --git a/tests/dj_replica/models.py b/tests/dj_replica/models.py index fbeb99e..03a3160 100644 --- a/tests/dj_replica/models.py +++ b/tests/dj_replica/models.py @@ -1,9 +1,9 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -from django.db import models - from dj_cqrs.mixins import ReplicaMixin +from django.db import models + class BasicFieldsModelRef(ReplicaMixin, models.Model): CQRS_ID = 'basic' diff --git a/tests/test_commands/test_bulk_dump.py b/tests/test_commands/test_bulk_dump.py index 8e79e0f..87481f0 100644 --- a/tests/test_commands/test_bulk_dump.py +++ b/tests/test_commands/test_bulk_dump.py @@ -1,14 +1,16 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. -import ujson - -import pytest from django.core.management import CommandError, call_command from django.db import transaction -from tests.utils import db_error + +import pytest from tests.dj_master.models import Author, Publisher from tests.test_commands.utils import remove_file +from tests.utils import db_error + +import ujson + COMMAND_NAME = 'cqrs_bulk_dump' diff --git a/tests/test_commands/test_bulk_flow.py b/tests/test_commands/test_bulk_flow.py index 6796dff..b0c4768 100644 --- a/tests/test_commands/test_bulk_flow.py +++ b/tests/test_commands/test_bulk_flow.py @@ -1,9 +1,10 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. -import pytest from django.core.management import call_command from django.db import transaction +import pytest + from tests.dj_master import models as master_models from tests.dj_replica import models as replica_models from tests.test_commands.utils import remove_file diff --git a/tests/test_commands/test_bulk_load.py b/tests/test_commands/test_bulk_load.py index 7adf3cd..d4e0adc 100644 --- a/tests/test_commands/test_bulk_load.py +++ b/tests/test_commands/test_bulk_load.py @@ -1,11 +1,13 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + -import pytest from django.core.management import CommandError, call_command from django.utils.timezone import now -from tests.utils import db_error + +import pytest from tests.dj_replica.models import AuthorRef +from tests.utils import db_error COMMAND_NAME = 'cqrs_bulk_load' @@ -26,21 +28,21 @@ def test_no_file(): def test_empty_file(): with pytest.raises(CommandError) as e: - call_command(COMMAND_NAME, '-i={}empty_file.dump'.format(DUMPS_PATH)) + call_command(COMMAND_NAME, '-i={0}empty_file.dump'.format(DUMPS_PATH)) assert "empty_file.dump is empty!" in str(e) def test_no_cqrs_id(): with pytest.raises(CommandError) as e: - call_command(COMMAND_NAME, '-i={}bad_cqrs_id.dump'.format(DUMPS_PATH)) + call_command(COMMAND_NAME, '-i={0}bad_cqrs_id.dump'.format(DUMPS_PATH)) assert "Wrong CQRS ID: publisher!" in str(e) @pytest.mark.django_db def test_unparseable_line(capsys): - call_command(COMMAND_NAME, '-i={}unparseable.dump'.format(DUMPS_PATH)) + call_command(COMMAND_NAME, '-i={0}unparseable.dump'.format(DUMPS_PATH)) assert AuthorRef.objects.count() == 0 captured = capsys.readouterr() @@ -50,7 +52,7 @@ def test_unparseable_line(capsys): @pytest.mark.django_db def test_bad_master_data(capsys): - call_command(COMMAND_NAME, '-i={}bad_master_data.dump'.format(DUMPS_PATH)) + call_command(COMMAND_NAME, '-i={0}bad_master_data.dump'.format(DUMPS_PATH)) assert AuthorRef.objects.count() == 1 captured = capsys.readouterr() @@ -62,7 +64,7 @@ def test_bad_master_data(capsys): def test_no_rows(capsys): AuthorRef.objects.create(id=1, name='1', cqrs_revision=0, cqrs_updated=now()) - call_command(COMMAND_NAME, '--input={}no_rows.dump'.format(DUMPS_PATH)) + call_command(COMMAND_NAME, '--input={0}no_rows.dump'.format(DUMPS_PATH)) assert AuthorRef.objects.count() == 1 captured = capsys.readouterr() @@ -73,7 +75,7 @@ def test_no_rows(capsys): def test_loaded_correctly(capsys): AuthorRef.objects.create(id=1, name='1', cqrs_revision=0, cqrs_updated=now()) - call_command(COMMAND_NAME, '--input={}author.dump'.format(DUMPS_PATH)) + call_command(COMMAND_NAME, '--input={0}author.dump'.format(DUMPS_PATH)) assert AuthorRef.objects.count() == 2 captured = capsys.readouterr() @@ -84,7 +86,7 @@ def test_loaded_correctly(capsys): def test_delete_before_upload_ok(capsys): AuthorRef.objects.create(id=1, name='1', cqrs_revision=0, cqrs_updated=now()) - call_command(COMMAND_NAME, '--input={}no_rows.dump'.format(DUMPS_PATH), '--clear=true') + call_command(COMMAND_NAME, '--input={0}no_rows.dump'.format(DUMPS_PATH), '--clear=true') assert AuthorRef.objects.count() == 0 captured = capsys.readouterr() @@ -92,10 +94,10 @@ def test_delete_before_upload_ok(capsys): @pytest.mark.django_db -def test_delete_operation_fails(mocker, ): +def test_delete_operation_fails(mocker): mocker.patch('django.db.models.manager.BaseManager.all', side_effect=db_error) with pytest.raises(CommandError) as e: - call_command(COMMAND_NAME, '--input={}no_rows.dump'.format(DUMPS_PATH), '--clear=true') + call_command(COMMAND_NAME, '--input={0}no_rows.dump'.format(DUMPS_PATH), '--clear=true') assert "Delete operation fails!" in str(e) @@ -103,7 +105,7 @@ def test_delete_operation_fails(mocker, ): @pytest.mark.django_db def test_unexpected_error(mocker, capsys): mocker.patch('tests.dj_replica.models.AuthorRef.cqrs_save', side_effect=db_error) - call_command(COMMAND_NAME, '--input={}author.dump'.format(DUMPS_PATH)) + call_command(COMMAND_NAME, '--input={0}author.dump'.format(DUMPS_PATH)) captured = capsys.readouterr() assert 'Unexpected error: line 2!' in captured.err @@ -115,7 +117,7 @@ def test_unexpected_error(mocker, capsys): def test_loaded_correctly_batch(capsys): AuthorRef.objects.create(id=1, name='1', cqrs_revision=0, cqrs_updated=now()) - call_command(COMMAND_NAME, '--input={}author.dump'.format(DUMPS_PATH), '--batch=1') + call_command(COMMAND_NAME, '--input={0}author.dump'.format(DUMPS_PATH), '--batch=1') assert AuthorRef.objects.count() == 2 captured = capsys.readouterr() diff --git a/tests/test_commands/test_dead_letters.py b/tests/test_commands/test_dead_letters.py index 519f326..381b7bc 100644 --- a/tests/test_commands/test_dead_letters.py +++ b/tests/test_commands/test_dead_letters.py @@ -1,15 +1,17 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import ujson from datetime import datetime -import pytest -from django.utils import timezone - from dj_cqrs.constants import SignalType -from django.core.management import call_command, CommandError from dj_cqrs.management.commands.cqrs_dead_letters import Command, RabbitMQTransport +from django.core.management import CommandError, call_command +from django.utils import timezone + +import pytest + +import ujson + COMMAND_NAME = 'cqrs_dead_letters' @@ -19,12 +21,12 @@ def test_dump(capsys, mocker): mocker.patch.object( RabbitMQTransport, '_get_consumer_settings', - return_value=('queue', 'dead_letters_queue') + return_value=('queue', 'dead_letters_queue'), ) mocker.patch.object( RabbitMQTransport, '_get_common_settings', - return_value=('host', 'port', mocker.MagicMock(), 'exchange') + return_value=('host', 'port', mocker.MagicMock(), 'exchange'), ) queue = mocker.MagicMock() @@ -37,7 +39,7 @@ def test_dump(capsys, mocker): mocker.patch.object( RabbitMQTransport, '_create_connection', - return_value=(mocker.MagicMock(), channel) + return_value=(mocker.MagicMock(), channel), ) mocker.patch.object(RabbitMQTransport, '_nack') @@ -52,7 +54,7 @@ def test_handle_retry(settings, capsys, mocker): mocker.patch.object( RabbitMQTransport, '_get_producer_rmq_objects', - return_value=(None, produce_channel) + return_value=(None, produce_channel), ) channel = mocker.MagicMock() diff --git a/tests/test_commands/test_deleted_diff_master.py b/tests/test_commands/test_deleted_diff_master.py index e59eb85..3aa40cd 100644 --- a/tests/test_commands/test_deleted_diff_master.py +++ b/tests/test_commands/test_deleted_diff_master.py @@ -3,10 +3,11 @@ import sys from io import StringIO -import pytest from django.core.management import CommandError, call_command from django.utils.timezone import now +import pytest + from tests.dj_master.models import Author from tests.dj_replica.models import AuthorRef @@ -48,7 +49,7 @@ def test_first_row(capsys, mocker): captured = replica_master_pipe(capsys, mocker) first_row = captured.out.split('\n')[0] - assert '{},'.format(Author.CQRS_ID) in first_row + assert '{0},'.format(Author.CQRS_ID) in first_row @pytest.mark.django_db diff --git a/tests/test_commands/test_deleted_diff_replica.py b/tests/test_commands/test_deleted_diff_replica.py index 792f08e..7e722c6 100644 --- a/tests/test_commands/test_deleted_diff_replica.py +++ b/tests/test_commands/test_deleted_diff_replica.py @@ -1,13 +1,14 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import ujson - -import pytest from django.core.management import CommandError, call_command from django.utils.timezone import now +import pytest + from tests.dj_replica.models import AuthorRef +import ujson + COMMAND_NAME = 'cqrs_deleted_diff_replica' @@ -31,7 +32,7 @@ def test_first_row(capsys): call_command(COMMAND_NAME, '--cqrs-id=author') captured = capsys.readouterr() - assert '{},'.format(AuthorRef.CQRS_ID) in captured.out + assert '{0},'.format(AuthorRef.CQRS_ID) in captured.out @pytest.mark.django_db diff --git a/tests/test_commands/test_deleted_sync_replica.py b/tests/test_commands/test_deleted_sync_replica.py index e07c0e6..da9025a 100644 --- a/tests/test_commands/test_deleted_sync_replica.py +++ b/tests/test_commands/test_deleted_sync_replica.py @@ -3,10 +3,11 @@ import sys from io import StringIO -import pytest from django.core.management import CommandError, call_command from django.utils.timezone import now +import pytest + from tests.dj_master.models import Author from tests.dj_replica.models import AuthorRef diff --git a/tests/test_commands/test_diff_master.py b/tests/test_commands/test_diff_master.py index 1728dfd..67ed399 100644 --- a/tests/test_commands/test_diff_master.py +++ b/tests/test_commands/test_diff_master.py @@ -1,12 +1,13 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. -import ujson +from django.core.management import CommandError, call_command import pytest -from django.core.management import CommandError, call_command from tests.dj_master.models import Author +import ujson + COMMAND_NAME = 'cqrs_diff_master' @@ -30,7 +31,7 @@ def test_first_row(capsys): call_command(COMMAND_NAME, '--cqrs-id=author') captured = capsys.readouterr() - assert '{},'.format(Author.CQRS_ID) in captured.out + assert '{0},'.format(Author.CQRS_ID) in captured.out @pytest.mark.django_db diff --git a/tests/test_commands/test_diff_replica.py b/tests/test_commands/test_diff_replica.py index 219300b..aa94350 100644 --- a/tests/test_commands/test_diff_replica.py +++ b/tests/test_commands/test_diff_replica.py @@ -1,13 +1,14 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import sys from io import StringIO -import pytest from django.conf import settings from django.core.management import CommandError, call_command from django.utils.timezone import now +import pytest + from tests.dj_master.models import Author from tests.dj_replica.models import AuthorRef @@ -49,7 +50,7 @@ def test_first_row(capsys, mocker): captured = master_replica_pipe(capsys, mocker) first_row = captured.out.split('\n')[0] - assert '{},'.format(Author.CQRS_ID) in first_row + assert '{0},'.format(Author.CQRS_ID) in first_row assert settings.CQRS['queue'] in first_row diff --git a/tests/test_commands/test_diff_sync.py b/tests/test_commands/test_diff_sync.py index a939093..63beb24 100644 --- a/tests/test_commands/test_diff_sync.py +++ b/tests/test_commands/test_diff_sync.py @@ -1,14 +1,16 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. import sys from io import StringIO -import pytest +from dj_cqrs.constants import NO_QUEUE +from dj_cqrs.management.commands import cqrs_sync + from django.core.management import CommandError, call_command from django.utils.timezone import now -from dj_cqrs.constants import NO_QUEUE -from dj_cqrs.management.commands import cqrs_sync +import pytest + from tests.dj_master.models import Author from tests.dj_replica.models import AuthorRef @@ -92,7 +94,7 @@ def test_sync_batch(mocker, capsys): @pytest.mark.django_db def test_sync_no_queue(mocker): sync_mock = mocker.patch.object(cqrs_sync.Command, 'handle') - mocker.patch.object(sys, 'stdin', StringIO('author,dt,{}\n[1]\n'.format(NO_QUEUE))) + mocker.patch.object(sys, 'stdin', StringIO('author,dt,{0}\n[1]\n'.format(NO_QUEUE))) call_command(COMMAND_NAME) sync_mock.assert_called_once_with(**{'cqrs_id': 'author', 'filter': '{"id__in": [1]}'}) diff --git a/tests/test_commands/test_sync.py b/tests/test_commands/test_sync.py index accf346..72e98e9 100644 --- a/tests/test_commands/test_sync.py +++ b/tests/test_commands/test_sync.py @@ -1,11 +1,14 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +from dj_cqrs.constants import SignalType -import pytest from django.core.management import CommandError, call_command -from tests.utils import db_error -from dj_cqrs.constants import SignalType +import pytest + from tests.dj_master.models import Author +from tests.utils import db_error + COMMAND_NAME = 'cqrs_sync' @@ -109,7 +112,7 @@ def test_error(capsys, mocker): Author.objects.create(id=2, name='2') mocker.patch('tests.dj_master.models.Author.cqrs_sync', side_effect=db_error) - call_command(COMMAND_NAME, '--cqrs-id=author', '-f={}') + call_command(COMMAND_NAME, '--cqrs-id=author', '-f={}') # noqa: P103 captured = capsys.readouterr() assert 'Sync record failed for pk=2' in captured.out @@ -120,7 +123,7 @@ def test_error(capsys, mocker): @pytest.mark.django_db def test_progress(capsys): Author.objects.create(id=2, name='2') - call_command(COMMAND_NAME, '--cqrs-id=author', '--progress', '-f={}', '--batch=2') + call_command(COMMAND_NAME, '--cqrs-id=author', '--progress', '-f={}', '--batch=2') # noqa: P103 captured = capsys.readouterr() assert 'Processing 1 records with batch size 2' in captured.out diff --git a/tests/test_controller.py b/tests/test_controller.py index b6b8eef..8eb5392 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -1,12 +1,12 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import pytest - from dj_cqrs.constants import SignalType from dj_cqrs.controller.consumer import consume, route_signal_to_replica_model from dj_cqrs.controller.producer import produce from dj_cqrs.dataclasses import TransportPayload +import pytest + def test_producer(mocker): transport_mock = mocker.patch('tests.dj.transport.TransportStub.produce') diff --git a/tests/test_correlation.py b/tests/test_correlation.py index 2f8d844..0e5a16b 100644 --- a/tests/test_correlation.py +++ b/tests/test_correlation.py @@ -2,10 +2,10 @@ from importlib import import_module, reload -import pytest - from dj_cqrs.correlation import get_correlation_id +import pytest + def test_default_correlation(): assert get_correlation_id(None, None, None, None) is None diff --git a/tests/test_delay.py b/tests/test_delay.py index fcce3c0..081cd6c 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -1,12 +1,12 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta, timezone from queue import Full -import pytest - from dj_cqrs.delay import DelayMessage, DelayQueue +import pytest + def test_delay_message(mocker): fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc) diff --git a/tests/test_flow.py b/tests/test_flow.py index 20bf5b9..133ca6f 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -1,8 +1,9 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. -import pytest from django.db import transaction +import pytest + from tests.dj_master import models as master_models from tests.dj_replica import models as replica_models @@ -176,5 +177,5 @@ def test_sync_downgrade(mocker, caplog): assert replica_author.cqrs_revision == 0 assert replica_author.name == 'other' - assert 'CQRS revision downgrade on sync: pk = 1, cqrs_revision = new 0 / existing 1 (author).' \ - in caplog.text + e = 'CQRS revision downgrade on sync: pk = 1, cqrs_revision = new 0 / existing 1 (author).' + assert e in caplog.text diff --git a/tests/test_master/test_mixin.py b/tests/test_master/test_mixin.py index a916d44..35489cf 100644 --- a/tests/test_master/test_mixin.py +++ b/tests/test_master/test_mixin.py @@ -1,16 +1,17 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import pytest from time import sleep from uuid import uuid4 +from dj_cqrs.constants import FIELDS_TRACKER_FIELD_NAME, SignalType +from dj_cqrs.metas import MasterMeta + from django.contrib.contenttypes.models import ContentType from django.db import transaction -from django.db.models import CharField, IntegerField, F +from django.db.models import CharField, F, IntegerField from django.utils.timezone import now -from dj_cqrs.constants import SignalType, FIELDS_TRACKER_FIELD_NAME -from dj_cqrs.metas import MasterMeta +import pytest from tests.dj_master import models from tests.dj_master.serializers import AuthorSerializer @@ -475,7 +476,7 @@ def test_update_from_related_table(mocker): 'name': 'author', 'publisher': { 'id': 1, - 'name': 'new' + 'name': 'new', }, 'books': [], 'cqrs_revision': 1, diff --git a/tests/test_master/test_signals.py b/tests/test_master/test_signals.py index b1edcca..6091ce5 100644 --- a/tests/test_master/test_signals.py +++ b/tests/test_master/test_signals.py @@ -2,11 +2,13 @@ from datetime import datetime, timezone -import pytest +from dj_cqrs.constants import SignalType +from dj_cqrs.signals import post_bulk_create, post_update + from django.db.models.signals import post_delete, post_save -from dj_cqrs.signals import post_bulk_create, post_update -from dj_cqrs.constants import SignalType +import pytest + from tests.dj_master import models from tests.utils import assert_publisher_once_called_with_args diff --git a/tests/test_metas.py b/tests/test_metas.py index 81ddb59..a307433 100644 --- a/tests/test_metas.py +++ b/tests/test_metas.py @@ -1,9 +1,9 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. - -import pytest +# Copyright © 2021 Ingram Micro Inc. All rights reserved. from dj_cqrs.metas import _MetaUtils +import pytest + @pytest.mark.django_db def test_no_cqrs_id(): diff --git a/tests/test_registries.py b/tests/test_registries.py index 79558cb..ba2f513 100644 --- a/tests/test_registries.py +++ b/tests/test_registries.py @@ -1,8 +1,9 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +from dj_cqrs.registries import MasterRegistry, ReplicaRegistry import pytest -from dj_cqrs.registries import MasterRegistry, ReplicaRegistry from tests.dj_master import models as master_models from tests.dj_replica import models as replica_models diff --git a/tests/test_replica/test_factory.py b/tests/test_replica/test_factory.py index 00789b0..ea528f4 100644 --- a/tests/test_replica/test_factory.py +++ b/tests/test_replica/test_factory.py @@ -1,11 +1,11 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. - -import pytest +# Copyright © 2021 Ingram Micro Inc. All rights reserved. from dj_cqrs.constants import SignalType from dj_cqrs.controller.consumer import route_signal_to_replica_model from dj_cqrs.mixins import ReplicaMixin +import pytest + def test_bad_model(caplog): route_signal_to_replica_model(SignalType.SAVE, 'invalid', {}) diff --git a/tests/test_replica/test_mixin.py b/tests/test_replica/test_mixin.py index 88638cb..90c79ee 100644 --- a/tests/test_replica/test_mixin.py +++ b/tests/test_replica/test_mixin.py @@ -1,11 +1,12 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -import pytest +from dj_cqrs.metas import ReplicaMeta from django.db.models import CharField, IntegerField, QuerySet from django.utils.timezone import now -from dj_cqrs.metas import ReplicaMeta +import pytest + from tests.dj_replica import models from tests.utils import db_error @@ -364,8 +365,9 @@ def test_wrong_update_order(caplog): assert earlier_instance.cqrs_revision == 2 assert earlier_instance.char_field == 'new_text_2' assert later_instance - assert 'Wrong CQRS sync order: pk = 1, cqrs_revision = new 1 / existing 2 (basic).' in \ - caplog.text + + e = 'Wrong CQRS sync order: pk = 1, cqrs_revision = new 1 / existing 2 (basic).' + assert e in caplog.text @pytest.mark.django_db(transaction=True) diff --git a/tests/test_transport/test_base.py b/tests/test_transport/test_base.py index 56b04c0..dee84e0 100644 --- a/tests/test_transport/test_base.py +++ b/tests/test_transport/test_base.py @@ -1,11 +1,11 @@ -# Copyright © 2020 Ingram Micro Inc. All rights reserved. +# Copyright © 2021 Ingram Micro Inc. All rights reserved. from importlib import import_module, reload -import pytest - from dj_cqrs.transport.base import BaseTransport +import pytest + def test_no_transport_setting(settings): settings.CQRS = {} diff --git a/tests/test_transport/test_kombu.py b/tests/test_transport/test_kombu.py index 7dbc91e..9b35852 100644 --- a/tests/test_transport/test_kombu.py +++ b/tests/test_transport/test_kombu.py @@ -1,16 +1,18 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. import logging -import ujson from importlib import import_module, reload -import pytest -from kombu.exceptions import KombuError - from dj_cqrs.constants import SignalType from dj_cqrs.dataclasses import TransportPayload from dj_cqrs.registries import ReplicaRegistry -from dj_cqrs.transport.kombu import _KombuConsumer, KombuTransport +from dj_cqrs.transport.kombu import KombuTransport, _KombuConsumer + +from kombu.exceptions import KombuError + +import pytest + +import ujson class PublicKombuTransport(KombuTransport): @@ -139,17 +141,16 @@ def test_produce_message_ok(mocker): prepare_message_args = channel.prepare_message.call_args[0] basic_publish_kwargs = channel.basic_publish.call_args[1] - assert ujson.loads(prepare_message_args[0]) == \ - { - 'signal_type': SignalType.SAVE, - 'cqrs_id': 'cqrs_id', - 'instance_data': {}, - 'instance_pk': 'id', - 'previous_data': {'e': 'f'}, - 'correlation_id': None, - 'expires': None, - 'retries': 0, - } + assert ujson.loads(prepare_message_args[0]) == { + 'signal_type': SignalType.SAVE, + 'cqrs_id': 'cqrs_id', + 'instance_data': {}, + 'instance_pk': 'id', + 'previous_data': {'e': 'f'}, + 'correlation_id': None, + 'expires': None, + 'retries': 0, + } assert prepare_message_args[2] == 'text/plain' assert prepare_message_args[5]['delivery_mode'] == 2 @@ -170,17 +171,16 @@ def test_produce_sync_message_no_queue(mocker): prepare_message_args = channel.prepare_message.call_args[0] basic_publish_kwargs = channel.basic_publish.call_args[1] - assert ujson.loads(prepare_message_args[0]) == \ - { - 'signal_type': SignalType.SYNC, - 'cqrs_id': 'cqrs_id', - 'instance_data': {}, - 'instance_pk': None, - 'previous_data': None, - 'correlation_id': None, - 'expires': None, - 'retries': 0, - } + assert ujson.loads(prepare_message_args[0]) == { + 'signal_type': SignalType.SYNC, + 'cqrs_id': 'cqrs_id', + 'instance_data': {}, + 'instance_pk': None, + 'previous_data': None, + 'correlation_id': None, + 'expires': None, + 'retries': 0, + } assert basic_publish_kwargs['routing_key'] == 'cqrs_id' @@ -194,17 +194,16 @@ def test_produce_sync_message_queue(mocker): prepare_message_args = channel.prepare_message.call_args[0] basic_publish_kwargs = channel.basic_publish.call_args[1] - assert ujson.loads(prepare_message_args[0]) == \ - { - 'signal_type': SignalType.SYNC, - 'cqrs_id': 'cqrs_id', - 'instance_data': {}, - 'instance_pk': 'id', - 'previous_data': None, - 'correlation_id': None, - 'expires': None, - 'retries': 0, - } + assert ujson.loads(prepare_message_args[0]) == { + 'signal_type': SignalType.SYNC, + 'cqrs_id': 'cqrs_id', + 'instance_data': {}, + 'instance_pk': 'id', + 'previous_data': None, + 'correlation_id': None, + 'expires': None, + 'retries': 0, + } assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id' diff --git a/tests/test_transport/test_rabbit_mq.py b/tests/test_transport/test_rabbit_mq.py index 7e187ee..1d16e6f 100644 --- a/tests/test_transport/test_rabbit_mq.py +++ b/tests/test_transport/test_rabbit_mq.py @@ -1,21 +1,24 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. import logging -from datetime import datetime, timezone, timedelta - -import ujson +from datetime import datetime, timedelta, timezone from importlib import import_module, reload -import pytest -from django.db import DatabaseError -from pika.exceptions import AMQPError - -from dj_cqrs.delay import DelayQueue, DelayMessage from dj_cqrs.constants import SignalType from dj_cqrs.dataclasses import TransportPayload +from dj_cqrs.delay import DelayMessage, DelayQueue from dj_cqrs.transport.rabbit_mq import RabbitMQTransport + +from django.db import DatabaseError + +from pika.exceptions import AMQPError + +import pytest + from tests.utils import db_error +import ujson + class PublicRabbitMQTransport(RabbitMQTransport): @classmethod @@ -79,7 +82,7 @@ def test_non_default_settings(settings, caplog): def test_default_url_settings(settings): settings.CQRS = { 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport', - 'url': 'amqp://localhost' + 'url': 'amqp://localhost', } s = PublicRabbitMQTransport.get_common_settings() assert s[0] == 'localhost' @@ -104,7 +107,7 @@ def test_non_default_url_settings(settings): def test_invalid_url_settings(settings): settings.CQRS = { 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport', - 'url': 'rabbit://localhost' + 'url': 'rabbit://localhost', } with pytest.raises(AssertionError) as ei: PublicRabbitMQTransport.get_common_settings() @@ -207,17 +210,16 @@ def test_produce_message_ok(mocker): assert channel.basic_publish.call_count == 1 basic_publish_kwargs = channel.basic_publish.call_args[1] - assert ujson.loads(basic_publish_kwargs['body']) == \ - { - 'signal_type': SignalType.SAVE, - 'cqrs_id': 'cqrs_id', - 'instance_data': {}, - 'instance_pk': 'id', - 'previous_data': {'e': 'f'}, - 'correlation_id': None, - 'expires': expected_expires, - 'retries': 2, - } + assert ujson.loads(basic_publish_kwargs['body']) == { + 'signal_type': SignalType.SAVE, + 'cqrs_id': 'cqrs_id', + 'instance_data': {}, + 'instance_pk': 'id', + 'previous_data': {'e': 'f'}, + 'correlation_id': None, + 'expires': expected_expires, + 'retries': 2, + } assert basic_publish_kwargs['exchange'] == 'exchange' assert basic_publish_kwargs['mandatory'] assert basic_publish_kwargs['routing_key'] == 'cqrs_id' @@ -232,17 +234,16 @@ def test_produce_sync_message_no_queue(mocker): PublicRabbitMQTransport.produce_message(channel, 'exchange', payload) basic_publish_kwargs = channel.basic_publish.call_args[1] - assert ujson.loads(basic_publish_kwargs['body']) == \ - { - 'signal_type': SignalType.SYNC, - 'cqrs_id': 'cqrs_id', - 'instance_data': {}, - 'instance_pk': None, - 'previous_data': None, - 'correlation_id': None, - 'expires': None, - 'retries': 0, - } + assert ujson.loads(basic_publish_kwargs['body']) == { + 'signal_type': SignalType.SYNC, + 'cqrs_id': 'cqrs_id', + 'instance_data': {}, + 'instance_pk': None, + 'previous_data': None, + 'correlation_id': None, + 'expires': None, + 'retries': 0, + } assert basic_publish_kwargs['routing_key'] == 'cqrs_id' @@ -253,17 +254,16 @@ def test_produce_sync_message_queue(mocker): PublicRabbitMQTransport.produce_message(channel, 'exchange', payload) basic_publish_kwargs = channel.basic_publish.call_args[1] - assert ujson.loads(basic_publish_kwargs['body']) == \ - { - 'signal_type': SignalType.SYNC, - 'cqrs_id': 'cqrs_id', - 'instance_data': {}, - 'instance_pk': 'id', - 'previous_data': None, - 'correlation_id': None, - 'expires': None, - 'retries': 0, - } + assert ujson.loads(basic_publish_kwargs['body']) == { + 'signal_type': SignalType.SYNC, + 'cqrs_id': 'cqrs_id', + 'instance_data': {}, + 'instance_pk': 'id', + 'previous_data': None, + 'correlation_id': None, + 'expires': None, + 'retries': 0, + } assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id' @@ -481,7 +481,7 @@ def test_process_delay_messages(mocker, caplog): payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1) delay_queue = DelayQueue() delay_queue.put( - DelayMessage(delivery_tag=1, payload=payload, eta=datetime.now(tz=timezone.utc)) + DelayMessage(delivery_tag=1, payload=payload, eta=datetime.now(tz=timezone.utc)), ) PublicRabbitMQTransport.process_delay_messages(channel, delay_queue) diff --git a/tests/test_utils.py b/tests/test_utils.py index f2bb563..c6c1c34 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,11 +1,11 @@ # Copyright © 2021 Ingram Micro Inc. All rights reserved. -from datetime import timezone, datetime, timedelta - -import pytest +from datetime import datetime, timedelta, timezone from dj_cqrs.utils import get_expires_datetime +import pytest + def test_get_expires_datetime(mocker, settings): settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 3600 diff --git a/tox.ini b/tox.ini index 3cdea8e..5a75e4f 100644 --- a/tox.ini +++ b/tox.ini @@ -1,4 +1,9 @@ +[flake8] +exclude = .idea,.git,venv*/,.eggs/,*.egg-info +max-line-length = 100 +show-source = True + [pytest] DJANGO_SETTINGS_MODULE = tests.dj.settings -addopts = --random-order --cov-report=xml --cov=./dj_cqrs +addopts = --cov-report=xml --cov=./dj_cqrs testpaths = tests