-
Notifications
You must be signed in to change notification settings - Fork 3
/
verify_markers.py
213 lines (179 loc) · 6.43 KB
/
verify_markers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
from __future__ import print_function, division
import sys
import argparse
import logging
import os
from os.path import join, exists
# noinspection PyPackageRequirements
from Bio.Seq import Seq
import shutil
import pandas as pd
from src.polyoligo import lib_blast, lib_markers, _logger_config, lib_utils
BINARIES = {
"macosx": join(os.path.dirname(__file__), "src/polyoligo", "bin/macosx_x64"),
"linux": join(os.path.dirname(__file__), "src/polyoligo", "bin/linux_x64"),
"win": join(os.path.dirname(__file__), "src/polyoligo", "bin/win_x64"),
}
NUC = ["A", "T", "G", "C"]
class Marker(lib_markers.Marker):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def assert_marker(self, blast_hook):
if self.ref != "":
query = [{
"chr": self.chrom,
"start": int(self.pos1),
"stop": int(self.pos1 + self.n - 1),
}]
seq = "N"
for _, seq in blast_hook.fetch(query).items():
seq = seq.upper()
break
if self.ref == seq:
pass
else:
# Nucleotides do not match so look for a solution
n1 = self.ref
n2 = self.alt
n1c = str(Seq(n1).complement())
n2c = str(Seq(n2).complement())
if self.alt == seq: # Check for an inversion
self.ref = n2
self.alt = n1
elif n1c == seq: # Check for complementarity
self.ref = n1c
self.alt = n2c
elif n2c == seq: # Check for reverse complementarity
self.ref = n2c
self.alt = n1c
else:
print("{} {}/{} -> ERROR skipped as reference is {}".format(
self.name,
n1,
n2,
seq,
))
return None
print("{} {}/{} -> {}/{}".format(
self.name,
n1,
n2,
self.ref,
self.alt,
))
def read_markers(fp):
"""Reads a text file of input markers and returns a list of Marker objects.
File should be tab delimited and no headers and contain the information columns:
CHR POS NAME REF ALT
"""
df = pd.read_csv(fp, delim_whitespace=True, header=None)
df.columns = ["chr", "pos", "name", "ref", "alt"]
markers = []
for _, row in df.iterrows():
markers.append(Marker(
chrom=row["chr"],
pos=row["pos"] - 1, # to 0-based indexing
ref_allele=row["ref"],
alt_allele=row["alt"],
name=row["name"],
))
return markers
def parse_args(inputargs):
# Define the args parser
parser = argparse.ArgumentParser(prog="check_input_markers",
description="",
epilog="",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"-v", "--version",
action="version",
version="%(prog)s {}".format("0.1"),
)
parser.add_argument(
"markers",
metavar="MARKERS",
type=str,
help="File containing a list of input markers in tab-separated format and with no header as: "
"CHR POS NAME REF ALT",
)
parser.add_argument(
"output",
metavar="OUTPUT",
type=str,
help="Output marker filename.",
)
parser.add_argument(
"refgenome",
metavar="FASTA/BLASTDB",
type=str,
help="Either a FASTA file or a BLAST database to use as the reference genome.\n"
"FASTA: Both raw and compressed files are supported natively (see sample_data/blastdb.fa.gz).\n"
"BLASTDB: Extensions (e.g. .nsq/.nin/...) are not required, just enter the basename of the "
"database (see sample_data/blastdb).",
)
parser.add_argument(
"--silent",
action="store_true",
help="Silent mode, will not print to STDOUT.",
)
args = parser.parse_args(inputargs) # Parse input args
return args
def main(strcmd=None):
# Input arguments handling
if strcmd: # Means we are running the script using a string of arguments (e.g. for testing)
testcmd = lib_utils.absolute_paths(strcmd) # Make paths absolute
args = parse_args(testcmd.split()[1:])
else:
args = parse_args(sys.argv[1:])
# Prepare directories
out_path = os.path.dirname(os.path.abspath(args.output))
if out_path == "":
out_path = os.getcwd()
args.output = os.path.basename(args.output)
temp_path = join(out_path, "temporary")
if not exists(out_path):
os.makedirs(out_path)
if not exists(temp_path):
os.makedirs(temp_path)
# Init the logger
_logger_config.setup_logging(log_fp=join(out_path, args.output + ".log"), verbose=not args.silent)
logger = logging.getLogger(__name__)
# Detect the os and point to respective binaries
curr_os = lib_utils.get_os()
if curr_os is None:
logger.error("OS not supported or not detected properly.")
sys.exit(1)
bin_path = BINARIES[curr_os]
# Initialize hooks
blast_hook = lib_blast.BlastDB(
path_db=args.refgenome,
path_temporary=temp_path,
path_bin=bin_path,
job_id="main",
n_cpu=1,
)
# Build the BlastDB if a fasta was provided
if not blast_hook.has_db:
logger.info("Converting the input reference genome to BlastDB ...")
blast_hook.fasta2db()
# Read markers
try:
markers = read_markers(args.markers)
except:
logger.error("Failed to read input markers. Please check the input file path or format.")
sys.exit(1)
fp_out = join(out_path, args.output) # Write the cleaned markers to a new file
with open(fp_out, "w") as f:
for marker in markers:
marker.assert_marker(blast_hook)
f.write("{}\t{}\t{}\t{}\t{}\n".format(
marker.chrom,
marker.pos1,
marker.name,
marker.ref,
marker.alt,
))
shutil.rmtree(temp_path)
os.remove(fp_out + ".log") # Remove the log, this is redundant here
if __name__ == "__main__":
main()