diff --git a/src/toil_lib/tools/preprocessing.py b/src/toil_lib/tools/preprocessing.py index 02f4969..36494c6 100644 --- a/src/toil_lib/tools/preprocessing.py +++ b/src/toil_lib/tools/preprocessing.py @@ -1,4 +1,5 @@ import os +import time from toil.job import PromisedRequirement from toil.lib.docker import dockerCall @@ -84,6 +85,40 @@ def run_samtools_index(job, bam): return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam.bai')) + +def run_samtools_view(job, bam): + """ + Runs SAMtools view to create a SAM file. + + :param JobFunctionWrappingJob job: passed automatically by Toil + :param str bam: FileStoreID of the BAM file + :return: FileStoreID for SAM file + :rtype: str + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'sample.bam')) + # Call: index the bam + parameters = ['view', + '-h', + '-o', '/data/sample.sam', + '/data/sample.bam'] + dockerCall(job=job, workDir=work_dir, parameters=parameters, + tool='quay.io/ucsc_cgl/samtools:0.1.19--dd5ac549b95eb3e5d166a5e310417ef13651994e') + # Write to fileStore + return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.sam')) + + +def _log_runtime(job, start, end, cmd): + + elapsed_time = end - start + + hours = int(elapsed_time) / (60 * 60) + minutes = int(elapsed_time - (60 * 60 * hours)) / 60 + seconds = int(elapsed_time - (60 * 60 * hours) - (60 * minutes)) % 60 + + job.fileStore.logToMaster("%s ran in %dh%dm%ds" % (cmd, hours, minutes, seconds)) + + def run_samtools_sort(job, bam): """ Sorts BAM file using SAMtools sort @@ -96,12 +131,121 @@ def run_samtools_sort(job, bam): work_dir = job.fileStore.getLocalTempDir() job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam')) command = ['sort', - '-@', str(job.cores), + '-@', str(int(job.cores)), '-o', '/data/output.bam', '/data/input.bam'] + + start_time = time.time() + dockerCall(job=job, workDir=work_dir, + parameters=command, + tool='quay.io/ucsc_cgl/samtools:1.3--256539928ea162949d8a65ca5c79a72ef557ce7c') + end_time = time.time() + _log_runtime(job, start_time, end_time, "samtools sort") + return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + + +def run_samtools_rmdup(job, bam): + """ + Mark reads as PCR duplicates using SAMtools rmdup + + :param JobFunctionWrappingJob job: passed automatically by Toil + :param str bam: FileStoreID for BAM file + :return: FileStoreID for sorted BAM file + :rtype: str + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam')) + command = ['rmdup', + '/data/input.bam', + '/data/output.bam'] + + start_time = time.time() dockerCall(job=job, workDir=work_dir, parameters=command, tool='quay.io/ucsc_cgl/samtools:1.3--256539928ea162949d8a65ca5c79a72ef557ce7c') + end_time = time.time() + _log_runtime(job, start_time, end_time, "samtools rmdup") + return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + + +def run_sambamba_sort(job, bam, sort_by_name=False): + """ + Sorts BAM file using Sambamba sort + + :param JobFunctionWrappingJob job: passed automatically by Toil + :param str bam: FileStoreID for BAM file + :param boolean sort_by_name: If true, sorts by read name instead of coordinate. + :return: FileStoreID for sorted BAM file + :rtype: str + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam')) + command = ['/usr/local/bin/sambamba', + 'sort', + '-t', str(int(job.cores)), + '-m', str(job.memory), + '-o', '/data/output.bam', + '/data/input.bam'] + + if sort_by_name: + command.append('-n') + + start_time = time.time() + dockerCall(job=job, workDir=work_dir, + parameters=command, + tool='quay.io/biocontainers/sambamba:0.6.6--0') + end_time = time.time() + _log_runtime(job, start_time, end_time, "sambamba sort") + return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + + +def run_sambamba_markdup(job, bam): + """ + Marks reads as PCR duplicates using Sambamba + + :param JobFunctionWrappingJob job: passed automatically by Toil + :param str bam: FileStoreID for BAM file + :return: FileStoreID for sorted BAM file + :rtype: str + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam')) + command = ['/usr/local/bin/sambamba', + 'markdup', + '-t', str(int(job.cores)), + '/data/input.bam', + '/data/output.bam'] + + start_time = time.time() + dockerCall(job=job, workDir=work_dir, + parameters=command, + tool='quay.io/biocontainers/sambamba:0.6.6--0') + end_time = time.time() + _log_runtime(job, start_time, end_time, "sambamba mkdup") + return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + + +def run_samblaster(job, bam): + """ + Marks reads as PCR duplicates using SAMBLASTER + + :param JobFunctionWrappingJob job: passed automatically by Toil + :param str bam: FileStoreID for BAM file + :return: FileStoreID for sorted BAM file + :rtype: str + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam')) + command = ['/usr/local/bin/samblaster', + '-s', '/data/input.bam', + '-o', '/data/output.bam'] + + start_time = time.time() + dockerCall(job=job, workDir=work_dir, + parameters=command, + tool='quay.io/biocontainers/samblaster:0.1.24--0') + end_time = time.time() + _log_runtime(job, start_time, end_time, "SAMBLASTER") return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) @@ -152,18 +296,62 @@ def picard_mark_duplicates(job, bam, bai, validation_stringency='LENIENT'): # picard-tools container doesn't have JAVA_OPTS variable # Set TMPDIR to /data to prevent writing temporary files to /tmp - docker_parameters = ['--rm', 'log-driver', 'none', - '-e', 'JAVA_OPTIONS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)] + docker_parameters = ['--rm', + '--log-driver', 'none', + '-e', 'JAVA_OPTIONS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory), + '-v', '{}:/data'.format(work_dir)] + + start_time = time.time() dockerCall(job=job, workDir=work_dir, parameters=command, tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e', dockerParameters=docker_parameters) + end_time = time.time() + _log_runtime(job, start_time, end_time, "Picard MarkDuplicates") bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'mkdups.bam')) bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'mkdups.bai')) return bam, bai +def run_picard_sort(job, bam, sort_by_name=False): + """ + Sorts BAM file using Picard SortSam + + :param JobFunctionWrappingJob job: passed automatically by Toil + :param str bam: FileStoreID for BAM file + :param boolean sort_by_name: If true, sorts by read name instead of coordinate. + :return: FileStoreID for sorted BAM file + :rtype: str + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam')) + command = ['SortSam', + 'O=/data/output.bam', + 'I=/data/input.bam'] + + # picard-tools container doesn't have JAVA_OPTS variable + # Set TMPDIR to /data to prevent writing temporary files to /tmp + docker_parameters = ['--rm', + '--log-driver', 'none', + '-e', 'JAVA_OPTIONS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory), + '-v', '{}:/data'.format(work_dir)] + + if sort_by_name: + command.append('SO=queryname') + else: + command.append('SO=coordinate') + + start_time = time.time() + dockerCall(job=job, workDir=work_dir, + parameters=command, + tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e', + dockerParameters=docker_parameters) + end_time = time.time() + _log_runtime(job, start_time, end_time, "Picard SortSam") + return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + + def run_gatk_preprocessing(job, bam, bai, ref, ref_dict, fai, g1k, mills, dbsnp, realign=False, unsafe=False): """ GATK Preprocessing Pipeline @@ -342,16 +530,27 @@ def run_realigner_target_creator(job, bam, bai, ref, ref_dict, fai, g1k, mills, '-known', '/data/mills.vcf', '--downsampling_type', 'NONE', '-o', '/data/sample.intervals'] + + end_time = time.time() + _log_runtime(job, start_time, end_time, "GATK3 RealignerTargetCreator") + if unsafe: parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) # Set TMPDIR to /data to prevent writing temporary files to /tmp - docker_parameters = ['--rm', 'log-driver', 'none', - '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)] + docker_parameters = ['--rm', + '--log-driver', 'none', + '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory), + '-v', '{}:/data'.format(work_dir)] + + start_time = time.time() dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', workDir=work_dir, parameters=parameters, dockerParameters=docker_parameters) + end_time = time.time() + _log_runtime(job, start_time, end_time, "GATK3 RTC") + # Write to fileStore return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.intervals')) @@ -404,12 +603,17 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, g1k, mil parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) # Set TMPDIR to /data to prevent writing temporary files to /tmp - docker_parameters = ['--rm', 'log-driver', 'none', - '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)] + docker_parameters = ['--rm', + '--log-driver', 'none', + '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory), + '-v', '{}:/data'.format(work_dir)] + start_time = time.time() dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', workDir=work_dir, parameters=parameters, dockerParameters=docker_parameters) + end_time = time.time() + _log_runtime(job, start_time, end_time, "GATK3 IndelRealigner") indel_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) indel_bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bai')) @@ -446,7 +650,7 @@ def run_base_recalibration(job, bam, bai, ref, ref_dict, fai, dbsnp, mills, unsa # Call: GATK -- BaseRecalibrator parameters = ['-T', 'BaseRecalibrator', - '-nct', str(job.cores), + '-nct', str(int(job.cores)), '-R', '/data/ref.fasta', '-I', '/data/input.bam', # Recommended known sites: @@ -459,12 +663,17 @@ def run_base_recalibration(job, bam, bai, ref, ref_dict, fai, dbsnp, mills, unsa parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) # Set TMPDIR to /data to prevent writing temporary files to /tmp - docker_parameters = ['--rm', 'log-driver', 'none', - '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)] + docker_parameters = ['--rm', + '--log-driver', 'none', + '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory), + '-v', '{}:/data'.format(work_dir)] + start_time = time.time() dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', workDir=work_dir, parameters=parameters, dockerParameters=docker_parameters) + end_time = time.time() + _log_runtime(job, start_time, end_time, "GATK3 BaseRecalibrator") return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'recal_data.table')) @@ -497,21 +706,29 @@ def apply_bqsr_recalibration(job, table, bam, bai, ref, ref_dict, fai, unsafe=Fa # Call: GATK -- PrintReads parameters = ['-T', 'PrintReads', - '-nct', str(job.cores), + '-nct', str(int(job.cores)), '-R', '/data/ref.fasta', '-I', '/data/input.bam', '-BQSR', '/data/recal.table', '-o', '/data/bqsr.bam'] + end_time = time.time() + _log_runtime(job, start_time, end_time, "GATK3 BQSR PrintReads") + if unsafe: parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) # Set TMPDIR to /data to prevent writing temporary files to /tmp - docker_parameters = ['--rm', 'log-driver', 'none', - '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)] + docker_parameters = ['--rm', + '--log-driver', 'none', + '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory), + '-v', '{}:/data'.format(work_dir)] + start_time = time.time() dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', workDir=work_dir, parameters=parameters, dockerParameters=docker_parameters) + end_time = time.time() + _log_runtime(job, start_time, end_time, "GATK3 BQSR PrintReads") output_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'bqsr.bam')) output_bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'bqsr.bai'))