diff --git a/src/pynteny/hmm.py b/src/pynteny/hmm.py index 7396b83..cb976fb 100644 --- a/src/pynteny/hmm.py +++ b/src/pynteny/hmm.py @@ -9,16 +9,25 @@ import logging import os -import shutil import sys +from typing import Callable from collections import defaultdict from pathlib import Path +import tempfile import pandas as pd from Bio import SearchIO import pynteny.wrappers as wrappers -from pynteny.utils import extract_tar_file, flatten_directory, is_tar_file, list_tar_dir +from pynteny.utils import ( + extract_tar_file, + flatten_directory, + is_tar_file, + list_tar_dir, + download_file, + extract_gz_file, + split_hmms, +) logger = logging.getLogger(__name__) @@ -120,78 +129,56 @@ def get_HMMER_tables( return hmm_hits -class PGAP: - """Tools to parse PGAP hmm database metadata""" +class HMMDatabase: + """Base class for HMM databases""" - def __init__(self, meta_file: Path): - """Initialize class PGAP + def __init__( + self, + database_directory: Path, + metadata_file: Path, + metadata_columns: list[str] = None, + ) -> None: + """Initialize class HMMDatabase Args: - meta_file (Path): path to PGAP's metadata file. + database_directory (Path): path to directory containing HMM database + with individual files for each HMM. + metadata_file (Path): path to metadata file. + metadata_columns (list[str], optional): list of metadata columns to be + used. Defaults to None (all columns). """ - meta_file = Path(meta_file) - meta = pd.read_csv( - str(meta_file), - sep="\t", - usecols=[ - "#ncbi_accession", - "gene_symbol", - "label", - "product_name", - "ec_numbers", - ], + self._data_dir = Path(database_directory) + self._metadata_file = Path(metadata_file) + self._meta = pd.read_csv( + self._metadata_file, sep="\t", usecols=metadata_columns ) - # meta = meta[ - # ["#ncbi_accession", "gene_symbol", "label", "product_name", "ec_numbers"] - # ] - self._meta = meta - self._meta_file = meta_file - @staticmethod - def extract_PGAP_to_directory(pgap_tar: Path, output_dir: Path) -> None: - """Extract PGAP hmm database (tar.gz) to given directory + @property + def meta(self) -> pd.DataFrame: + """Return PFAM metadata - Args: - pgap_tar (Path): path to compressed PGAP database. - output_dir (Path): path to output directory. + Returns: + pd.DataFrame: PFAM metadata """ - pgap_tar = Path(pgap_tar) - if not is_tar_file(pgap_tar): - logger.warning(f"{pgap_tar} is not a tar file. Skipping extraction") - sys.exit(1) - logger.info("Extracting hmm files to target directory") - output_dir = Path(output_dir) - if output_dir.exists(): - shutil.rmtree(output_dir) - extract_tar_file(tar_file=pgap_tar, dest_dir=output_dir) - flatten_directory(output_dir) - - def remove_missing_HMMs_from_metadata( - self, hmm_dir: Path, output_file: Path = None - ) -> None: - """Remove HMMs from metadata that are not in HMM directory + return self._meta - Args: - hmm_dir (Path): path to directory containing PGAP database. - output_file (Path, optional): path to output file. Defaults to None. + @property + def meta_file(self) -> Path: + """Return path to metadata file + + Returns: + Path: metadata file """ - hmm_dir = Path(hmm_dir) - if output_file is None: - output_file = self._meta.parent / f"{self._meta.stem}_missing_hmms.tsv" - else: - output_file = Path(output_file) - if is_tar_file(hmm_dir): - hmm_file_names = [ - Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir) - ] - else: - hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()] - not_found = set() - for i, row in self._meta.iterrows(): - if row["#ncbi_accession"].strip() not in hmm_file_names: - not_found.add(i) - self._meta = self._meta.drop(not_found) - self._meta.to_csv(output_file, sep="\t", index=False) + return self._metadata_file + + @property + def data_dir(self) -> Path: + """Return data directory + + Returns: + Path: data directory + """ + return self._data_dir def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]: """Try to retrieve HMM by its gene symbol, more @@ -204,14 +191,9 @@ def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]: list[str]: list of HMM names matching gene symbol. """ meta = self._meta # .dropna(subset=["gene_symbol", "label"], axis=0) - return meta[ - ( - (meta.gene_symbol == gene_symbol) - | - # (meta.label.str.contains(gene_id)) - (meta.label == gene_symbol) - ) - ]["#ncbi_accession"].values.tolist() + return meta[((meta.gene_symbol == gene_symbol) | (meta.label == gene_symbol))][ + "accession" + ].values.tolist() def get_HMM_group_for_gene_symbol(self, gene_symbol: str) -> str: """Get HMMs corresponding to gene symbol in PGAP metadata. @@ -241,8 +223,8 @@ def get_HMM_gene_ID(self, hmm_name: str) -> list[str]: Returns: list[str]: list of gene symbols matching given HMM. """ - meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0) - return meta[meta["#ncbi_accession"] == hmm_name]["gene_symbol"].values.tolist() + meta = self._meta.dropna(subset=["accession"], axis=0) + return meta[meta["accession"] == hmm_name]["gene_symbol"].values.tolist() def get_meta_info_for_HMM(self, hmm_name: str) -> dict: """Get meta info for given hmm. @@ -253,13 +235,219 @@ def get_meta_info_for_HMM(self, hmm_name: str) -> dict: Returns: dict: metadata of provided HMM. """ - meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0).applymap( + meta = self._meta.dropna(subset=["accession"], axis=0).applymap( lambda x: x if not pd.isna(x) else "" ) metadata = { k: list(v.values())[0] if list(v.values())[0] else "undef" - for k, v in meta[meta["#ncbi_accession"] == hmm_name].to_dict().items() + for k, v in meta[meta["accession"] == hmm_name].to_dict().items() } if not metadata: logger.warning(f"No metadata for HMM: {hmm_name}") return metadata + + +class PGAP(HMMDatabase): + """Tools to parse PGAP hmm database metadata""" + + def __init__(self): + """Initialize class PGAP""" + super().__init__() + self._meta = self._meta.rename(columns={"#ncbi_accession": "accession"}) + self._meta = self.remove_missing_HMMs_from_metadata(meta_outfile=None) + + def remove_missing_HMMs_from_metadata(self, meta_outfile: Path = None) -> None: + """Remove HMMs from metadata that are not in HMM directory + + Args: + hmm_dir (Path): path to directory containing PGAP database. + meta_outfile (Path, optional): path to output file. Defaults to None. + """ + logger.info("Removing missing HMMs from PGAP metadata") + hmm_dir = self._database_dir + if meta_outfile is None: + meta_outfile = ( + self._metadata_file.parent + / f"{self._metadata_file.stem}_preprocessed.tsv" + ) + else: + meta_outfile = Path(meta_outfile) + if is_tar_file(hmm_dir): + hmm_file_names = [ + Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir) + ] + else: + hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()] + not_found = set() + for i, row in self._meta.iterrows(): + if row["accession"].strip() not in hmm_file_names: + not_found.add(i) + self._meta = self._meta.drop(not_found) + self._metadata_file = meta_outfile + self._meta.to_csv(meta_outfile, sep="\t", index=False) + + +class PFAM(HMMDatabase): + """Tools to preprocess the PFAM-A hmm database""" + + def __init__(self): + """Initialize class PFAM""" + super().__init__() + + @classmethod + def from_gz_file( + cls, hmm_gz_file: Path, hmm_outdir: Path = None, meta_outfile: Path = None + ) -> PFAM: + """Initialize PFAM class from hmm file + + Args: + hmm_gz_file (Path): path to hmm gz file. + hmm_outdir (Path): path to output directory to store individual HMM files. + meta_outfile (Path, optional): path to metadata output file. Defaults to None. + """ + hmm_gz_file = Path(hmm_gz_file) + if hmm_outdir is not None: + hmm_outdir = Path(hmm_outdir) + else: + hmm_outdir = hmm_gz_file.parent / "pfam_hmms" + if meta_outfile is not None: + meta_outfile = Path(meta_outfile) + else: + meta_outfile = hmm_outdir / f"{hmm_gz_file.stem}_meta.tsv" + with tempfile.NamedTemporaryFile() as temp: + extract_gz_file(hmm_gz_file, temp.name) + split_hmms(temp.name, hmm_outdir) + cls.construct_meta_file(hmm_outdir, meta_outfile) + return cls(hmm_outdir, meta_outfile) + + def construct_meta_file(self, meta_outfile: Path = None) -> None: + """Construct metadata file from individual HMM files. + + Args: + meta_outfile (Path): path to metadata file. + """ + logger.info("Constructing metadata file for PFAM-A database") + if meta_outfile is None: + meta_outfile = self._database_dir / "PFAM_meta.tsv" + else: + meta_outfile = Path(meta_outfile) + hmm_meta_lines = ["accession\tgene_symbol\tdescription\tlength\n"] + for hmm_file in self._database_dir.glob("*.hmm"): + with open(hmm_file, "r") as f: + hmm_text = f.read() + + acc_code = [ + entry for entry in hmm_text.split("\n") if entry.startswith("ACC") + ] + name = [entry for entry in hmm_text.split("\n") if entry.startswith("NAME")] + description = [ + entry for entry in hmm_text.split("\n") if entry.startswith("DESC") + ] + length = [ + entry for entry in hmm_text.split("\n") if entry.startswith("LENG") + ] + name = name[0].split()[1] if name else "Unspecified" + description = description[0].split()[1] if description else "Unspecified" + length = length[0].split()[1] if length else "Unspecified" + + if acc_code: + acc_name = acc_code[0].split()[1] + hmm_meta_lines.append(f"{acc_name}\t{name}\t{description}\t{length}\n") + with open(meta_outfile, "w+") as f: + f.writelines(hmm_meta_lines) + self._metadata_file = meta_outfile + self._meta = pd.read_csv(meta_outfile, sep="\t") + + +class Downloader: + """Tools to download and preprocess HMM databases""" + + def __init__(self, download_dir: Path): + """Initialize class Downloader + + Args: + output_dir (Path): path to output directory. + """ + self._download_dir = Path(download_dir) + if self._download_dir.exists(): + logger.warning( + f"{self._download_dir} already exists. Downloader may overwrite files." + ) + + def download_pgap(self, unpack: bool = False) -> None: + """Download PGAP database + + Args: + unpack (bool, optional): if True then PGAP database will be extracted + """ + + data_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.HMM.tgz" + meta_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.tsv" + logger.info("Downloading PGAP database") + try: + PGAP_file = self._download_dir / "hmm_PGAP.HMM.tgz" + meta_file = self._download_dir / "hmm_PGAP.tsv" + download_file(data_url, PGAP_file) + download_file(meta_url, meta_file) + except Exception: + logger.exception( + "Failed to download PGAP database. Please check your internet connection." + ) + sys.exit(1) + if unpack: + self.extract_pgap_to_directory(PGAP_file) + logger.info("Database downloaded successfully\n") + + def download_pfam(self, unpack: bool = False) -> None: + """Download PFAM database + + Args: + unpack (bool, optional): if True then PFAM database will be extracted + """ + pfam_file = self.download_dir / "Pfam-A.gz" + # hmm_outdir = self._output_dir.parent / "pfam_hmms" + # meta_outfile = hmm_outdir / f"{pfam_file.stem}_meta.tsv" + logger.info("Downloading PFAM-A hmm database") + try: + url = ( + "https://ftp.ebi.ac.uk/pub/databases/Pfam/current_release/Pfam-A.hmm.gz" + ) + download_file(url, pfam_file) + except Exception: + logger.exception( + "Failed to download PFAM-A database. Please check your internet connection." + ) + sys.exit(1) + if unpack: + self.extract_pfam_to_directory(pfam_file) + logger.info("Database downloaded successfully") + + def extract_pgap_to_directory(self, pgap_tar: Path) -> None: + """Extract PGAP hmm database (tar.gz) to downlaod directory + + Args: + pgap_tar (Path): path to compressed PGAP database. + """ + pgap_tar = Path(pgap_tar) + if not is_tar_file(pgap_tar): + logger.warning(f"{pgap_tar} is not a tar file. Skipping extraction") + return + logger.info("Extracting hmm files to target directory") + extract_tar_file(pgap_tar, self._download_dir) + flatten_directory(self._download_dir) + logger.info("PGAP database unpacked successfully") + + def extract_pfam_to_directory(self, pfam_gz: Path) -> None: + """Extract PFAM hmm database (gz) to downlaod directory + + Args: + pfam_gz (Path): path to compressed PFAM database. + """ + pfam_gz = Path(pfam_gz) + if not pfam_gz.is_file(): + logger.warning(f"{pfam_gz} is not a file. Skipping extraction") + return + logger.info("Extracting hmm files to target directory") + extract_gz_file(pfam_gz, self._download_dir) + flatten_directory(self.download_dir) + logger.info("PGAP database unpacked successfully") diff --git a/src/pynteny/subcommands.py b/src/pynteny/subcommands.py index 38dc576..7d9035d 100644 --- a/src/pynteny/subcommands.py +++ b/src/pynteny/subcommands.py @@ -17,7 +17,7 @@ import pynteny.parsers.syntenyparser as syntenyparser from pynteny.filter import SyntenyHits, filter_FASTA_by_synteny_structure -from pynteny.hmm import PGAP +from pynteny.hmm import PGAP, PFAM, Downloader from pynteny.preprocessing import Database from pynteny.utils import ( CommandArgs, @@ -230,10 +230,10 @@ def download_hmms(args: Union[CommandArgs, ArgumentParser]) -> None: logger = init_logger(args) config = ConfigParser.get_default_config() if (config.get_field("data_downloaded")) and (not args.force): - logger.info("PGAP database already downloaded. Skipping download") + logger.info("HMM databases already downloaded. Skipping download") sys.exit(1) if args.outdir is None: - logger.error("Please, provide directory in which to download PGAP database.") + logger.error("Please, provide directory in which to download HMM databases.") sys.exit(1) else: download_dir = Path(args.outdir).absolute() @@ -241,34 +241,36 @@ def download_hmms(args: Union[CommandArgs, ArgumentParser]) -> None: download_dir.mkdir(parents=True, exist_ok=True) config.update_config("database_dir", download_dir.as_posix()) - config.update_config("upack_PGAP_database", args.unpack) - - data_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.HMM.tgz" - meta_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.tsv" - logger.info("Downloading PGAP database") - try: - PGAP_file = download_dir / "hmm_PGAP.HMM.tgz" - meta_file = download_dir / "hmm_PGAP.tsv" - download_file(data_url, PGAP_file) - download_file(meta_url, meta_file) - logger.info("Database dowloaded successfully\n") - config.update_config("data_downloaded", True) - config.update_config("PGAP_database", PGAP_file.as_posix()) - config.update_config("PGAP_meta_file", meta_file.as_posix()) - except Exception: - logger.exception( - "Failed to download PGAP database. Please check your internet connection." - ) - sys.exit(1) - logger.info("Removing missing entries from PGAP metadata file") - PGAP(meta_file).remove_missing_HMMs_from_metadata(PGAP_file, meta_file) - if args.unpack: - logger.info("Unpacking PGAP database") - unpacked_PGAP_dir = download_dir / "hmm_PGAP" - PGAP.extract_PGAP_to_directory(PGAP_file, output_dir=unpacked_PGAP_dir) - os.remove(PGAP_file) - config.update_config("PGAP_database", unpacked_PGAP_dir.as_posix()) - logger.info("PGAP database unpacked successfully") + config.update_config("unpack_PGAP_database", args.unpack) + + downloader = Downloader(download_dir) + + # data_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.HMM.tgz" + # meta_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.tsv" + # logger.info("Downloading PGAP database") + # try: + # PGAP_file = download_dir / "hmm_PGAP.HMM.tgz" + # meta_file = download_dir / "hmm_PGAP.tsv" + # download_file(data_url, PGAP_file) + # download_file(meta_url, meta_file) + # logger.info("Database dowloaded successfully\n") + # config.update_config("data_downloaded", True) + # config.update_config("PGAP_database", PGAP_file.as_posix()) + # config.update_config("PGAP_meta_file", meta_file.as_posix()) + # except Exception: + # logger.exception( + # "Failed to download PGAP database. Please check your internet connection." + # ) + # sys.exit(1) + # logger.info("Removing missing entries from PGAP metadata file") + # PGAP(meta_file).remove_missing_HMMs_from_metadata(PGAP_file, meta_file) + # if args.unpack: + # logger.info("Unpacking PGAP database") + # unpacked_PGAP_dir = download_dir / "hmm_PGAP" + # PGAP.extract_PGAP_to_directory(PGAP_file, output_dir=unpacked_PGAP_dir) + # os.remove(PGAP_file) + # config.update_config("PGAP_database", unpacked_PGAP_dir.as_posix()) + # logger.info("PGAP database unpacked successfully") logging.shutdown() diff --git a/src/pynteny/utils.py b/src/pynteny/utils.py index ccc15d0..7d4aaf8 100644 --- a/src/pynteny/utils.py +++ b/src/pynteny/utils.py @@ -252,6 +252,26 @@ def extract_tar_file(tar_file: Path, dest_dir: Path = None) -> None: sys.exit(1) +def extract_gz_file(gz_file: Path, dest_dir: Path = None) -> None: + """Extract gz files to dest_dir. + + Args: + gz_file (Path): path to gz file. + dest_dir (Path, optional): path to destination directory + to store the uncompressed file. Defaults to None. + """ + gz_file = Path(gz_file) + if dest_dir is None: + dest_dir = "." + if gz_file.as_posix().endswith("gz"): + gz = tarfile.open(gz_file, "r:gz") + gz.extractall(path=dest_dir) + gz.close() + else: + logger.error("Input is not a gz file") + sys.exit(1) + + def list_tar_dir(tar_dir: Path) -> list: """List files within tar or tar.gz directory. @@ -305,3 +325,24 @@ def is_right_list_nested_type(list_object: list, inner_type: type) -> bool: bool: whether list contains elements of the same specified type. """ return all(isinstance(x, inner_type) for x in list_object) + + +def split_hmms(hmm_file: Path, output_dir: Path) -> None: + """Split PFAM-A hmm database into individual HMM files + and extract metadata file. + + Args: + output_dir (Path): path to output directory. + """ + output_dir = Path(output_dir) + if not output_dir.exists(): + output_dir.mkdir(parents=True) + with open(hmm_file, "r") as f: + hmm_text = f.read() + hmm_texts = hmm_text.split("//") + for text in hmm_texts: + acc_code = [entry for entry in text.split("\n") if entry.startswith("ACC")] + if acc_code: + acc_name = acc_code[0].split()[1] + with open(output_dir / f"{acc_name}.hmm", "w+") as f: + f.write(text)