Skip to content

Commit

Permalink
updated dna_thread function - complete rewrite. Also included a funct…
Browse files Browse the repository at this point in the history
…ion to account for ClipKIT trimmed multiple sequence alignments
  • Loading branch information
JLSteenwyk committed Feb 27, 2024
1 parent 9351027 commit 8e5c14f
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 119 deletions.
7 changes: 4 additions & 3 deletions docs/usage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,9 @@ Function names: monophyly_check; is_monophyletic |br|
Command line interface: pk_monophyly_check; pk_is_monophyletic

This analysis can be used to determine if a set of
taxa are monophyletic.
taxa are exclusively monophyletic. By exclusively monophyletic,
if other taxa are in the same clade, the lineage will not be
considered exclusively monophyletic.

Requires a taxa file, which species which tip names
are expected to be monophyletic. File format is a
Expand All @@ -793,8 +795,7 @@ col 2: average bipartition support value in the clade of interest
col 3: maximum bipartition support value in the clade of interest
col 4: minimum bipartition support value in the clade of interest
col 5: standard deviation of bipartition support values in the clade of interest
col 6: tip names of taxa monophyletic with the lineage of interest
excluding those that are listed in the taxa_of_interest file
col 6: tip names of taxa monophyletic with the lineage of interest excluding those that are listed in the taxa_of_interest file

.. code-block:: shell
Expand Down
90 changes: 59 additions & 31 deletions phykit/services/alignment/dna_threader.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,48 +42,76 @@ def run(self):
def read_file(self, file_path: str, file_format: str = "fasta") -> SeqRecord:
return SeqIO.parse(file_path, file_format)

def create_mask(self, length):
keep_mask = [True] * length
if self.clipkit_log_data:
mask = []
for row in self.clipkit_log_data:
if row[1] == "keep":
mask.append(True)
mask.append(True)
mask.append(True)
else:
mask.append(False)
mask.append(False)
mask.append(False)
keep_mask = mask

return keep_mask

def thread(self, protein: SeqRecord) -> dict:
# protein alignment to nucleotide alignment
pal2nal = {}

# ML
# ML-

# M--L-

# MMM------LLL---
# AAA------GGG
# keep, trim, keep

# AAA------GGG---
# AAATTTGGG

# -?*XxNn
# AAAGGG---

nucl_records = SeqIO.to_dict(SeqIO.parse(self.nucleotide_file_path, "fasta"))
try:
if self.clipkit_log_data:
# TODO: Integrating ClipKIT log handling
print(self.clipkit_log_data)
else:
for protein_seq_record in protein:
gene_id = protein_seq_record.id
p_seq = protein_seq_record.seq
n_seq = nucl_records[gene_id].seq

sequence = []
transformed = "".join([c * 3 for c in p_seq])

for idx, c in enumerate(transformed):
if c not in "-?*XxNn":
sequence.append(n_seq[idx])
else:
sequence.append("-")

if self.remove_stop_codon is True:
if transformed[-1] == "*":
sequence[-1] = n_seq[-1]
sequence[-2] = n_seq[-2]
sequence[-3] = n_seq[-3]
length = len(SeqIO.to_dict(SeqIO.parse(self.protein_file_path, "fasta")).get("1"))

pal2nal[gene_id] = "".join(sequence)
keep_mask = self.create_mask(length * 3)
try:
for protein_seq_record in protein:
gene_id = protein_seq_record.id
p_seq = protein_seq_record.seq
n_seq = nucl_records[gene_id].seq

if self.clipkit_log_data:
untrimmed = []
offset = 0
for idx, value in enumerate(keep_mask[::3]):
if value is True:
untrimmed.append(p_seq[idx - offset])
else:
offset += 1
untrimmed.append("#")
p_seq = "".join(untrimmed)

sequence = []
transformed = "".join([c * 3 for c in p_seq])

for idx, c in enumerate(transformed):
if not keep_mask[idx]:
continue
if c not in "-?*XxNn":
n_seq_c = n_seq[idx]
sequence.append(n_seq_c)
else:
sequence.append("-")

if self.remove_stop_codon is True:
if transformed[-1] == "*":
sequence[-1] = n_seq[-1]
sequence[-2] = n_seq[-2]
sequence[-3] = n_seq[-3]

pal2nal[gene_id] = "".join(sequence)

return pal2nal
except FileNotFoundError:
Expand Down
2 changes: 1 addition & 1 deletion phykit/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.13.1"
__version__ = "1.14.0"
168 changes: 84 additions & 84 deletions tests/integration/alignment/test_dna_threader_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,42 +491,42 @@ def test_dna_threader_nucl_file_not_found(self, mocked_print):
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 2

