Skip to content

Commit

Permalink
Merge pull request #38 from cloudblue/refactoring/LITE-18083
Browse files Browse the repository at this point in the history
LITE-18083 General Connect Flake8/pytest rules are applied
  • Loading branch information
maxipavlovic authored May 20, 2021
2 parents 57a96c0 + d9ccc9a commit e73f923
Show file tree
Hide file tree
Showing 66 changed files with 484 additions and 401 deletions.
7 changes: 4 additions & 3 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import logging
from contextlib import ExitStack

from django.db import close_old_connections, transaction

from dj_cqrs.constants import SignalType
from dj_cqrs.registries import ReplicaRegistry

from django.db import close_old_connections, transaction


logger = logging.getLogger('django-cqrs')


Expand All @@ -32,7 +33,7 @@ def route_signal_to_replica_model(signal_type, cqrs_id, instance_data, previous_
:param dict instance_data: Master model data.
"""
if signal_type not in (SignalType.DELETE, SignalType.SAVE, SignalType.SYNC):
logger.error('Bad signal type "{}" for CQRS_ID "{}".'.format(signal_type, cqrs_id))
logger.error('Bad signal type "{0}" for CQRS_ID "{1}".'.format(signal_type, cqrs_id))
return

model_cls = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
Expand Down
3 changes: 2 additions & 1 deletion dj_cqrs/dataclasses.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

from dateutil.parser import parse as dateutil_parse
from django.utils import timezone

from dj_cqrs.correlation import get_correlation_id
from dj_cqrs.utils import get_expires_datetime

from django.utils import timezone


class TransportPayload:
"""Transport message payload.
Expand Down
2 changes: 1 addition & 1 deletion dj_cqrs/delay.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

from queue import PriorityQueue, Full
from queue import Full, PriorityQueue

from django.utils import timezone

Expand Down
24 changes: 13 additions & 11 deletions dj_cqrs/management/commands/cqrs_bulk_dump.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

import datetime
import os
import sys
import time

import ujson
from django.core.management.base import BaseCommand, CommandError

from dj_cqrs.management.commands.utils import batch_qs
from dj_cqrs.registries import MasterRegistry

from django.core.management.base import BaseCommand, CommandError

import ujson


class Command(BaseCommand):
help = 'Bulk dump of a CQRS model from master service.'
Expand Down Expand Up @@ -59,7 +60,7 @@ def handle(self, *args, **options):

if progress:
print(
'Processing {} records with batch size {}'.format(db_count, batch_size),
'Processing {0} records with batch size {1}'.format(db_count, batch_size),
file=sys.stderr,
)
for qs in batch_qs(
Expand All @@ -76,20 +77,21 @@ def handle(self, *args, **options):
)
success_counter += 1
except Exception as e:
print('\nDump record failed for pk={}: {}: {}'.format(
print('\nDump record failed for pk={0}: {1}: {2}'.format(
instance.pk, type(e).__name__, str(e),
), file=sys.stderr)
if progress:
rate = (counter - cs) / (time.time() - ts)
percent = 100 * counter / db_count
eta = datetime.timedelta(seconds=int((db_count - counter) / rate))
sys.stderr.write(
'\r{} of {} processed - {}% with rate {:.1f} rps, to go {} ...{:20}'.format(
'\r{0} of {1} processed - {2}% with '
'rate {3:.1f} rps, to go {4} ...{5:20}'.format(
counter, db_count, int(percent), rate, str(eta), ' ',
))
sys.stderr.flush()

print('Done!\n{} instance(s) saved.\n{} instance(s) processed.'.format(
print('Done!\n{0} instance(s) saved.\n{1} instance(s) processed.'.format(
success_counter, counter,
), file=sys.stderr)

Expand All @@ -99,18 +101,18 @@ def _get_model(options):
model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)

if not model:
raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id))
raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))

return model

@staticmethod
def _get_output_filename(options):
f_name = options['output']
if f_name is None:
f_name = '{}.dump'.format(options['cqrs_id'])
f_name = '{0}.dump'.format(options['cqrs_id'])

if f_name != '-' and os.path.exists(f_name) and not (options['force']):
raise CommandError('File {} exists!'.format(f_name))
raise CommandError('File {0} exists!'.format(f_name))

return f_name

Expand Down
27 changes: 14 additions & 13 deletions dj_cqrs/management/commands/cqrs_bulk_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import os
import sys

import ujson
from dj_cqrs.registries import ReplicaRegistry

from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, DatabaseError
from django.db import DatabaseError, transaction

from dj_cqrs.registries import ReplicaRegistry
import ujson


class Command(BaseCommand):
Expand Down Expand Up @@ -38,7 +39,7 @@ def handle(self, *args, **options):

f_name = options['input']
if f_name != '-' and not os.path.exists(f_name):
raise CommandError("File {} doesn't exist!".format(f_name))
raise CommandError("File {0} doesn't exist!".format(f_name))

with sys.stdin if f_name == '-' else open(f_name, 'r') as f:
try:
Expand All @@ -47,11 +48,11 @@ def handle(self, *args, **options):
cqrs_id = None

if not cqrs_id:
raise CommandError('File {} is empty!'.format(f_name))
raise CommandError('File {0} is empty!'.format(f_name))

model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
if not model:
raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id))
raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))

