Skip to content

Commit

Permalink
Merge pull request #121 from cloudblue/LITE-28676-fix-error-handling
Browse files Browse the repository at this point in the history
LITE-28676 Fix error handling for calls to CBC
  • Loading branch information
Hairash authored Oct 6, 2023
2 parents 4509ae4 + d3cbada commit 8466659
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 81 deletions.
2 changes: 2 additions & 0 deletions connect_ext_ppr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,5 @@
"required": ["hierarchical_files_data"]
}}
"""

DELAY_SECONDS_BETWEEN_TASKS = 10
2 changes: 1 addition & 1 deletion connect_ext_ppr/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def add_new_deployment_request(db, dr_data, deployment, account_id, logger):
))
tasks.append(Task(
deployment_request_id=deployment_request.id,
title='Apply PP and delegate to marketplaces',
title='Apply PPR and delegate to marketplaces',
type=Task.TYPES.apply_and_delegate,
created_by=account_id,
))
Expand Down
7 changes: 4 additions & 3 deletions connect_ext_ppr/services/cbc_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def update_product(self, product_id: str):
)

def parse_ppr(self, file: BufferedReader):
file.seek(0)
base64_content = base64.b64encode(file.read()).decode('ascii')

return self.plm_service.action(
Expand Down Expand Up @@ -175,9 +176,9 @@ def search_task_logs_by_name(self, partial_name: str):

def parse_price_file(
self,
account_id: int,
vendor_id: str,
file: BufferedReader,
account_id: int,
vendor_id: str,
file: BufferedReader,
) -> Dict:
"""
Expand Down
150 changes: 83 additions & 67 deletions connect_ext_ppr/tasks_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from sqlalchemy.orm import joinedload, selectinload

from connect_ext_ppr.client.exception import CBCClientError
from connect_ext_ppr.constants import PPR_FILE_NAME_DELEGATION_L2, PPR_FILE_NAME_UPDATE_MARKETPLACES
from connect_ext_ppr.constants import (
DELAY_SECONDS_BETWEEN_TASKS,
PPR_FILE_NAME_DELEGATION_L2,
PPR_FILE_NAME_UPDATE_MARKETPLACES,
)
from connect_ext_ppr.db import get_cbc_extension_db, get_cbc_extension_db_engine, get_db_ctx_manager
from connect_ext_ppr.models.configuration import Configuration
from connect_ext_ppr.models.enums import (
Expand Down Expand Up @@ -65,7 +69,10 @@ def _execute_with_retries(function, func_kwargs, num_retries=5):
num_retries=num_retries,
)
except CBCClientError as ex:
raise TaskException(str(ex))
message = ex.message
if ex.json:
message += f' - {ex.json.get("message")}'
raise TaskException(message)


def _send_ppr(cbc_service, file: BufferedReader):
Expand All @@ -89,11 +96,11 @@ def _check_cbc_task_status(cbc_service, tracking_id):
task_log = _execute_with_retries(
cbc_service.search_task_logs_by_name, func_kwargs={'partial_name': tracking_id},
)
# Setting this first default value in case takes time to create it in extenal system.
# Setting this first default value in case takes time to create it in external system.
task_log = task_log[0] if task_log else {'status': CBCTaskLogStatus.not_started}

while task_log['status'] in (CBCTaskLogStatus.not_started, CBCTaskLogStatus.running):
time.sleep(10)
time.sleep(DELAY_SECONDS_BETWEEN_TASKS)
task_log = _execute_with_retries(
cbc_service.search_task_logs_by_name, {'partial_name': tracking_id})[0]

Expand All @@ -103,6 +110,29 @@ def _check_cbc_task_status(cbc_service, tracking_id):
raise TaskException(f'Something went wrong with task: {tracking_id}')


def check_and_update_product(deployment_request, cbc_service, **kwargs):
if not deployment_request.manually:

product_id = deployment_request.deployment.product_id

response = _execute_with_retries(
cbc_service.get_product_details, func_kwargs={'product_id': product_id},
)

if 'error' in response.keys():
raise Exception(response['error'])

if response.get('isUpdateAvailable'):
response = _execute_with_retries(
cbc_service.update_product, func_kwargs={'product_id': product_id},
)

if 'error' in response.keys():
raise Exception(response['error'])

return True


