-
Notifications
You must be signed in to change notification settings - Fork 51
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added cht_data module, forgot to add to repo earlier, but required fo…
…r fit_bnb_coefficients
- Loading branch information
Showing
1 changed file
with
225 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,225 @@ | ||
import sys | ||
import gzip | ||
|
||
import numpy as np | ||
|
||
class TestSNP: | ||
def __init__(self, name, geno_hap1, geno_hap2, AS_target_ref, AS_target_alt, | ||
hetps, totals, counts): | ||
self.name = name | ||
self.geno_hap1 = geno_hap1 | ||
self.geno_hap2 = geno_hap2 | ||
self.AS_target_ref = AS_target_ref | ||
self.AS_target_alt = AS_target_alt | ||
self.hetps = hetps | ||
self.totals = totals | ||
self.counts = counts | ||
|
||
|
||
def is_het(self): | ||
"""returns True if the test SNP is heterozygous""" | ||
return self.geno_hap1 != self.geno_hap2 | ||
|
||
def is_homo_ref(self): | ||
"""Returns True if test SNP is homozygous for reference allele""" | ||
return self.geno_hap1 == 0 and self.geno_hap2 == 0 | ||
|
||
def is_homo_alt(self): | ||
"""Returns True if test SNP is homozygous for non-reference allele""" | ||
return self.geno_hap1 == 1 and self.geno_hap2 == 1 | ||
|
||
|
||
dup_snp_warn = True | ||
|
||
|
||
def parse_test_snp(snpinfo, shuffle=False): | ||
global dup_snp_warn | ||
snp_id = snpinfo[2] | ||
|
||
tot = 0 if snpinfo[16] == "NA" else float(snpinfo[16]) | ||
|
||
if snpinfo[6] == "NA": | ||
geno_hap1 = 0 | ||
geno_hap2 = 0 | ||
else: | ||
geno_hap1 = int(snpinfo[6].strip().split("|")[0]) | ||
geno_hap2 = int(snpinfo[6].strip().split("|")[1]) | ||
|
||
count = 0 if snpinfo[15] == "NA" else int(snpinfo[15]) | ||
|
||
if snpinfo[9].strip() == "NA" or geno_hap1 == geno_hap2: | ||
# SNP is homozygous, so there is no AS info | ||
return TestSNP(snp_id, geno_hap1, geno_hap2, [], [], [], tot, count) | ||
else: | ||
# positions of target SNPs | ||
snp_locs = np.array([int(y.strip()) for y in snpinfo[9].split(';')]) | ||
|
||
# counts of reads that match reference overlapping linked 'target' SNPs | ||
snp_as_ref = np.array([int(y) for y in snpinfo[12].split(';')]) | ||
|
||
# counts of reads that match alternate allele | ||
snp_as_alt = np.array([int(y) for y in snpinfo[13].split(';')]) | ||
|
||
# heterozygote probabilities | ||
snp_hetps = np.array([np.float64(y.strip()) | ||
for y in snpinfo[10].split(';')]) | ||
|
||
# linkage probabilities, not currently used | ||
snp_linkageps = np.array([np.float64(y.strip()) | ||
for y in snpinfo[11].split(';')]) | ||
|
||
|
||
# same SNP should not be provided multiple times, this | ||
# can create problems with combined test. Warn and filter | ||
# duplicate SNPs | ||
uniq_loc, uniq_idx = np.unique(snp_locs, return_index=True) | ||
|
||
if dup_snp_warn and uniq_loc.shape[0] != snp_locs.shape[0]: | ||
sys.stderr.write("WARNING: discarding SNPs that are repeated " | ||
"multiple times in same line\n") | ||
# only warn once | ||
dup_snp_warn = False | ||
|
||
snp_as_ref = snp_as_ref[uniq_idx] | ||
snp_as_alt = snp_as_alt[uniq_idx] | ||
snp_hetps = snp_hetps[uniq_idx] | ||
|
||
# linkage probabilities currently not used | ||
snp_linkageps = snp_linkageps[uniq_idx] | ||
|
||
if shuffle: | ||
# permute allele-specific read counts by flipping them randomly at | ||
# each SNP | ||
for y in range(len(snp_as_ref)): | ||
if random.randint(0, 1) == 1: | ||
temp = snp_as_ref[y] | ||
snp_as_ref[y] = snp_as_alt[y] | ||
snp_as_alt[y] = temp | ||
|
||
return TestSNP(snp_id, geno_hap1, geno_hap2, snp_as_ref, | ||
snp_as_alt, snp_hetps, tot, count) | ||
|
||
|
||
|
||
|
||
|
||
def open_input_files(in_filename): | ||
if not os.path.exists(in_filename) or not os.path.isfile(in_filename): | ||
raise IOError("input file %s does not exist or is not a " | ||
"regular file\n" % in_filename) | ||
|
||
# read file that contains list of input files | ||
in_file = open(in_filename) | ||
|
||
infiles = [] | ||
for line in in_file: | ||
# open each input file and read first line | ||
filename = line.rstrip() | ||
sys.stderr.write(" " + filename + "\n") | ||
if (not filename) or (not os.path.exists(filename)) or \ | ||
(not os.path.isfile(filename)): | ||
sys.stderr.write("input file '%s' does not exist or is not a " | ||
"regular file\n" % in_file) | ||
exit(2) | ||
if filename.endswith(".gz"): | ||
f = gzip.open(filename, "rt") | ||
else: | ||
f = open(filename) | ||
|
||
# skip header | ||
f.readline() | ||
|
||
infiles.append(f) | ||
in_file.close() | ||
|
||
if len(infiles) == 0: | ||
sys.stderr.write("no input files specified in file '%s'\n" % in_filename) | ||
exit(2) | ||
|
||
return infiles | ||
|
||
|
||
|
||
|
||
|
||
|
||
def read_count_matrices(input_filename, shuffle=False, skip=0, min_counts=0, min_as_counts=0): | ||
"""Given an input file that contains paths to input files for all individuals, and returns | ||
matrix of observed read counts, and matrix of expected read counts | ||
""" | ||
infiles = open_input_files(input_filename) | ||
|
||
is_finished = False | ||
count_matrix = [] | ||
expected_matrix = [] | ||
line_num = 0 | ||
skip_num = 0 | ||
|
||
while not is_finished: | ||
is_comment = False | ||
line_num += 1 | ||
count_line = [] | ||
expected_line = [] | ||
num_as = 0 | ||
|
||
for i in range(len(infiles)): | ||
# read next row from this input file | ||
line = infiles[i].readline().strip() | ||
|
||
if line.startswith("#") or line.startswith("CHROM"): | ||
# skip comment lines and header line | ||
is_comment = True | ||
elif line: | ||
if is_finished: | ||
raise IOError("All input files should have same number of lines. " | ||
"LINE %d is present in file %s, but not in all input files\n" | ||
% (line_num, infiles[i].name)) | ||
if is_comment: | ||
raise IOError("Comment and header lines should be consistent accross " | ||
"all input files. LINE %d is comment or header line in some input files " | ||
"but not in file %s" % (line_num, infiles[i].name)) | ||
|
||
# parse test SNP and associated info from input file row | ||
new_snp = parse_test_snp(line.split(), shuffle=shuffle) | ||
if new_snp.is_het(): | ||
num_as += np.sum(new_snp.AS_target_ref) + \ | ||
np.sum(new_snp.AS_target_alt) | ||
|
||
count_line.append(new_snp.counts) | ||
expected_line.append(new_snp.totals) | ||
|
||
else: | ||
# out of lines from at least one file, assume we are finished | ||
is_finished = True | ||
|
||
if not is_finished and not is_comment: | ||
if skip_num < skip: | ||
# skip this row | ||
skip_num += 1 | ||
else: | ||
if(sum(count_line) >= min_counts and num_as >= min_as_counts): | ||
# this line exceeded minimum number of read counts and AS counts | ||
count_matrix.append(count_line) | ||
expected_matrix.append(expected_line) | ||
skip_num = 0 | ||
|
||
count_matrix = np.array(count_matrix, dtype=int) | ||
expected_matrix = np.array(expected_matrix, dtype=np.float64) | ||
|
||
sys.stderr.write("count_matrix dimension: %s\n" % str(count_matrix.shape)) | ||
sys.stderr.write("expect_matrix dimension: %s\n" % str(expected_matrix.shape)) | ||
|
||
nrow = count_matrix.shape[0] | ||
if (options.sample > 0) and (options.sample < count_matrix.shape): | ||
# randomly sample subset of rows without replacement | ||
sys.stderr.write("randomly sampling %d target regions\n" % options.sample) | ||
samp_index = np.arange(nrow) | ||
np.random.shuffle(samp_index) | ||
samp_index = samp_index[:options.sample] | ||
count_matrix = count_matrix[samp_index,] | ||
expected_matrix = expected_matrix[samp_index,] | ||
|
||
sys.stderr.write("new count_matrix dimension: %s\n" % str(count_matrix.shape)) | ||
sys.stderr.write("new expect_matrix dimension: %s\n" % str(expected_matrix.shape)) | ||
|
||
return count_matrix, expected_matrix |