Skip to content

Commit

Permalink
LITE-28603: Sending logger to main process that execute tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
akodelia committed Sep 27, 2023
1 parent 2484416 commit 5e7deaa
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 24 deletions.
29 changes: 17 additions & 12 deletions connect_ext_ppr/tasks_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from connect_ext_ppr.constants import PPR_FILE_NAME_DELEGATION_L2, PPR_FILE_NAME_UPDATE_MARKETPLACES
from connect_ext_ppr.db import get_cbc_extension_db, get_cbc_extension_db_engine, get_db_ctx_manager
from connect_ext_ppr.models.configuration import Configuration
from connect_ext_ppr.models.enums import CBCTaskLogStatus
from connect_ext_ppr.models.enums import (
CBCTaskLogStatus,
DeploymentRequestStatusChoices,
DeploymentStatusChoices,
TasksStatusChoices,
Expand Down Expand Up @@ -390,7 +390,7 @@ def delegate_to_l2(deployment_request, cbc_service, connect_client, **kwargs):
}


def execute_tasks(db, config, tasks, connect_client): # noqa: CCR001
def execute_tasks(db, config, tasks, connect_client, logger):
was_succesfull = False
cbc_service = None

Expand All @@ -416,27 +416,28 @@ def execute_tasks(db, config, tasks, connect_client): # noqa: CCR001
db=db,
)
task.status = TasksStatusChoices.done
if not was_succesfull:
task.status = TasksStatusChoices.error

except TaskException as ex:
was_succesfull = False
task.error_message = str(ex)
task.status = TasksStatusChoices.error
except Exception as err:
task.error_message = str(ex)[:4000]
except Exception as ex:
logger.error(f'Task ID: {task.id} - {ex}')
was_succesfull = False
task.error_message = str(err)
task.status = TasksStatusChoices.error
task.error_message = 'Something went wrong.'

task.finished_at = datetime.utcnow()
db.add(task)
db.commit()

if not was_succesfull:
task.status = TasksStatusChoices.error
db.commit()
break
db.commit()

return was_succesfull


def main_process(deployment_request_id, config, connect_client):
def main_process(deployment_request_id, config, connect_client, logger):

with get_db_ctx_manager(config) as db:
deployment_request = db.query(DeploymentRequest).options(
Expand All @@ -461,7 +462,11 @@ def main_process(deployment_request_id, config, connect_client):
deployment_request_id=deployment_request_id,
).order_by(Task.id).all()

was_succesfull = execute_tasks(db, config, tasks, connect_client)
try:
was_succesfull = execute_tasks(db, config, tasks, connect_client, logger)
except Exception as ex:
was_succesfull = False
logger.error(f'DeploymentRequest ID: {deployment_request_id} - {ex}')

db.refresh(deployment_request, with_for_update=True)

Expand Down
5 changes: 3 additions & 2 deletions connect_ext_ppr/webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def add_dep_request(
db, deployment_request, deployment, account_id, logger,
)

self.thread_pool.submit(main_process, instance.id, config, client)
self.thread_pool.submit(main_process, instance.id, config, client, logger)

hub = get_client_object(client, 'hubs', instance.deployment.hub_id)
response = get_deployment_request_schema(instance, hub)
Expand Down Expand Up @@ -341,11 +341,12 @@ def retry(
client: ConnectClient = Depends(get_installation_client),
installation: dict = Depends(get_installation),
config: dict = Depends(get_config),
logger: Logger = Depends(get_logger),
):
dr = get_deployment_request_by_id(depl_req_id, db, installation)
dr = DeploymentRequestActionHandler.retry(db, dr)

self.thread_pool.submit(main_process, dr.id, config, client)
self.thread_pool.submit(main_process, dr.id, config, client, logger)