# @patch("builtins.print")
# def test_dna_threader_trimmed_alignment_short_log(self, mocked_print):
# expected_result_0 = dedent(
# """>1"""
# )
# expected_result_1 = dedent(
# """AAATTT"""
# )
# expected_result_2 = dedent(
# """>2"""
# )
# expected_result_3 = dedent(
# """AAAGGG"""
# )
# expected_result_4 = dedent(
# """>3"""
# )
# expected_result_5 = dedent(
# """AAA---"""
# )
# expected_result_6 = dedent(
# """>4"""
# )
# expected_result_7 = dedent(
# """AAAGGG"""
# )
# testargs = [
# "phykit",
# "p2n",
# "-p",
# f"{here.parent.parent.parent}/sample_files/test_alignment.prot.faa.clipkit",
# "-n",
# f"{here.parent.parent.parent}/sample_files/test.nucl.fna",
# "-c",
# f"{here.parent.parent.parent}/sample_files/test_alignment.prot.faa.clipkit.log",
# ]
@patch("builtins.print")
def test_dna_threader_trimmed_alignment_short_log(self, mocked_print):
expected_result_0 = dedent(
""">1"""
)
expected_result_1 = dedent(
"""AAATTT"""
)
expected_result_2 = dedent(
""">2"""
)
expected_result_3 = dedent(
"""AAAGGG"""
)
expected_result_4 = dedent(
""">3"""
)
expected_result_5 = dedent(
"""AAA---"""
)
expected_result_6 = dedent(
""">4"""
)
expected_result_7 = dedent(
"""AAAGGG"""
)
testargs = [
"phykit",
"p2n",
"-p",
f"{here.parent.parent.parent}/sample_files/test_alignment.prot.faa.clipkit",
"-n",
f"{here.parent.parent.parent}/sample_files/test.nucl.fna",
"-c",
f"{here.parent.parent.parent}/sample_files/test_alignment.prot.faa.clipkit.log",
]

# with patch.object(sys, "argv", testargs):
# Phykit()
Expand All @@ -541,55 +541,55 @@ def test_dna_threader_nucl_file_not_found(self, mocked_print):
# call(expected_result_7),
# ]

# @patch("builtins.print")
# def test_dna_threader_trimmed_alignment_long_log(self, mocked_print):
# expected_result_0 = dedent(
# """>1"""
# )
# expected_result_1 = dedent(
# """AAATTT"""
# )
# expected_result_2 = dedent(
# """>2"""
# )
# expected_result_3 = dedent(
# """AAAGGG"""
# )
# expected_result_4 = dedent(
# """>3"""
# )
# expected_result_5 = dedent(
# """AAA---"""
# )
# expected_result_6 = dedent(
# """>4"""
# )
# expected_result_7 = dedent(
# """AAAGGG"""
# )
# testargs = [
# "phykit",
# "p2n",
# "-p",
# f"{here.parent.parent.parent}/sample_files/test_alignment.prot.faa.clipkit",
# "-n",
# f"{here.parent.parent.parent}/sample_files/test.nucl.fna",
# "--clipkit_log_file",
# f"{here.parent.parent.parent}/sample_files/test_alignment.prot.faa.clipkit.log",
# ]
@patch("builtins.print")
def test_dna_threader_trimmed_alignment_long_log(self, mocked_print):
expected_result_0 = dedent(
""">1"""
)
expected_result_1 = dedent(
"""AAATTT"""
)
expected_result_2 = dedent(
""">2"""
)
expected_result_3 = dedent(
"""AAAGGG"""
)
expected_result_4 = dedent(
""">3"""
)
expected_result_5 = dedent(
"""AAA---"""
)
expected_result_6 = dedent(
""">4"""
)
expected_result_7 = dedent(
"""AAAGGG"""
)
testargs = [
"phykit",
"p2n",
"-p",
f"{here.parent.parent.parent}/sample_files/test_alignment.prot.faa.clipkit",
"-n",
f"{here.parent.parent.parent}/sample_files/test.nucl.fna",
"--clipkit_log_file",
f"{here.parent.parent.parent}/sample_files/test_alignment.prot.faa.clipkit.log",
]

# with patch.object(sys, "argv", testargs):
# Phykit()
# assert mocked_print.mock_calls == [
# call(expected_result_0),
# call(expected_result_1),
# call(expected_result_2),
# call(expected_result_3),
# call(expected_result_4),
# call(expected_result_5),
# call(expected_result_6),
# call(expected_result_7),
# ]
with patch.object(sys, "argv", testargs):
Phykit()
assert mocked_print.mock_calls == [
call(expected_result_0),
call(expected_result_1),
call(expected_result_2),
call(expected_result_3),
call(expected_result_4),
call(expected_result_5),
call(expected_result_6),
call(expected_result_7),
]

# @patch("builtins.print")
# def test_dna_threader_trimmed_longer_alignment_log(self, mocked_print):
Expand Down

0 comments on commit 8e5c14f

Please sign in to comment.