diff --git a/src/pynteny/cli.py b/src/pynteny/cli.py index 37a3612..75648d1 100644 --- a/src/pynteny/cli.py +++ b/src/pynteny/cli.py @@ -517,6 +517,22 @@ def download() -> argparse.ArgumentParser: action="store_true", help="force-download database again if already downloaded", ) + optional.add_argument( + "-pgap", + "--pgap", + dest="pgap", + default=False, + action="store_true", + help="download PGAP database (default)", + ) + optional.add_argument( + "-pfam", + "--pfam", + dest="pfam", + default=False, + action="store_true", + help="download PFAM database", + ) optional.add_argument( "-l", "--log", diff --git a/src/pynteny/config.json b/src/pynteny/config.json index 2b2f043..90345dd 100644 --- a/src/pynteny/config.json +++ b/src/pynteny/config.json @@ -1,9 +1,11 @@ { "database_dir": "", "upack_PGAP_database": false, - "data_downloaded": false, + "upack_PFAM_database": false, + "PGAP_data_downloaded": false, + "PFAM_data_downloaded": false, "PGAP_database": "", "PGAP_meta_file": "", - "streamlit_process": "", - "streamlit_log": "" + "PFAM_database": "", + "PFAM_meta_file": "" } \ No newline at end of file diff --git a/src/pynteny/filter.py b/src/pynteny/filter.py index 01c4fb3..9b36697 100644 --- a/src/pynteny/filter.py +++ b/src/pynteny/filter.py @@ -16,7 +16,7 @@ import pynteny.parsers.labelparser as labelparser import pynteny.parsers.syntenyparser as syntenyparser -from pynteny.hmm import HMMER, PGAP +from pynteny.hmm import HMMER, PFAM, PGAP, HMMDatabase from pynteny.preprocessing import FASTA logger = logging.getLogger(__name__) @@ -416,14 +416,14 @@ def add_HMM_meta_info_to_hits(self, hmm_meta: Path) -> SyntenyHits: fields = ["gene_symbol", "label", "product", "ec_number"] if all([f in self._synteny_hits.columns for f in fields]): return self._synteny_hits - pgap = PGAP(hmm_meta) + hmm_db = HMMDatabase(hmm_meta) self._synteny_hits[fields] = "" for row in self._synteny_hits.itertuples(): meta_values = [ [ str(v).replace("nan", "") - for k, v in pgap.get_meta_info_for_HMM(hmm).items() - if k != "#ncbi_accession" + for k, v in hmm_db.get_meta_info_for_HMM(hmm).items() + if k != "accession" ] for hmm in row.hmm.split("|") ] diff --git a/src/pynteny/hmm.py b/src/pynteny/hmm.py index 7396b83..00cefe5 100644 --- a/src/pynteny/hmm.py +++ b/src/pynteny/hmm.py @@ -9,8 +9,8 @@ import logging import os -import shutil import sys +import tempfile from collections import defaultdict from pathlib import Path @@ -18,7 +18,14 @@ from Bio import SearchIO import pynteny.wrappers as wrappers -from pynteny.utils import extract_tar_file, flatten_directory, is_tar_file, list_tar_dir +from pynteny.utils import ( + download_file, + extract_gz_file, + extract_to_directory, + is_tar_file, + list_tar_dir, + split_hmms, +) logger = logging.getLogger(__name__) @@ -120,78 +127,45 @@ def get_HMMER_tables( return hmm_hits -class PGAP: - """Tools to parse PGAP hmm database metadata""" +class HMMDatabase: + """Base class for HMM databases""" - def __init__(self, meta_file: Path): - """Initialize class PGAP + def __init__( + self, + metadata_file: Path, + metadata_columns: list[str] = None, + ) -> None: + """Initialize class HMMDatabase Args: - meta_file (Path): path to PGAP's metadata file. + metadata_file (Path): path to metadata file. + metadata_columns (list[str], optional): list of metadata columns to be + used. Defaults to None (all columns). """ - meta_file = Path(meta_file) - meta = pd.read_csv( - str(meta_file), - sep="\t", - usecols=[ - "#ncbi_accession", - "gene_symbol", - "label", - "product_name", - "ec_numbers", - ], + self._metadata_file = Path(metadata_file) + self._meta = pd.read_csv( + self._metadata_file, sep="\t", usecols=metadata_columns ) - # meta = meta[ - # ["#ncbi_accession", "gene_symbol", "label", "product_name", "ec_numbers"] - # ] - self._meta = meta - self._meta_file = meta_file + if "#ncbi_accession" in self._meta.columns: + self._meta = self._meta.rename(columns={"#ncbi_accession": "accession"}) - @staticmethod - def extract_PGAP_to_directory(pgap_tar: Path, output_dir: Path) -> None: - """Extract PGAP hmm database (tar.gz) to given directory + @property + def meta(self) -> pd.DataFrame: + """Return PFAM metadata - Args: - pgap_tar (Path): path to compressed PGAP database. - output_dir (Path): path to output directory. + Returns: + pd.DataFrame: PFAM metadata """ - pgap_tar = Path(pgap_tar) - if not is_tar_file(pgap_tar): - logger.warning(f"{pgap_tar} is not a tar file. Skipping extraction") - sys.exit(1) - logger.info("Extracting hmm files to target directory") - output_dir = Path(output_dir) - if output_dir.exists(): - shutil.rmtree(output_dir) - extract_tar_file(tar_file=pgap_tar, dest_dir=output_dir) - flatten_directory(output_dir) + return self._meta - def remove_missing_HMMs_from_metadata( - self, hmm_dir: Path, output_file: Path = None - ) -> None: - """Remove HMMs from metadata that are not in HMM directory + @property + def meta_file(self) -> Path: + """Return path to metadata file - Args: - hmm_dir (Path): path to directory containing PGAP database. - output_file (Path, optional): path to output file. Defaults to None. + Returns: + Path: metadata file """ - hmm_dir = Path(hmm_dir) - if output_file is None: - output_file = self._meta.parent / f"{self._meta.stem}_missing_hmms.tsv" - else: - output_file = Path(output_file) - if is_tar_file(hmm_dir): - hmm_file_names = [ - Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir) - ] - else: - hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()] - not_found = set() - for i, row in self._meta.iterrows(): - if row["#ncbi_accession"].strip() not in hmm_file_names: - not_found.add(i) - self._meta = self._meta.drop(not_found) - self._meta.to_csv(output_file, sep="\t", index=False) + return self._metadata_file def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]: """Try to retrieve HMM by its gene symbol, more @@ -204,14 +178,9 @@ def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]: list[str]: list of HMM names matching gene symbol. """ meta = self._meta # .dropna(subset=["gene_symbol", "label"], axis=0) - return meta[ - ( - (meta.gene_symbol == gene_symbol) - | - # (meta.label.str.contains(gene_id)) - (meta.label == gene_symbol) - ) - ]["#ncbi_accession"].values.tolist() + return meta[((meta.gene_symbol == gene_symbol) | (meta.label == gene_symbol))][ + "accession" + ].values.tolist() def get_HMM_group_for_gene_symbol(self, gene_symbol: str) -> str: """Get HMMs corresponding to gene symbol in PGAP metadata. @@ -241,8 +210,8 @@ def get_HMM_gene_ID(self, hmm_name: str) -> list[str]: Returns: list[str]: list of gene symbols matching given HMM. """ - meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0) - return meta[meta["#ncbi_accession"] == hmm_name]["gene_symbol"].values.tolist() + meta = self._meta.dropna(subset=["accession"], axis=0) + return meta[meta["accession"] == hmm_name]["gene_symbol"].values.tolist() def get_meta_info_for_HMM(self, hmm_name: str) -> dict: """Get meta info for given hmm. @@ -253,13 +222,173 @@ def get_meta_info_for_HMM(self, hmm_name: str) -> dict: Returns: dict: metadata of provided HMM. """ - meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0).applymap( + meta = self._meta.dropna(subset=["accession"], axis=0).applymap( lambda x: x if not pd.isna(x) else "" ) metadata = { k: list(v.values())[0] if list(v.values())[0] else "undef" - for k, v in meta[meta["#ncbi_accession"] == hmm_name].to_dict().items() + for k, v in meta[meta["accession"] == hmm_name].to_dict().items() } if not metadata: logger.warning(f"No metadata for HMM: {hmm_name}") return metadata + + +class PGAP(HMMDatabase): + """Tools to parse PGAP hmm database metadata""" + + def __init__(self, *args, **kwargs): + """Initialize class PGAP""" + super().__init__(*args, **kwargs) + + def remove_missing_HMMs_from_metadata( + self, hmm_database: Path, meta_outfile: Path = None + ) -> None: + """Remove HMMs from metadata that are not in HMM directory + + Args: + hmm_database (Path): path to HMM database. + hmm_dir (Path): path to directory containing PGAP database. + meta_outfile (Path, optional): path to output file. Defaults to None. + """ + logger.info("Removing missing HMMs from PGAP metadata") + hmm_dir = hmm_database + if meta_outfile is None: + meta_outfile = ( + self._metadata_file.parent + / f"{self._metadata_file.stem}_preprocessed.tsv" + ) + else: + meta_outfile = Path(meta_outfile) + if is_tar_file(hmm_dir): + hmm_file_names = [ + Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir) + ] + else: + hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()] + not_found = set() + for i, row in self._meta.iterrows(): + if row["accession"].strip() not in hmm_file_names: + not_found.add(i) + self._meta = self._meta.drop(not_found) + self._metadata_file = meta_outfile + self._meta.to_csv(meta_outfile, sep="\t", index=False) + + +class PFAM(HMMDatabase): + """Tools to preprocess the PFAM-A hmm database""" + + @classmethod + def from_gz_file( + cls, hmm_gz_file: Path, hmm_outdir: Path = None, meta_outfile: Path = None + ) -> PFAM: + """Initialize PFAM class from hmm file + + Args: + hmm_gz_file (Path): path to hmm gz file. + hmm_outdir (Path): path to output directory to store individual HMM files. + meta_outfile (Path, optional): path to metadata output file. Defaults to None. + """ + hmm_gz_file = Path(hmm_gz_file) + if hmm_outdir is not None: + hmm_outdir = Path(hmm_outdir) + else: + hmm_outdir = hmm_gz_file.parent / "pfam_hmms" + if meta_outfile is not None: + meta_outfile = Path(meta_outfile) + else: + meta_outfile = hmm_outdir / f"{hmm_gz_file.stem}_meta.tsv" + with tempfile.NamedTemporaryFile() as temp: + extract_gz_file(hmm_gz_file, temp.name) + split_hmms(temp.name, hmm_outdir) + cls.construct_meta_file(hmm_outdir, meta_outfile) + return cls(meta_outfile) + + def construct_meta_file( + self, hmm_database: Path, meta_outfile: Path = None + ) -> None: + """Construct metadata file from individual HMM files. + + Args: + hmm_database (Path): path to HMM database. + meta_outfile (Path): path to metadata file. + """ + logger.info("Constructing metadata file for PFAM-A database") + if meta_outfile is None: + meta_outfile = hmm_database / "PFAM_meta.tsv" + else: + meta_outfile = Path(meta_outfile) + hmm_meta_lines = ["accession\tgene_symbol\tdescription\tlength\n"] + for hmm_file in hmm_database.glob("*.hmm"): + with open(hmm_file, "r") as f: + hmm_text = f.read() + + acc_code = [ + entry for entry in hmm_text.split("\n") if entry.startswith("ACC") + ] + name = [entry for entry in hmm_text.split("\n") if entry.startswith("NAME")] + description = [ + entry for entry in hmm_text.split("\n") if entry.startswith("DESC") + ] + length = [ + entry for entry in hmm_text.split("\n") if entry.startswith("LENG") + ] + name = name[0].split()[1] if name else "Unspecified" + description = description[0].split()[1] if description else "Unspecified" + length = length[0].split()[1] if length else "Unspecified" + + if acc_code: + acc_name = acc_code[0].split()[1] + hmm_meta_lines.append(f"{acc_name}\t{name}\t{description}\t{length}\n") + with open(meta_outfile, "w+") as f: + f.writelines(hmm_meta_lines) + self._metadata_file = meta_outfile + self._meta = pd.read_csv(meta_outfile, sep="\t") + + +def download_pgap(download_dir: Path, unpack: bool = False) -> tuple[Path, Path]: + """Download PGAP database + + Args: + download_dir (Path): path to output directory. + unpack (bool, optional): if True then PGAP database will be extracted + """ + if download_dir.exists(): + logger.warning( + f"{download_dir} already exists. Downloader may overwrite files." + ) + + data_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.HMM.tgz" + meta_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.tsv" + PGAP_file = download_dir / "hmm_PGAP.HMM.tgz" + meta_file = download_dir / "hmm_PGAP.tsv" + download_file(data_url, PGAP_file) + download_file(meta_url, meta_file) + if unpack: + destination_path = download_dir / "pgap_hmms" + extract_to_directory(PGAP_file, destination_dir=destination_path) + return destination_path, meta_file + else: + return PGAP_file, meta_file + + +def download_pfam(download_dir: Path, unpack: bool = False) -> Path: + """Download PFAM database + + Args: + unpack (bool, optional): if True then PFAM database will be extracted + """ + if download_dir.exists(): + logger.warning( + f"{download_dir} already exists. Downloader may overwrite files." + ) + PFAM_file = download_dir / "Pfam-A.gz" + logger.info("Downloading PFAM-A hmm database") + url = "https://ftp.ebi.ac.uk/pub/databases/Pfam/current_release/Pfam-A.hmm.gz" + download_file(url, PFAM_file) + if unpack: + destination_path = download_dir / "pfam_hmms" + extract_to_directory(PFAM_file, destination_dir=destination_path) + return destination_path + else: + return PFAM_file diff --git a/src/pynteny/parsers/syntenyparser.py b/src/pynteny/parsers/syntenyparser.py index 668aec7..cb3da7b 100644 --- a/src/pynteny/parsers/syntenyparser.py +++ b/src/pynteny/parsers/syntenyparser.py @@ -181,7 +181,7 @@ def parse_genes_in_synteny_structure( tuple[str,dict]: parsed synteny structure where gene symbols are replaced by HMM names. """ - pgap = PGAP(Path(hmm_meta)) + pgap = PGAP(hmm_meta) gene_symbols = get_gene_symbols_in_structure(synteny_structure) strand_locs = get_strands_in_structure(synteny_structure, parsed_symbol=False) gene_dists = get_maximum_distances_in_structure(synteny_structure) diff --git a/src/pynteny/subcommands.py b/src/pynteny/subcommands.py index 38dc576..ee58461 100644 --- a/src/pynteny/subcommands.py +++ b/src/pynteny/subcommands.py @@ -17,12 +17,18 @@ import pynteny.parsers.syntenyparser as syntenyparser from pynteny.filter import SyntenyHits, filter_FASTA_by_synteny_structure -from pynteny.hmm import PGAP +from pynteny.hmm import ( + PFAM, + PGAP, + download_pfam, + download_pgap, +) from pynteny.preprocessing import Database from pynteny.utils import ( CommandArgs, ConfigParser, - download_file, + extract_to_directory, + is_gz_file, is_tar_file, ) @@ -74,7 +80,10 @@ def synteny_search(args: Union[CommandArgs, ArgumentParser]) -> SyntenyHits: ) sys.exit(1) if args.hmm_dir is None: - if not config.get_field("data_downloaded"): + if not ( + config.get_field("PGAP_data_downloaded") + or config.get_field("PFAM_data_downloaded") + ): logger.warning( "HMM database not found. Downloading PGAP database from NCBI" ) @@ -84,7 +93,10 @@ def synteny_search(args: Union[CommandArgs, ArgumentParser]) -> SyntenyHits: args.hmm_dir = Path(config.get_field("PGAP_database")) if args.gene_ids: if args.hmm_meta is None: - if not config.get_field("data_downloaded"): + if not ( + config.get_field("PGAP_data_downloaded") + or config.get_field("PFAM_data_downloaded") + ): logger.error( "Please download hmm database first or provide path to hmm metadata file." ) @@ -104,8 +116,8 @@ def synteny_search(args: Union[CommandArgs, ArgumentParser]) -> SyntenyHits: ) temp_hmm_dir = Path(args.hmm_dir.parent) / "temp_hmm_dir" - if is_tar_file(args.hmm_dir): - PGAP.extract_PGAP_to_directory(args.hmm_dir, temp_hmm_dir) + if is_tar_file(args.hmm_dir) or is_gz_file(args.hmm_dir): + extract_to_directory(args.hmm_dir, temp_hmm_dir) hmm_dir = temp_hmm_dir else: hmm_dir = args.hmm_dir @@ -201,7 +213,10 @@ def parse_gene_ids(args: Union[CommandArgs, ArgumentParser]) -> str: logger = init_logger(args) config = ConfigParser.get_default_config() if args.hmm_meta is None: - if not config.get_field("data_downloaded"): + if not ( + config.get_field("PGAP_data_downloaded") + or config.get_field("PFAM_data_downloaded") + ): logger.error( "Please download hmm database meta file or provide path to existing one first." ) @@ -229,11 +244,23 @@ def download_hmms(args: Union[CommandArgs, ArgumentParser]) -> None: """ logger = init_logger(args) config = ConfigParser.get_default_config() - if (config.get_field("data_downloaded")) and (not args.force): - logger.info("PGAP database already downloaded. Skipping download") + if (config.get_field("PGAP_data_downloaded")) and (args.pgap) and (not args.force): + logger.info("PGAP HMM database already downloaded. Skipping download") + elif ( + (config.get_field("PFAM_data_downloaded")) and (args.pfam) and (not args.force) + ): + logger.info("PFAM HMM database already downloaded. Skipping download") + elif ( + (config.get_field("PGAP_data_downloaded")) + and (args.pgap) + and (config.get_field("PFAM_data_downloaded")) + and (args.pfam) + and (not args.force) + ): + logger.info("HMM databases already downloaded. Skipping download") sys.exit(1) if args.outdir is None: - logger.error("Please, provide directory in which to download PGAP database.") + logger.error("Please, provide directory in which to download HMM databases.") sys.exit(1) else: download_dir = Path(args.outdir).absolute() @@ -241,34 +268,46 @@ def download_hmms(args: Union[CommandArgs, ArgumentParser]) -> None: download_dir.mkdir(parents=True, exist_ok=True) config.update_config("database_dir", download_dir.as_posix()) - config.update_config("upack_PGAP_database", args.unpack) - - data_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.HMM.tgz" - meta_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.tsv" - logger.info("Downloading PGAP database") - try: - PGAP_file = download_dir / "hmm_PGAP.HMM.tgz" - meta_file = download_dir / "hmm_PGAP.tsv" - download_file(data_url, PGAP_file) - download_file(meta_url, meta_file) - logger.info("Database dowloaded successfully\n") - config.update_config("data_downloaded", True) - config.update_config("PGAP_database", PGAP_file.as_posix()) - config.update_config("PGAP_meta_file", meta_file.as_posix()) - except Exception: - logger.exception( - "Failed to download PGAP database. Please check your internet connection." - ) - sys.exit(1) - logger.info("Removing missing entries from PGAP metadata file") - PGAP(meta_file).remove_missing_HMMs_from_metadata(PGAP_file, meta_file) - if args.unpack: - logger.info("Unpacking PGAP database") - unpacked_PGAP_dir = download_dir / "hmm_PGAP" - PGAP.extract_PGAP_to_directory(PGAP_file, output_dir=unpacked_PGAP_dir) - os.remove(PGAP_file) - config.update_config("PGAP_database", unpacked_PGAP_dir.as_posix()) - logger.info("PGAP database unpacked successfully") + + if args.pgap: + logger.info("Downloading PGAP database") + try: + PGAP_path, PGAP_meta_file = download_pgap(download_dir, unpack=args.unpack) + PGAP(PGAP_meta_file).remove_missing_HMMs_from_metadata( + PGAP_path, PGAP_meta_file + ) + config.update_config("unpack_PGAP_database", args.unpack) + logger.info("PGAP database downloaded successfully\n") + config.update_config("PGAP_data_downloaded", True) + config.update_config("PGAP_database", PGAP_path.as_posix()) + config.update_config("PGAP_meta_file", PGAP_meta_file.as_posix()) + except Exception: + logger.exception( + "Failed to download PGAP database. Please check your internet connection." + ) + sys.exit(1) + + if args.pfam: + logger.info("Downloading PFAM-A database") + try: + PFAM_meta_file = download_dir / "hmm_PFAM.tsv" + PFAM_path = download_dir / "PFAM_hmms" + PFAM_gz_file = download_pfam(download_dir, unpack=True) + PFAM.from_gz_file( + PFAM_gz_file, + hmm_outdir=PFAM_path, + meta_outfile=PFAM_meta_file, + ) + config.update_config("unpack_PFAM_database", True) + logger.info("PFAM database downloaded successfully\n") + config.update_config("PFAM_data_downloaded", True) + config.update_config("PFAM_database", PFAM_path.as_posix()) + config.update_config("PFAM_meta_file", PFAM_meta_file.as_posix()) + except Exception: + logger.exception( + "Failed to download PFAM-A database. Please check your internet connection." + ) + sys.exit(1) logging.shutdown() diff --git a/src/pynteny/utils.py b/src/pynteny/utils.py index ccc15d0..cbd2f27 100644 --- a/src/pynteny/utils.py +++ b/src/pynteny/utils.py @@ -55,10 +55,13 @@ def initialize_config_file() -> Path: config = { "database_dir": "", "upack_PGAP_database": False, - "data_downloaded": False, + "upack_PFAM_database": False, + "PGAP_data_downloaded": False, + "PFAM_data_downloaded": False, "PGAP_database": "", "PGAP_meta_file": "", - "streamlit_process": "", + "PFAM_database": "", + "PFAM_meta_file": "", } with open(config_file, "w", encoding="UTF-8") as f: json.dump(config, f, indent=4) @@ -226,6 +229,19 @@ def is_tar_file(tar_file: Path) -> bool: return tar_file.is_file() and tarfile.is_tarfile(tar_file.as_posix()) +def is_gz_file(gz_file: Path) -> bool: + """Check whether file is gz-compressed. + + Args: + gz_file (Path): path to file. + + Returns: + bool: whether file is compressed or not. + """ + gz_file = Path(gz_file) + return gz_file.is_file() and gz_file.as_posix().endswith("gz") + + def extract_tar_file(tar_file: Path, dest_dir: Path = None) -> None: """Extract tar or tar.gz files to dest_dir. @@ -252,6 +268,47 @@ def extract_tar_file(tar_file: Path, dest_dir: Path = None) -> None: sys.exit(1) +def extract_gz_file(gz_file: Path, dest_dir: Path = None) -> None: + """Extract gz files to dest_dir. + + Args: + gz_file (Path): path to gz file. + dest_dir (Path, optional): path to destination directory + to store the uncompressed file. Defaults to None. + """ + gz_file = Path(gz_file) + if dest_dir is None: + dest_dir = "." + if gz_file.as_posix().endswith("gz"): + gz = tarfile.open(gz_file, "r:gz") + gz.extractall(path=dest_dir) + gz.close() + else: + logger.error("Input is not a gz file") + sys.exit(1) + + +def extract_to_directory(file: Path, destination_dir: Path) -> None: + """Extract hmm database (tar.gz) to downlaod directory + + Args: + hmm_file (Path): path to compressed database. + """ + file = Path(file) + if (not is_tar_file(file)) and (not is_gz_file(file)): + logger.warning(f"{file} is not a tar or gz file. Skipping extraction") + return + else: + logger.info("Extracting files to target directory") + if is_tar_file(file): + extract_tar_file(file, destination_dir) + elif is_gz_file(file): + extract_gz_file(file, destination_dir) + flatten_directory(destination_dir) + os.remove(file) + logger.info("Database unpacked successfully") + + def list_tar_dir(tar_dir: Path) -> list: """List files within tar or tar.gz directory. @@ -305,3 +362,24 @@ def is_right_list_nested_type(list_object: list, inner_type: type) -> bool: bool: whether list contains elements of the same specified type. """ return all(isinstance(x, inner_type) for x in list_object) + + +def split_hmms(hmm_file: Path, output_dir: Path) -> None: + """Split PFAM-A hmm database into individual HMM files + and extract metadata file. + + Args: + output_dir (Path): path to output directory. + """ + output_dir = Path(output_dir) + if not output_dir.exists(): + output_dir.mkdir(parents=True) + with open(hmm_file, "r") as f: + hmm_text = f.read() + hmm_texts = hmm_text.split("//") + for text in hmm_texts: + acc_code = [entry for entry in text.split("\n") if entry.startswith("ACC")] + if acc_code: + acc_name = acc_code[0].split()[1] + with open(output_dir / f"{acc_name}.hmm", "w+") as f: + f.write(text) diff --git a/tests/test_hmm.py b/tests/test_hmm.py index e24e1bb..a2766ac 100644 --- a/tests/test_hmm.py +++ b/tests/test_hmm.py @@ -18,7 +18,9 @@ class TestPGAP(unittest.TestCase): def test_remove_missing_HMMs_from_metadata(self): with tempfile.NamedTemporaryFile() as tmp: - pgap = PGAP(this_file_dir / "test_data/hmm_meta.tsv") + pgap = PGAP( + this_file_dir / "test_data/hmm_meta.tsv", + ) pgap.remove_missing_HMMs_from_metadata( this_file_dir / "test_data/hmms", tmp.name )