Skip to content

Commit

Permalink
fix: Celery ConnectTimeoutError on request Error (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
karatugo authored Apr 17, 2024
1 parent 608c115 commit bb5485f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 24 deletions.
44 changes: 25 additions & 19 deletions sumstats_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ def update_sumstats(callback_id):
content = request.get_json(force=True)

resp = endpoints.update_sumstats(callback_id=callback_id, content=content)
logger.debug(f">>>>>>>>>>>>>>>>>>>> {resp=}")
logger.info(f"PUT /v1/sum-stats/{callback_id}")
logger.info(f">> {resp=}")

if resp:
move_files_result = move_files_to_staging.apply_async(
Expand All @@ -204,19 +205,20 @@ def update_sumstats(callback_id):
move_files_result.wait()

if move_files_result.successful():
logger.info(f"{callback_id=} :: move_files_result successful")
metadata_conversion_result = convert_metadata_to_yaml.apply_async(
args=[resp["studyList"][0]["gcst"], False],
retry=True,
)
metadata_conversion_result.wait()

if metadata_conversion_result.successful():
delete_endpoint_result = delete_globus_endpoint.apply_async(
args=[move_files_result.get()["globus_endpoint_id"]],
retry=True,
)
delete_endpoint_result.wait()
globus_endpoint_id = move_files_result.get()["globus_endpoint_id"]
logger.info(f">> [delete_globus_endpoint] calling {globus_endpoint_id=}")
delete_endpoint_result = au.delete_globus_endpoint(globus_endpoint_id)
logger.info(f"{callback_id=} :: {delete_endpoint_result=}")

logger.info(f"{callback_id=} :: Return status 200")
return Response(status=200, mimetype="application/json")


Expand All @@ -225,6 +227,7 @@ def update_sumstats(callback_id):

@app.route("/v1/sum-stats/globus/mkdir", methods=["POST"])
def make_dir():
logger.info(">> /v1/sum-stats/globus/mkdir")
req_data = request.get_json()
unique_id = req_data["uniqueID"]
email = req_data["email"]
Expand All @@ -242,8 +245,11 @@ def make_dir():
@app.route("/v1/sum-stats/globus/<unique_id>", methods=["DELETE"])
def deactivate_dir(unique_id):
resp = {"unique_id": unique_id}
logger.info(f">> DELETE /v1/sum-stats/globus/{unique_id}")
status = au.delete_globus_endpoint(unique_id)
logger.info(f">> {status=}")
if status is False:
logger.info('aborting...')
abort(404)
return make_response(jsonify(resp), status)

Expand Down Expand Up @@ -275,9 +281,9 @@ def process_studies(
zero_p_values: bool = False,
bypass: bool = False,
):
print(f">>> [process_studies] {callback_id=} with {minrows=} {forcevalid=} {bypass=} {zero_p_values=} {file_type=}")
logger.info(f">>> [process_studies] {callback_id=} with {minrows=} {forcevalid=} {bypass=} {zero_p_values=} {file_type=}")
if endpoints.create_studies(callback_id=callback_id, file_type=file_type, content=content):
print(f'endpoints.create_studies: True for {callback_id=}')
logger.info(f'endpoints.create_studies: True for {callback_id=}')
validate_files_in_background.apply_async(
kwargs={
"callback_id": callback_id,
Expand All @@ -303,20 +309,20 @@ def validate_files_in_background(
zero_p_values: bool = False,
file_type: Union[str, None] = None,
):
print(f">>> [validate_files_in_background] {callback_id=} with {minrows=} {forcevalid=} {bypass=} {zero_p_values=} {file_type=}")
logger.info(f">>> [validate_files_in_background] {callback_id=} with {minrows=} {forcevalid=} {bypass=} {zero_p_values=} {file_type=}")

print('calling store_validation_method')
logger.info('calling store_validation_method')
au.store_validation_method(callback_id=callback_id, bypass_validation=forcevalid)

if bypass is True:
print('Bypassing the validation.')
logger.info('Bypassing the validation.')
results = au.skip_validation_completely(
callback_id=callback_id,
content=content,
file_type=file_type,
)
else:
print('Validating files.')
logger.info('Validating files.')
results = au.validate_files(
callback_id=callback_id,
content=content,
Expand All @@ -330,39 +336,39 @@ def validate_files_in_background(

@celery.task(queue=config.CELERY_QUEUE2, options={"queue": config.CELERY_QUEUE2})
def store_validation_results(results):
print(">>> [store_validation_results]")
logger.info(">>> [store_validation_results]")
if results:
print('results: True')
logger.info('results: True')
au.store_validation_results_in_db(results)


@celery.task(queue=config.CELERY_QUEUE1, options={"queue": config.CELERY_QUEUE1})
def remove_payload_files(callback_id):
print(">>> [remove_payload_files]")
logger.info(">>> [remove_payload_files]")
au.remove_payload_files(callback_id)


@celery.task(queue=config.CELERY_QUEUE1, options={"queue": config.CELERY_QUEUE1})
def move_files_to_staging(resp):
print(">>> [move_files_to_staging]")
logger.info(">>> [move_files_to_staging]")
return au.move_files_to_staging(resp)


@celery.task(queue=config.CELERY_QUEUE3, options={"queue": config.CELERY_QUEUE3})
def convert_metadata_to_yaml(gcst_id, is_harmonised_included=True):
print(f">>> [convert_metadata_to_yaml] for {gcst_id=}")
logger.info(f">>> [convert_metadata_to_yaml] for {gcst_id=}")
return au.convert_metadata_to_yaml(gcst_id, is_harmonised_included)


@celery.task(queue=config.CELERY_QUEUE1, options={"queue": config.CELERY_QUEUE1})
def delete_globus_endpoint(globus_endpoint_id):
print(f">>> [delete_globus_endpoint] for {globus_endpoint_id}")
logger.info(f">>> [delete_globus_endpoint] for {globus_endpoint_id}")
return au.delete_globus_endpoint(globus_endpoint_id)


@task_failure.connect
def task_failure_handler(sender=None, **kwargs) -> None:
print(">>> [task_failure_handler]")
logger.info(">>> [task_failure_handler]")
subject = f"Celery error in {sender.name}"
message = """{einfo} Task was called with args:
{args} kwargs: {kwargs}.\n
Expand Down
2 changes: 2 additions & 0 deletions sumstats_service/resources/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def store_validation_results_in_db(validation_response):


def delete_globus_endpoint(globus_uuid):
logger.info(f">> delete globus endpoint {globus_uuid}")
status = globus.remove_endpoint_and_all_contents(globus_uuid)
logger.info(f">> delete globus endpoint {globus_uuid} :: {status=}")
return status


Expand Down
6 changes: 4 additions & 2 deletions sumstats_service/resources/file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,13 @@ def move_file_to_staging(self):
dest_file = os.path.join(dest_dir, self.staging_file_name + ext)
pathlib.Path(dest_dir).mkdir(parents=True, exist_ok=True)
shutil.move(source_file, dest_file)
except (IndexError, FileNotFoundError, OSError) as e:
raise IOError(
except (IndexError, FileNotFoundError, OSError, ValueError) as e:
logger.error(
"Error: {}\nCould not move file {} to staging,\ "
"callback ID: {}".format(e, self.staging_file_name, self.callback_id)
)
raise

return True

def create_metadata_file(self):
Expand Down
23 changes: 20 additions & 3 deletions sumstats_service/resources/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@
scopes,
)

from sumstats_service import config
from sumstats_service import config, logger_config
import logging

try:
logger_config.setup_logging()
logger = logging.getLogger(__name__)
except Exception as e:
logging.basicConfig(level=logging.DEBUG, format="(%(levelname)s): %(message)s")
logger = logging.getLogger(__name__)
logger.error(f"Logging setup failed: {e}")


def mkdir(unique_id: str, email_address: str = None) -> str:
Expand Down Expand Up @@ -228,7 +237,7 @@ def rename_file(dest_dir, source, dest):
if dest not in files:
transfer.operation_rename(config.MAPPED_COLLECTION_ID, source, dest)
except TransferAPIError as e:
print(e)
logger.info(e)
return False
return True

Expand All @@ -240,7 +249,7 @@ def list_files(directory):
dir_ls = transfer.operation_ls(config.MAPPED_COLLECTION_ID, path=directory)
files = [os.path.join(directory, f["name"]) for f in dir_ls]
except TransferAPIError as e:
print(e)
logger.info(e)
return files


Expand All @@ -261,18 +270,26 @@ def remove_path(path_to_remove, transfer_client=None):


def remove_endpoint_and_all_contents(uid):
logger.info(f">> remove_endpoint_and_all_contents {uid=}")
transfer = init_transfer_client()
deactivate_status = False
endpoint_id = get_endpoint_id_from_uid(uid, transfer_client=transfer)
logger.info(f">> remove_endpoint_and_all_contents {uid=} :: {endpoint_id=}")
if endpoint_id:
logger.info(f">> remove_endpoint_and_all_contents {uid=} :: {endpoint_id=} true")
if remove_path(path_to_remove=uid, transfer_client=transfer):
logger.info(f">> remove_endpoint_and_all_contents {uid=} :: remove_path true")
deactivate_status = deactivate_endpoint(endpoint_id)
logger.info(f">> remove_endpoint_and_all_contents {uid=} :: {deactivate_status=}")

return deactivate_status


def deactivate_endpoint(endpoint_id, gcs_client=None):
logger.info(f">> deactivate_endpoint {endpoint_id=}")
gcs = gcs_client if gcs_client else init_gcs_client()
status = gcs.delete_collection(endpoint_id)
logger.info(f">> deactivate_endpoint {endpoint_id=} :: {status=}")
return status.http_status


Expand Down

0 comments on commit bb5485f

Please sign in to comment.