-
Notifications
You must be signed in to change notification settings - Fork 6
/
index.py
executable file
·81 lines (60 loc) · 2.6 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env python
'''
Create an index file using the original and dereplicated fastas.
Load all the dereplicated sequences and sequence IDs into memory. Then read through the
original fasta. If a sequence is in the dereplicated fasta, then keep track of the
abundance of the sequence in that sample.
Produce an index file with lines
sample_name sequence_id (# of times that seq appears in that sample)
'''
import sys, argparse, re
from Bio import SeqIO
import util, util_index
def parse_derep_fasta(fasta):
'''create a hash {sequence => ID} from fasta filename or filehandle'''
return {str(record.seq): util_index.parse_seq_sid(record.id) for record in SeqIO.parse(fasta, 'fasta')}
def sid_to_sample(sid):
'''sample=donor1;400 -> donor1'''
m = re.match('sample=(.+);\d+', sid)
if m is None:
raise RuntimeError("fasta at line did not parse: %s" % sid)
else:
return m.group(1)
def parse_full_fasta(fasta, seq_sid):
'''
Count the abundance of each sequence in each sample. Ignore sequences that are not
in the known ID list.
fasta : filehandle or filename
input fasta
seq_sid : dictionary
{sequence => sequence ID}
returns : dict
{(sample, sequence ID) => abundance}
'''
abund = {}
for record in SeqIO.parse(fasta, 'fasta'):
sample = sid_to_sample(record.id)
seq = str(record.seq)
if seq in seq_sid:
seq_id = seq_sid[seq]
key = (sample, seq_id)
if key in abund:
abund[key] += 1
else:
abund[key] = 1
return abund
def index_lines(abundances):
'''{(sample, ID) => abundance, ...} -> tab-separated sample, ID, abundance'''
for (sample, sid), abundance in abundances.items():
yield "\t".join([sample, sid, str(abundance)])
if __name__ == '__main__':
# parse command line arguments
parser = argparse.ArgumentParser(description='Count the number of times every sequence appears in each sample', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('orig', help='original fasta file')
parser.add_argument('derep', help='dereplicated fasta file')
parser.add_argument('--output', '-o', default=sys.stdout, type=argparse.FileType('w'), help='output file (default stdout)')
args = parser.parse_args()
seq_sid = parse_derep_fasta(args.derep)
abundances = parse_full_fasta(args.orig, seq_sid)
for line in index_lines(abundances):
args.output.write(line + "\n")