From 3cf7ccbfe991843ebe56fb9515bf6d224401e8a1 Mon Sep 17 00:00:00 2001 From: RPINerd Date: Sat, 19 Oct 2024 11:20:46 -0400 Subject: [PATCH 1/8] Direct python3 translation of script, with updates to more gracefully handle arguments --- fasta/interleave_fasta.py | 65 +++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/fasta/interleave_fasta.py b/fasta/interleave_fasta.py index fae9605d..084925a9 100755 --- a/fasta/interleave_fasta.py +++ b/fasta/interleave_fasta.py @@ -1,34 +1,45 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 -# encoding:utf8 -# author: Sébastien Boisvert -# part of Ray distribution """This script takes two fasta files and interleaves them Usage: interleave-fasta.py fasta_file1 fasta_file2 + +Author: Sébastien Boisvert +Py3 conversion: RPINerd """ -# Importing modules -import sys - -# Main -if __name__ == '__main__': - try: - file1 = sys.argv[1] - file2 = sys.argv[2] - except: - print __doc__ - sys.exit(1) - - with open(file1) as f1: - with open(file2) as f2: - while True: - line = f1.readline() - if line.strip() == "": - break - - print line.strip() - print f1.readline().strip() - print f2.readline().strip() - print f2.readline().strip() +import argparse +import os + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Interleave two fasta files") + parser.add_argument("-1", "--fasta1", type=str, help="First fasta file") + parser.add_argument("-2", "--fasta2", type=str, help="Second fasta file") + return parser.parse_args() + + +def main(args: argparse.Namespace) -> None: + + # Verify the files exist + if not os.path.exists(args.fasta1): + raise FileNotFoundError(f"Error: file {args.fasta1} does not exist\n") + if not os.path.exists(args.fasta2): + raise FileNotFoundError(f"Error: file {args.fasta2} does not exist\n") + + with open(args.fasta1, "r") as f1, open(args.fasta2, "r") as f2: + while True: + line = f1.readline() + if line.strip() == "": + break + + print(line.strip()) + print(f1.readline().strip()) + print(f2.readline().strip()) + print(f2.readline().strip()) + + +if __name__ == "__main__": + args = parse_args() + main(args) From 4464174e58f60b316d8a8b0195cd6b42c7396488 Mon Sep 17 00:00:00 2001 From: RPINerd Date: Sat, 19 Oct 2024 11:28:25 -0400 Subject: [PATCH 2/8] Update script for fully py3, refactor, clean, organize Updated some error handling, fixed spacing and syntax for Black/Flake8 best practices, removed some py2 legacy compatibility --- fastq/validate_mate_pairing.py | 203 +++++++++++++++++---------------- 1 file changed, 107 insertions(+), 96 deletions(-) diff --git a/fastq/validate_mate_pairing.py b/fastq/validate_mate_pairing.py index 6999cb10..471ece98 100755 --- a/fastq/validate_mate_pairing.py +++ b/fastq/validate_mate_pairing.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python -from __future__ import division +#!/usr/bin/env python3 """ A memory-efficient script for validating mates in paired-end FASTQ files. @@ -15,7 +14,7 @@ @61JCNAAXX100503:5:100:10001:18267/1 @61JCNAAXX100503:5:100:10001:18267/2 - + Assumptions: Input reads can be in raw FASTQ format or compressed with a .gz extension. If @@ -28,108 +27,120 @@ import argparse import gzip -import os -import re - -def main(): - parser = argparse.ArgumentParser( description='Mate validation in paired-end FASTQ files') - - ## output file to be written - parser.add_argument('-l', '--left', type=str, required=True, help='Comma-separated list of left reads in an paired set' ) - parser.add_argument('-r', '--right', type=str, required=True, help='Comma-separated list of right reads in an paired set' ) - args = parser.parse_args() - - check_input_options(args) - left_files = args.left.split(',') - right_files = args.right.split(',') - - lindex = 0 - - for lpath in left_files: - if lpath.endswith('.gz'): - ifh = gzip.open(lpath, 'rb') - left_is_compressed = True - - if args.right is not None: - ifh_right = gzip.open(right_files[lindex], 'rb') - else: - ifh = open(lpath, 'rU') - left_is_compressed = False - if args.right is not None: - ifh_right = open(right_files[lindex], 'rU') - - while True: - line1 = ifh.readline().replace("\n", '') - print("line1: ({0})".format(line1)) - rline1 = ifh_right.readline() - - # Check if either file ended early - if not line1: - if not rline1: - # expected, matching end of both files - break - else: - raise Exception("ERROR: Left file ended before right file") - - if not rline1: - raise Exception("ERROR: Right file ended before left file") - - # We don't actually need these for now. Could just read to advance the cursor - line2 = ifh.readline() - line3 = ifh.readline() - line4 = ifh.readline() - rline2 = ifh_right.readline() - rline3 = ifh_right.readline() - rline4 = ifh_right.readline() - - print(line1) - - if len(line1): - print("line1 is ({0})".format(line1)) - lread_base = line1.split()[0] - else: - lread_base = False - - if len(rline1): - rread_base = rline1.split()[0] - else: - rread_base = False - - ## did we reach the natural ending? - if not lread_base and not rread_base: - break - - ## Let's first check the easy case that they match: - if lread_base == rread_base: - pass - else: - if lread_base[-2:] == '/1' and rread_base[-2:] == '/2': - # compare without the mate info then - if lread_base[:-2] != rread_base[:-2]: - raise Exception("Mismatch found: left:{0} right:{1}".format(lread_base, rread_base)) + + +def parse_args() -> argparse.Namespace: + """ + Simple argument parser for this script. + + :return: argparse.Namespace object containing the input options + """ + + parser = argparse.ArgumentParser(description="Mate validation in paired-end FASTQ files") + parser.add_argument( + "-l", "--left", type=str, required=True, help="Comma-separated list of left reads in a paired set" + ) + parser.add_argument( + "-r", "--right", type=str, required=True, help="Comma-separated list of right reads in a paired set" + ) + + return parser.parse_args() + + +def compare_reads(ifh, ifh_right) -> None: + """ + Parse through two read files and compare the read headers. + + :param ifh: file handle for read 1 + :param ifh_right: file handle for read 2 + + :return: None + """ + + # Step through each file and compare the read headers + try: + + line_type = 1 + first_line = True + read_style = None + + for r1, r2 in zip(ifh, ifh_right, strict=True): + + # For the first entry, determine the read pair style (i.e. /1 or 1:N:0:TTAGGCA) + if first_line: + r1 = r1.decode().strip() + r2 = r2.decode().strip() + if r1[-2] == "/" and r2[-2] == "/": + read_style = 1 + elif r1.split(" ")[1].startswith("1") and r2.split(" ")[1].startswith("2"): + read_style = 2 else: - raise Exception("Mismatch found: left:{0} right:{1}".format(lread_base, rread_base)) + raise Exception(f"ERROR: Read style not recognized! ({r1}, {r2})") + first_line = False - ## Now those with file names with ids ending like /1 and /2 to denote mate pairs - print("Comparing {0} with {1}".format(lread_base, rread_base)) + # We are only interested in the ID lines + if line_type == 1: + r1 = r1.decode().strip() + r2 = r2.decode().strip() - lindex += 1 + # Compare the read headers + if read_style == 1: + if r1[:-2] != r2[:-2]: + raise Exception(f"ERROR: Headers do not match! ({r1}, {r2})") + elif read_style == 2: + if r1.split(" ")[0] != r2.split(" ")[0]: + raise Exception(f"ERROR: Headers do not match! ({r1}, {r2})") + # Reset the line type if we reach the end of the read + if line_type == 4: + line_type = 1 -def check_input_options(args): - left_files = args.left.split(',') - right_files = args.right.split(',') - - if len(left_files) != len(right_files): - raise Exception("ERROR: Count of left files ({0}) must be the same as right files ({1})".format( - len(left_files), len(right_files))) - -if __name__ == '__main__': - main() + line_type += 1 + + except ValueError: + # If the files are not the same length, we will get a ValueError + raise Exception("ERROR: Files are not the same length!") +def main(args: argparse.Namespace) -> None: + """ + Main function for the script. + :param args: argparse.Namespace object containing the input options + :return: None + """ + left_files = args.left.split(",") + right_files = args.right.split(",") + # Validate that there are equal numbers of R1 and R2 files + if len(left_files) != len(right_files): + raise Exception( + f"ERROR: Count of left files ({len(left_files)}) must be the same as right files ({len(right_files)})" + ) + + # Build a list of tuples with the read pairs and their gzip status + pairs = [] + for lfile, rfile in zip(left_files, right_files): + if lfile.endswith(".gz") and rfile.endswith(".gz"): + gz_status = True + elif not lfile.endswith(".gz") and not rfile.endswith(".gz"): + gz_status = False + else: + raise Exception(f"ERROR: gzip status must be the same for both files! ({lfile}, {rfile})") + + pairs.append((lfile, rfile, gz_status)) + + # Iterate over the files and compare the reads + for lfile, rfile, gz_status in pairs: + if gz_status: + with gzip.open(lfile, "rb") as ifh, gzip.open(rfile, "rb") as ifh_right: + compare_reads(ifh, ifh_right) + else: + with open(lfile, "r") as ifh, open(rfile, "r") as ifh_right: + compare_reads(ifh, ifh_right) +if __name__ == "__main__": + args = parse_args() + main(args) From 368e87b2fb98eea521392b7cada9c54a32423895 Mon Sep 17 00:00:00 2001 From: RPINerd Date: Sat, 19 Oct 2024 11:31:12 -0400 Subject: [PATCH 3/8] Update formatting, reorg code Black/Flake8 best practices, broke out arg parsing and header formatting to make code more modular and readable --- fastq/append_to_fastq_read_header.py | 112 ++++++++++++++++----------- 1 file changed, 67 insertions(+), 45 deletions(-) diff --git a/fastq/append_to_fastq_read_header.py b/fastq/append_to_fastq_read_header.py index 62d9e7a1..1c9d81d1 100755 --- a/fastq/append_to_fastq_read_header.py +++ b/fastq/append_to_fastq_read_header.py @@ -3,15 +3,15 @@ """ Running with no options other than -i and -o, this script will transform headers like these: - @SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT + @SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT to headers like this: - @SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1 + @SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1 That is, it will see the '1' or '2' after the whitespace and transform the header to this instead (in FASTA): - >SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1 + >SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1 Otherwise, if you want a specific string to be appended to all headers, use the --append option. @@ -24,70 +24,92 @@ import re import sys -def main(): - parser = argparse.ArgumentParser( description='Append strings to the end of FASTQ read headers') + +def parse_args() -> argparse.Namespace: + """ + Simple argument parser for this script. + + :return: argparse.Namespace object containing the input options + """ + + parser = argparse.ArgumentParser(description="Append strings to the end of FASTQ read headers") ## output file to be written - parser.add_argument('-i', '--input_file', type=str, required=True, help='Path to an input file to be read (gzip supported)' ) - parser.add_argument('-o', '--output_file', type=str, required=False, help='Path to an output file to be created' ) - parser.add_argument('-a', '--append', type=str, required=False, help='String to be appended to each read header' ) - args = parser.parse_args() + parser.add_argument( + "-i", "--input_file", type=str, required=True, help="Path to an input file to be read (gzip supported)" + ) + parser.add_argument("-o", "--output_file", type=str, required=False, help="Path to an output file to be created") + parser.add_argument("-a", "--append", type=str, required=False, help="String to be appended to each read header") + + return parser.parse_args() + + +def append_header(line: str, append: str) -> str: + """ + Append a custom string to the end of a FASTQ header line. + + :param line: FASTQ header line + :param append: string to append + + :return: FASTQ header line with appended string + """ + + header = re.search("^(\@\S+)", line) + if header: + return f"{header.group(1)}{append}\n" + else: + raise Exception(f"ERROR: FASTQ header line found that didn't match expected format: {line}") + + +def reformat_header(line: str) -> str: + """ + Reformat a FASTQ header line to the standard format. + + :param line: FASTQ header line + + :return: reformatted FASTQ header line + """ + + header = re.search("^(\@.+?) (\d)", line) + if header: + return f"{header.group(1)}/{header.group(2)}\n" + else: + raise Exception(f"ERROR: FASTQ header line found that didn't match expected format: {line}") + + +def main(args: argparse.Namespace) -> None: if args.output_file is None: ofh = sys.stdout else: - ofh = open( args.output_file, 'wt' ) + ofh = open(args.output_file, "wt") - if args.append is None: - detect_direction = True - else: - detect_direction = False - line_count = 0 - if args.input_file.endswith('.gz'): - fh = gzip.open( args.input_file, 'rb') + if args.input_file.endswith(".gz"): + fh = gzip.open(args.input_file, "rb") is_compressed = True else: - fh = open(args.input_file, 'rU') + fh = open(args.input_file, "rU") is_compressed = False for line in fh: if is_compressed: line = line.decode() - + line_count += 1 if line_count % 4 == 1: - - if detect_direction: - m = re.search('^(\@.+?) (\d)', line) - if m: - ofh.write("{0}/{1}\n".format(m.group(1), m.group(2))) - else: - raise Exception("ERROR: FASTQ header line found that didn't match expected format: {0}".format(line)) - else: - m = re.search('^(\@\S+)', line) - if m: - ofh.write("{0}{1}\n".format(m.group(1), args.append)) - else: - raise Exception("ERROR: FASTQ header line found that didn't match expected format: {0}".format(line)) - - + header = append_header(line, args.append) if args.append else reformat_header(line) + ofh.write(header) else: ofh.write(line) - - ofh.close() - - - - -if __name__ == '__main__': - main() - - - + fh.close() + ofh.close() +if __name__ == "__main__": + args = parse_args() + main(args) From 5f8d54c9696214cc26c06f2c3b0259ed7a2670a0 Mon Sep 17 00:00:00 2001 From: RPINerd Date: Sat, 19 Oct 2024 11:34:23 -0400 Subject: [PATCH 4/8] Upgrade, refactor, sytax Broke out argument parsing for ease of reading, Black/flake8 best practices, some small changes to sytax like using with() for keeping file open --- fastq/convert_fastq_to_fasta.py | 113 ++++++++++++++++++-------------- 1 file changed, 64 insertions(+), 49 deletions(-) diff --git a/fastq/convert_fastq_to_fasta.py b/fastq/convert_fastq_to_fasta.py index 8b2ce884..e853f901 100755 --- a/fastq/convert_fastq_to_fasta.py +++ b/fastq/convert_fastq_to_fasta.py @@ -12,7 +12,7 @@ - 2 minutes, 8 seconds when run with the --width=60 option - 3 minutes, 17 seconds when run with both --detect_direction and --width=60 options -** Timing was run on a laptop with an Intel(R) Core(TM) i7-3740QM CPU @ 2.70GHz, +** Timing was run on a laptop with an Intel(R) Core(TM) i7-3740QM CPU @ 2.70GHz, utilizing only one core. OUTPUT @@ -20,22 +20,22 @@ Running with no options other than -i and -o, this script will transform headers like these: - @SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT - @61JCNAAXX100503:5:100:10001:18267/2 + @SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT + @61JCNAAXX100503:5:100:10001:18267/2 -to this in the FASTA output: - - >SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT - >61JCNAAXX100503:5:100:10001:18267/2 +to this in the FASTA output: + + >SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT + >61JCNAAXX100503:5:100:10001:18267/2 If you have have headers like this: - @SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT + @SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT The optional --detect_direction option will see the '1' or '2' after the whitespace and transform the header to this instead (in FASTA): - >SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1 + >SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1 Note that this can increase the runtime up to 3x (from my testing), since a regex is used. @@ -51,67 +51,82 @@ """ import argparse +import os import re import sys -def main(): - parser = argparse.ArgumentParser( description='Convert FASTQ to FASTA format') + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Convert FASTQ to FASTA format") ## output file to be written - parser.add_argument('-i', '--input_file', type=str, required=True, help='Path to an input file to be read' ) - parser.add_argument('-o', '--output_file', type=str, required=False, help='Path to an output file to be created' ) - parser.add_argument('-d', '--detect_direction', action='store_true', help='Pass this flag to auto-detect the mate pair direction. See full documentation for more info' ) - parser.add_argument('-w', '--width', type=int, required=False, help='Defines the width (or number of bases) on each FASTA sequence line' ) - args = parser.parse_args() + parser.add_argument("-i", "--input_file", type=str, required=True, help="Path to an input file to be read") + parser.add_argument("-o", "--output_file", type=str, required=False, help="Path to an output file to be created") + parser.add_argument( + "-d", + "--detect_direction", + action="store_true", + help="Pass this flag to auto-detect the mate pair direction. See full documentation for more info", + ) + parser.add_argument( + "-w", + "--width", + type=int, + required=False, + help="Defines the width (or number of bases) on each FASTA sequence line", + ) + + return parser.parse_args() + + +def main(args: argparse.Namespace) -> None: if args.output_file is None: ofh = sys.stdout else: - ofh = open( args.output_file, 'wt' ) - + ofh = open(args.output_file, "w") + line_count = 0 record_count = 0 last_header = None - for line in open(args.input_file, 'rU'): - line_count += 1 + with open(args.input_file, "r") as fh: + for line in fh: + line_count += 1 - if line_count % 4 == 1: - record_count += 1 + if line_count % 4 == 1: + record_count += 1 - if args.detect_direction: - m = re.search('^\@(.+?) (\d)', line) - if m: - last_header = "{0}/{1}\n".format(m.group(1), m.group(2)) + if args.detect_direction: + m = re.search(r"^@(.+?) (\d)", line) + if m: + last_header = f"{m.group(1)}/{m.group(2)}\n" + else: + raise Exception(f"ERROR: FASTQ header line found that didn't match expected format: {line}") else: - raise Exception("ERROR: FASTQ header line found that didn't match expected format: {0}".format(line)) - else: - line = line.lstrip('@') - last_header = line - - elif line_count % 4 == 2: - if args.width: - ofh.write(">{0}{1}\n".format(last_header, wrapped(line, args.width))) - else: - ofh.write(">{0}{1}".format(last_header, line)) - - ofh.close() - - print("{0} records written to the output FASTA file".format(record_count)) + line = line.lstrip("@") + last_header = line + elif line_count % 4 == 2: + if args.width: + ofh.write(f">{last_header}{wrapped(line, args.width)}\n") + else: + ofh.write(f">{last_header}{line}") -def wrapped(string, every=60): - string = string.rstrip() - ''' this runs orders of magnitude faster than using the textwrap module ''' - return '\n'.join(string[i:i+every] for i in range(0, len(string), every)) - - -if __name__ == '__main__': - main() - + print(f"{record_count} records written to the output FASTA file") +def wrapped(string, every=60): + string = string.rstrip() + """ this runs orders of magnitude faster than using the textwrap module """ + return "\n".join(string[i : i + every] for i in range(0, len(string), every)) +if __name__ == "__main__": + args = parse_args() + # Verify input file exists + if not os.path.exists(args.input_file): + raise FileNotFoundError(f"ERROR: Input file '{args.input_file}' not found\n") + main(args) From f354ade2a31b622a87e82f5a6d0e32f43e3dc6b3 Mon Sep 17 00:00:00 2001 From: RPINerd Date: Sat, 19 Oct 2024 11:37:00 -0400 Subject: [PATCH 5/8] Large reorg, update sytax, cleanup py2 legacy Removed some py2 compatibility imports, Black/flake8 best practices, large reorganziation to break up the monolithic main function into more servicable sub functions --- fastq/randomly_subsample_fastq.py | 199 +++++++++++++++++------------- 1 file changed, 113 insertions(+), 86 deletions(-) diff --git a/fastq/randomly_subsample_fastq.py b/fastq/randomly_subsample_fastq.py index 19693c19..9e9d99df 100755 --- a/fastq/randomly_subsample_fastq.py +++ b/fastq/randomly_subsample_fastq.py @@ -1,7 +1,5 @@ #!/usr/bin/env python3 -from __future__ import division - """ A memory-efficient script for subsampling single or paired-end FASTQ files randomly (rather than taking the first N, or some other non-random approach) @@ -22,10 +20,10 @@ million reads out of a fileset of 2.5 billion, and this took too much memory. Referenced C version: - http://homes.cs.washington.edu/~dcjones/fastq-tools/fastq-sample.html + http://homes.cs.washington.edu/~dcjones/fastq-tools/fastq-sample.html Python implementation which has the same issue: - http://pythonforbiologists.com/index.php/randomly-sampling-reads-from-a-fastq-file/ + http://pythonforbiologists.com/index.php/randomly-sampling-reads-from-a-fastq-file/ Overview of steps: @@ -35,7 +33,7 @@ So, in practice, if you know the read count beforehand, the complexity will most often be around O(n) - + Probability multiplier: Why is this needed? If you have a set of 100 reads and ask for 10 to be selected, @@ -57,12 +55,12 @@ file, this will just append .fastq to the value passed. If processing paired-end files, the following will be created: - {output_base}.R1.fastq - {output_base}.R2.fastq + {output_base}.R1.fastq + {output_base}.R2.fastq This is true regardless of how many input (comma-delimited) files are passed, only one output file will be written per direction. - + Assumptions: Input reads can be in raw FASTQ format or compressed with a .gz extension. If @@ -75,45 +73,108 @@ import argparse import gzip -import os import random -def main(): - parser = argparse.ArgumentParser( description='Randomized subsampling of single or paired-end FASTQ files') +def arg_parser() -> argparse.Namespace: + + parser = argparse.ArgumentParser(description="Randomized subsampling of single or paired-end FASTQ files") + + parser.add_argument( + "-l", "--left", type=str, required=False, help="Comma-separated list of left reads in an paired set" + ) + parser.add_argument( + "-r", "--right", type=str, required=False, help="Comma-separated list of right reads in an paired set" + ) + parser.add_argument( + "-s", "--single", type=str, required=False, help="Use this if you have only one file of unpaired reads" + ) + parser.add_argument( + "-irc", + "--input_read_count", + type=int, + required=False, + help="If passed, skips a step of reading over the file(s) once just to get the read count. " + + "Should be just the count of reads in either the --single or --left file(s)", + ) + parser.add_argument( + "-orc", + "--output_read_count", + type=int, + required=True, + help="Subsample size - number of reads (if single) or read pairs (if passing -l and -r) to export", + ) + parser.add_argument("-ob", "--output_base", type=str, required=True, help="Base name of output files to be created") + parser.add_argument( + "-pm", + "--probability_multiplier", + type=float, + required=False, + default=1.05, + help="Increase the likelihood of a given read being selected, to prevent insufficient reads from being pulled", + ) + + return parser.parse_args() + + +def get_read_count(files: list[str]) -> int: + line_count = 0 + + for path in files: + if path.endswith(".gz"): + fh = gzip.open(path, "rb") + else: + fh = open(path, "rU") + + for _ in fh: + line_count += 1 + + return int(line_count / 4) + + +def check_input_options(args: argparse.Namespace) -> None: + """Ensure that the user passed either singleton option, or right AND left options, but not both""" - ## output file to be written - parser.add_argument('-l', '--left', type=str, required=False, help='Comma-separated list of left reads in an paired set' ) - parser.add_argument('-r', '--right', type=str, required=False, help='Comma-separated list of right reads in an paired set' ) - parser.add_argument('-s', '--single', type=str, required=False, help='Use this if you have only one file of unpaired reads' ) - parser.add_argument('-irc', '--input_read_count', type=int, required=False, - help='If passed, skips a step of reading over the file(s) once just to get the read count. ' + - 'Should be just the count of reads in either the --single or --left file(s)') - parser.add_argument('-orc', '--output_read_count', type=int, required=True, help='Subsample size - number of reads (if single) or read pairs (if passing -l and -r) to export') - parser.add_argument('-ob', '--output_base', type=str, required=True, help='Base name of output files to be created') - parser.add_argument('-pm', '--probability_multiplier', type=float, required=False, default=1.05, help='Increase the likelihood of a given read being selected, to prevent insufficient reads from being pulled' ) - args = parser.parse_args() + if args.single is not None: + if args.left is not None or args.right is not None: + raise Exception("ERROR: Cannot use --left or --right options if --single is passed") + elif args.left is not None: + if args.right is None: + raise Exception("ERROR: Must pass --right option if --left option is used") + + left_files = args.left.split(",") + right_files = args.right.split(",") + + if len(left_files) != len(right_files): + raise Exception( + f"ERROR: Count of left files ({len(left_files)}) must be the same as right files ({len(right_files)})" + ) + else: + raise Exception("ERROR: Must pass either --single or --left AND --right to define input") + + +def main(args: argparse.Namespace) -> None: ###################### ## Validate, then gather files and get read counts ###################### - + check_input_options(args) input_read_count = args.input_read_count - left_files = list() - right_files = list() + left_files: list[str] = list() + right_files: list[str] = list() if args.single is not None: - left_files = args.single.split(',') + left_files = args.single.split(",") elif args.left is not None: - left_files = args.left.split(',') - right_files = args.right.split(',') + left_files = args.left.split(",") + right_files = args.right.split(",") if input_read_count is None: input_read_count = get_read_count(left_files) - print("INFO: parsed input read count is: {0}".format(input_read_count)) + print(f"INFO: parsed input read count is: {input_read_count}") else: - print("INFO: user-passed input read count is: {0}".format(input_read_count)) + print(f"INFO: user-passed input read count is: {input_read_count}") ####################### ## Do a first random selection pass @@ -123,33 +184,34 @@ def main(): # the proportion we want to select. individual_read_prob = args.output_read_count / input_read_count individual_read_prob_adj = individual_read_prob * args.probability_multiplier - - print("INFO: The probability of any individual read being selected is: {0} (raw), {1} (multiplier-adjusted)".format( - individual_read_prob, individual_read_prob_adj)) + + print( + f"INFO: The probability of any individual read being selected is: {individual_read_prob} (raw), {individual_read_prob_adj} (multiplier-adjusted)" + ) output_reads_written = 0 - + if args.single is not None: - ofh = open("{0}.fastq".format(args.output_base), 'wt') + ofh = open(f"{args.output_base}.fastq", "wt") ofh_right = None else: - ofh = open("{0}.R1.fastq".format(args.output_base), 'wt') - ofh_right = open("{0}.R2.fastq".format(args.output_base), 'wt') + ofh = open(f"{args.output_base}.R1.fastq", "wt") + ofh_right = open(f"{args.output_base}.R2.fastq", "wt") print("INFO: Performing a pass through the files for random read selection") lindex = 0 for lpath in left_files: - if lpath.endswith('.gz'): - ifh = gzip.open(lpath, 'rb') + if lpath.endswith(".gz"): + ifh = gzip.open(lpath, "rb") left_is_compressed = True if args.right is not None: - ifh_right = gzip.open(right_files[lindex], 'rb') + ifh_right = gzip.open(right_files[lindex], "rb") else: - ifh = open(lpath, 'rU') + ifh = open(lpath, "rU") left_is_compressed = False if args.right is not None: - ifh_right = open(right_files[lindex], 'rU') + ifh_right = open(right_files[lindex], "rU") while True: line1 = ifh.readline() @@ -200,50 +262,15 @@ def main(): break if output_reads_written < args.output_read_count: - raise Exception("ERROR: Unfortunately only {0} of the requested {1} reads were randomly selected. " + \ - "You can either try again with the same options or increase the --probability_multiplier".format( - output_reads_written, args.output_read_count)) - - -def check_input_options(args): - # user must pass either singleton option, right AND left options, but not both - if args.single is not None: - if args.left is not None or args.right is not None: - raise Exception("ERROR: Cannot use --left or --right options if --single is passed") - elif args.left is not None: - if args.right is None: - raise Exception("ERROR: Must pass --right option if --left option is used") - - left_files = args.left.split(',') - right_files = args.right.split(',') - - if len(left_files) != len(right_files): - raise Exception("ERROR: Count of left files ({0}) must be the same as right files ({1})".format( - len(left_files), len(right_files))) - else: - raise Exception("ERROR: You must pass either --single or --left AND --right to define input") - -def get_read_count(files): - line_count = 0 - - for path in files: - if path.endswith('.gz'): - fh = gzip.open(path, 'rb') - else: - fh = open(path, 'rU') - - for line in fh: - line_count += 1 - - return int(line_count / 4) - - -if __name__ == '__main__': - main() - - - - + raise Exception( + f""" + ERROR: Unfortunately only {output_reads_written} of the requested {args.output_read_count} + reads were randomly selected.\n You can either try again with the same options or increase + the --probability_multiplier""" + ) +if __name__ == "__main__": + args = arg_parser() + main(args) From 7ac1f6d9df6f1146befce741abffa41d6068cc2b Mon Sep 17 00:00:00 2001 From: RPINerd Date: Sat, 19 Oct 2024 11:39:31 -0400 Subject: [PATCH 6/8] Update syntax for py3, unify arg parse command, Black/flake8 best practices Also incorporated error raising rather than just print statements --- fastq/split_interleaved_sequence_file.py | 205 ++++++++++++----------- 1 file changed, 104 insertions(+), 101 deletions(-) diff --git a/fastq/split_interleaved_sequence_file.py b/fastq/split_interleaved_sequence_file.py index 268b6db0..078bd8f7 100755 --- a/fastq/split_interleaved_sequence_file.py +++ b/fastq/split_interleaved_sequence_file.py @@ -11,14 +11,14 @@ The -o option defines the base name of the output files to which the directionality and extensions will be added. So given the options: - -o ATCC_30222 - -t fastq + -o ATCC_30222 + -t fastq The following files will be created: - ATCC_30222.R1.fastq - ATCC_30222.R1.fastq - ATCC_30222.single.fastq + ATCC_30222.R1.fastq + ATCC_30222.R1.fastq + ATCC_30222.single.fastq Note: No format conversion will happen here. If your input is FASTA your output will also be FASTA. @@ -27,8 +27,8 @@ This script handles read headers in either of the two following formats: - @SN7001163:74:C0YGBACXX:1:1101:1099:2196 1:N:0:ATCACGA - @SN7001163:74:C0YGBACXX:1:1101:1099:2196/1 + @SN7001163:74:C0YGBACXX:1:1101:1099:2196 1:N:0:ATCACGA + @SN7001163:74:C0YGBACXX:1:1101:1099:2196/1 The first of these is what comes off our Illumina machines, with the direction being the first number after the whitespace. The latter format with /1 or /2 is what most software expects. @@ -44,158 +44,162 @@ """ import argparse -import os +import gzip import re -def interface(): - ## read input fastq file - parser = argparse.ArgumentParser( description='Script to split output of Diginorm.') - parser.add_argument('-i', '--input_file',type=str, required=True, help='The interleaved fastq/fasta file to split.') - parser.add_argument('-t', '--input_type',type=str, required=True, help='The type of input file(fasta/fastq).') - ## output file to be written - parser.add_argument('-o', '--output', type=str, required=True, help='Base path/name for the output files to be created' ) +def parse_args(): + parser = argparse.ArgumentParser(description="Script to split output of Diginorm.") + parser.add_argument( + "-i", "--input_file", type=str, required=True, help="The interleaved fastq/fasta file to split." + ) + parser.add_argument("-t", "--input_type", type=str, required=True, help="The type of input file(fasta/fastq).") + parser.add_argument( + "-o", "--output", type=str, required=True, help="Base path/name for the output files to be created" + ) parser = parser.parse_args() return parser + def process_fasta(args): - read1_file = args.output + ".R1.fasta" - read2_file = args.output + ".R2.fasta" + read1_file = args.output + ".R1.fasta" + read2_file = args.output + ".R2.fasta" single_file = args.output + ".single.fasta" - - if args.input_file.endswith('.gz'): - inp = gzip.open(args.input_file,'rb') - fout1= gzip.open(read1_file,'wb') - fout2= gzip.open(read2_file,'wb') - fout3= gzip.open(single_file,'wb') + + if args.input_file.endswith(".gz"): + inp = gzip.open(args.input_file, "rb") + fout1 = gzip.open(read1_file, "wb") + fout2 = gzip.open(read2_file, "wb") + fout3 = gzip.open(single_file, "wb") else: - inp = open(args.input_file,'rU') - fout1= open(read1_file,'w') - fout2= open(read2_file,'w') - fout3= open(single_file,'w') + inp = open(args.input_file, "rU") + fout1 = open(read1_file, "w") + fout2 = open(read2_file, "w") + fout3 = open(single_file, "w") flag = 0 - while 1 : + while 1: + if flag == 0: + Read1 = "" + Read2 = "" + c = 2 + d = 2 if flag == 0: - Read1='' - Read2='' - c=2 - d=2 - if flag == 0 : - while (c > 0): - inp_line = inp.readline() - Read1 += inp_line - c = c-1 - if Read1 == '': break - id1 = Read1.split('\n')[0].split('/')[0] - if not id1.startswith('>'): - print ("Error!! Unknown header: not '>'") + while c > 0: + inp_line = inp.readline() + Read1 += inp_line + c = c - 1 + if Read1 == "": break - tag1 = Read1.split('\n')[0].split('/')[1] - while (d > 0): - inp_line = inp.readline() + id1 = Read1.split("\n")[0].split("/")[0] + if not id1.startswith(">"): + raise Exception(f"ERROR: Unknown header in id {id1}") + + tag1 = Read1.split("\n")[0].split("/")[1] + while d > 0: + inp_line = inp.readline() Read2 += inp_line - d = d-1 - if not Read1 == '' and Read2 == '': + d = d - 1 + if not Read1 == "" and Read2 == "": fout3.write(Read1) break - - id2 = Read2.split('\n')[0].split('/')[0] - if not id1.startswith('>'): - print ("Error!! Unknown header: not '>'") - break - - tag2 = Read2.split('\n')[0].split('/')[1] - if tag1 == '1' and tag2 == '2' : + + id2 = Read2.split("\n")[0].split("/")[0] + if not id1.startswith(">"): + raise Exception(f"ERROR: Unknown header in id {id1}") + + tag2 = Read2.split("\n")[0].split("/")[1] + if tag1 == "1" and tag2 == "2": if id1 == id2: fout1.write(Read1) fout2.write(Read2) - flag=0 + flag = 0 else: fout3.write(Read1) fout3.write(Read2) - flag=0 - - elif tag1 == '1' and tag2 == '1': + flag = 0 + + elif tag1 == "1" and tag2 == "1": fout3.write(Read1) - flag=1 - Read1=Read2 - tag1=tag2 - id1=id2 - - elif tag1 == '2' and tag2 == '2': + flag = 1 + Read1 = Read2 + tag1 = tag2 + id1 = id2 + + elif tag1 == "2" and tag2 == "2": fout3.write(Read1) fout3.write(Read2) - flag=0 - - elif tag1 == '2' and tag2 == '1': + flag = 0 + + elif tag1 == "2" and tag2 == "1": fout3.write(Read1) - Read1=Read2 - tag1=tag2 - id1=id2 - flag=1 - + Read1 = Read2 + tag1 = tag2 + id1 = id2 + flag = 1 + def process_fastq(args): - read1_file = args.output + ".R1.fastq" - read2_file = args.output + ".R2.fastq" + read1_file = args.output + ".R1.fastq" + read2_file = args.output + ".R2.fastq" single_file = args.output + ".single.fastq" input_is_compressed = False - - if args.input_file.endswith('.gz'): - inp = gzip.open(args.input_file,'rb') - read1_out_fh = gzip.open(read1_file,'wb') - read2_out_fh = gzip.open(read2_file,'wb') - singles_out_fh = gzip.open(single_file,'wb') + + if args.input_file.endswith(".gz"): + inp = gzip.open(args.input_file, "rb") + read1_out_fh = gzip.open(read1_file, "wb") + read2_out_fh = gzip.open(read2_file, "wb") + singles_out_fh = gzip.open(single_file, "wb") input_is_compressed = True else: - inp = open(args.input_file,'rU') - read1_out_fh = open(read1_file,'w') - read2_out_fh = open(read2_file,'w') - singles_out_fh = open(single_file,'w') + inp = open(args.input_file, "rU") + read1_out_fh = open(read1_file, "w") + read2_out_fh = open(read2_file, "w") + singles_out_fh = open(single_file, "w") line_count = 0 last_base_name = None last_dir = None last_base_buffer = list() current_fh = None - + for line in inp: if input_is_compressed: line = line.decode() line_count += 1 - if re.match(r'^\s*$', line): continue + if re.match(r"^\s*$", line): + continue if line_count % 4 == 1: # this should be a header line - spaced_match = re.search(r'^\@(\S+) ([12])', line) - + spaced_match = re.search(r"^\@(\S+) ([12])", line) + if spaced_match: base = spaced_match.group(1) dir = spaced_match.group(2) else: - traditional_match = re.search(r'^\@(\S+)\/([12])', line) + traditional_match = re.search(r"^\@(\S+)\/([12])", line) if traditional_match: base = traditional_match.group(1) dir = traditional_match.group(2) else: - raise Exception("ERROR: Expected this to be a header line, but format wasn't recognized: {0}".format(line)) + raise Exception(f"ERROR: Expected header line, but format wasn't recognized: {line}") - ## if this is /2, was /1 found? + # If this is /2, was /1 found? if int(dir) == 1: - # if the last direction was 1, purge it as a singleton + # If the last direction was 1, purge it as a singleton if last_dir == 1: for buffer_line in last_base_buffer: singles_out_fh.write(buffer_line) - + last_base_name = base last_dir = 1 last_base_buffer = [line] current_fh = None - + elif int(dir) == 2: if last_base_name == base: if last_dir == 1: @@ -207,7 +211,7 @@ def process_fastq(args): read2_out_fh.write(line) current_fh = read2_out_fh else: - raise Exception("ERROR: Were there two {0}/2 reads in a row?".format(last_base_name)) + raise Exception(f"ERROR: Were there two {last_base_name}/2 reads in a row?") else: # this must be a /2 where the /1 is missing singles_out_fh.write(line) @@ -222,12 +226,11 @@ def process_fastq(args): current_fh.write(line) -if __name__ == '__main__': - args = interface() - if args.input_type == 'fastq': +if __name__ == "__main__": + args = parse_args() + if args.input_type == "fastq": process_fastq(args) - elif args.input_type == 'fasta': + elif args.input_type == "fasta": process_fasta(args) else: - print ("Error! Input file format incorrect. Expected to be 'fastq' or 'fasta'"); - + print(f"ERROR: Input file format incorrect! Expected 'fastq' or 'fasta', got {args.input_type}") From ffc8a22513c11be2af6a43056d9bd53d78778584 Mon Sep 17 00:00:00 2001 From: RPINerd Date: Sat, 16 Nov 2024 12:20:48 -0500 Subject: [PATCH 7/8] Complete refactor using cases and binary flags to cleanup argument handling --- fastq/filter_fastq_by_N_content.py | 378 ++++++++++++++++------------- 1 file changed, 211 insertions(+), 167 deletions(-) diff --git a/fastq/filter_fastq_by_N_content.py b/fastq/filter_fastq_by_N_content.py index a1df7d1b..17927bc2 100755 --- a/fastq/filter_fastq_by_N_content.py +++ b/fastq/filter_fastq_by_N_content.py @@ -12,205 +12,249 @@ """ import argparse -import re -import sys - -def main(): - parser = argparse.ArgumentParser( description='Filter FASTQ file by N content') - - ## output file to be written - parser.add_argument('-l', '--left', type=str, required=False, help='FASTQ: Left (R1) mate pair file' ) - parser.add_argument('-r', '--right', type=str, required=False, help='FASTQ: Right (R2) mate pair file' ) - parser.add_argument('-s', '--singletons', type=str, required=False, help='FASTQ: singleton reads file' ) - parser.add_argument('-lo', '--left_out', type=str, required=False, help='Output file of Left reads kept' ) - parser.add_argument('-ro', '--right_out', type=str, required=False, help='Output file of Right reads kept') - parser.add_argument('-so', '--singletons_out', type=str, required=False, help='Output file of singleton reads kept') - parser.add_argument('-p', '--percent_n_cutoff', type=int, required=True, help='1-100: Percentage of Ns in read >= this will cause removal') - parser.add_argument('-or', '--output_report', type=str, required=False, help='Optional report of read filtering statistics') - - args = parser.parse_args() - - l_in_fh = None - r_in_fh = None - s_in_fh = None - - l_out_fh = None - r_out_fh = None - s_out_fh = None - - # Let the option checking and file handle bonanza begin! - - # One of these must be passed: [left, right], [left, right, singletons], [singletons] - if args.left is not None and args.right is not None: - # singletons is optional if L/R is passed - if args.singletons is not None: - s_in_fh = open(args.singletons) - - if args.left_out is None: - raise Exception("ERROR: If you specify --left you must also specify --left_out") - else: - l_out_fh = open(args.left_out, 'wt') +from typing import TextIO + + +def parse_arguments() -> argparse.Namespace: + """Simple argument parser for the script.""" + parser = argparse.ArgumentParser(description="Filter FASTQ file by N content") + + parser.add_argument("-l", "--left", type=str, required=False, help="FASTQ: Left (R1) mate pair file") + parser.add_a("-r", "--right", type=str, required=False, help="FASTQ: Right (R2) mate pair file") + parser.add_argument("-s", "--singletons", type=str, required=False, help="FASTQ: singleton reads file") + parser.add_argument("-lo", "--left_out", type=str, required=False, help="Output file of Left reads kept") + parser.add_argument("-ro", "--right_out", type=str, required=False, help="Output file of Right reads kept") + parser.add_argument("-so", "--singletons_out", type=str, required=False, help="Output file of singleton reads kept") + parser.add_argument( + "-p", + "--percent_n_cutoff", + type=int, + options=range(1, 101), + required=True, + help="1-100: Percentage of Ns in read >= this will cause removal", + ) + parser.add_argument( + "-or", "--output_report", type=str, required=False, help="Optional report of read filtering statistics" + ) + + return parser.parse_args() + + +def validate_arguments(args: argparse.Namespace) -> int: + """ + Validate the arguments passed to the script. For cleaner code, the input options are mapped to a binary state. + + One of the following sets must be passed: + - [left, right] + - [left, right, singletons] + - [singletons] + """ + + left = 0 if args.left is not None else 1 + right = 0 if args.right is not None else 2 + singletons = 0 if args.singletons is not None else 4 + arg_state = left + right + singletons + + # Valid states are 3 (1+2), 7 (1+2+4), and 4 + match arg_state: + case 0: + raise Exception("ERROR: You must pass at least one of --left, --right, or --singletons") + case 1, 5: + raise Exception("ERROR: If you specify --left you must also specify --right") + case 2, 6: + raise Exception("ERROR: If you specify --right you must also specify --left") + case 3: + assert args.left_out is not None, "ERROR: If you specify --left you must also specify --left_out" + assert args.right_out is not None, "ERROR: If you specify --right you must also specify --right_out" + assert ( + args.singletons_out is not None + ), """ERROR: If you pass --left and --right you must still pass --singletons_out. + This is so orphaned mate pairs can still be exported.""" + return arg_state + case 4: + assert ( + args.singletons_out is not None + ), "ERROR: If you pass --singletons you must also pass --singletons_out" + if args.left_out is not None or args.right_out is not None: + print("--left and --right arguments not passed, --left_out and --right_out options will be ignored") + return arg_state + case 7: + assert args.left_out is not None, "ERROR: If you specify --left you must also specify --left_out" + assert args.right_out is not None, "ERROR: If you specify --right you must also specify --right_out" + assert ( + args.singletons_out is not None + ), "ERROR: If you pass --singletons you must also pass --singletons_out" + return arg_state + case _: + raise Exception("ERROR: Invalid argument state") + + +def left_right_processing( + counts: dict, + pct_cutoff: int, + l_in_fh: TextIO, + r_in_fh: TextIO, + l_out_fh: TextIO, + r_out_fh: TextIO, + s_out_fh: TextIO, + line_limit=None, +) -> dict: + """Process left and right reads.""" + + l_keep = False + r_keep = False + last_lheader = None + last_rheader = None + line_count = 0 - if args.right_out is None: - raise Exception("ERROR: If you specify --right you must also specify --right_out") - else: - r_out_fh = open(args.right_out, 'wt') + for l_line, r_line in zip(l_in_fh, r_in_fh): + line_count += 1 + + if line_limit is not None and line_count > line_limit: + print("WARNING: Program exited after {0} lines for debugging".format(line_limit)) + break + + if line_count % 4 == 1: + last_lheader = l_line + last_rheader = r_line + l_keep = False + r_keep = False + + elif line_count % 4 == 2: + counts["total_reads"] += 2 + l_pct_n = (l_line.count("N") / (len(l_line) - 1)) * 100 + r_pct_n = (r_line.count("N") / (len(r_line) - 1)) * 100 + + # Left bad, right good + if l_pct_n >= pct_cutoff and r_pct_n < pct_cutoff: + l_keep = False + r_keep = s_out_fh + r_keep.write(last_rheader) + r_keep.write(r_line) + counts["left_only_discarded"] += 1 + counts["right_only_kept"] += 1 + + # Left good, right bad + elif l_pct_n < pct_cutoff and r_pct_n >= pct_cutoff: + r_keep = False + l_keep = s_out_fh + l_keep.write(last_lheader) + l_keep.write(l_line) + counts["left_only_kept"] += 1 + counts["right_only_discarded"] += 1 + + # Both good + elif l_pct_n < pct_cutoff and r_pct_n < pct_cutoff: + l_keep = l_out_fh + r_keep = r_out_fh + l_keep.write(last_lheader) + l_keep.write(l_line) + r_keep.write(last_rheader) + r_keep.write(r_line) + counts["pairs_kept"] += 1 + + # Both bad + else: + counts["pairs_discarded"] += 1 - if args.singletons_out is None: - raise Exception("ERROR: The --singletons_out option must be passed if --left and --right are specified, even " \ - "if --singletons isn't. This is so that orphaned mate pairs can still be exported.") + # Handle the third/fourth lines else: - s_out_fh = open(args.singletons_out, 'wt') - - elif args.singletons is not None: - s_in_fh = open(args.singletons) + if r_keep: + r_keep.write(r_line) - if args.singletons_out is None: - raise Exception("ERROR: If you pass --singletons you must also pass --singletons_out") - else: - s_out_fh = open(args.singletons_out, 'wt') - - else: - raise Exception("ERROR. One of these must be passed: [left, right], [left, right, singletons], [singletons]") - - - ############################################################## - # now the actual functioning code - ############################################################## - line_count = 0 - record_count = 0 - last_lheader = None - last_rheader = None + if l_keep: + l_keep.write(l_line) - #counts = {'left_kept':0, 'left_discarded':0, 'right_kept':0, 'right_discarded':0, 'singlets_kept':0, 'singlets_discarded':0} - counts = {'total_reads':0, 'pairs_kept':0, 'pairs_discarded':0, \ - 'left_only_kept':0, 'left_only_discarded':0, \ - 'right_only_kept':0, 'right_only_discarded':0, \ - 'singletons_kept':0, 'singletons_discarded':0 - } + return counts - # for debugging purposes on large files only - line_limit = None - - if args.left is not None and args.right is not None: - l_keep = False - r_keep = False - - # Read left and right files at once. - with open(args.left) as l_in_fh, open(args.right) as r_in_fh: - for l_line, r_line in zip(l_in_fh, r_in_fh): - line_count += 1 - - if line_limit is not None and line_count > line_limit: - print("WARNING: Program exited after {0} lines for debugging".format(line_limit)) - break - - if line_count % 4 == 1: - record_count += 1 - last_lheader = l_line - last_rheader = r_line - l_keep = False - r_keep = False - - elif line_count % 4 == 2: - counts['total_reads'] += 2 - l_pct_n = (l_line.count('N') / (len(l_line) - 1)) * 100 - r_pct_n = (r_line.count('N') / (len(r_line) - 1)) * 100 - - #print("LN%: {0}\tRN%{1}: ".format(l_pct_n, r_pct_n)) - - # left bad, right good - if l_pct_n >= args.percent_n_cutoff and r_pct_n < args.percent_n_cutoff: - l_keep = False - r_keep = s_out_fh - r_keep.write(last_rheader) - r_keep.write(r_line) - counts['left_only_discarded'] += 1 - counts['right_only_kept'] += 1 - - # left good, right bad - elif l_pct_n < args.percent_n_cutoff and r_pct_n >= args.percent_n_cutoff: - r_keep = False - l_keep = s_out_fh - l_keep.write(last_lheader) - l_keep.write(l_line) - counts['left_only_kept'] += 1 - counts['right_only_discarded'] += 1 - - # both good - elif l_pct_n < args.percent_n_cutoff and r_pct_n < args.percent_n_cutoff: - l_keep = l_out_fh - r_keep = r_out_fh - l_keep.write(last_lheader) - l_keep.write(l_line) - r_keep.write(last_rheader) - r_keep.write(r_line) - counts['pairs_kept'] += 1 - - # both bad - else: - counts['pairs_discarded'] += 1 - else: - # handle the third/fourth lines - if r_keep: - r_keep.write(r_line) - if l_keep: - l_keep.write(l_line) - +def singleton_processing(counts: dict, pct_cutoff: int, s_in_fh: TextIO, s_out_fh: TextIO) -> dict: + """Process singleton reads.""" if args.singletons is not None: - # s_in_fh - # s_out_fh line_count = 0 - record_count = 0 last_header = None keep = False - + for line in s_in_fh: line_count += 1 if line_count % 4 == 1: - record_count += 1 last_header = line keep = False elif line_count % 4 == 2: - counts['total_reads'] += 1 - pct_n = (line.count('N') / (len(line) - 1)) * 100 + counts["total_reads"] += 1 + pct_n = (line.count("N") / (len(line) - 1)) * 100 - if pct_n < args.percent_n_cutoff: + if pct_n < pct_cutoff: keep = True s_out_fh.write(last_header) s_out_fh.write(line) - counts['singletons_kept'] += 1 + counts["singletons_kept"] += 1 else: - counts['singletons_discarded'] += 1 + counts["singletons_discarded"] += 1 else: if keep: s_out_fh.write(line) - - - if args.output_report is not None: - report_fh = open(args.output_report, 'wt') - - report_fh.write("Total input reads: {0}\n".format(counts['total_reads'])) - report_fh.write("Read pairs kept: {0}\n".format(counts['pairs_kept'])) - report_fh.write("Read pairs discarded: {0}\n".format(counts['pairs_discarded'])) - report_fh.write("Left only reads kept: {0}\n".format(counts['left_only_kept'])) - report_fh.write("Left only reads discarded: {0}\n".format(counts['left_only_discarded'])) - report_fh.write("Right only reads kept: {0}\n".format(counts['right_only_kept'])) - report_fh.write("Right only reads discarded: {0}\n".format(counts['right_only_discarded'])) - report_fh.write("Singleton reads kept: {0}\n".format(counts['singletons_kept'])) - report_fh.write("Singleton reads discarded: {0}\n".format(counts['singletons_discarded'])) - - -if __name__ == '__main__': - main() + return counts +def main(args) -> None: + # Debugging on large files only + line_limit = None # noqa + # Determine the set of inputs and outputs + run_state = validate_arguments(args) + counts = { + "lines": 0, + "records": 0, + "total_reads": 0, + "pairs_kept": 0, + "pairs_discarded": 0, + "left_only_kept": 0, + "left_only_discarded": 0, + "right_only_kept": 0, + "right_only_discarded": 0, + "singletons_kept": 0, + "singletons_discarded": 0, + } + # Left and right provided + if run_state == 3 or run_state == 7: + with ( + open(args.left, "r") as l_in_fh, + open(args.right, "r") as r_in_fh, + open(args.left_out, "wt") as l_out_fh, + open(args.right_out, "wt") as r_out_fh, + open(args.singletons_out, "wt") as s_out_fh, + ): + counts = left_right_processing( + counts, args.percent_n_cutoff, l_in_fh, r_in_fh, l_out_fh, r_out_fh, s_out_fh + ) + + # Singletons provided + if run_state == 4 or run_state == 7: + with open(args.singletons, "r") as s_in_fh, open(args.singletons_out, "wt") as s_out_fh: + counts = singleton_processing(counts, args.percent_n_cutoff, s_in_fh, s_out_fh) + + # Output the report + if args.output_report is not None: + with open(args.output_report, "wt") as report_fh: + report_fh.write("Total input reads: {0}\n".format(counts["total_reads"])) + report_fh.write("Read pairs kept: {0}\n".format(counts["pairs_kept"])) + report_fh.write("Read pairs discarded: {0}\n".format(counts["pairs_discarded"])) + report_fh.write("Left only reads kept: {0}\n".format(counts["left_only_kept"])) + report_fh.write("Left only reads discarded: {0}\n".format(counts["left_only_discarded"])) + report_fh.write("Right only reads kept: {0}\n".format(counts["right_only_kept"])) + report_fh.write("Right only reads discarded: {0}\n".format(counts["right_only_discarded"])) + report_fh.write("Singleton reads kept: {0}\n".format(counts["singletons_kept"])) + report_fh.write("Singleton reads discarded: {0}\n".format(counts["singletons_discarded"])) + + +if __name__ == "__main__": + args = parse_arguments() + main(args) From 8ee4aaae0f657e1cc6fa1dafaab3aced098ffdea Mon Sep 17 00:00:00 2001 From: RPINerd Date: Mon, 18 Nov 2024 19:21:32 -0500 Subject: [PATCH 8/8] Refactor, flake8/Black formatting and best practices, some upgrades for newer python3 conventions --- fastq/fastq_simple_stats.py | 142 +++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 58 deletions(-) diff --git a/fastq/fastq_simple_stats.py b/fastq/fastq_simple_stats.py index 84fbddb1..483e3f21 100755 --- a/fastq/fastq_simple_stats.py +++ b/fastq/fastq_simple_stats.py @@ -6,10 +6,10 @@ $ ./fastq_simple_stats.py my.fastq File: my.fastq - Total sequence entries: 277971801 - Total bases: 28353123702 - Avg sequence length: 102.0 - GC percentage: 44.5 + Total sequence entries: 277971801 + Total bases: 28353123702 + Avg sequence length: 102.0 + GC percentage: 44.5 If you pass multiple files via *.fastq or something similar this will report stats on these as an aggregate unless you pass the --individual flag, which will cause @@ -23,68 +23,115 @@ import argparse import codecs import gzip -import os import sys +from typing import TextIO -def main(): - parser = argparse.ArgumentParser( description='Provides simple quantitative statistics for a given FASTQ file') - ## output file to be written - parser.add_argument('input_files', metavar='N', type=str, nargs='+', help='Path to one or more input files, separated by spaces' ) - parser.add_argument('-o', '--output_file', type=str, required=False, help='Optional path to an output file to be created, else prints on STDOUT' ) - parser.add_argument('--individual', action='store_true', help='Report stats on each file individually' ) - parser.add_argument('-p', '--progress_interval', type=int, required=False, help='Pass an integer to show progress ever N entries on STDERR' ) - args = parser.parse_args() +def parse_args() -> argparse.Namespace: + """ + Simple argument parser for this script. - ## open the output file - fout = None + :return: argparse.Namespace object containing the input options + """ + + parser = argparse.ArgumentParser(description="Provides simple quantitative statistics for a given FASTQ file") + + parser.add_argument( + "input_files", metavar="N", type=str, nargs="+", help="Path to one or more input files, separated by spaces" + ) + parser.add_argument( + "-o", + "--output_file", + type=str, + required=False, + help="Optional path to an output file to be created, else prints on STDOUT", + ) + parser.add_argument("--individual", action="store_true", help="Report stats on each file individually") + parser.add_argument( + "-p", + "--progress_interval", + type=int, + required=False, + help="Pass an integer to show progress ever N entries on STDERR", + ) + + return parser.parse_args() + + +def report_stats( + fout: TextIO | codecs.StreamWriter, input_file: str, entry_count: int, total_bases: int, gc_count: int +) -> None: + + try: + avg_entry_length = total_bases / entry_count + except ZeroDivisionError: + avg_entry_length = 0.0 + except Exception as e: + raise Exception(f"Error calculating average entry length: {e}") + + fout.write(f"\nFile: {input_file}\n") + fout.write(f"\tTotal sequence entries: {entry_count:,}\n") + fout.write(f"\tTotal bases: {total_bases:,}\n") + fout.write(f"\tAvg sequence length: {avg_entry_length:,.1f}\n") + + try: + fout.write(f"\tGC percentage: {((gc_count / total_bases) * 100):.1f}\n") + except ZeroDivisionError: + fout.write("\tGC percentage: N/A\n") + except Exception as e: + raise Exception(f"Error calculating GC percentage: {e}") + + +def main(args: argparse.Namespace) -> None: + + # Open the output file, if specified if args.output_file is None: - fout = codecs.getwriter('utf8')(sys.stdout.buffer) + fout = codecs.getwriter("utf8")(sys.stdout.buffer) else: fout = open(args.output_file, "w") - ## values that will be reported - entry_count = 0 - total_bases = 0 - gc_count = 0 + # Values that will be reported + entry_count: int = 0 + total_bases: int = 0 + gc_count: int = 0 for input_file in args.input_files: - print("Processing file: {0}".format(input_file), file=sys.stderr) + print(f"Processing file: {input_file}", file=sys.stderr) line_number = 0 - if input_file.endswith('.gz'): - fh = gzip.open( input_file, 'rb') + if input_file.endswith(".gz"): + fh = gzip.open(input_file, "rb") is_compressed = True else: - fh = open(input_file, 'r') + fh = open(input_file, "r") is_compressed = False if args.individual: entry_count = 0 total_bases = 0 - gc_count = 0 - + gc_count = 0 + for line in fh: if is_compressed: line = line.decode() - + line = line.rstrip() line_number += 1 - ## every 4th line is the start of a sequence entry (check still) + # Every 4th line is the start of a sequence entry (check still) if line_number % 4 == 1: - if line[0] == '@': + if line[0] == "@": entry_count += 1 if args.progress_interval: if not entry_count % args.progress_interval: - print("{0:,} entries processed".format(entry_count), file=sys.stderr, end="\r") + print(f"{entry_count:,} entries processed", file=sys.stderr, end="\r") else: - raise Exception("Error, expected every 4th line to start with @ and this one didn't: {0}".format(line) ) + raise Exception(f"Line {line_number} expected to start with '@': {line}") elif line_number % 4 == 2: total_bases += len(line) - gc_count += line.count('G') - gc_count += line.count('C') + gc_count += line.count("G") + gc_count += line.count("C") if args.individual: report_stats(fout, input_file, entry_count, total_bases, gc_count) @@ -93,30 +140,9 @@ def main(): if len(args.input_files) == 1: report_stats(fout, args.input_files[0], entry_count, total_bases, gc_count) else: - report_stats(fout, 'multiple', entry_count, total_bases, gc_count) - -def report_stats(fout, input_file, entry_count, total_bases, gc_count): - if entry_count: - avg_entry_length = total_bases / entry_count - else: - avg_entry_length = 0 - - fout.write("\nFile: {0}\n".format(input_file)) - fout.write("\tTotal sequence entries: {0:,}\n".format(entry_count)) - fout.write("\tTotal bases: {0:,}\n".format(total_bases)) - fout.write("\tAvg sequence length: {0:,.1f}\n".format(avg_entry_length)) - - if total_bases: - fout.write("\tGC percentage: {0:.1f}\n".format((gc_count / total_bases) * 100)) - else: - fout.write("\tGC percentage: N/A\n") - -if __name__ == '__main__': - main() - - - - - + report_stats(fout, "multiple", entry_count, total_bases, gc_count) +if __name__ == "__main__": + args = parse_args() + main(args)