-
Notifications
You must be signed in to change notification settings - Fork 6
/
util_primer.py
139 lines (112 loc) · 5.07 KB
/
util_primer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import re, string, sys, time, itertools, os, subprocess
from Bio import SeqRecord, SeqIO, Seq
from SmileTrain import util
import usearch_python.primer
def mismatches(seq, primer, w):
'''
Calculate mismatches between a sequence and primer with window size w.
Returns the starting index and number of mismatches for the best match.
Parameters
seq : string (or stringable)
target (longer) sequence
primer : string (or stringable)
query (shorter) sequence
w : int
window size (how many bases on the target to go in, at most)
returns : tuple
(primer start index, differences in primer and sequence)
'''
I = 0
D = len(seq)
for i in range(w):
d = usearch_python.primer.MatchPrefix(seq[i:], primer)
if d < D:
I = i
D = d
return (I, D)
class PrimerRemover():
def __init__(self, fastq, primer, max_primer_diffs, reverse_primer=None, skip=1, out=sys.stdout):
'''
Remove well-matched primers from sequences in a fastq file. If given a reverse
primer, search from the beginning for the forward primer and from the end for
the revese primer.
Parameters
fastq : filename or filehandle
input
primer : string
the primer sequence to be removed
max_primer_diffs : int
the maximum number of mismatches allowed before throwing out the read
reverse_primer : string
a primer to be trimmed from the end of the sequence
skip : integer >= 1
take only every n-th entry (so skip=1 means every entry)
'''
self.records = SeqIO.parse(fastq, 'fastq')
self.primer = primer
self.primer_length = len(self.primer)
self.max_primer_diffs = max_primer_diffs
if reverse_primer is not None:
#self.reverse_primer = Seq.Seq(reverse_primer).reverse_complement()
self.reverse_primer = Seq.Seq(reverse_primer)
self.reverse_primer_length = len(self.reverse_primer)
else:
self.reverse_primer = None
self.skip = skip
self.out = out
self.n_successes = 0
self.n_failures = 0
self.position = 0
def __iter__(self):
return self
def next(self):
'''iterator over successfully trimmed input fastq entries'''
while True:
record = self.records.next()
# take this entry if it's the n-th entry
if self.position % self.skip == 0:
# find the best primer position in the sequence
primer_start_index, n_primer_diffs = mismatches(record.seq, self.primer, 15)
quality = record.letter_annotations['phred_quality']
# if we find a good match, trim the sequence and the
# quality line and yield a single string
if n_primer_diffs <= self.max_primer_diffs:
primer_end_index = primer_start_index + self.primer_length
trim_seq = record.seq[primer_end_index:]
trim_quality = quality[primer_end_index:]
success = True
else:
success = False
if success and self.reverse_primer is not None:
rc_trim_seq = trim_seq.reverse_complement()
reverse_start_index, n_reverse_diffs = mismatches(rc_trim_seq, self.reverse_primer, 15)
if n_reverse_diffs <= self.max_primer_diffs:
reverse_end_index = reverse_start_index + self.reverse_primer_length
rc_trim_seq = rc_trim_seq[reverse_end_index:]
trim_seq = rc_trim_seq.reverse_complement()
trim_quality = trim_quality[:-reverse_end_index]
success = True
else:
success = False
if success:
new_record = SeqRecord.SeqRecord(seq=str(trim_seq), id=record.id, letter_annotations={'phred_quality': trim_quality}, description="")
self.n_successes += 1
return new_record
else:
self.n_failures += 1
self.position += 1
def print_entries(self):
'''print the successfully trimmed entries'''
timer = util.timer()
for record in self:
SeqIO.write(record, self.out, 'fastq')
self.elapsed_time = timer.next()
def check_entries(self):
'''check for successfully trimmed entries'''
timer = util.timer()
for entry in self: pass
self.elapsed_time = timer.next()
def diagnostic_message(self):
'''return a string about how the trimming went'''
success_rate = float(self.n_successes) / (self.n_successes + self.n_failures) * 100
return "Primer removal:\n%s successes and %s failures (%.0f%% success rate)\n%f seconds elapsed\n" %(self.n_successes, self.n_failures, success_rate, self.elapsed_time)