-
Notifications
You must be signed in to change notification settings - Fork 22
/
CESAR_wrapper.py
executable file
·2662 lines (2418 loc) · 101 KB
/
CESAR_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""CESAR2.0 wrapper.
Retrieve exons from the query genome.
"""
import argparse
import os
import sys
import subprocess
import uuid
import math
from datetime import datetime as dt
from re import finditer, IGNORECASE
from collections import defaultdict
import ctypes
from operator import and_
from functools import reduce
from twobitreader import TwoBitFile
from modules.common import parts, bed_extract_id_text
from modules.common import bed_extract_id, chain_extract_id
from modules.common import make_cds_track
from modules.common import eprint
from modules.common import die
from modules.common import flatten
from modules.inact_mut_check import inact_mut_check
from modules.parse_cesar_output import parse_cesar_out
from constants import Constants
from constants import GENETIC_CODE
from constants import COMPLEMENT_BASE
from version import __version__
__author__ = "Bogdan M. Kirilenko"
LOCATION = os.path.dirname(os.path.abspath(__file__))
VERBOSE = False
TMP_NAME_SIZE = 10
EXP_REG_EXTRA_FLANK = 50
EXON_SEQ_FLANK = 10
ERR_CODE_FRAGM_ERR = 2
# alias; works for Hillerlab-only
two_bit_templ = "/projects/hillerlab/genome/gbdb-HL/{0}/{0}.2bit"
chain_alias_template = "/projects/hillerlab/genome/gbdb-HL/{0}/lastz/vs_{1}/axtChain/{0}.{1}.allfilled.chain.gz"
# connect shared lib; define input and output data types
chain_coords_conv_lib_path = os.path.join(
LOCATION, "modules", "chain_coords_converter_slib.so"
)
ch_lib = ctypes.CDLL(chain_coords_conv_lib_path)
ch_lib.chain_coords_converter.argtypes = [
ctypes.c_char_p,
ctypes.c_int,
ctypes.c_int,
ctypes.POINTER(ctypes.c_char_p),
]
ch_lib.chain_coords_converter.restype = ctypes.POINTER(ctypes.c_char_p)
# connect extract subchain lib
extract_subchain_lib_path = os.path.join(
LOCATION, "modules", "extract_subchain_slib.so"
)
DEFAULT_CESAR = os.path.join(LOCATION, "CESAR2.0", "cesar")
ex_lib = ctypes.CDLL(extract_subchain_lib_path)
ex_lib.extract_subchain.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p]
ex_lib.extract_subchain.restype = ctypes.POINTER(ctypes.c_char_p)
# blosum matrix address
BLOSUM_FILE = os.path.join(LOCATION, "supply", "BLOSUM62.txt")
GAP_SCORE = 0
INS_PEN = -1
DEL_PEN = -1
AA_FLANK = 15
SS_SIZE = 2
# CESAR2.0 necessary files location
FIRST_CODON_PROFILE = "extra/tables/human/firstCodon_profile.txt"
LAST_CODON_PROFILE = "extra/tables/human/lastCodon_profile.txt"
ACC_PROFILE = "extra/tables/human/acc_profile.txt"
DO_PROFILE = "extra/tables/human/do_profile.txt"
EQ_ACC_PROFILE = os.path.join(LOCATION, "supply", "eq_acc_profile.txt")
EQ_DO_PROFILE = os.path.join(LOCATION, "supply", "eq_donor_profile.txt")
def verbose(msg):
"""Write verbose in stderr if verbose == TRUE."""
eprint(msg) if VERBOSE else None
def parse_args():
"""Parse args, check and return."""
# parse args
app = argparse.ArgumentParser(description="")
app.add_argument("gene", type=str, help="working gene") # mandatory
app.add_argument(
"chains",
type=str,
help="chain ids, write 'region' if you like to type a region directly",
)
app.add_argument(
"bdb_bed_file", type=str, help="BDB BED FILE, text bed12 also works but slower."
)
app.add_argument(
"bdb_chain_file",
type=str,
help="BDB CHAIN FILE" "Or chain file, or even chain.gz file, but slower.",
)
app.add_argument("tDB", type=str, help="target db, alias or 2bit file")
app.add_argument("qDB", type=str, help="query db, alias or 2bit file")
app.add_argument(
"--ref",
default="hg38",
help="Use this is you like to use " "chain alias and your ref is not the hg38.",
)
app.add_argument(
"--cesar_binary", type=str, default=None, help="CESAR2.0 binary address."
)
app.add_argument(
"--cesar_input_save_to",
type=str,
default=None,
help="Where to save the cesar input file. /dev/shm/{whoami}/XXXXXX.fa is default.",
)
app.add_argument(
"--memlim", default="Auto", help="memory limit for CESAR, auto as default."
)
app.add_argument(
"--mask_stops",
"--ms",
action="store_true",
dest="mask_stops",
help="In case if there are inframe stop codons replace them with NNN.",
)
app.add_argument(
"--estimate_memory",
"-e",
action="store_true",
dest="estimate_memory",
help="do not compute anything, just return what amount of memory is required for this run.",
)
app.add_argument(
"--shift",
type=int,
default=2,
help="num of blocks in up/downstream regions, 2 is default",
)
app.add_argument(
"--raw_output",
"--ro",
default=None,
help="Save raw CESAR output, " "You can use either stdout of a file for that.",
)
app.add_argument(
"--cesar_output",
"--co",
default=None,
help="If CESAR output is already done, specify it here",
)
app.add_argument(
"--save_loci", default=None, help="Save gene: chain: locus dict to a file."
)
app.add_argument(
"--output",
default="stdout",
help="Final output, stdout as default "
"unreachable if params --raw_output or --estimate_memory are set.",
)
app.add_argument(
"--prot_out", "--po", default="stdout", help="Save protein sequence"
)
app.add_argument(
"--codon_out", "--cdo", default="stdout", help="Save codon alignment"
)
app.add_argument("--verbose", "-v", action="store_true", dest="verbose")
app.add_argument( # TODO: remove everything related to this parameter
"--exon_flank",
default=2,
type=int,
help="OBSOLETE When intersecting exons and intersected blocks"
"add flanks to extend exons range.",
)
app.add_argument(
"--gap_size",
default=10,
type=int,
help="How many gaps in a row consider as an assembly gap.",
)
app.add_argument(
"--gene_flank",
default=1000,
type=int,
help="Up and downstream in query genome.",
)
app.add_argument(
"--extra_flank",
"--ef",
default=0,
type=int,
help="Add bases up and down stream.",
)
app.add_argument(
"--query_len_limit",
"--ql",
default=0,
type=int,
help="Skip job if query is too long",
)
app.add_argument(
"--check_loss",
"--cl",
action="store_true",
dest="check_loss",
help="Check whether a gene found has inactivating mutations.",
)
app.add_argument(
"--print_inact_mut",
"--pim",
action="store_true",
dest="print_inact_mut",
help="Print gene loss data on the screen",
)
app.add_argument(
"--ic",
action="store_true",
dest="ic",
help="Invert complement query sequences (DANGER)",
)
app.add_argument(
"--paral",
action="store_true",
dest="paral",
help="Mark that this gene is called for paralogous chains",
)
app.add_argument("--u12", default=None, help="Add U12 exons data")
app.add_argument("--uhq_flank", default=50, type=int, help="UHQ flank filter")
app.add_argument(
"--no_fpi",
action="store_true",
dest="no_fpi",
help="Consider some frame-preserving mutations as inactivating. "
"See documentation for details.",
)
app.add_argument(
"--fragments",
"--fr",
action="store_true",
dest="fragments",
help="Stitch scaffolds -> in case of a fragmented genome",
)
app.add_argument(
"--precomputed_orth_loci",
default=None,
help="Path to loci saved with --save_orth_locus"
)
app.add_argument(
"--do_not_check_exon_chain_intersection",
default=False,
action="store_true",
dest="do_not_check_exon_chain_intersection",
help="Do not extract chain blocks (not recommended)"
)
app.add_argument(
"--alt_frame_del",
"--lfd",
default=False,
action="store_true",
dest="alt_frame_del",
help="Consider codons in alternative frame (between compensated FS) deleted"
)
app.add_argument(
"--mask_all_first_10p",
"--m_f10p",
action="store_true",
dest="mask_all_first_10p",
help="Automatically mask all inactivating mutations in first 10 percent of "
"the reading frame, ignoring ATG codons distribution."
)
app.add_argument(
"--temp_dir",
default=None,
help="Temp dir to store intermediate files.\n"
"In TOGA pipeline: $toga_output/temp/cesar_temp_files; "
"As a standalone script, by default: cesar_temp_files in the "
"directory where it was called"
)
if len(sys.argv) == 1:
app.print_help()
sys.exit(0)
# call the main
ret_args = app.parse_args()
# check args
die(
f"Error! --shift must be >=0, {ret_args.shift} given", 1
) if ret_args.shift < 0 else None
die(
f"Error! --exon_flank must be >=0 {ret_args.exon_flank} given", 1
) if ret_args.exon_flank < 0 else None
die("Error! --gap_size must be >= 1!", 1) if ret_args.gap_size < 1 else None
global VERBOSE
VERBOSE = True if ret_args.verbose else False
verbose("Program runs with the following params:")
for k, v in vars(ret_args).items():
verbose('"{0}": {1}'.format(k, v))
return ret_args
def read_bed(gene, bed_file):
"""Extract and parse gene-related bed track."""
try: # if it's berkeley db file
bed_track_raw = bed_extract_id(bed_file, gene)
except OSError:
# h5df cannot open this file: this is likely a text file
bed_track_raw = bed_extract_id_text(bed_file, gene)
# left CDS only
bed_track = make_cds_track(bed_track_raw)
# regular parsing of a bed-12 formatted file
bed_info = bed_track.split("\t")
# if not 12 fields -> then input is wrong
die("Error! Bed-12 required!", 1) if len(bed_info) != 12 else None
bed_data = {
"chrom": bed_info[0],
"chromStart": int(bed_info[1]),
"chromEnd": int(bed_info[2]),
"name": bed_info[3],
"strand": True if bed_info[5] == "+" else False,
}
block_sizes = [int(x) for x in bed_info[10].split(",") if x != ""]
block_starts = [
int(x) + bed_data["chromStart"] for x in bed_info[11].split(",") if x != ""
]
block_ends = [x[0] + x[1] for x in zip(block_starts, block_sizes)]
bed_data["blocks"] = list(zip(block_starts, block_ends))
bed_data["block_sizes"] = (
block_sizes[::-1] if not bed_data["strand"] else block_sizes
)
verbose("Bed data is:\n{0}".format(bed_data))
return bed_data
def revert(line):
"""Revert string and change with complement bases."""
line = line[::-1].upper()
new_str = "".join([COMPLEMENT_BASE.get(c) for c in line if COMPLEMENT_BASE.get(c)])
return new_str
def get_2bit_path(db_arg):
"""Check if alias and return a path to 2bit file."""
if os.path.isfile(db_arg): # not an alias
return db_arg # there is nothing to do
elif os.path.islink(db_arg):
return os.readlink(db_arg)
aliased = two_bit_templ.format(db_arg)
# check that it's a file
die(f"Error! Cannot find {aliased} file", 1) if not os.path.isfile(
aliased
) else None
return aliased
def find_chain_file(ref_name, chain_arg):
"""Return chain file path."""
if os.path.isfile(chain_arg):
# not an alias -> just return it
return chain_arg
# not a chain path/ Hillerlab-specific place!
chain_path = chain_alias_template.format(ref_name, chain_arg)
die(
f"Error! File {chain_path} not found! Please set the chain path explicitly.", 1
) if not os.path.isfile(chain_path) else None
return chain_path
def get_exons(bed_data, t_db):
"""Extract exons sequences for reference."""
exons_raw = (
bed_data["blocks"][::-1] if not bed_data["strand"] else bed_data["blocks"]
)
exons_pos = {} # contain exon_num : positions
s_sites = []
exon_flanks = {}
for num, exon in enumerate(exons_raw):
# start, end if strand == + end, start otherwise
# need to know start and end to extract from 2bit file
exons_pos[num] = (
(int(exon[0]), int(exon[1]))
if bed_data["strand"]
else (int(exon[1]), int(exon[0]))
)
max_exon_num = max(exons_pos.keys())
_all_positions = sorted(flatten(exons_pos.values()))
gene_borders = {_all_positions[0], _all_positions[-1]}
# extract sequences
exons_seq = {} # exon number: sequence dict
target_genome = TwoBitFile(get_2bit_path(t_db)) # use 2bitreader library
get_chr = bed_data["chrom"]
try:
chrom_seq = target_genome[get_chr]
except KeyError:
chrom_seq = [] # to suppress PyCharm analyzer
die(f"Error! Cannot find chrom {get_chr} in 2bit file {t_db}")
verbose("\nExons sequences ####\n")
for num, pos in exons_pos.items():
is_first_exon = num == 0
is_last_exon = num == max_exon_num
# for twoBitToFa start must be < end
# determine search start and end
# do not subtract/add SS_SIZE if gene border: no splice sites then
min_pos = min(pos)
max_pos = max(pos)
start = min_pos - SS_SIZE if min_pos not in gene_borders else min_pos
end = max_pos + SS_SIZE if max_pos not in gene_borders else max_pos
# get exon 10 bp flanks:
left_brd_ = min_pos - EXON_SEQ_FLANK if min_pos - EXON_SEQ_FLANK > 0 else 0
left_flank_coord = (left_brd_, min_pos)
right_flank_coord = (max_pos, max_pos + EXON_SEQ_FLANK)
left_flank = chrom_seq[left_flank_coord[0]: left_flank_coord[1]].upper()
right_flank = chrom_seq[right_flank_coord[0]: right_flank_coord[1]].upper()
# correct for strand:
left_flank = left_flank if bed_data["strand"] else revert(left_flank)
right_flank = right_flank if bed_data["strand"] else revert(right_flank)
if not bed_data["strand"]:
left_flank, right_flank = right_flank, left_flank
# placeholder in case we coudld not extract flanks
left_flank = left_flank if len(left_flank) > 0 else "X"
right_flank = right_flank if len(right_flank) > 0 else "X"
exon_seq_raw = chrom_seq[start:end].upper()
# revert if negative strand
exon_seq_w_ss = revert(exon_seq_raw) if not bed_data["strand"] else exon_seq_raw
# trim splice sites
if not is_first_exon and not is_last_exon:
# both splice sites are here
exon_seq = exon_seq_w_ss[SS_SIZE:-SS_SIZE]
acc_ = exon_seq_w_ss[:SS_SIZE]
don_ = exon_seq_w_ss[-SS_SIZE:]
elif is_first_exon and is_last_exon:
# no splice sites
exon_seq = exon_seq_w_ss
acc_, don_ = "NN", "NN"
elif is_first_exon:
exon_seq = exon_seq_w_ss[:-SS_SIZE]
acc_ = "NN"
don_ = exon_seq_w_ss[-SS_SIZE:]
elif is_last_exon:
exon_seq = exon_seq_w_ss[SS_SIZE:]
acc_ = exon_seq_w_ss[:SS_SIZE]
don_ = "NN"
else:
raise RuntimeError("Unreachable branch reached")
s_sites.append(acc_)
s_sites.append(don_)
exons_seq[num] = exon_seq
exon_flanks[num] = {"L": left_flank, "R": right_flank}
verbose(f"Exon {num} in the range {pos}; sequence:\n{exon_seq}")
return exons_pos, exons_seq, s_sites, exon_flanks
def check_ref_exons(exon_seqs, mask_stops):
"""Check if the reference sequence is correct.
Should start with ATG and end with a stop.
Mask_stops controls handling of inframe stops.
"""
sec_codons = set() # in case there are TGA codons in the ref seq -> collect them
gene_seq = "".join([exon_seqs[i] for i in range(len(exon_seqs.keys()))])
codons = parts(gene_seq, n=3) # split a seq of letters in chunks of len == 3
if codons[0] != "ATG":
eprint("Input is corrupted! Reference sequence should start with ATG!")
elif codons[-1] not in Constants.STOP_CODONS:
eprint("Input is corrupted! Reference sequence should end with a stop codon!")
stop_codons = [(n, c) for n, c in enumerate(codons[:-1]) if c in Constants.STOP_CODONS]
if len(stop_codons) == 0: # no stop codons -> nothing else to do
return exon_seqs, set()
# there are stop codons in reference sequence:
eprint("Warning! There are inframe stop codons!")
for stop in stop_codons:
eprint(f"Codon num {stop[0] + 1} - {stop[1]}")
codons[stop[0]] = Constants.NNN_CODON if mask_stops else codons[stop[0]]
if stop[1] == "TGA":
# maybe a sec codon
sec_codons.add(stop[0])
eprint(">>>STOP_CODON>>>") if not mask_stops else None
die("Abort, there are inframe stop codons.", 0) if not mask_stops else None
# if stop codons in reference are allowed, then we need to mask them (rewrite as NNN)
# otherwise CESAR will show an error
safe_seq = "".join(codons)
stop_masked = {}
prev_index = 0
for num, exon_seq in exon_seqs.items():
exon_len = len(exon_seq)
stop_masked[num] = safe_seq[prev_index: prev_index + exon_len]
prev_index += exon_len
return stop_masked, sec_codons
def prepare_exons_for_cesar(exon_seqs):
"""Cesar requires some special formatting for exons."""
# CESAR requires a special king of reference exons formatting
# Split codons should be written in lowercase, as follows:
# ATGTTTa ctGTAAAGTGCc ttAGTTGA
verbose("prepare_exons_for_cesar")
left_pointer = 0 # init value
cesar_input_exons = {} # accumulate result here
for k, exon_seq in exon_seqs.items():
# define number of letters to lowercase at the right side
right_pointer = (len(exon_seq) - left_pointer) % 3
# apply left pointer
if left_pointer != 3: # it it were accumulated 3 bases it has no sense like 0
exon_seq_lfix = (
exon_seq[:left_pointer].lower() + exon_seq[left_pointer:].upper()
)
else: # don't touch if left_pointer == 0 or == 3
exon_seq_lfix = exon_seq
# re-define left pointer
left_pointer = 3 - right_pointer
# apply right-side pointer
if right_pointer != 0:
exon_seq_rfix = (
exon_seq_lfix[:-right_pointer] + exon_seq_lfix[-right_pointer:].lower()
)
else: # save as it was
exon_seq_rfix = exon_seq_lfix
# save prepared exon into special dict
cesar_input_exons[k] = exon_seq_rfix
return cesar_input_exons
def get_chain(chain_file, chain_id):
"""Return chain string according the parameters passed."""
chain = None # to calm IDE down
if chain_file.endswith(".bst"):
# we have bdb file; extract with BDB extractor
chain = chain_extract_id(chain_file, chain_id)
return chain
elif chain_file.endswith(".gz"): # a gzipped chain file was given
# gzip and redirect scteam to chain_filter_by_id binary
extract_by_id_cmd = (
f"gzip -dc {chain_file} | ./modules/chain_filter_by_id stdin {chain_id}"
)
try: # check that output is OK
chain = subprocess.check_output(extract_by_id_cmd, shell=True).decode(
"utf-8"
)
except subprocess.CalledProcessError:
# die if the command died
die(
"Error! Process {extract_by_id_cmd} died! Please check if input data is correct",
1,
)
return chain
else: # just a chain file, extract the chain we need
# the same as above, but without gzip stage
extract_by_id_cmd = f"./modules/chain_filter_by_id {chain_file} {chain_id}"
try: # also check if output is OK, die otherwise
chain = subprocess.check_output(extract_by_id_cmd, shell=True).decode(
"utf-8"
)
except subprocess.CalledProcessError:
die(
"Error! Process {extract_by_id_cmd} died! Please check if input data is correct",
1,
)
return chain
def range_corrector(g_range):
"""Swap start and end if start > end."""
chrom, start_end = g_range.split(":")
start_end_split = start_end.split("-")
start, end = int(start_end_split[0]), int(start_end_split[1])
if start < end:
return g_range
else:
return f"{chrom}:{end}-{start}"
def chain_cut(chain_str, gene_range, gene_flank, extra_flank=0):
"""Call chain_cut binary.
Project reference gene coordinates to query through a chain.
Also add flanks if shift is > 0.
"""
# need to get genomic region for the gene
# also need to translate python data types to C
# to call the shared library; I do it 2 times here
# for shift = 0 and shifts = 2 (add flanks around gene)
c_chain = ctypes.c_char_p(chain_str.encode())
c_shift_2 = ctypes.c_int(2)
c_shift_0 = ctypes.c_int(0)
granges_num = 1
c_granges_num = ctypes.c_int(granges_num) # we need only one grange to analyze
granges_arr = (ctypes.c_char_p * (granges_num + 1))() # granges_num + 1
granges_bytes = [gene_range.encode("utf-8")]
# need to do this tricks to pass strings array to C
granges_arr[:-1] = granges_bytes
granges_arr[granges_num] = None
raw_ch_conv_s2 = ch_lib.chain_coords_converter(
c_chain, c_shift_2, c_granges_num, granges_arr
)
chain_coords_conv_out_s2 = [] # keep lines here
# convert C output to python-readable type
for i in range(granges_num + 1):
chain_coords_conv_out_s2.append(raw_ch_conv_s2[i].decode("utf-8"))
# chain', 'chr5', '+', '137889395', '148245211', 'chr18', '+', '34409342', '44120958
chain_data = chain_coords_conv_out_s2[0].split(" ")
t_strand = True if chain_data[2] == "+" else False
q_strand = True if chain_data[7] == "+" else False
t_size = int(chain_data[3])
q_size = int(chain_data[8])
# re-define arrays to avoid segfault
c_chain = ctypes.c_char_p(chain_str.encode())
granges_arr = (ctypes.c_char_p * (granges_num + 1))() # granges_num + 1
granges_bytes = [gene_range.encode("utf-8")]
granges_arr[:-1] = granges_bytes
granges_arr[granges_num] = None
raw_ch_conv_s0 = ch_lib.chain_coords_converter(
c_chain, c_shift_0, c_granges_num, granges_arr
)
chain_coords_conv_out_s0 = [] # keep lines here
# convert C output to python-readable type
for i in range(granges_num + 1):
chain_coords_conv_out_s0.append(raw_ch_conv_s0[i].decode("utf-8"))
# another approach to detect range
# sometimes blocks go so far
# ------------------genegene-------------------
# block-------------blockblock------------block
# to avoid very huge query sequences program controls it's size
search_region_shift_str = range_corrector(
chain_coords_conv_out_s2[1].split("\t")[1]
)
search_region_abs_str = range_corrector(chain_coords_conv_out_s0[1].split("\t")[1])
chrom = search_region_shift_str.split(":")[0]
search_reg_shift = [
int(x) for x in search_region_shift_str.split(":")[1].split("-")
]
search_reg_abs = [int(x) for x in search_region_abs_str.split(":")[1].split("-")]
search_reg_flanked = [
search_reg_abs[0] - gene_flank,
search_reg_abs[1] + gene_flank,
]
# define actual starts and ends
act_start = (
search_reg_shift[0]
if search_reg_shift[0] > search_reg_flanked[0]
else search_reg_flanked[0]
)
act_end = (
search_reg_shift[1]
if search_reg_shift[1] < search_reg_flanked[1]
else search_reg_flanked[1]
)
if extra_flank > 0: # add extra flanks if required
act_start = act_start - extra_flank if act_start - extra_flank > 0 else 0
act_end = (
act_end + extra_flank if act_end + extra_flank < q_size else q_size - 1
)
act_search_range = f"{chrom}:{act_start}-{act_end}"
# ext_search_range = f"{chrom}:{}-{}"
del raw_ch_conv_s0 # not sure if this is necessary
del raw_ch_conv_s2 # but just in case
return (
act_search_range,
search_region_shift_str,
(t_strand, t_size, q_strand, q_size),
)
def extract_subchain(chain_str, search_locus):
"""Extract subchain containing only the search locus."""
# convert python types to C types
c_chain = ctypes.c_char_p(chain_str.encode())
c_mode = ctypes.c_char_p("q".encode())
c_search = ctypes.c_char_p(search_locus.encode())
# call the library finally
ex_out_raw = ex_lib.extract_subchain(c_chain, c_mode, c_search)
ex_out = []
for elem in ex_out_raw:
line = elem.decode("utf-8")
if line == "END":
break
ex_out.append(line)
blocks = [[int(x) for x in elem.split()] for elem in ex_out]
if len(blocks) == 0: # die an show chain id
chain_id = chain_str.split("\n")[0].split()[-1]
die(f"Error! No overlapping blocks for chain {chain_id} found!", 1)
del ex_out_raw # maybe it's necessary
return blocks
def orient_blocks(subchain_blocks_raw, chain_data):
"""Create block num: coordinates dict.
Orient them in correct direction.
Add interblock regions, like block 1_2 between blocks 1 and 2.
"""
block_ranges = {}
t_strand, t_size, q_strand, q_size = chain_data
for i in range(len(subchain_blocks_raw)):
i_block = subchain_blocks_raw[i]
t_start = i_block[0] if t_strand else t_size - i_block[1]
t_end = i_block[1] if t_strand else t_size - i_block[0]
q_start = i_block[2] if q_strand else q_size - i_block[3]
q_end = i_block[3] if q_strand else q_size - i_block[2]
block_ranges[str(i)] = (
min(t_start, t_end),
max(t_start, t_end),
min(q_start, q_end),
max(q_start, q_end),
)
# if there is only one block (weird but possible) --> no gaps between blocks
if len(block_ranges) == 1:
return block_ranges
# make interblock gaps
blocks_num = len(block_ranges.keys())
for i in range(1, blocks_num):
prev = block_ranges[str(i - 1)]
current = block_ranges[str(i)]
# connect xEnd_prev to xStart_current
inter_t_start = prev[1] if t_strand else prev[0]
inter_t_end = current[0] if t_strand else current[1]
inter_q_start = prev[3] if q_strand else prev[2]
inter_q_end = current[2] if q_strand else current[3]
interblock_data = (inter_t_start, inter_t_end, inter_q_start, inter_q_end)
block_ranges[f"{i - 1}_{i}"] = interblock_data
return block_ranges
def intersect_ranges(range_1, range_2):
"""If > 0: ranges intersect."""
range_1 = range_1 if range_1[1] > range_1[0] else range_1[::-1]
range_2 = range_2 if range_2[1] > range_2[0] else range_2[::-1]
return min(range_1[1], range_2[1]) - max(range_1[0], range_2[0])
def get_aa_ex_cov(exon_range, i_block_coords):
"""Get number of bases covered in target and query."""
# deal with t_cov first
# chain blocks without _ in name - real blocks, len in T == len in Q
# blocks with _ -> not aligned
# will get exon len + flanks - len of blocks with _
flanked_exon_len = max(exon_range) - min(exon_range)
exon_range = (min(exon_range), max(exon_range))
interblock_reg = [x[1] for x in i_block_coords if "_" in x[0]]
if len(interblock_reg) == 0:
# single block covers the exon + flanks: ideal case
return flanked_exon_len, flanked_exon_len, flanked_exon_len
# select blocks, ignore
cov_blocks_not_trimmed = [x[1] for x in i_block_coords if "_" not in x[0]]
t_cov_blocks_trimmed = []
# trim blocks | convert this:
# --------exonexonexonexonexon--------
# --blockclobk===blockblock===========
# to:
# --------exonexonexonexonexon--------
# --------block==blockblock===========
for block in cov_blocks_not_trimmed:
t_start, t_end = block[0], block[1]
t_block_vals = (t_start, t_end)
t_block = [min(t_block_vals), max(t_block_vals)]
start_within = exon_range[0] <= t_block[0] <= exon_range[1]
end_within = exon_range[0] <= t_block[1] <= exon_range[1]
if not start_within and not end_within:
# something impossible
continue
# die("Impossible block configuration in get_aa_ex_cov!")
if start_within and end_within:
t_cov_blocks_trimmed.append(t_block)
continue
elif not start_within:
t_block[0] = exon_range[0]
t_cov_blocks_trimmed.append(t_block)
continue
else:
t_block[1] = exon_range[1]
t_cov_blocks_trimmed.append(t_block)
continue
t_cov = sum([abs(x[1] - x[0]) for x in t_cov_blocks_trimmed])
t_cov = t_cov if t_cov > 0 else 0
# now compute q_cov
q_blocks_within = []
for block in interblock_reg:
t_start, t_end = block[0], block[1]
q_start, q_end = block[2], block[3]
q_block_vals = (q_start, q_end)
t_block_vals = (t_start, t_end)
t_block = [min(t_block_vals), max(t_block_vals)]
q_block = [min(q_block_vals), max(q_block_vals)]
start_within = exon_range[0] <= t_block[0] <= exon_range[1]
end_within = exon_range[0] <= t_block[1] <= exon_range[1]
if not start_within or not end_within:
continue
q_blocks_within.append(q_block)
q_corr_reg = sum([abs(x[1] - x[0]) for x in q_blocks_within])
q_cov = t_cov + q_corr_reg
q_cov = q_cov if q_cov > 0 else 0
return flanked_exon_len, t_cov, q_cov
def intersect_exons_blocks_gaps(
exon_coordinates, subchain_blocks, gap_coordinates, flank, uhq_flank
):
"""Intersect exons, chain blocks and gaps.
Create the following dictionaries:
exon_num: intersected chain blocks (list)
chain_block: intersects gap or not (bool)
"""
# make a pseudoblock --> for the entire chain
# all T - all chain blocks in reference coordinates
all_t = [b[0] for b in subchain_blocks.values()] + [
b[1] for b in subchain_blocks.values()
]
# all Q - all chain blocks in query coordinates
all_q = [b[2] for b in subchain_blocks.values()] + [
b[3] for b in subchain_blocks.values()
]
global_block = (min(all_t), max(all_t), min(all_q), max(all_q))
# get a dict of exons with flanks
exon_flank_coordinates = {}
exon_aa_flank_coordinates = {} # just to check that AA criteria satisfied
marginal_cases = {e: False for e in exon_coordinates.keys()}
for exon_num, exon_coords in exon_coordinates.items():
# get exon -> flanked coordinates
exon_start, exon_end = min(exon_coords), max(exon_coords)
exon_size = exon_end - exon_start
flanked_exon_start = exon_start - flank
flanked_exon_end = exon_end + flank
aa_flanked_start = exon_start - uhq_flank
aa_flanked_end = exon_end + uhq_flank
exon_flank_coordinates[exon_num] = (flanked_exon_start, flanked_exon_end)
exon_aa_flank_coordinates[exon_num] = (aa_flanked_start, aa_flanked_end)
# also check if a block outside the chain
glob_intersect = intersect_ranges(
exon_coords, (global_block[0], global_block[1])
)
if glob_intersect != exon_size:
# exon doesn't intersect chain -> put to marginal cases
marginal_cases[exon_num] = True
# find intersections for blocks and assembly gaps
exon_num_blocks = defaultdict(list)
fl_exon_num_blocks = defaultdict(list)
aa_exon_num_blocks = defaultdict(list)
block_gaps = {b: False for b in subchain_blocks.keys()}
for block_num, block_coords in subchain_blocks.items():
# try intersection with all exons
for exon_num, exon_coords in exon_coordinates.items():
flanked_coords = exon_flank_coordinates.get(exon_num)
aa_flank_coords = exon_aa_flank_coordinates.get(exon_num)
intersect_norm = intersect_ranges(
exon_coords, (block_coords[0], block_coords[1])
)
intersect_flanked = intersect_ranges(
flanked_coords, (block_coords[0], block_coords[1])
)
intersect_aa_class = intersect_ranges(
aa_flank_coords, (block_coords[0], block_coords[1])
)
if intersect_norm >= 0:
exon_num_blocks[exon_num].append(block_num)
if intersect_flanked >= 0:
fl_exon_num_blocks[exon_num].append(block_num)
if intersect_aa_class >= 0:
aa_exon_num_blocks[exon_num].append(block_num)
# and gaps
for gap_num, gap in enumerate(gap_coordinates):
intersect = intersect_ranges(gap, (block_coords[2], block_coords[3]))
if intersect <= 0:
continue
block_gaps[block_num] = gap_num
verbose_msg = (
f"Block num {block_num} in coords t:{block_coords[0]}-{block_coords[1]} "
f"q:{block_coords[2]}-{block_coords[3]} intersects gap {gap[0]}-{gap[1]}"
)
verbose(verbose_msg)
# get missed exons to exclude
missing_exons = [e for e in exon_coordinates.keys() if not exon_num_blocks.get(e)]
not_covered_str = ", ".join([str(e) for e in missing_exons])
verbose(f"Exons:\n{not_covered_str}\nare not covered by chain") if len(missing_exons) > 0 else None
verbose(f"Flank size is: {flank}")
for exon_num in sorted(exon_num_blocks.keys()):
verbose(f"Exon {exon_num} intersects blocks {exon_num_blocks.get(exon_num)}")
verbose(
f"Exon {exon_num} with flanks intersects blocks {fl_exon_num_blocks.get(exon_num)}"
)
verbose(
f"Exon {exon_num} with AA flanks intersects blocks {aa_exon_num_blocks.get(exon_num)}"
)
verbose("\n")
aa_sat = {}
for exon_num, blocks in aa_exon_num_blocks.items():
exon_range = exon_aa_flank_coordinates[exon_num]
i_block_coords = [(b, subchain_blocks[b]) for b in blocks]
fex_len, t_cov, q_cov = get_aa_ex_cov(exon_range, i_block_coords)
# allow 20% deviation
# if corresponding regions are in this range: it's OK
dev_thr = uhq_flank * 0.2
lo_, hi_ = fex_len - dev_thr, fex_len + dev_thr
t_cov_ok = lo_ <= t_cov <= hi_
q_cov_ok = lo_ <= q_cov <= hi_
cond_sat_ = t_cov_ok is True and q_cov_ok is True
aa_sat[exon_num] = True if cond_sat_ else False
return (
exon_num_blocks,
fl_exon_num_blocks,
block_gaps,
missing_exons,
exon_flank_coordinates,
marginal_cases,
aa_sat,
)
def sort_blocks(blocks_num):
"""Return block: index and index: block dicts."""
block_to_index, index_to_block = {}, {}
pointer = -1
for i in range(blocks_num):
if i % 2 == 0:
# if so, it is a real block
pointer += 1
block_id = str(pointer)
else: # otherwise an interblock range
block_id = f"{pointer}_{pointer + 1}"
block_to_index[block_id] = i
index_to_block[i] = block_id
return block_to_index, index_to_block
def classify_predict_exons(exon_blocks, block_coordinates, margin_cases):
"""Classify exons and get expected coordinates."""
exon_class, exon_exp_q_region = {}, {}
block_deleted = {}
block_to_index, index_to_block = sort_blocks(len(block_coordinates))
# for each block compute if it corresponds to gap in query
for block_num, block_coords in block_coordinates.items():
block_deleted[block_num] = True if block_coords[2] == block_coords[3] else False
for exon_num, blocks in exon_blocks.items():
# extract expected query region
block_indexes = sorted([block_to_index.get(b) for b in blocks])
start_block_id = index_to_block.get(block_indexes[0])
end_block_id = index_to_block.get(block_indexes[-1])
start_block_coords = block_coordinates.get(start_block_id)
end_block_coords = block_coordinates.get(end_block_id)
exp_q_region_start = min(start_block_coords[2], end_block_coords[2])
exp_q_region_end = max(start_block_coords[3], end_block_coords[3])
exon_exp_q_region[exon_num] = (exp_q_region_start, exp_q_region_end)
margin_exon = margin_cases.get(exon_num)
# assign the class
if margin_exon:
# in case of margin exon the following happens:
# exonexonexon
# chainchainchain===chainchain
# so exon is not covered by the chain partly
this_exon_class = "M"
elif "_" not in start_block_id and "_" not in end_block_id:
# both start and end of the exon are covered, class A
# ----------exonexonexon----------
# ---chainchain====chaincha=======
# ==========chainchainchain-------
this_exon_class = "A"
# >ENST00000464162 | 1 | 4482 | KK498653:40658904-40658593 | 59.81 | OK | A |
# exp:40655721-40655810 | EXCL
elif start_block_id == end_block_id and "_" in start_block_id:
# exon unaligned or completely removed, class C
# ----------exonexonexon----------
# ================================
# --------------------------------