with transaction.atomic():
if options['clear']:
Expand Down Expand Up @@ -80,7 +81,7 @@ def _process(cls, stream, model, batch_size):
except EOFError:
break

print('Done!\n{} instance(s) loaded.'.format(success_counter), file=sys.stderr)
print('Done!\n{0} instance(s) loaded.'.format(success_counter), file=sys.stderr)

@staticmethod
def _process_line(line_number, line, model):
Expand All @@ -91,23 +92,23 @@ def _process_line(line_number, line, model):
master_data = ujson.loads(line.strip())
except ValueError:
print(
"Dump file can't be parsed: line {}!".format(line_number),
file=sys.stderr
"Dump file can't be parsed: line {0}!".format(line_number),
file=sys.stderr,
)
return False

instance = model.cqrs_save(master_data)
if not instance:
print(
"Instance can't be saved: line {}!".format(line_number),
file=sys.stderr
"Instance can't be saved: line {0}!".format(line_number),
file=sys.stderr,
)
else:
return True
except Exception as e:
print(
'Unexpected error: line {}! {}'.format(line_number, str(e)),
file=sys.stderr
'Unexpected error: line {0}! {1}'.format(line_number, str(e)),
file=sys.stderr,
)

return False
Expand Down
4 changes: 2 additions & 2 deletions dj_cqrs/management/commands/cqrs_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from multiprocessing import Process

from django.core.management.base import BaseCommand

from dj_cqrs.transport import current_transport

from django.core.management.base import BaseCommand


class Command(BaseCommand):
help = 'Starts CQRS worker, which consumes messages from message queue.'
Expand Down
21 changes: 11 additions & 10 deletions dj_cqrs/management/commands/cqrs_dead_letters.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

import ujson
from django.core.management.base import BaseCommand, CommandError

from dj_cqrs.dataclasses import TransportPayload
from dj_cqrs.registries import ReplicaRegistry
from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
from dj_cqrs.transport import current_transport
from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
from dj_cqrs.utils import get_expires_datetime

from django.core.management.base import BaseCommand, CommandError

import ujson


class RabbitMQTransportService(RabbitMQTransport):

Expand Down Expand Up @@ -82,22 +83,22 @@ def init_broker(self):
queue_name, dead_letter_queue_name = RabbitMQTransportService.get_consumer_settings()
RabbitMQTransportService.declare_queue(channel, queue_name)
RabbitMQTransportService.declare_queue(channel, dead_letter_queue_name)
for cqrs_id, replica_model in ReplicaRegistry.models.items():
for cqrs_id, _ in ReplicaRegistry.models.items():
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=cqrs_id)

# Every service must have specific SYNC or requeue routes
channel.queue_bind(
exchange=exchange,
queue=queue_name,
routing_key='cqrs.{}.{}'.format(queue_name, cqrs_id),
routing_key='cqrs.{0}.{1}'.format(queue_name, cqrs_id),
)

return channel, connection

def handle_retry(self, channel, consumer_generator, dead_letters_count):
self.stdout.write("Total dead letters: {}".format(dead_letters_count))
self.stdout.write("Total dead letters: {0}".format(dead_letters_count))
for i in range(1, dead_letters_count + 1):
self.stdout.write("Retrying: {}/{}".format(i, dead_letters_count))
self.stdout.write("Retrying: {0}/{1}".format(i, dead_letters_count))
method_frame, properties, body = next(consumer_generator)

