Skip to content

Commit

Permalink
streamlined the saturation function
Browse files Browse the repository at this point in the history
  • Loading branch information
JLSteenwyk committed Sep 15, 2024
1 parent 095abdf commit 0c18d8b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 55 deletions.
85 changes: 34 additions & 51 deletions phykit/services/tree/saturation.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from enum import Enum
import itertools
import sys
from typing import Tuple
from typing import Dict, List, Tuple

from Bio import Align
from Bio.Phylo import Newick
import numpy as np
from sklearn.linear_model import LinearRegression

from .base import Tree
from ...helpers.files import get_alignment_and_format as get_alignment_and_format_helper
from ...helpers.files import (
get_alignment_and_format as get_alignment_and_format_helper
)


class FileFormat(Enum):
Expand All @@ -25,27 +29,22 @@ class Saturation(Tree):
def __init__(self, args) -> None:
super().__init__(**self.process_args(args))

def run(self):
# read in alignment
alignment, alignment_format = get_alignment_and_format_helper(
def run(self) -> None:
alignment, _, _ = get_alignment_and_format_helper(
self.alignment_file_path
)

# read in tree
tree = self.read_tree_file()

# get tip and tip combinations
tips = self.get_tip_names_from_tree(tree)
combos = list(itertools.combinations(tips, 2))

# for pairwise combinations, calculate patristic
# distances and pairwise identities
(
patristic_distances,
uncorrected_distances,
) = self.loop_through_combos_and_calculate_pds_and_pis(combos, alignment, tree)


) = self.loop_through_combos_and_calculate_pds_and_pis(
combos, alignment, tree
)

# calculate slope and fit the y-intercept to zero
# Fitting the y-intercept to zero follows Jeffroy et al.
Expand All @@ -55,81 +54,65 @@ def run(self):
np.array(patristic_distances).reshape(-1, 1),
np.array(uncorrected_distances)
)
# report res

self.print_res(
self.verbose, combos, uncorrected_distances, patristic_distances, model.coef_[0]
)

def process_args(self, args):
def process_args(self, args) -> Dict[str, str]:
return dict(
tree_file_path=args.tree,
alignment_file_path=args.alignment,
verbose=args.verbose,
)

def loop_through_combos_and_calculate_pds_and_pis(
self, combos: list, alignment, tree
) -> Tuple[list, list]:
self,
combos: List[Tuple[str, str]],
alignment: Align.MultipleSeqAlignment,
tree: Newick.Tree,
) -> Tuple[
List[float],
List[float]
]:
"""
loop through all taxon combinations and determine
their patristic distance and pairwise identity
"""
patristic_distances = []
uncorrected_distances = []
aln_len = alignment.get_alignment_length()
seq_dict = {record.name: record.seq for record in alignment}
for combo in combos:
# calculate pd
# calculate pds
patristic_distances.append(tree.distance(combo[0], combo[1]))
# calculate pairwise identity
uncorrected_distances = self.calculate_uncorrected_distances(
alignment, uncorrected_distances, aln_len, combo
)

return patristic_distances, uncorrected_distances
# calcualte uncorrected distances
seq_one = seq_dict[combo[0]]
seq_two = seq_dict[combo[1]]
identities = sum(1 for idx in range(aln_len) if seq_one[idx] == seq_two[idx])
uncorrected_distances.append(1 - (identities / aln_len))

def calculate_uncorrected_distances(
self, alignment, uncorrected_distances: list, aln_len: int, combo: tuple
) -> list:
"""
calculate uncorrected distances for a given combo
"""
identities = 0
seq_one = ""
seq_two = ""
for record in alignment:
if record.name == combo[0]:
seq_one = record.seq
elif record.name == combo[1]:
seq_two = record.seq
for idx in range(0, aln_len):
try:
if seq_one[idx] == seq_two[idx]:
identities += 1
except IndexError:
print("Error: the alignment FASTA headers and tree tip labels are different.\nDouble check that the sequence alignment and phylogenetic tree have the same labels.")
sys.exit()
uncorrected_distances.append(1-(identities / aln_len))

return uncorrected_distances
return patristic_distances, uncorrected_distances

def print_res(
self,
verbose: bool,
combos: list,
uncorrected_distances: list,
patristic_distances: list,
combos: List[str],
uncorrected_distances: List[float],
patristic_distances: List[float],
slope: float,
) -> None:
"""
print results to stdout
"""
try:
if verbose:
for combo, dist, patristic_distance in zip(
for cbo, dist, pd in zip(
combos, uncorrected_distances, patristic_distances
):
print(
f"{combo[0]}\t{combo[1]}\t{round(dist,4)}\t{round(patristic_distance, 4)}"
f"{cbo[0]}\t{cbo[1]}\t{round(dist,4)}\t{round(pd, 4)}"
)
else:
print(f"{round(slope, 4)}\t{abs(round(1-slope, 4))}")
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/tree/test_saturation_integration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import pytest
import sys
from math import isclose
from mock import patch, call
from pathlib import Path
from textwrap import dedent
import pytest
import sys

from phykit.phykit import Phykit

Expand Down

0 comments on commit 0c18d8b

Please sign in to comment.