-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9742a91
commit a729348
Showing
3 changed files
with
324 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,316 @@ | ||
import copy | ||
import datetime | ||
import logging | ||
import os | ||
import sqlite3 | ||
import time | ||
from pathlib import Path | ||
from typing import List, Optional, Dict | ||
|
||
from otlmow_converter.DotnotationHelper import DotnotationHelper | ||
from otlmow_converter.OtlmowConverter import OtlmowConverter | ||
from otlmow_model.OtlmowModel.BaseClasses.OTLObject import OTLObject, dynamic_create_instance_from_uri | ||
|
||
from InvalidTemplateKeyError import InvalidTemplateKeyError | ||
from PostenMappingDict import PostenMappingDict | ||
from RequestHandler import RequestHandler | ||
from RequesterFactory import RequesterFactory | ||
from SettingsManager import SettingsManager | ||
|
||
|
||
class TypeTemplateToAssetProcessor: | ||
dt_format = '%Y-%m-%dT%H:%M:%SZ' | ||
|
||
@staticmethod | ||
def wait_seconds(seconds: int = 10): | ||
time.sleep(seconds) | ||
|
||
def _save_to_sqlite_state(self, entries: Dict) -> None: | ||
if len(entries.items()) == 0: | ||
return | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
|
||
# get all names in table state | ||
c.execute('SELECT name FROM state;') | ||
existing_keys = set(row[0] for row in c.fetchall()) | ||
entries_keys = set(entries.keys()) | ||
|
||
# insert | ||
keys_to_insert = entries_keys - existing_keys | ||
for key in keys_to_insert: | ||
c.execute('''INSERT INTO state (name, value) VALUES (?, ?)''', (key, entries[key])) | ||
|
||
keys_to_update = entries_keys - keys_to_insert | ||
# update | ||
for key in keys_to_update: | ||
c.execute('''UPDATE state SET value = ? WHERE name = ?''', (entries[key], key)) | ||
|
||
c.execute('SELECT name, value FROM state;') | ||
self.state_db = {row[0]: row[1] for row in c.fetchall()} | ||
|
||
conn.commit() | ||
conn.close() | ||
|
||
def _load_state_db(self) -> None: | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
|
||
c.execute('SELECT name, value FROM state;') | ||
self.state_db = {row[0]: row[1] for row in c.fetchall()} | ||
|
||
conn.commit() | ||
conn.close() | ||
|
||
def save_last_event(self): | ||
while True: | ||
try: | ||
last_page = self.rest_client.get_current_feedpage() | ||
except ProcessLookupError: | ||
self.wait_seconds(60) | ||
continue | ||
sorted_entries = sorted(last_page.entries, key=lambda x: x.id, reverse=True) | ||
self_link = next(self_link for self_link in last_page.links if self_link.rel == 'self') | ||
self._save_to_sqlite_state(entries={'event_id': sorted_entries[0].id, 'page': self_link.href.split('/')[1]}) | ||
break | ||
|
||
def determine_if_template_is_complex(self, template_key): | ||
template = self.postenmapping_dict[template_key] | ||
return len(template.keys()) > 1 | ||
|
||
def perform_davie_aanlevering(self, reference: str, file_path: Path, event_id: str): | ||
aanlevering = self.get_aanlevering_by_event_id(event_id=event_id) | ||
if aanlevering is None: | ||
aanlevering = self.davie_client.create_aanlevering_employee( | ||
niveau='LOG-1', referentie=reference, | ||
verificatorId='6c2b7c0a-11a9-443a-a96b-a1bec249c629') | ||
|
||
aanlevering_id = aanlevering.id | ||
self._save_to_sqlite_aanleveringen(event_id=event_id, aanlevering_id=aanlevering_id, state='created') | ||
aanlevering = self.get_aanlevering_by_event_id(event_id=event_id) | ||
else: | ||
aanlevering_id = aanlevering[1] | ||
|
||
if aanlevering[2] == 'created': | ||
self.davie_client.upload_file(id=aanlevering_id, file_path=file_path) | ||
self._save_to_sqlite_aanleveringen(event_id=event_id, state='uploaded') | ||
aanlevering = self.get_aanlevering_by_event_id(event_id=event_id) | ||
|
||
if aanlevering[2] == 'uploaded': | ||
self.davie_client.finalize_and_wait(id=aanlevering_id) | ||
self._save_to_sqlite_aanleveringen(event_id=event_id, state='processed') | ||
|
||
@classmethod | ||
def process_objects_using_template(cls, objects_to_process, postenmapping_dict: Dict) -> List[OTLObject]: | ||
objects_to_upload = [] | ||
for object_nr, object_to_process in enumerate(objects_to_process): | ||
valid_postnummers = [postnummer for postnummer in object_to_process.bestekPostNummer | ||
if postnummer in postenmapping_dict] | ||
|
||
if len(valid_postnummers) != 1: | ||
continue | ||
|
||
objects_to_upload.extend(cls.create_assets_from_template( | ||
base_asset=object_to_process, template_key=valid_postnummers[0], asset_index=object_nr, | ||
postenmapping_dict=PostenMappingDict.mapping_dict)) | ||
return objects_to_upload | ||
|
||
def has_last_processed_been_too_long(self, current_updated: datetime, | ||
max_interval_in_minutes: int = 1) -> bool: | ||
last_processed = self.get_last_processed_by_context_id(context_id=self.state_db['transaction_context']) | ||
return last_processed + datetime.timedelta(minutes=max_interval_in_minutes) < current_updated | ||
|
||
@classmethod | ||
def create_assets_from_template(cls, template_key: str, base_asset: OTLObject, asset_index: int, postenmapping_dict | ||
) -> List[ | ||
OTLObject]: | ||
mapping = copy.deepcopy(postenmapping_dict[template_key]) | ||
copy_base_asset = dynamic_create_instance_from_uri(base_asset.typeURI) | ||
copy_base_asset.assetId = base_asset.assetId | ||
copy_base_asset.bestekPostNummer = base_asset.bestekPostNummer | ||
copy_base_asset.bestekPostNummer.remove(template_key) | ||
base_asset_toestand = base_asset.toestand | ||
created_assets = [copy_base_asset] | ||
|
||
# change the local id of the base asset to the real id in the mapping | ||
# and change relation id's accordingly | ||
base_local_id = next(local_id for local_id, asset_template in mapping.items() if asset_template['isHoofdAsset']) | ||
for local_id, asset_template in mapping.items(): | ||
if local_id == base_local_id: | ||
continue | ||
if 'bronAssetId.identificator' in asset_template['attributen']: | ||
if asset_template['attributen']['bronAssetId.identificator']['value'] == base_local_id: | ||
asset_template['attributen']['bronAssetId.identificator'][ | ||
'value'] = base_asset.assetId.identificator | ||
else: | ||
asset_template['attributen']['bronAssetId.identificator']['value'] = \ | ||
f"{asset_template['attributen']['bronAssetId.identificator']['value']}_{asset_index}" | ||
|
||
if 'doelAssetId.identificator' in asset_template['attributen']: | ||
if asset_template['attributen']['doelAssetId.identificator']['value'] == base_local_id: | ||
asset_template['attributen']['doelAssetId.identificator'][ | ||
'value'] = base_asset.assetId.identificator | ||
else: | ||
asset_template['attributen']['doelAssetId.identificator']['value'] = \ | ||
f"{asset_template['attributen']['doelAssetId.identificator']['value']}_{asset_index}" | ||
|
||
for asset_to_create in mapping.keys(): | ||
if asset_to_create != base_local_id: | ||
type_uri = mapping[asset_to_create]['typeURI'] | ||
asset = dynamic_create_instance_from_uri(class_uri=type_uri) | ||
asset.assetId.identificator = f'{asset_to_create}_{asset_index}' | ||
created_assets.append(asset) | ||
if hasattr(asset, 'toestand'): | ||
asset.toestand = base_asset_toestand | ||
else: | ||
asset = copy_base_asset | ||
|
||
for attr in mapping[asset_to_create]['attributen'].values(): | ||
if attr['dotnotation'] == 'typeURI': | ||
continue | ||
if attr['value'] is not None: | ||
value = attr['value'] | ||
if attr['type'] == 'http://www.w3.org/2001/XMLSchema#decimal': | ||
value = float(attr['value']) | ||
|
||
if asset == copy_base_asset: | ||
asset_attr = DotnotationHelper.get_attribute_by_dotnotation( | ||
base_asset, dotnotation=attr['dotnotation'], waarde_shortcut=True) | ||
if isinstance(asset_attr, list): | ||
asset_attr = asset_attr[0] | ||
if asset_attr.waarde is not None: | ||
continue | ||
|
||
DotnotationHelper.set_attribute_by_dotnotation(asset, dotnotation=attr['dotnotation'], | ||
waarde_shortcut=True, value=value) | ||
|
||
return created_assets | ||
|
||
@staticmethod | ||
def create_sqlite_if_not_exists(sqlite_path: Path): | ||
# create sqlite if not exists | ||
if not sqlite_path.exists(): | ||
conn = sqlite3.connect(sqlite_path) | ||
c = conn.cursor() | ||
|
||
for q in [''' | ||
CREATE TABLE state | ||
(name text, | ||
value text); | ||
''', ''' | ||
CREATE TABLE contexts | ||
(id text PRIMARY KEY, | ||
starting_page text, | ||
last_event_id text, | ||
last_processed_event text); | ||
''', ''' | ||
CREATE UNIQUE INDEX contexts_id_uindex ON contexts (id); | ||
''', ''' | ||
CREATE TABLE contexts_assets | ||
(context_id text, | ||
asset_uuid text, | ||
FOREIGN KEY (context_id) REFERENCES contexts (id)); | ||
''',''' | ||
CREATE TABLE aanleveringen | ||
(event_id text PRIMARY KEY, | ||
aanlevering_id text, | ||
state text); | ||
''']: | ||
c.execute(q) | ||
conn.commit() | ||
conn.close() | ||
|
||
def _save_to_sqlite_aanleveringen(self, event_id: str, aanlevering_id: str = None, state: str = None): | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
c.execute('''SELECT event_id FROM aanleveringen WHERE event_id = ?''', (event_id,)) | ||
if c.fetchone() is None: | ||
c.execute('''INSERT INTO aanleveringen VALUES (?, ?, ?)''', (event_id, aanlevering_id, state)) | ||
conn.commit() | ||
conn.close() | ||
return | ||
if aanlevering_id is not None: | ||
c.execute('''UPDATE aanleveringen SET aanlevering_id = ? WHERE event_id = ?''', (aanlevering_id, event_id)) | ||
if state is not None: | ||
c.execute('''UPDATE aanleveringen SET state = ? WHERE event_id = ?''', (state, event_id)) | ||
conn.commit() | ||
conn.close() | ||
|
||
def get_aanlevering_by_event_id(self, event_id: str) -> tuple: | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
|
||
c.execute('''SELECT event_id, aanlevering_id, state FROM aanleveringen WHERE event_id = ?''', (event_id,)) | ||
row = c.fetchone() | ||
conn.commit() | ||
conn.close() | ||
return row | ||
|
||
def _save_to_sqlite_contexts(self, context_id: str, starting_page: str = None, | ||
last_event_id: str = None, last_processed_event: str = None): | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
c.execute('''SELECT id FROM contexts WHERE id = ?''', (context_id,)) | ||
if c.fetchone() is None: | ||
c.execute('''INSERT INTO contexts VALUES (?, ?, ?, ?)''', (context_id, starting_page, last_event_id, | ||
last_processed_event)) | ||
conn.commit() | ||
conn.close() | ||
return | ||
if starting_page is not None: | ||
c.execute('''UPDATE contexts SET starting_page = ? WHERE id = ?''', (starting_page, context_id)) | ||
if last_event_id is not None: | ||
c.execute('''UPDATE contexts SET last_event_id = ? WHERE id = ?''', (last_event_id, context_id)) | ||
if last_processed_event is not None: | ||
c.execute('''UPDATE contexts SET last_processed_event = ? WHERE id = ?''', | ||
(last_processed_event, context_id)) | ||
conn.commit() | ||
conn.close() | ||
|
||
def _save_to_sqlite_contexts_assets(self, context_id: str, append: str): | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
|
||
c.execute('''INSERT INTO contexts_assets VALUES (?, ?)''', (context_id, append)) | ||
|
||
conn.commit() | ||
conn.close() | ||
|
||
def _get_asset_uuids_by_context_id(self, context_id: str): | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
|
||
c.execute('''SELECT asset_uuid FROM contexts_assets WHERE context_id = ?''', (context_id,)) | ||
results = [row[0] for row in c.fetchall()] | ||
conn.commit() | ||
conn.close() | ||
return results | ||
|
||
def get_last_processed_by_context_id(self, context_id) -> datetime.datetime: | ||
date_str = self.get_context_by_context_id(context_id)[3] | ||
return datetime.datetime.strptime(date_str, self.dt_format) | ||
|
||
def get_context_by_context_id(self, context_id) -> tuple: | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
|
||
c.execute('''SELECT id, starting_page, last_event_id, last_processed_event FROM contexts WHERE id = ?''', | ||
(context_id,)) | ||
row = c.fetchone() | ||
|
||
conn.commit() | ||
conn.close() | ||
return row | ||
|
||
def check_if_already_done(self, context_uuid, entry_id: int) -> bool: | ||
conn = sqlite3.connect(self.sqlite_path) | ||
c = conn.cursor() | ||
c.execute("""SELECT id, last_event_id FROM contexts WHERE id like ? || '%'""", (context_uuid,)) | ||
rows = c.fetchall() | ||
conn.commit() | ||
conn.close() | ||
|
||
for row in rows: | ||
if int(row[1]) >= entry_id: | ||
return True | ||
return False |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters