Skip to content

Commit

Permalink
Improve migration of aggregate data
Browse files Browse the repository at this point in the history
  • Loading branch information
marioba committed Feb 4, 2022
1 parent a2cd470 commit 2ef2f34
Showing 1 changed file with 131 additions and 120 deletions.
251 changes: 131 additions & 120 deletions comptages/datamodel/migrations/0022_auto_20211217_0624.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import logging
import sys
import time

from django.db import migrations
import datetime
import pytz
from django.db import migrations, transaction


def get_console_logger():
Expand All @@ -21,147 +21,158 @@ def get_console_logger():
def migrate_data(apps, schema_editor):
logger = get_console_logger()
count_details = []
chunk_size = 1000
local_tz = pytz.timezone('Europe/Zurich')
start_time = datetime.datetime.now()

# We can't import the models directly as it may be a newer
# version than this migration expects. We use the historical version.
CountAggregateValueCls = apps.get_model('comptages', 'CountAggregateValueCls')
CountAggregateValueCnt = apps.get_model('comptages', 'CountAggregateValueCnt')
CountAggregateValueDrn = apps.get_model('comptages', 'CountAggregateValueDrn')
CountAggregateValueLen = apps.get_model('comptages', 'CountAggregateValueLen')
CountAggregateValueSpd = apps.get_model('comptages', 'CountAggregateValueSpd')
CountDetail = apps.get_model('comptages', 'CountDetail')

qs = CountAggregateValueCls.objects.all()
logger.info('Start with count_aggregate_value_cls table')
logger.info(f'Total rows: {len(qs)}')