hub = get_hub(client, dr.deployment.hub_id)
return get_deployment_request_schema(dr, hub)
Expand Down
27 changes: 17 additions & 10 deletions tests/test_tasks_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ def test_main_process(
mock_tasks,
mocker,
product_details,
logger,
):
mock_get_product_details.return_value = product_details
dep = deployment_factory()
Expand All @@ -865,7 +866,7 @@ def test_main_process(
task_factory(deployment_request=dr, task_index='0003', type=TaskTypesChoices.delegate_to_l2)

mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done

assert dbsession.query(Deployment).filter_by(status=DeploymentStatusChoices.synced).count() == 1
assert dbsession.query(DeploymentRequest).filter_by(
Expand All @@ -892,6 +893,7 @@ def test_main_process_wo_l2_delegation(
mock_tasks,
mocker,
product_details,
logger,
):
mock_get_product_details.return_value = product_details
dep = deployment_factory()
Expand All @@ -901,7 +903,7 @@ def test_main_process_wo_l2_delegation(
task_factory(deployment_request=dr, task_index='0002', type=TaskTypesChoices.apply_and_delegate)

mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down Expand Up @@ -931,6 +933,7 @@ def test_main_process_deployment_w_new_ppr_version(
mock_tasks,
mocker,
product_details,
logger,
):
mock_get_product_details.return_value = product_details
ppr_file = file_factory(id='MFL-123')
Expand All @@ -944,7 +947,7 @@ def test_main_process_deployment_w_new_ppr_version(
task_factory(deployment_request=dr, task_index='0003', type=TaskTypesChoices.delegate_to_l2)

mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down Expand Up @@ -981,7 +984,7 @@ def test_main_process_ends_w_error(
task_factory,
ppr_version_factory,
connect_client,
mock_tasks,
logger,
):
dep = deployment_factory()
ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1)
Expand All @@ -993,12 +996,13 @@ def test_main_process_ends_w_error(
my_mock = mocker.Mock()

def mock_get(key):
print(key, ' :', key != type_function_to_mock)
return lambda **kwargs: key != type_function_to_mock
my_mock.get = mock_get

mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
mocker.patch('connect_ext_ppr.tasks_manager.TASK_PER_TYPE', my_mock)
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down Expand Up @@ -1031,6 +1035,7 @@ def test_main_process_wo_hub_credentials(
task_factory,
connect_client,
mocker,
logger,
):
dep = deployment_factory()
ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1)
Expand All @@ -1039,7 +1044,7 @@ def test_main_process_wo_hub_credentials(
deployment_request=dr, task_index='0001', type=TaskTypesChoices.product_setup,
)
mocker.patch('connect_ext_ppr.client.utils.get_hub_credentials', return_value=None)
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error

assert task.status == TasksStatusChoices.error
assert task.error_message == 'Hub Credentials not found for Hub ID HB-0000-0000.'
Expand Down Expand Up @@ -1079,6 +1084,7 @@ def test_main_process_w_aborted_tasks(
connect_client,
mock_tasks,
mocker,
logger,
):
"""
We only process DeploymentRequest that are in Pending status. So in this case we asume that
Expand Down Expand Up @@ -1120,7 +1126,7 @@ def change_dr_status(instance, attribute_names=None, with_for_update=None):
dbsession.refresh = change_dr_status
mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())

assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.aborted
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.aborted

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand All @@ -1147,6 +1153,7 @@ def test_main_process_w_aborted_deployment_request(
ppr_version_factory,
connect_client,
mock_tasks,
logger,
):
"""
We only process DeploymentRequest that are in Pending status. So in this case we asume that
Expand Down Expand Up @@ -1180,7 +1187,7 @@ def test_main_process_w_aborted_deployment_request(
status=TasksStatusChoices.aborted,
)

assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.aborted
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.aborted

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down Expand Up @@ -1216,7 +1223,7 @@ def test_main_process_ends_w_task_exception(
task_factory,
ppr_version_factory,
connect_client,
mock_tasks,
logger,
):
dep = deployment_factory()
ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1)
Expand All @@ -1236,7 +1243,7 @@ def mock_get(key):

mocker.patch('connect_ext_ppr.tasks_manager.TASK_PER_TYPE', my_mock)
mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down

0 comments on commit 5e7deaa

Please sign in to comment.