diff --git a/connect_ext_ppr/tasks_manager.py b/connect_ext_ppr/tasks_manager.py index 7bae1ff..9451fc9 100644 --- a/connect_ext_ppr/tasks_manager.py +++ b/connect_ext_ppr/tasks_manager.py @@ -9,8 +9,8 @@ from connect_ext_ppr.constants import PPR_FILE_NAME_DELEGATION_L2, PPR_FILE_NAME_UPDATE_MARKETPLACES from connect_ext_ppr.db import get_cbc_extension_db, get_cbc_extension_db_engine, get_db_ctx_manager from connect_ext_ppr.models.configuration import Configuration -from connect_ext_ppr.models.enums import CBCTaskLogStatus from connect_ext_ppr.models.enums import ( + CBCTaskLogStatus, DeploymentRequestStatusChoices, DeploymentStatusChoices, TasksStatusChoices, @@ -390,7 +390,7 @@ def delegate_to_l2(deployment_request, cbc_service, connect_client, **kwargs): } -def execute_tasks(db, config, tasks, connect_client): # noqa: CCR001 +def execute_tasks(db, config, tasks, connect_client, logger): was_succesfull = False cbc_service = None @@ -416,27 +416,28 @@ def execute_tasks(db, config, tasks, connect_client): # noqa: CCR001 db=db, ) task.status = TasksStatusChoices.done - if not was_succesfull: - task.status = TasksStatusChoices.error + except TaskException as ex: was_succesfull = False - task.error_message = str(ex) - task.status = TasksStatusChoices.error - except Exception as err: + task.error_message = str(ex)[:4000] + except Exception as ex: + logger.error(f'Task ID: {task.id} - {ex}') was_succesfull = False - task.error_message = str(err) - task.status = TasksStatusChoices.error + task.error_message = 'Something went wrong.' task.finished_at = datetime.utcnow() db.add(task) - db.commit() + if not was_succesfull: + task.status = TasksStatusChoices.error + db.commit() break + db.commit() return was_succesfull -def main_process(deployment_request_id, config, connect_client): +def main_process(deployment_request_id, config, connect_client, logger): with get_db_ctx_manager(config) as db: deployment_request = db.query(DeploymentRequest).options( @@ -461,7 +462,11 @@ def main_process(deployment_request_id, config, connect_client): deployment_request_id=deployment_request_id, ).order_by(Task.id).all() - was_succesfull = execute_tasks(db, config, tasks, connect_client) + try: + was_succesfull = execute_tasks(db, config, tasks, connect_client, logger) + except Exception as ex: + was_succesfull = False + logger.error(f'DeploymentRequest ID: {deployment_request_id} - {ex}') db.refresh(deployment_request, with_for_update=True) diff --git a/connect_ext_ppr/webapp.py b/connect_ext_ppr/webapp.py index 7a4cfb3..bec98d6 100644 --- a/connect_ext_ppr/webapp.py +++ b/connect_ext_ppr/webapp.py @@ -167,7 +167,7 @@ def add_dep_request( db, deployment_request, deployment, account_id, logger, ) - self.thread_pool.submit(main_process, instance.id, config, client) + self.thread_pool.submit(main_process, instance.id, config, client, logger) hub = get_client_object(client, 'hubs', instance.deployment.hub_id) response = get_deployment_request_schema(instance, hub) @@ -341,11 +341,12 @@ def retry( client: ConnectClient = Depends(get_installation_client), installation: dict = Depends(get_installation), config: dict = Depends(get_config), + logger: Logger = Depends(get_logger), ): dr = get_deployment_request_by_id(depl_req_id, db, installation) dr = DeploymentRequestActionHandler.retry(db, dr) - self.thread_pool.submit(main_process, dr.id, config, client) + self.thread_pool.submit(main_process, dr.id, config, client, logger) hub = get_hub(client, dr.deployment.hub_id) return get_deployment_request_schema(dr, hub) diff --git a/tests/test_tasks_manager.py b/tests/test_tasks_manager.py index 722caf5..05f5054 100644 --- a/tests/test_tasks_manager.py +++ b/tests/test_tasks_manager.py @@ -855,6 +855,7 @@ def test_main_process( mock_tasks, mocker, product_details, + logger, ): mock_get_product_details.return_value = product_details dep = deployment_factory() @@ -865,7 +866,7 @@ def test_main_process( task_factory(deployment_request=dr, task_index='0003', type=TaskTypesChoices.delegate_to_l2) mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService()) - assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done + assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done assert dbsession.query(Deployment).filter_by(status=DeploymentStatusChoices.synced).count() == 1 assert dbsession.query(DeploymentRequest).filter_by( @@ -892,6 +893,7 @@ def test_main_process_wo_l2_delegation( mock_tasks, mocker, product_details, + logger, ): mock_get_product_details.return_value = product_details dep = deployment_factory() @@ -901,7 +903,7 @@ def test_main_process_wo_l2_delegation( task_factory(deployment_request=dr, task_index='0002', type=TaskTypesChoices.apply_and_delegate) mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService()) - assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done + assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done assert dbsession.query(Deployment).filter_by( status=DeploymentStatusChoices.pending, @@ -931,6 +933,7 @@ def test_main_process_deployment_w_new_ppr_version( mock_tasks, mocker, product_details, + logger, ): mock_get_product_details.return_value = product_details ppr_file = file_factory(id='MFL-123') @@ -944,7 +947,7 @@ def test_main_process_deployment_w_new_ppr_version( task_factory(deployment_request=dr, task_index='0003', type=TaskTypesChoices.delegate_to_l2) mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService()) - assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done + assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done assert dbsession.query(Deployment).filter_by( status=DeploymentStatusChoices.pending, @@ -981,7 +984,7 @@ def test_main_process_ends_w_error( task_factory, ppr_version_factory, connect_client, - mock_tasks, + logger, ): dep = deployment_factory() ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1) @@ -993,12 +996,13 @@ def test_main_process_ends_w_error( my_mock = mocker.Mock() def mock_get(key): + print(key, ' :', key != type_function_to_mock) return lambda **kwargs: key != type_function_to_mock my_mock.get = mock_get mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService()) mocker.patch('connect_ext_ppr.tasks_manager.TASK_PER_TYPE', my_mock) - assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error + assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error assert dbsession.query(Deployment).filter_by( status=DeploymentStatusChoices.pending, @@ -1031,6 +1035,7 @@ def test_main_process_wo_hub_credentials( task_factory, connect_client, mocker, + logger, ): dep = deployment_factory() ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1) @@ -1039,7 +1044,7 @@ def test_main_process_wo_hub_credentials( deployment_request=dr, task_index='0001', type=TaskTypesChoices.product_setup, ) mocker.patch('connect_ext_ppr.client.utils.get_hub_credentials', return_value=None) - assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error + assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error assert task.status == TasksStatusChoices.error assert task.error_message == 'Hub Credentials not found for Hub ID HB-0000-0000.' @@ -1079,6 +1084,7 @@ def test_main_process_w_aborted_tasks( connect_client, mock_tasks, mocker, + logger, ): """ We only process DeploymentRequest that are in Pending status. So in this case we asume that @@ -1120,7 +1126,7 @@ def change_dr_status(instance, attribute_names=None, with_for_update=None): dbsession.refresh = change_dr_status mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService()) - assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.aborted + assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.aborted assert dbsession.query(Deployment).filter_by( status=DeploymentStatusChoices.pending, @@ -1147,6 +1153,7 @@ def test_main_process_w_aborted_deployment_request( ppr_version_factory, connect_client, mock_tasks, + logger, ): """ We only process DeploymentRequest that are in Pending status. So in this case we asume that @@ -1180,7 +1187,7 @@ def test_main_process_w_aborted_deployment_request( status=TasksStatusChoices.aborted, ) - assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.aborted + assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.aborted assert dbsession.query(Deployment).filter_by( status=DeploymentStatusChoices.pending, @@ -1216,7 +1223,7 @@ def test_main_process_ends_w_task_exception( task_factory, ppr_version_factory, connect_client, - mock_tasks, + logger, ): dep = deployment_factory() ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1) @@ -1236,7 +1243,7 @@ def mock_get(key): mocker.patch('connect_ext_ppr.tasks_manager.TASK_PER_TYPE', my_mock) mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService()) - assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error + assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error assert dbsession.query(Deployment).filter_by( status=DeploymentStatusChoices.pending,