for idx, i in enumerate(qs):
count_details.append(
CountDetail(
numbering=1,
timestamp=i.id_count_aggregate.start,
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
id_category=i.id_category,
times=i.value,
from_aggregate=True,
## CLS
logger.info(f'{datetime.datetime.now()} Start with count_aggregate_value_cls table')
num = 0
while CountAggregateValueCls.objects.all().exists():
with transaction.atomic():
# Cannot delete using a normal slice on all() objects like this qs = CountAggregateValueCls.objects.all()[:100]
qs = CountAggregateValueCls.objects.filter(id__in=list(CountAggregateValueCls.objects.values_list('pk', flat=True)[:chunk_size]))
for i in qs:
count_details.append(
CountDetail(
numbering=1,
timestamp=local_tz.localize(i.id_count_aggregate.start),
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
id_category=i.id_category,
times=i.value,
from_aggregate=True,
)
)
)

if (idx % 1000) == 0:
logger.info(f'{idx}')
CountDetail.objects.bulk_create(count_details)
count_details = []

CountDetail.objects.bulk_create(count_details)
count_details = []

CountAggregateValueCnt = apps.get_model('comptages', 'CountAggregateValueCnt')
qs = CountAggregateValueCnt.objects.all()
logger.info('Start with count_aggregate_value_cnt table')
logger.info(f'Total rows: {len(qs)}')
for idx, i in enumerate(qs):
count_details.append(
CountDetail(
numbering=1,
timestamp=i.id_count_aggregate.start,
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
times=i.value,
from_aggregate=True,
qs.delete()
num += chunk_size
now = datetime.datetime.now()
logger.info(f'{now} Table CLS (1/5), migrated {num} rows, total elapsed time {now - start_time}')

## CNT
logger.info(f'{datetime.datetime.now()} Start with count_aggregate_value_cnt table')
num = 0
while CountAggregateValueCnt.objects.all().exists():
with transaction.atomic():
qs = CountAggregateValueCnt.objects.filter(id__in=list(CountAggregateValueCnt.objects.values_list('pk', flat=True)[:chunk_size]))
for i in qs:
count_details.append(
CountDetail(
numbering=1,
timestamp=local_tz.localize(i.id_count_aggregate.start),
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
times=i.value,
from_aggregate=True,
)
)
)

if (idx % 1000) == 0:
logger.info(f'{idx}')
CountDetail.objects.bulk_create(count_details)
count_details = []

CountDetail.objects.bulk_create(count_details)
count_details = []

CountAggregateValueDrn = apps.get_model('comptages', 'CountAggregateValueDrn')
qs = CountAggregateValueDrn.objects.all()
logger.info('Start with count_aggregate_value_drn table')
logger.info(f'Total rows: {len(qs)}')
for idx, i in enumerate(qs):
count_details.append(
CountDetail(
numbering=1,
timestamp=i.id_count_aggregate.start,
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
times=i.value,
from_aggregate=True,
qs.delete()
num += chunk_size
now = datetime.datetime.now()
logger.info(f'{now} Table CNT (2/5), migrated {num} rows, total elapsed time {now - start_time}')

## DRN
logger.info(f'{datetime.datetime.now()} Start with count_aggregate_value_drn table')
num = 0
while CountAggregateValueDrn.objects.all().exists():
with transaction.atomic():
qs = CountAggregateValueDrn.objects.filter(id__in=list(CountAggregateValueDrn.objects.values_list('pk', flat=True)[:chunk_size]))
for i in qs:
count_details.append(
CountDetail(
numbering=1,
timestamp=local_tz.localize(i.id_count_aggregate.start),
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
times=i.value,
from_aggregate=True,
)
)
)

if (idx % 1000) == 0:
logger.info(f'{idx}')
CountDetail.objects.bulk_create(count_details)
count_details = []

CountDetail.objects.bulk_create(count_details)
count_details = []

CountAggregateValueLen = apps.get_model('comptages', 'CountAggregateValueLen')
qs = CountAggregateValueLen.objects.all()
logger.info('Start with count_aggregate_value_len table')
logger.info(f'Total rows: {len(qs)}')
for idx, i in enumerate(qs):
count_details.append(
CountDetail(
numbering=1,
timestamp=i.id_count_aggregate.start,
length=int((i.low + i.high) / 2),
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
times=i.value,
from_aggregate=True,
qs.delete()
num += chunk_size
now = datetime.datetime.now()
logger.info(f'{now} Table DRN (3/5), migrated {num} rows, total elapsed time {now - start_time}')

## LEN
logger.info(f'{datetime.datetime.now()} Start with count_aggregate_value_len table')
num = 0
while CountAggregateValueLen.objects.all().exists():
with transaction.atomic():
qs = CountAggregateValueLen.objects.filter(id__in=list(CountAggregateValueLen.objects.values_list('pk', flat=True)[:chunk_size]))
for i in qs:
count_details.append(
CountDetail(
numbering=1,
timestamp=local_tz.localize(i.id_count_aggregate.start),
length=int((i.low + i.high) / 2),
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
times=i.value,
from_aggregate=True,
)
)
)

if (idx % 1000) == 0:
logger.info(f'{idx}')
CountDetail.objects.bulk_create(count_details)
count_details = []

CountDetail.objects.bulk_create(count_details)
count_details = []

CountAggregateValueSpd = apps.get_model('comptages', 'CountAggregateValueSpd')
qs = CountAggregateValueSpd.objects.all()
logger.info('Start with count_aggregate_value_spd table')
logger.info(f'Total rows: {len(qs)}')
for idx, i in enumerate(qs):
count_details.append(
CountDetail(
numbering=1,
timestamp=i.id_count_aggregate.start,
speed=i.low + 5,
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
times=i.value,
from_aggregate=True,
qs.delete()
num += chunk_size
now = datetime.datetime.now()
logger.info(f'{now} Table LEN (4/5), migrated {num} rows, total elapsed time {now - start_time}')

## SPD
logger.info(f'{datetime.datetime.now()} Start with count_aggregate_value_spd table')
num = 0
while CountAggregateValueSpd.objects.all().exists():
with transaction.atomic():
qs = CountAggregateValueSpd.objects.filter(id__in=list(CountAggregateValueSpd.objects.values_list('pk', flat=True)[:chunk_size]))
for i in qs:
count_details.append(
CountDetail(
numbering=1,
timestamp=local_tz.localize(i.id_count_aggregate.start),
speed=i.low + 5,
file_name=i.id_count_aggregate.file_name,
import_status=i.id_count_aggregate.import_status,
id_lane=i.id_count_aggregate.id_lane,
id_count=i.id_count_aggregate.id_count,
times=i.value,
from_aggregate=True,
)
)
)

if (idx % 1000) == 0:
logger.info(f'{idx}')
CountDetail.objects.bulk_create(count_details)
count_details = []
qs.delete()
num += chunk_size
now = datetime.datetime.now()
logger.info(f'{now} Table SPD (5/5), migrated {num} rows, total elapsed time {now - start_time}')

CountDetail.objects.bulk_create(count_details)
count_details = []

now = datetime.datetime.now()
logger.info(f'{now} All tables migrated! Total elapsed time {now - start_time}')

class Migration(migrations.Migration):
atomic = False

dependencies = [
('comptages', '0021_alter_count_tjm'),
Expand Down

0 comments on commit 2ef2f34

Please sign in to comment.