def prepare_ppr_file_for_task(
connect_client,
file_data,
Expand Down Expand Up @@ -153,8 +183,6 @@ def prepare_ppr_file_for_task(
file_obj.read(),
file_size,
)

file_obj.seek(0)
return file_obj

except (FileNotFoundError, ValueError, KeyError) as e:
Expand All @@ -163,34 +191,12 @@ def prepare_ppr_file_for_task(
raise TaskException(f'Error while connecting to Connect: {e.message}')


def check_and_update_product(deployment_request, cbc_service, **kwargs):
if not deployment_request.manually:

product_id = deployment_request.deployment.product_id

response = _execute_with_retries(
cbc_service.get_product_details, func_kwargs={'product_id': product_id},
)

if 'error' in response.keys():
raise Exception(response['error'])

if response.get('isUpdateAvailable'):
response = _execute_with_retries(
cbc_service.update_product, func_kwargs={'product_id': product_id},
)

if 'error' in response.keys():
raise Exception(response['error'])

return True


def apply_ppr_and_delegate_to_marketplaces(
deployment_request,
cbc_service,
connect_client,
db,
logger,
**kwargs,
):
"""Task sends PPR to the Commerce and updates marketplaces in DB
Expand All @@ -199,6 +205,7 @@ def apply_ppr_and_delegate_to_marketplaces(
:param cbc_service: CBC service
:param connect_client: Connect client
:param db: dbsession
:param logger: logger
:rtype bool
:raises TaskException
"""
Expand Down Expand Up @@ -255,6 +262,10 @@ def apply_ppr_and_delegate_to_marketplaces(
)

tracking_id = _send_ppr(cbc_service, file)
logger.debug(
f'apply_ppr_and_delegate_to_marketplaces dr_id: {deployment_request.id},'
f' tracking_id: {tracking_id}',
)
file.close()
_check_cbc_task_status(cbc_service, tracking_id)

Expand All @@ -267,6 +278,47 @@ def apply_ppr_and_delegate_to_marketplaces(
return True


def delegate_to_l2(deployment_request, cbc_service, connect_client, logger, **kwargs):
"""Task delegates PPR to L2 in Commerce
:param deployment_request: DeploymentRequest model
:param cbc_service: CBC service
:param connect_client: Connect client
:param logger: logger
:rtype bool
:raises TaskException
"""
if deployment_request.manually:
return True

ppr_file_id = deployment_request.ppr.file
deployment = deployment_request.deployment
try:
file_data = get_ppr_from_media(
connect_client,
deployment.account_id,
deployment.id,
ppr_file_id,
)
except ClientError as e:
raise TaskException(f'Error while connecting to Connect: {e.message}')

file = prepare_ppr_file_for_task(
connect_client=connect_client,
file_data=file_data,
file_name_template=PPR_FILE_NAME_DELEGATION_L2,
deployment_request=deployment_request,
deployment=deployment,
process_func=process_ppr_file_for_delelegate_l2,
)

tracking_id = _send_ppr(cbc_service, file)
logger.debug(f'delegate_to_l2 dr_id: {deployment_request.id}, tracking_id: {tracking_id}')
file.close()
_check_cbc_task_status(cbc_service, tracking_id)
return True


def apply_pricelist_task(
deployment_request,
cbc_service,
Expand Down Expand Up @@ -343,44 +395,6 @@ def validate_pricelists_task(
return True


def delegate_to_l2(deployment_request, cbc_service, connect_client, **kwargs):
"""Task delegates PPR to L2 in Commerce
:param deployment_request: DeploymentRequest model
:param cbc_service: CBC service
:param connect_client: Connect client
:rtype bool
:raises TaskException
"""
if deployment_request.manually:
return True

ppr_file_id = deployment_request.ppr.file
deployment = deployment_request.deployment
try:
file_data = get_ppr_from_media(
connect_client,
deployment.account_id,
deployment.id,
ppr_file_id,
)
except ClientError as e:
raise TaskException(f'Error while connecting to Connect: {e.message}')

file = prepare_ppr_file_for_task(
connect_client=connect_client,
file_data=file_data,
file_name_template=PPR_FILE_NAME_DELEGATION_L2,
deployment_request=deployment_request,
deployment=deployment,
process_func=process_ppr_file_for_delelegate_l2,
)

tracking_id = _send_ppr(cbc_service, file)
file.close()
return _check_cbc_task_status(cbc_service, tracking_id)


TASK_PER_TYPE = {
TaskTypesChoices.product_setup: check_and_update_product,
TaskTypesChoices.apply_and_delegate: apply_ppr_and_delegate_to_marketplaces,
Expand Down Expand Up @@ -414,10 +428,12 @@ def execute_tasks(db, config, tasks, connect_client, logger):
connect_client=connect_client,
marketplace=task.marketplace,
db=db,
logger=logger,
)
task.status = TasksStatusChoices.done

except TaskException as ex:
logger.error(f'Task ID: {task.id} - {ex}')
was_succesfull = False
task.error_message = str(ex)[:4000]
except Exception as ex:
Expand Down
3 changes: 3 additions & 0 deletions connect_ext_ppr/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from collections import defaultdict
from functools import partial
from io import BytesIO
Expand All @@ -18,6 +19,7 @@
from connect_ext_ppr.constants import (
BASE_SCHEMA,
CONFIGURATION_SCHEMA_TEMPLATE,
DELAY_SECONDS_BETWEEN_TASKS,
PPR_SCHEMA,
SUMMARY_TEMPLATE,
)
Expand Down Expand Up @@ -682,6 +684,7 @@ def execute_with_retry(function, exception_class, args=None, kwargs=None, num_re
except exception_class:
if num_retries == 0:
raise
time.sleep(DELAY_SECONDS_BETWEEN_TASKS)


def get_mps_to_update_for_apply_ppr_and_delegate_to_marketplaces(
Expand Down
Loading

0 comments on commit 8466659

Please sign in to comment.