-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathVcf2AxiomMyDesign.py
executable file
·150 lines (125 loc) · 4.71 KB
/
Vcf2AxiomMyDesign.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python
'''
Vcf2AxiomMyDesign: mining out SNPs from a vcf file and it's reference to a file ready for submission to the Axiom myDesign custom probe pipeline.
Usage:
Vcf2AxiomMyDesign.py -s species -v file.vcf -r ref.fa [--minindiv 20] [-o outfile.tsv]
Options:
-s STRING, --species=STRING Your species name
-v FILE, --vcf=FILE A vcf file
-r FILE, --fasta=FILE The matching reference fasta file
--minindiv=INT Minimum number of individuals with genotypes
for a position to be output [default: 1]
-o FILE, --out=FILE output file [default: myDesign.tsv]
'''
from docopt import docopt
import os
from Bio import SeqIO
class rec(object):
def __init__ (self):
'''
container for previous, current, next objects
'''
self.up=0
self.curr=0
self.down=0
class vcf(object):
'''
fVcf: POSITION SORTED vcf file name
vcf: a generator that walks over a vcf file
sample VCF line:
CHROM POS ID REF ALT QUAL FILTER INFO FORMAT sam1...samn
'''
def __init__(self, fVcf):
self.fVcf = fVcf
self.header_lines=[]
self.vcf_handle=open(fVcf)
self.rec = rec()
l=''
#initialize samples, strip off vcf header
#stop at first line with real data
while True:
l = self.vcf_handle.readline()
if l.startswith("#CHROM"):
a=l.lstrip('#').rstrip().split()
self.cols = a[:9]
self.samples = a[9:]
elif l.startswith("##"):
self.header_lines.append(l.rstrip())
else:
self.rec.up = 0
self.rec.curr = 0
self.rec.down = self.parse_vcf_line(l)
break
def __iter__(self):
return self
def next(self):
'''
'''
if self.rec.curr != None:
self.rec.up = self.rec.curr
self.rec.curr = self.rec.down
self.rec.down = self.parse_vcf_line(self.vcf_handle.readline())
return self.rec
else:
raise StopIteration()
def parse_vcf_line(self, line):
tmp = {}
tmp['called_snps']=0
vals = line.rstrip().split()
if len(vals) != len(self.cols)+len(self.samples):
return None
for i,v in zip(self.cols,vals[:9]):
tmp[i]=v
for v in vals[9:]:
if not v.startswith('./.'):
tmp['called_snps']+=1
return tmp
def check_snp(s, min_dist, min_sam):
#end
if s.curr is None:
print "End!"
return False
#minimum samples found
if s.curr['called_snps'] < min_sam:
print( "not enough SNPS called at " +s.curr['CHROM']+":"+s.curr['POS'] +" ("+str(s.curr['called_snps'])+")" )
return False
#only bi-allelic
if len(s.curr['ALT']) !=1:
print("too many alleles at "+ s.curr['CHROM']+":"+s.curr['POS'] + " ("+s.curr['ALT']+")")
return False
#check to make sure up and downstream snps are far enough away for probes to fit between
for other in [s.up, s.down]:
if other is None:
continue
if other == 0:
continue
if s.curr['CHROM'] == other['CHROM']:
if abs( int(s.curr['POS']) - int(other['POS'])) < min_dist:
print( "bad distance between " + s.curr['CHROM']+":"+s.curr['POS'] + " and " + other['CHROM']+":"+other['POS'] )
return False
return True
def generate_design(org, fVcf, fRef, min_sam, outfile):
probe_size = 35
min_dist = probe_size * 2 #35 bp probe for each position
my_vcf = vcf(fVcf)
ref = SeqIO.index(fRef, 'fasta')
my_design=open(outfile, 'w')
#write my design header
my_design.write('Organism\tSNPId\tREF_STR\tSEQ\tCHR\tPOS\tSNP_NEIGHBOR\tSNP_PRIORITY\tSNP_VAL\tCHR_TYPE\n')
ref_name = os.path.basename(fRef).rstrip('.fa')
for snp in my_vcf:
#if the snp is good
snp_good = check_snp(snp, min_dist, min_sam)
if snp_good:
#write to my_design file
p=int(snp.curr['POS'])-1 #(convert to 0-based position)
c=snp.curr['CHROM']
snpid = c + "_" + snp.curr['POS']
ra = "[" + snp.curr['REF'] + "/" + snp.curr['ALT'] + ']'
seq= str( ref[c][p-probe_size-1:p].seq + ra + ref[c][p+1:p+probe_size+1].seq )
my_design.write( '\t'.join([org, snpid, ref_name, seq, c, str(p), '0', '1', 'Il', 'autosomal']) + '\n' )
def main():
args = docopt(__doc__)
generate_design(args['--species'], args['--vcf'], args['--fasta'], int(args['--minindiv']), args['--out'])
if __name__ == '__main__':
main()