-
Notifications
You must be signed in to change notification settings - Fork 3
/
anacoreUtilsMergeVCFCallersMobiDL.py
executable file
·309 lines (289 loc) · 14.6 KB
/
anacoreUtilsMergeVCFCallersMobiDL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
#!/usr/bin/env python3
__author__ = 'Frederic Escudie'
__edit__ = 'David Baux'
__edit__ = 'Charles Van Goethem'
__copyright__ = 'Copyright (C) 2019 IUCT-O'
__license__ = 'GNU General Public License'
__version__ = '1.1.3a'
__email__ = 'escudie.frederic@iuct-oncopole.fr'
__status__ = 'prod'
import os
import sys
import numpy
import logging
import argparse
from anacore.vcf import VCFIO, HeaderInfoAttr, HeaderFormatAttr
########################################################################
#
# FUNCTIONS
#
########################################################################
def getNewHeaderAttr(args):
"""
Return renamed and new VCFHeader elements for the merged VCF.
:param args: The script's parameters.
:type args: NameSpace
:return: VCFHeader elements (filter, info, format, samples).
:rtype: dict
"""
final_filter = {}
final_info = {
"SRC": HeaderInfoAttr(
"SRC", type="String", number=".", description="Variant callers where the variant is identified. Possible values: {}".format(
{name: "s" + str(idx) for idx, name in enumerate(args.calling_sources)}
)
)
}
final_format = {
"GT": HeaderFormatAttr("GT", type="String", number="1", description="Genotype"),
"AD": HeaderFormatAttr("AD", type="Integer", number="A", description="Allele Depth"),
"DP": HeaderFormatAttr("DP", type="Integer", number="1", description="Total Depth"),
"GTSRC": HeaderFormatAttr("GTSRC", type="String", number=".", description="Genotype by source"),
"ADSRC": HeaderFormatAttr("ADSRC", type="Integer", number=".", description="Allele Depth by source"),
"DPSRC": HeaderFormatAttr("DPSRC", type="Integer", number=".", description="Total Depth by source")
}
final_samples = None
for idx_in, curr_in in enumerate(args.inputs_variants):
with VCFIO(curr_in) as FH_vcf:
# Samples
if final_samples is None:
final_samples = FH_vcf.samples
elif FH_vcf.samples != final_samples:
raise Exception(
"The samples in VCF are not the same: {} in {} and {} in {}.".format(
final_samples,
args.inputs_variants[0],
FH_vcf.samples,
curr_in
)
)
# FILTER
for tag, data in FH_vcf.filter.items():
new_tag = tag
if tag not in args.shared_filters: # Rename filters not based on caller
new_tag = "s{}_{}".format(idx_in, tag)
data.id = new_tag
data.source = args.calling_sources[idx_in]
final_filter[new_tag] = data
# INFO
for tag, data in FH_vcf.info.items():
if tag == args.annotations_field:
if tag not in final_info or len(final_info[tag].description) < len(data.description): # Manage merge between callers with 0 variants (and 0 annotations) and callers with variants
final_info[tag] = data
else:
new_tag = "s{}_{}".format(idx_in, tag)
data.id = new_tag
data.source = args.calling_sources[idx_in]
final_info[new_tag] = data
qual_tag = "s{}_VCQUAL".format(idx_in)
final_info[qual_tag] = HeaderInfoAttr(qual_tag, type="Float", number="1", description="The variant quality", source=args.calling_sources[idx_in])
# FORMAT
for tag, data in FH_vcf.format.items():
new_tag = "s{}_{}".format(idx_in, tag)
data.id = new_tag
data.source = args.calling_sources[idx_in]
final_format[new_tag] = data
return {
"filter": final_filter,
"info": final_info,
"format": final_format,
"samples": final_samples
}
def getMergedRecords(inputs_variants, calling_sources, annotations_field, shared_filters):
"""
Merge VCFRecords coming from several variant callers.
:param inputs_variants: Paths to the variants files.
:type inputs_variants: list
:param calling_sources: Names of the variants callers (in same order as inputs_variants).
:type calling_sources: list
:param annotations_field: Field used to store annotations.
:type annotations_field: str
:param shared_filters: Filters tags applying to the variant and independent of caller like filters on annotations. These filters are not renamed to add caller ID as suffix.
:type shared_filters: set
:return: Merged VCF records.
:rtype: list
"""
variant_by_name = {}
for idx_in, curr_in in enumerate(inputs_variants):
curr_caller = calling_sources[idx_in]
with VCFIO(curr_in) as FH_in:
log.info("Process {}".format(curr_caller))
for record in FH_in:
variant_name = record.getName()
# Extract AD and DP
support_by_spl = {}
for spl in FH_in.samples:
GT = None
if "GT" in record.samples[spl]: # The GT is already processed for the sample
GT = record.samples[spl]["GT"]
elif len(record.samples) == 1 and spl in record.samples and "GT" in record.info: # Only one sample and GT is already processed for population
GT = record.info["GT"]
print(GT)
support_by_spl[spl] = {
"AD": record.getAltAD(spl)[0],
"DP": record.getDP(spl),
"GT": GT
}
# Rename filters
if record.filter is not None:
new_filter = []
for tag in record.filter:
if tag != "PASS":
if tag in shared_filters: # Rename filters not based on caller
new_filter.append(tag)
else:
new_filter.append("s{}_{}".format(idx_in, tag))
record.filter = new_filter
# Rename INFO
new_info = {}
for key, val in record.info.items():
if key == annotations_field:
new_info[key] = val
else:
new_info["s{}_{}".format(idx_in, key)] = val
record.info = new_info
# Backup quality
if record.qual is not None:
record.info["s{}_VCQUAL".format(idx_in)] = record.qual
# Rename FORMAT
record.format = ["s{}_{}".format(idx_in, curr_filter) for curr_filter in record.format]
for spl_name, spl_info in record.samples.items():
renamed_info = {}
for key, val in spl_info.items():
renamed_info["s{}_{}".format(idx_in, key)] = val
record.samples[spl_name] = renamed_info
# Add to storage
if variant_name not in variant_by_name:
variant_by_name[variant_name] = record
# Data source
record.info["SRC"] = [curr_caller]
# Quality
if idx_in != 0:
record.qual = None # For consistency, the quality of the variant comes only from the first caller of the variant
# AD and DP by sample (from the first caller finding the variant: callers are in user order)
record.format.insert(0, "ADSRC")
record.format.insert(0, "DPSRC")
record.format.insert(0, "GTSRC")
record.format.insert(0, "GT")
record.format.insert(0, "AD")
record.format.insert(0, "DP")
for spl_name, spl_data in record.samples.items():
spl_data["AD"] = [support_by_spl[spl_name]["AD"]]
spl_data["DP"] = support_by_spl[spl_name]["DP"]
spl_data["GT"] = support_by_spl[spl_name]["GT"]
spl_data["ADSRC"] = [support_by_spl[spl_name]["AD"]]
spl_data["DPSRC"] = [support_by_spl[spl_name]["DP"]]
spl_data["GTSRC"] = [support_by_spl[spl_name]["GT"]]
else:
prev_variant = variant_by_name[variant_name]
prev_variant.info["SRC"].append(curr_caller)
# IDs
if record.id is not None:
prev_ids = prev_variant.id.split(";")
prev_ids.extend(record.id.split(";"))
prev_ids = sorted(list(set(prev_ids)))
prev_variant.id = ";".join(prev_ids)
# FILTERS
if record.filter is not None:
if prev_variant.filter is None:
prev_variant.filter = record.filter
else:
prev_variant.filter = list(set(prev_variant.filter) or set(record.filter))
# FORMAT
prev_variant.format.extend(record.format)
# INFO
prev_variant.info.update(record.info)
for spl_name, spl_data in prev_variant.samples.items():
spl_data.update(record.samples[spl_name])
spl_data["ADSRC"].append(support_by_spl[spl_name]["AD"])
spl_data["DPSRC"].append(support_by_spl[spl_name]["DP"])
spl_data["GTSRC"].append(support_by_spl[spl_name]["GT"])
return variant_by_name.values()
def logACVariance(variants, log):
"""
Display in log the variance on allele counts (AD and AF) between callers.
:param variants: Merged VCF records.
:type variants: list
:param log: Logger object.
:type log: logging.Logger
"""
diff_AD = []
diff_AF = []
nb_var = 0
for record in variants:
if len(record.info["SRC"]) > 1:
nb_var += 1
for spl_name, spl_data in record.samples.items():
# AD
retained_AD = spl_data["AD"][0]
max_diff = 0
for curr_AD in spl_data["ADSRC"][1:]:
max_diff = max(max_diff, abs(retained_AD - curr_AD))
diff_AD.append(max_diff)
# AF
AF = [AD / DP for AD, DP in zip(spl_data["ADSRC"], spl_data["DPSRC"])]
retained_AF = AF[0]
max_diff = 0
for curr_AF in AF[1:]:
max_diff = max(max_diff, abs(retained_AF - curr_AF))
diff_AF.append(max_diff)
# Log
if nb_var == 0:
log.info("Differences between retained AF and others callers (without missing): 0 common variants")
log.info("Differences between retained AD and others callers (without missing): 0 common variants")
else:
log.info("Differences between retained AF and others callers (without missing): median={:.1%}, upper_quartile={:.1%}, 90_persentile={:.1%} and max={:.1%} on {} variants".format(
numpy.percentile(diff_AF, 50, interpolation='midpoint'),
numpy.percentile(diff_AF, 75, interpolation='midpoint'),
numpy.percentile(diff_AF, 90, interpolation='midpoint'),
max(diff_AF),
nb_var
))
log.info("Differences between retained AD and others callers (without missing): median={}, upper_quartile={}, 90_persentile={} and max={} on {} variants".format(
int(numpy.percentile(diff_AD, 50, interpolation='midpoint')),
int(numpy.percentile(diff_AD, 75, interpolation='midpoint')),
int(numpy.percentile(diff_AD, 90, interpolation='midpoint')),
max(diff_AD),
nb_var
))
########################################################################
#
# MAIN
#
########################################################################
if __name__ == "__main__":
# Manage parameters
parser = argparse.ArgumentParser(description='Merge VCF coming from different calling on same sample(s). It is strongly recommended to apply this script after standardization and before annotation and filtering/tagging.')
parser.add_argument('-a', '--annotations-field', default="ANN", help='Field used to store annotations. [Default: %(default)s]')
parser.add_argument('-s', '--shared-filters', nargs='*', default=["lowAF", "OOT", "homoP", "popAF", "CSQ", "ANN.COLLOC", "ANN.RNA", "ANN.CSQ", "ANN.popAF"], help='Filters tags applying to the variant and independent of caller like filters on annotations. These filters are not renamed to add caller ID as suffix. [Default: %(default)s]')
parser.add_argument('-c', '--calling-sources', required=True, nargs='+', help='Name of the source in same order of --inputs-variants.')
group_input = parser.add_argument_group('Inputs') # Inputs
group_input.add_argument('-i', '--inputs-variants', required=True, nargs='+', help='Path to the variants files coming from different callers (format: VCF). The order determine the which AF and AD are retained: the first caller where it is found in this list.')
group_output = parser.add_argument_group('Outputs') # Outputs
group_input.add_argument('-o', '--output-variants', required=True, help='Path to the merged variants file (format: VCF).')
args = parser.parse_args()
args.shared_filters = set(args.shared_filters)
# Logger
logging.basicConfig(format='%(asctime)s -- [%(filename)s][pid:%(process)d][%(levelname)s] -- %(message)s')
log = logging.getLogger(os.path.basename(__file__))
log.setLevel(logging.INFO)
log.info("Command: " + " ".join(sys.argv))
# Get merged records
variants = getMergedRecords(args.inputs_variants, args.calling_sources, args.annotations_field, args.shared_filters)
# Log differences in AF and AD
logACVariance(variants, log)
# Write
with VCFIO(args.output_variants, "w") as FH_out:
# Header
new_header = getNewHeaderAttr(args)
FH_out.samples = new_header["samples"]
FH_out.info = new_header["info"]
FH_out.format = new_header["format"]
FH_out.filter = new_header["filter"]
FH_out.writeHeader()
# Records
for record in sorted(variants, key=lambda record: (record.chrom, record.refStart(), record.refEnd())):
if record.filter is not None and len(record.filter) == 0:
record.filter = ["PASS"]
FH_out.write(record)
log.info("End of job")