Skip to content

Commit

Permalink
fix: Silent errors in sumstats pipeline (#341)
Browse files Browse the repository at this point in the history
* fix: Silent errors in sumstats pipeline

* fix: Add more ctx to emails

* refactor: No need for RuntimeError

* fix: Undefined gcst_id
  • Loading branch information
karatugo authored May 30, 2024
1 parent 9f8fd5b commit 4011aa5
Showing 1 changed file with 126 additions and 128 deletions.
254 changes: 126 additions & 128 deletions sumstats_service/resources/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,157 +390,155 @@ def get_file_type_from_mongo(gcst) -> str:

# TODO: refactor this method
def convert_metadata_to_yaml(accession_id: str, is_harmonised_included: bool):
try:
logger.info(f"::: [convert_metadata_to_yaml] {accession_id=} :::")
logger.info(f"Non-HM case for {accession_id=}")

# Consume Ingest API via gwas-sumstats-tools
metadata_from_gwas_cat = metadata_dict_from_gwas_cat(
accession_id=accession_id,
is_bypass_rest_api=True,
# DEV ONLY #######
# 1. Update for Sandbox in gwas-sumstats-tools
# 2. Make a pre-release
#
logger.info(f"::: [convert_metadata_to_yaml] {accession_id=} :::")
logger.info(f"Non-HM case for {accession_id=}")

# Consume Ingest API via gwas-sumstats-tools
metadata_from_gwas_cat = metadata_dict_from_gwas_cat(
accession_id=accession_id,
is_bypass_rest_api=True,
# DEV ONLY #######
# 1. Update for Sandbox in gwas-sumstats-tools
# 2. Make a pre-release
#
)
logger.info(f"For non-hm {accession_id=} - {metadata_from_gwas_cat=}")
metadata_from_gwas_cat["date_metadata_last_modified"] = date.today()
metadata_from_gwas_cat["file_type"] = get_file_type_from_mongo(accession_id)

# Setting default values for keys that may not exist
default_keys = [
"genome_assembly",
"data_file_name",
"file_type",
"data_file_md5sum",
]
for key in default_keys:
logger.info(f"For non-hm {accession_id=} - Setting default value for {key=}.")
metadata_from_gwas_cat.setdefault(key, "")

if not is_harmonised_included:
filenames_to_md5_values = compute_md5_for_local_files(
accession_id,
)
logger.info(f"For non-hm {accession_id=} - {metadata_from_gwas_cat=}")
metadata_from_gwas_cat["date_metadata_last_modified"] = date.today()
metadata_from_gwas_cat["file_type"] = get_file_type_from_mongo(accession_id)

# Setting default values for keys that may not exist
default_keys = [
"genome_assembly",
"data_file_name",
"file_type",
"data_file_md5sum",
]
for key in default_keys:
logger.info(
f"For non-hm {accession_id=} - Setting default value for {key=}."
)
metadata_from_gwas_cat.setdefault(key, "")

if not is_harmonised_included:
filenames_to_md5_values = compute_md5_for_local_files(
accession_id,
)
else:
filenames_to_md5_values = compute_md5_for_ftp_files(
config.FTP_SERVER_EBI,
generate_path(accession_id),
accession_id,
)

filename_to_md5sum = get_md5_for_accession(
filenames_to_md5_values,
else:
filenames_to_md5_values = compute_md5_for_ftp_files(
config.FTP_SERVER_EBI,
generate_path(accession_id),
accession_id,
)
# set if exists, default value set above
for k, v in filename_to_md5sum.items():
metadata_from_gwas_cat["data_file_name"] = k
metadata_from_gwas_cat["data_file_md5sum"] = v

if not metadata_from_gwas_cat.get("data_file_name"):
logger.info("Data file not available in FTP")
return False

metadata_from_gwas_cat["gwas_id"] = accession_id
metadata_from_gwas_cat["gwas_catalog_api"] = (
f"{config.GWAS_CATALOG_REST_API_STUDY_URL}{accession_id}"

filename_to_md5sum = get_md5_for_accession(
filenames_to_md5_values,
accession_id,
)
# set if exists, default value set above
for k, v in filename_to_md5sum.items():
metadata_from_gwas_cat["data_file_name"] = k
metadata_from_gwas_cat["data_file_md5sum"] = v

if not metadata_from_gwas_cat.get("data_file_name"):
logger.info("Data file not available in FTP")
raise FileNotFoundError(
f"""
Data file not available for {accession_id}
at '{generate_path(accession_id)}'
"""
)

# Create out_dir as late as possible to make sure that it's not empty
out_dir = os.path.join(config.STAGING_PATH, accession_id)
logger.info(f"For non-hm {accession_id=} - {out_dir=}")
Path(out_dir).mkdir(parents=True, exist_ok=True)
metadata_from_gwas_cat["gwas_id"] = accession_id
metadata_from_gwas_cat["gwas_catalog_api"] = (
f"{config.GWAS_CATALOG_REST_API_STUDY_URL}{accession_id}"
)

metadata_filename = f"{metadata_from_gwas_cat['data_file_name']}-meta.yaml"
out_file = os.path.join(out_dir, metadata_filename)
logger.info(f"For non-hm {accession_id=} - {out_file=}")
metadata_client = MetadataClient(out_file=out_file)
# Create out_dir as late as possible to make sure that it's not empty
out_dir = os.path.join(config.STAGING_PATH, accession_id)
logger.info(f"For non-hm {accession_id=} - {out_dir=}")
Path(out_dir).mkdir(parents=True, exist_ok=True)

logger.info(f"For non-hm {accession_id=} updated -> {metadata_from_gwas_cat=}")
metadata_client.update_metadata(metadata_from_gwas_cat)
metadata_filename = f"{metadata_from_gwas_cat['data_file_name']}-meta.yaml"
out_file = os.path.join(out_dir, metadata_filename)
logger.info(f"For non-hm {accession_id=} - {out_file=}")
metadata_client = MetadataClient(out_file=out_file)

metadata_client.to_file()
logger.info(f"For non-hm {accession_id=} updated -> {metadata_from_gwas_cat=}")
metadata_client.update_metadata(metadata_from_gwas_cat)

# compute md5sum of the meta file and write to md5sum.txt here
filenames_to_md5_values[metadata_filename] = compute_md5_local(out_file)
logger.info(f"For non-hm {accession_id=} - {filenames_to_md5_values=}")
metadata_client.to_file()

write_md5_for_files(
filenames_to_md5_values, os.path.join(out_dir, "md5sum.txt")
)
# compute md5sum of the meta file and write to md5sum.txt here
filenames_to_md5_values[metadata_filename] = compute_md5_local(out_file)
logger.info(f"For non-hm {accession_id=} - {filenames_to_md5_values=}")

logger.info(
f"Metadata yaml file creation is successful for non-hm {accession_id=}."
)
write_md5_for_files(filenames_to_md5_values, os.path.join(out_dir, "md5sum.txt"))

if not is_harmonised_included:
return True
logger.info(
f"Metadata yaml file creation is successful for non-hm {accession_id=}."
)

logger.info(f"HM CASE for {accession_id=}")
logger.info(
f"For HM {accession_id=} - resetting data_file_name and data_file_md5sum"
)
metadata_from_gwas_cat["data_file_name"] = ""
metadata_from_gwas_cat["data_file_md5sum"] = ""
metadata_from_gwas_cat["harmonisation_reference"] = config.HM_REFERENCE
metadata_from_gwas_cat["coordinate_system"] = config.HM_COORDINATE_SYSTEM
metadata_from_gwas_cat["genome_assembly"] = config.LATEST_ASSEMBLY
metadata_from_gwas_cat["is_harmonised"] = True
metadata_from_gwas_cat["is_sorted"] = get_is_sorted(
config.FTP_SERVER_EBI,
f"{generate_path(accession_id)}/harmonised",
)
if not is_harmonised_included:
return True

filenames_to_md5_values = compute_md5_for_ftp_files(
config.FTP_SERVER_EBI,
f"{generate_path(accession_id)}/harmonised",
accession_id,
)
filename_to_md5sum_hm = get_md5_for_accession(
filenames_to_md5_values,
accession_id,
True,
)
for k, v in filename_to_md5sum_hm.items():
metadata_from_gwas_cat["data_file_name"] = k
metadata_from_gwas_cat["data_file_md5sum"] = v
logger.info(f"HM CASE for {accession_id=}")
logger.info(
f"For HM {accession_id=} - resetting data_file_name and data_file_md5sum"
)
metadata_from_gwas_cat["data_file_name"] = ""
metadata_from_gwas_cat["data_file_md5sum"] = ""
metadata_from_gwas_cat["harmonisation_reference"] = config.HM_REFERENCE
metadata_from_gwas_cat["coordinate_system"] = config.HM_COORDINATE_SYSTEM
metadata_from_gwas_cat["genome_assembly"] = config.LATEST_ASSEMBLY
metadata_from_gwas_cat["is_harmonised"] = True
metadata_from_gwas_cat["is_sorted"] = get_is_sorted(
config.FTP_SERVER_EBI,
f"{generate_path(accession_id)}/harmonised",
)

if not metadata_from_gwas_cat.get("data_file_name"):
logger.info("HM data file not available in FTP")
# It's okay to return True even though we haven't got the file
# because not every study is harmonised
return True
filenames_to_md5_values = compute_md5_for_ftp_files(
config.FTP_SERVER_EBI,
f"{generate_path(accession_id)}/harmonised",
accession_id,
)
filename_to_md5sum_hm = get_md5_for_accession(
filenames_to_md5_values,
accession_id,
True,
)
for k, v in filename_to_md5sum_hm.items():
metadata_from_gwas_cat["data_file_name"] = k
metadata_from_gwas_cat["data_file_md5sum"] = v

hm_dir = os.path.join(out_dir, "harmonised")
Path(hm_dir).mkdir(parents=True, exist_ok=True)
if not metadata_from_gwas_cat.get("data_file_name"):
logger.info(
f"""
HM data file not available for {accession_id}
at '{generate_path(accession_id)}/harmonised'
"""
)
# It's okay to return True even though we haven't got the file
# because not every study is harmonised
return True

metadata_filename_hm = f"{metadata_from_gwas_cat['data_file_name']}-meta.yaml"
out_file_hm = os.path.join(hm_dir, metadata_filename_hm)
logger.info(f"For HM {accession_id=} - {out_file_hm=}")
hm_dir = os.path.join(out_dir, "harmonised")
Path(hm_dir).mkdir(parents=True, exist_ok=True)

# Also generate client for hm case, i.e., if is_harmonised_included
metadata_client_hm = MetadataClient(out_file=out_file_hm)
metadata_filename_hm = f"{metadata_from_gwas_cat['data_file_name']}-meta.yaml"
out_file_hm = os.path.join(hm_dir, metadata_filename_hm)
logger.info(f"For HM {accession_id=} - {out_file_hm=}")

logger.info(f"For HM {accession_id=} updated -> {metadata_from_gwas_cat=}")
metadata_client_hm.update_metadata(metadata_from_gwas_cat)
# Also generate client for hm case, i.e., if is_harmonised_included
metadata_client_hm = MetadataClient(out_file=out_file_hm)

metadata_client_hm.to_file()
logger.info(f"For HM {accession_id=} updated -> {metadata_from_gwas_cat=}")
metadata_client_hm.update_metadata(metadata_from_gwas_cat)

filenames_to_md5_values[metadata_filename_hm] = compute_md5_local(out_file_hm)
logger.info(f"For HM {accession_id=} - {filenames_to_md5_values=}")
metadata_client_hm.to_file()

write_md5_for_files(filenames_to_md5_values, os.path.join(hm_dir, "md5sum.txt"))
logger.info(
f"Metadata yaml file creation is successful for HM {accession_id=}."
)
filenames_to_md5_values[metadata_filename_hm] = compute_md5_local(out_file_hm)
logger.info(f"For HM {accession_id=} - {filenames_to_md5_values=}")

except Exception as e:
logger.error(f"For {accession_id=} - error while creating metadata yaml files:")
logger.error(e)
return False
write_md5_for_files(filenames_to_md5_values, os.path.join(hm_dir, "md5sum.txt"))
logger.info(f"Metadata yaml file creation is successful for HM {accession_id=}.")

logger.info(f"::: ENDOF [convert_metadata_to_yaml] for {accession_id=}:::")

Expand Down

0 comments on commit 4011aa5

Please sign in to comment.