-
Notifications
You must be signed in to change notification settings - Fork 0
/
chunktwentytwo.py
108 lines (81 loc) · 3.9 KB
/
chunktwentytwo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#Short chunking script V0.22 (added directory functionality)
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
import sys
import os
import glob
#Take in user input for algorithm parameters
if len(sys.argv) != 4:
print("Incorrect script call, ensure call is formatted as: python3 chunketc.py <input_directory> <output directory> <chunksize>")
sys.exit(1)
#Read in name of the file and the desired size of chunks in nucleotides
input_directory = sys.argv[1]
output_file = sys.argv[2]
chunk_size = int(sys.argv[3])
#Use SeqIO.parse instead of read for multifasta file IMPORTANT
#records = list(SeqIO.parse(input_file, "fasta")) <<< moved to multi_batch function
def create_batch(records, chunk_size):
record_it = iter(records)
record = next(record_it)
current_base = 0
batch = []
batch_size = 0
while record:
#Loop over records untill the batch is full. (or no new records)
while batch_size != chunk_size and record:
end = current_base + chunk_size - batch_size
seq = record[current_base:end]
end_of_slice = current_base + len(seq) - 1
#Use record.id for pure name, record.description for contig details
fasta_header = record.description + ":{}-{}".format(current_base, end_of_slice)
seq.id = seq.name = fasta_header
seq.description = ''
batch.append(seq)
current_base += len(seq)
batch_size += len(seq)
#Current record is exhausted, get a new one.
if current_base >= len(record):
record = next(record_it, None)
current_base = 0
#Extracted batch with the correct size
yield batch
batch = []
batch_size = 0
def group_batch(input_directory, output_file, chunk_size):
#Ensure output directory exists
output_dir = os.path.dirname(output_file)
if output_dir:
os.makedirs(output_dir, exist_ok = True)
#Initialize a list to store chunks
all_chunks = []
#Obtain total files to process
total_files = len(glob.glob(os.path.join(input_directory, '*.fna'))) + len(glob.glob(os.path.join(input_directory, '*.fasta')))
processed_files = 0
#Iterate over .fna files individually within input directory
for ind_file in glob.glob(os.path.join(input_directory, '*.fna')):
records = list(SeqIO.parse(ind_file, "fasta"))
#Iterate over individual batches and run chunking
for i, batch in enumerate(create_batch(records, chunk_size)):
all_chunks.extend(batch)
processed_files += 1
print(f"\rProcessed {processed_files}/{total_files} files", end='', flush=True)
#Iterate over .fasta files individually within input directory
for ind_file in glob.glob(os.path.join(input_directory, '*.fasta')):
records = list(SeqIO.parse(ind_file, "fasta"))
#Iterate over individual batches and run chunking
for i, batch in enumerate(create_batch(records, chunk_size)):
all_chunks.extend(batch)
processed_files += 1
print(f"\rProcessed {processed_files}/{total_files} files", end='', flush=True)
#Write chunks to a singular multifasta file
SeqIO.write(all_chunks, output_file, "fasta")
print("\nSuccessfully created multifasta file:", output_file)
group_batch(input_directory, output_file, chunk_size)
#DEPRECATED
#Ensure output directory exists
#os.makedirs(output_directory, exist_ok = True)
#Range over batches and export each chunk as a fasta file
#for i, batch in enumerate(create_batch(records, chunk_size)):
#Add input_file to prevent file overwrite from chunk loading
#filename = os.path.join(output_directory,(input_file+"chunk{}.fasta").format(i))
#SeqIO.write(batch, filename, "fasta")