-
Notifications
You must be signed in to change notification settings - Fork 0
/
TR_in_multiple_Protein.py
205 lines (158 loc) · 7.39 KB
/
TR_in_multiple_Protein.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import sys
import os
import pickle
from Bio import SeqIO
from pyfaidx import Fasta
import logging
import logging.config
from tral.paths import config_file, PACKAGE_DIRECTORY
from tral import configuration
from tral.sequence import sequence
from tral.hmm import hmm
def return_duplicate_proteins(sequences_file):
"""
Checks if there are duplicated protein sequences in the fasta file.
If so, it outputs a prompt message.
Args:
sequence_file (str): whole path to file with >1 protein fasta-sequence
Output:
List of duplicate protein IDs
"""
# Check for Duplicates
seen = {}
dupes = []
for prot in SeqIO.parse(sequences_file, "fasta"):
protein = prot.id.split("|")[1]
if protein not in seen:
seen[protein] = 1
else:
if seen[protein] == 1:
dupes.append(protein)
seen[protein] += 1
#if not dupes:
#print("there are duplicates. Check them manually and evtl. remove them.")
return dupes
def find_protein_repeats(sequences_file, result_dir, pvalue_threshold = 0.05, divergence_threshold = 0.1, n_threshold = 2.5, l_threshold = 3):
"""
Finds tandem repeats (TRs) in the protein sequences provided in 'sequence_file'.
Filters the TRs according to the thresholds.
Saves the TRs in the 'result_dir' each individually with the protein identifier as filename.
Args:
sequence_file (str): whole path to file with >=1 protein fasta-sequence
result_dir (str): path where the individual TRAL results are supposed to be stored.
pvalue_threshold (int): p-value threshold for filtering
divergence_threshold (int): divergence threshold for filtering
n_threshold (int): minimun repeat unit count for filtering
l_threshold (int): maximum repeat unit length for filtering
Output:
For each protein sequence, a single file with its TRs is produced and stored in 'result_dir'.
"""
logging.config.fileConfig(config_file("logging.ini"))
log = logging.getLogger('root')
# define this as in the config
CONFIG_GENERAL = configuration.Configuration.instance().config
CONFIG = CONFIG_GENERAL["repeat_list"]
score = CONFIG["model"]
##########################################################################
# From .fasta to Class Sequence sequences
proteins = Fasta(sequences_file)
all_denovo_repeats = 0
all_filtered_repeats = 0
for pyfaidx in proteins:
seq_name = pyfaidx.name.split("|")[1]
# name is protein identifier
seq = sequence.Sequence(seq=str(pyfaidx), name=seq_name)
log.debug("Work on sequence {}".format(seq_name))
##########################################################################
# Getting TRs
denovo_list = seq.detect(denovo=True)
for TR in denovo_list.repeats:
TR.calculate_pvalues()
##########################################################################
# Filtering TRs
# add number of denovo found repeats
all_denovo_repeats += len(denovo_list.repeats)
# filtering for pvalue
denovo_list = denovo_list.filter(
"pvalue",
score,
pvalue_threshold)
# filtering for divergence
denovo_list = denovo_list.filter(
"divergence",
score,
divergence_threshold)
# filtering for number of repeat units
denovo_list = denovo_list.filter(
"attribute",
"n_effective",
"min",
n_threshold)
# filtering for length of repeat units
denovo_list = denovo_list.filter(
"attribute",
"l_effective",
"max",
l_threshold)
##########################################################################
# Building HMM with hmmbuild
# # De novo TRs were remastered with HMM
denovo_hmm = [hmm.HMM.create(input_format='repeat', repeat=iTR)
for iTR in denovo_list.repeats] # only possible with hmmbuild
denovo_list_remastered = seq.detect(lHMM=denovo_hmm)
##########################################################################
# Clustering
# De novo TRs were clustered for overlap (common ancestry). Only best =
# lowest p-Value and lowest divergence were retained.
denovo_list_remastered = denovo_list.filter(
"none_overlapping", ["common_ancestry"], {"pvalue": score, "divergence": score})
##########################################################################
# Save Tandem Repeats
# Create output directory if not already exists.
try:
if not os.path.isdir(result_dir):
os.makedirs(result_dir)
except:
raise Exception(
"Could not create path to result directory: {}".format(
os.path.dirname(result_dir)))
# create filename
output_pickle_file = os.path.join(result_dir, seq_name + ".pkl")
output_tsv_file = os.path.join(result_dir, seq_name + ".tsv")
# save TR-file
denovo_list_remastered.write(output_format="pickle", file=output_pickle_file)
denovo_list_remastered.write(output_format="tsv", file=output_tsv_file)
#TODO save as fasta
all_filtered_repeats += len(denovo_list_remastered.repeats)
print("\n***", seq_name, "***")
print("denovo repeats:", len(denovo_list.repeats))
print("repeats after filtering and clustering:",
len(denovo_list_remastered.repeats))
for i in range(len(denovo_list_remastered.repeats)):
print(denovo_list_remastered.repeats[i])
return print("\nThere where {} repeats found de novo.".format(all_denovo_repeats), "After filtering and clustering there where only {} repeats left.\n".format(
all_filtered_repeats))
if __name__ == "__main__":
##########################################################################
# Defining Paths and Parameters
# AA reference
# Input paths
#working_dir = "/home/matteo/polybox/MSc_ACLS/master_thesis/data/proteom_reference/pickles"
sequences_file_unfav = "/home/matteo/polybox/MSc_ACLS/master_thesis/CRC_TRs/data/unfavorable_proteins_CRC_sp.fasta"
sequences_file_fav = "/home/matteo/polybox/MSc_ACLS/master_thesis/CRC_TRs/data/favorable_proteins_CRC_sp.fasta"
# Output paths
result_dir_unfav = "/home/matteo/polybox/MSc_ACLS/master_thesis/CRC_TRs/results/TRs_unfavorable_proteins_CRC_sp"
result_dir_fav = "/home/matteo/polybox/MSc_ACLS/master_thesis/CRC_TRs/results/TRs_favorable_proteins_CRC_sp"
# Thresholds for filtering
#pvalue_threshold = 0.05
#divergence_threshold = 0.1
#n_threshold = 2.5 # minimun repeat unit count
#l_threshold = 3 # maximum repeat unit length
##########################################################################
# If duplicated proteins, print them:
print(return_duplicate_proteins(sequences_file = sequences_file_unfav))
print(return_duplicate_proteins(sequences_file = sequences_file_fav))
##########################################################################
# Call TRAL
find_protein_repeats(sequences_file = sequences_file_unfav, result_dir=result_dir_unfav)
find_protein_repeats(sequences_file = sequences_file_fav, result_dir=result_dir_fav)