dct = ujson.loads(body)
Expand All @@ -116,12 +117,12 @@ def handle_retry(self, channel, consumer_generator, dead_letters_count):
RabbitMQTransportService.nack(channel, method_frame.delivery_tag)

def handle_dump(self, consumer_generator, dead_letters_count):
for i in range(1, dead_letters_count + 1):
for _ in range(1, dead_letters_count + 1):
*_, body = next(consumer_generator)
self.stdout.write(body.decode('utf-8'))

def handle_purge(self, channel, dead_letter_queue_name, dead_letter_count):
self.stdout.write("Total dead letters: {}".format(dead_letter_count))
self.stdout.write("Total dead letters: {0}".format(dead_letter_count))
if dead_letter_count > 0:
channel.queue_purge(dead_letter_queue_name)
self.stdout.write("Purged")
10 changes: 5 additions & 5 deletions dj_cqrs/management/commands/cqrs_deleted_diff_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import sys

import ujson
from dj_cqrs.registries import MasterRegistry

from django.core.management.base import BaseCommand, CommandError
from django.db import connection

from dj_cqrs.registries import MasterRegistry
import ujson


GET_NON_EXISTING_PKS_SQL_TEMPLATE = """
Expand Down Expand Up @@ -46,7 +46,7 @@ def handle(self, *args, **options):
master_data = self.deserialize_in(package_line)

sql = GET_NON_EXISTING_PKS_SQL_TEMPLATE.format(
values=','.join(["({})".format(pk) for pk in master_data]),
values=','.join(["({0})".format(pk) for pk in master_data]),
table=model._meta.db_table,
pk_field=model._meta.pk.attname,
)
Expand All @@ -55,14 +55,14 @@ def handle(self, *args, **options):
diff_ids = [r[0] for r in cursor.fetchall()]
if diff_ids:
self.stdout.write(self.serialize_out(diff_ids))
self.stderr.write('PK to delete: {}'.format(str(diff_ids)))
self.stderr.write('PK to delete: {0}'.format(str(diff_ids)))

@staticmethod
def _get_model(first_line):
cqrs_id = first_line.split(',')[0]
model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)

if not model:
raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id))
raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))

return model
13 changes: 7 additions & 6 deletions dj_cqrs/management/commands/cqrs_deleted_diff_replica.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

import ujson
from dj_cqrs.management.commands.utils import batch_qs
from dj_cqrs.registries import ReplicaRegistry

from django.core.exceptions import FieldError
from django.core.management.base import BaseCommand, CommandError
from django.utils.timezone import now

from dj_cqrs.management.commands.utils import batch_qs
from dj_cqrs.registries import ReplicaRegistry
import ujson


class Command(BaseCommand):
Expand Down Expand Up @@ -52,14 +53,14 @@ def handle(self, *args, **options):
try:
qs = qs.filter(**kwargs)
except FieldError as e:
raise CommandError('Bad filter kwargs! {}'.format(str(e)))
raise CommandError('Bad filter kwargs! {0}'.format(str(e)))

if not qs.exists():
self.stderr.write('No objects found for filter!')
return

current_dt = now()
self.stdout.write('{},{}'.format(model.CQRS_ID, str(current_dt)))
self.stdout.write('{0},{1}'.format(model.CQRS_ID, str(current_dt)))

for bqs in batch_qs(qs.values_list('pk', flat=True), batch_size=batch_size):
self.stdout.write(self.serialize_package(list(bqs)))
Expand All @@ -70,7 +71,7 @@ def _get_model(options):
model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)

if not model:
raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id))
raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))

return model

Expand Down
6 changes: 3 additions & 3 deletions dj_cqrs/management/commands/cqrs_deleted_sync_replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import sys

import ujson
from dj_cqrs.registries import ReplicaRegistry

from django.core.management.base import BaseCommand, CommandError
from django.db import DatabaseError

from dj_cqrs.registries import ReplicaRegistry
import ujson


class Command(BaseCommand):
Expand Down Expand Up @@ -36,6 +36,6 @@ def _get_model(first_line):
model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)

if not model:
raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id))
raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))

return model
Loading

0 comments on commit e73f923

Please sign in to comment.