diff --git a/phykit/helpers/files.py b/phykit/helpers/files.py index e872b75..8861577 100644 --- a/phykit/helpers/files.py +++ b/phykit/helpers/files.py @@ -1,5 +1,6 @@ from enum import Enum import sys +from typing import Tuple from Bio import AlignIO from Bio.Align import MultipleSeqAlignment @@ -16,11 +17,16 @@ class FileFormat(Enum): stockholm = "stockholm" -def get_alignment_and_format(alignment_file_path: str): - # if file format is provided, read the file according to the user's file format +def get_alignment_and_format( + alignment_file_path: str +) -> Tuple[MultipleSeqAlignment, str, bool]: + # if file format is provided, read the file + # according to the user's file format for fileFormat in FileFormat: try: - alignment = AlignIO.read(open(alignment_file_path), fileFormat.value) + alignment = AlignIO.read( + open(alignment_file_path), fileFormat.value + ) return alignment, fileFormat.value, is_protein_alignment(alignment) # the following exceptions refer to skipping over errors # associated with reading the wrong input file diff --git a/phykit/services/alignment/base.py b/phykit/services/alignment/base.py index 5338bf2..15adc98 100644 --- a/phykit/services/alignment/base.py +++ b/phykit/services/alignment/base.py @@ -1,3 +1,4 @@ +from collections import Counter import sys from typing import List @@ -51,49 +52,34 @@ def get_alignment_and_format(self): print("Please double check pathing and filenames") sys.exit() - def calculate_rcv(self): - alignment, _ = self.get_alignment_and_format() + def calculate_rcv(self) -> float: + alignment, _, _ = self.get_alignment_and_format() aln_len = alignment.get_alignment_length() - # string to hold all sequences - concat_seq = "" - # initialize a counter for the number of sequences in the input fasta file - num_records = 0 + concat_seq = [] + num_records = len(alignment) - # for each record join concatSeq string and sequence as well as keeping track - # of the number of records - for record in alignment: - concat_seq += record.seq - num_records += 1 + concat_seq = "".join(str(record.seq) for record in alignment) + + total_counts = Counter(concat_seq) - # dictionary to hold the average occurence of each sequence letter - average_d = {} - # loop through the different sequences that appear in the fasta file - # population dictionary with how many times that sequence appears - for seq in set(concat_seq): - average_d[seq] = concat_seq.count(seq) / num_records + average_d = { + seq: total_counts[seq] / num_records for seq in total_counts + } - # intiailize list to hold the RCV values per ith taxa - # that will later be summed indiv_rcv_values = [] - # loop through records again and calculate RCV for - # each taxa and append to indivRCVvalues for record in alignment: - # temp holds a temporary value of the numerator before appending - # to numeratorRCVvalues and then is reassigned to 0 when it goes - # through the loop again - temp = 0 - # calculates the absolute value of the ith sequence letter minus the average - for seq_letter in set(concat_seq): - temp += abs( - record.seq.count(seq_letter) - average_d[seq_letter] - ) - indiv_rcv_values.append(temp / (num_records * aln_len)) + record_counts = Counter(record.seq) + temp_rcv = sum( + abs( + record_counts[seq_letter] - average_d[seq_letter] + ) for seq_letter in total_counts + ) + indiv_rcv_values.append(temp_rcv / (num_records * aln_len)) relative_composition_variability = sum(indiv_rcv_values) - # print the sum of all RCV values return relative_composition_variability def get_gap_chars(is_protein: bool) -> List[str]: diff --git a/phykit/services/alignment/rcvt.py b/phykit/services/alignment/rcvt.py index 87517eb..841d174 100644 --- a/phykit/services/alignment/rcvt.py +++ b/phykit/services/alignment/rcvt.py @@ -1,3 +1,5 @@ +from collections import Counter + from .base import Alignment @@ -6,50 +8,27 @@ def __init__(self, args) -> None: super().__init__(**self.process_args(args)) def run(self): - alignment, _ = self.get_alignment_and_format() + alignment, _, _ = self.get_alignment_and_format() aln_len = alignment.get_alignment_length() + num_records = len(alignment) - # string to hold all sequences - concat_seq = "" - # initialize a counter for the number of sequences in the input fasta file - num_records = 0 + concat_seq = "".join(str(record.seq) for record in alignment) + total_counts = Counter(concat_seq) + + average_d = { + char: total_counts[char] / num_records for char in total_counts + } - # for each record join concatSeq string and sequence as well as keeping track - # of the number of records - for record in alignment: - concat_seq += record.seq - num_records += 1 - - # dictionary to hold the average occurence of each sequence letter - average_d = {} - # loop through the different sequences that appear in the fasta file - # population dictionary with how many times that sequence appears - for seq in set(concat_seq): - average_d[seq] = concat_seq.count(seq) / num_records - - # intiailize list to hold the RCV values per ith taxa - # that will later be summed - indiv_rcv_values = [] - - # loop through records again and calculate RCV for - # each taxa and append to indivRCVvalues for record in alignment: - # temp holds a temporary value of the numerator before appending - # to numeratorRCVvalues and then is reassigned to 0 when it goes - # through the loop again - temp = 0 - # calculates the absolute value of the ith sequence letter minus the average - for seq_letter in set(concat_seq): - temp += abs( - record.seq.count(seq_letter) - average_d[seq_letter] + record_counts = Counter(record.seq) + temp_rcv = \ + sum( + abs( + record_counts[seq_letter] - average_d[seq_letter] + ) for seq_letter in total_counts ) - indiv_rcv_values.append(temp / (num_records * aln_len)) - print(f"{record.id}\t{round((temp / (num_records * aln_len)),4)}") - - # relative_composition_variability = sum(indiv_rcv_values) - - # print the sum of all RCV values - # print(round(relative_composition_variability, 4)) + rcv_value = temp_rcv / (num_records * aln_len) + print(f"{record.id}\t{round(rcv_value, 4)}") def process_args(self, args): return dict(alignment_file_path=args.alignment) diff --git a/tests/integration/alignment/test_rcv_integration.py b/tests/integration/alignment/test_rcv_integration.py index a0dd367..0e8e084 100644 --- a/tests/integration/alignment/test_rcv_integration.py +++ b/tests/integration/alignment/test_rcv_integration.py @@ -2,7 +2,6 @@ import sys from mock import patch, call from pathlib import Path -from textwrap import dedent from phykit.phykit import Phykit @@ -70,4 +69,4 @@ def test_rcv_incorrect_input_file(self, mocked_print): Phykit() assert pytest_wrapped_e.type == SystemExit - assert pytest_wrapped_e.value.code == 2 \ No newline at end of file + assert pytest_wrapped_e.value.code == 2