Skip to content

Commit

Permalink
Merge pull request BD2KGenomics#96 from fnothaft/issues/95-sambamba-s…
Browse files Browse the repository at this point in the history
…amblaster

Added alternate sort and duplicate marking tools (resolves BD2KGenomics#95)
  • Loading branch information
jvivian authored Aug 8, 2017
2 parents 657c115 + 766ced8 commit 68daa20
Showing 1 changed file with 230 additions and 13 deletions.
243 changes: 230 additions & 13 deletions src/toil_lib/tools/preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time

from toil.job import PromisedRequirement
from toil.lib.docker import dockerCall
Expand Down Expand Up @@ -84,6 +85,40 @@ def run_samtools_index(job, bam):
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam.bai'))



def run_samtools_view(job, bam):
"""
Runs SAMtools view to create a SAM file.
:param JobFunctionWrappingJob job: passed automatically by Toil
:param str bam: FileStoreID of the BAM file
:return: FileStoreID for SAM file
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'sample.bam'))
# Call: index the bam
parameters = ['view',
'-h',
'-o', '/data/sample.sam',
'/data/sample.bam']
dockerCall(job=job, workDir=work_dir, parameters=parameters,
tool='quay.io/ucsc_cgl/samtools:0.1.19--dd5ac549b95eb3e5d166a5e310417ef13651994e')
# Write to fileStore
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.sam'))


def _log_runtime(job, start, end, cmd):

elapsed_time = end - start

hours = int(elapsed_time) / (60 * 60)
minutes = int(elapsed_time - (60 * 60 * hours)) / 60
seconds = int(elapsed_time - (60 * 60 * hours) - (60 * minutes)) % 60

job.fileStore.logToMaster("%s ran in %dh%dm%ds" % (cmd, hours, minutes, seconds))


def run_samtools_sort(job, bam):
"""
Sorts BAM file using SAMtools sort
Expand All @@ -96,12 +131,121 @@ def run_samtools_sort(job, bam):
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam'))
command = ['sort',
'-@', str(job.cores),
'-@', str(int(job.cores)),
'-o', '/data/output.bam',
'/data/input.bam']

start_time = time.time()
dockerCall(job=job, workDir=work_dir,
parameters=command,
tool='quay.io/ucsc_cgl/samtools:1.3--256539928ea162949d8a65ca5c79a72ef557ce7c')
end_time = time.time()
_log_runtime(job, start_time, end_time, "samtools sort")
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam'))


def run_samtools_rmdup(job, bam):
"""
Mark reads as PCR duplicates using SAMtools rmdup
:param JobFunctionWrappingJob job: passed automatically by Toil
:param str bam: FileStoreID for BAM file
:return: FileStoreID for sorted BAM file
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam'))
command = ['rmdup',
'/data/input.bam',
'/data/output.bam']

start_time = time.time()
dockerCall(job=job, workDir=work_dir,
parameters=command,
tool='quay.io/ucsc_cgl/samtools:1.3--256539928ea162949d8a65ca5c79a72ef557ce7c')
end_time = time.time()
_log_runtime(job, start_time, end_time, "samtools rmdup")
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam'))


def run_sambamba_sort(job, bam, sort_by_name=False):
"""
Sorts BAM file using Sambamba sort
:param JobFunctionWrappingJob job: passed automatically by Toil
:param str bam: FileStoreID for BAM file
:param boolean sort_by_name: If true, sorts by read name instead of coordinate.
:return: FileStoreID for sorted BAM file
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam'))
command = ['/usr/local/bin/sambamba',
'sort',
'-t', str(int(job.cores)),
'-m', str(job.memory),
'-o', '/data/output.bam',
'/data/input.bam']

if sort_by_name:
command.append('-n')

start_time = time.time()
dockerCall(job=job, workDir=work_dir,
parameters=command,
tool='quay.io/biocontainers/sambamba:0.6.6--0')
end_time = time.time()
_log_runtime(job, start_time, end_time, "sambamba sort")
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam'))


def run_sambamba_markdup(job, bam):
"""
Marks reads as PCR duplicates using Sambamba
:param JobFunctionWrappingJob job: passed automatically by Toil
:param str bam: FileStoreID for BAM file
:return: FileStoreID for sorted BAM file
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam'))
command = ['/usr/local/bin/sambamba',
'markdup',
'-t', str(int(job.cores)),
'/data/input.bam',
'/data/output.bam']

start_time = time.time()
dockerCall(job=job, workDir=work_dir,
parameters=command,
tool='quay.io/biocontainers/sambamba:0.6.6--0')
end_time = time.time()
_log_runtime(job, start_time, end_time, "sambamba mkdup")
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam'))


def run_samblaster(job, bam):
"""
Marks reads as PCR duplicates using SAMBLASTER
:param JobFunctionWrappingJob job: passed automatically by Toil
:param str bam: FileStoreID for BAM file
:return: FileStoreID for sorted BAM file
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam'))
command = ['/usr/local/bin/samblaster',
'-s', '/data/input.bam',
'-o', '/data/output.bam']

start_time = time.time()
dockerCall(job=job, workDir=work_dir,
parameters=command,
tool='quay.io/biocontainers/samblaster:0.1.24--0')
end_time = time.time()
_log_runtime(job, start_time, end_time, "SAMBLASTER")
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam'))


Expand Down Expand Up @@ -152,18 +296,62 @@ def picard_mark_duplicates(job, bam, bai, validation_stringency='LENIENT'):

# picard-tools container doesn't have JAVA_OPTS variable
# Set TMPDIR to /data to prevent writing temporary files to /tmp
docker_parameters = ['--rm', 'log-driver', 'none',
'-e', 'JAVA_OPTIONS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)]
docker_parameters = ['--rm',
'--log-driver', 'none',
'-e', 'JAVA_OPTIONS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory),
'-v', '{}:/data'.format(work_dir)]

start_time = time.time()
dockerCall(job=job, workDir=work_dir,
parameters=command,
tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e',
dockerParameters=docker_parameters)
end_time = time.time()
_log_runtime(job, start_time, end_time, "Picard MarkDuplicates")

bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'mkdups.bam'))
bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'mkdups.bai'))
return bam, bai


def run_picard_sort(job, bam, sort_by_name=False):
"""
Sorts BAM file using Picard SortSam
:param JobFunctionWrappingJob job: passed automatically by Toil
:param str bam: FileStoreID for BAM file
:param boolean sort_by_name: If true, sorts by read name instead of coordinate.
:return: FileStoreID for sorted BAM file
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'input.bam'))
command = ['SortSam',
'O=/data/output.bam',
'I=/data/input.bam']

# picard-tools container doesn't have JAVA_OPTS variable
# Set TMPDIR to /data to prevent writing temporary files to /tmp
docker_parameters = ['--rm',
'--log-driver', 'none',
'-e', 'JAVA_OPTIONS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory),
'-v', '{}:/data'.format(work_dir)]

if sort_by_name:
command.append('SO=queryname')
else:
command.append('SO=coordinate')

start_time = time.time()
dockerCall(job=job, workDir=work_dir,
parameters=command,
tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e',
dockerParameters=docker_parameters)
end_time = time.time()
_log_runtime(job, start_time, end_time, "Picard SortSam")
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam'))


def run_gatk_preprocessing(job, bam, bai, ref, ref_dict, fai, g1k, mills, dbsnp, realign=False, unsafe=False):
"""
GATK Preprocessing Pipeline
Expand Down Expand Up @@ -342,16 +530,27 @@ def run_realigner_target_creator(job, bam, bai, ref, ref_dict, fai, g1k, mills,
'-known', '/data/mills.vcf',
'--downsampling_type', 'NONE',
'-o', '/data/sample.intervals']

end_time = time.time()
_log_runtime(job, start_time, end_time, "GATK3 RealignerTargetCreator")

if unsafe:
parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY'])

# Set TMPDIR to /data to prevent writing temporary files to /tmp
docker_parameters = ['--rm', 'log-driver', 'none',
'-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)]
docker_parameters = ['--rm',
'--log-driver', 'none',
'-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory),
'-v', '{}:/data'.format(work_dir)]

start_time = time.time()
dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
workDir=work_dir,
parameters=parameters,
dockerParameters=docker_parameters)
end_time = time.time()
_log_runtime(job, start_time, end_time, "GATK3 RTC")

# Write to fileStore
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.intervals'))

Expand Down Expand Up @@ -404,12 +603,17 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, g1k, mil
parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY'])

# Set TMPDIR to /data to prevent writing temporary files to /tmp
docker_parameters = ['--rm', 'log-driver', 'none',
'-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)]
docker_parameters = ['--rm',
'--log-driver', 'none',
'-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory),
'-v', '{}:/data'.format(work_dir)]
start_time = time.time()
dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
workDir=work_dir,
parameters=parameters,
dockerParameters=docker_parameters)
end_time = time.time()
_log_runtime(job, start_time, end_time, "GATK3 IndelRealigner")

indel_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam'))
indel_bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bai'))
Expand Down Expand Up @@ -446,7 +650,7 @@ def run_base_recalibration(job, bam, bai, ref, ref_dict, fai, dbsnp, mills, unsa

# Call: GATK -- BaseRecalibrator
parameters = ['-T', 'BaseRecalibrator',
'-nct', str(job.cores),
'-nct', str(int(job.cores)),
'-R', '/data/ref.fasta',
'-I', '/data/input.bam',
# Recommended known sites:
Expand All @@ -459,12 +663,17 @@ def run_base_recalibration(job, bam, bai, ref, ref_dict, fai, dbsnp, mills, unsa
parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY'])

# Set TMPDIR to /data to prevent writing temporary files to /tmp
docker_parameters = ['--rm', 'log-driver', 'none',
'-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)]
docker_parameters = ['--rm',
'--log-driver', 'none',
'-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory),
'-v', '{}:/data'.format(work_dir)]
start_time = time.time()
dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
workDir=work_dir,
parameters=parameters,
dockerParameters=docker_parameters)
end_time = time.time()
_log_runtime(job, start_time, end_time, "GATK3 BaseRecalibrator")

return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'recal_data.table'))

Expand Down Expand Up @@ -497,21 +706,29 @@ def apply_bqsr_recalibration(job, table, bam, bai, ref, ref_dict, fai, unsafe=Fa

# Call: GATK -- PrintReads
parameters = ['-T', 'PrintReads',
'-nct', str(job.cores),
'-nct', str(int(job.cores)),
'-R', '/data/ref.fasta',
'-I', '/data/input.bam',
'-BQSR', '/data/recal.table',
'-o', '/data/bqsr.bam']
end_time = time.time()
_log_runtime(job, start_time, end_time, "GATK3 BQSR PrintReads")

if unsafe:
parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY'])

# Set TMPDIR to /data to prevent writing temporary files to /tmp
docker_parameters = ['--rm', 'log-driver', 'none',
'-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory)]
docker_parameters = ['--rm',
'--log-driver', 'none',
'-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory),
'-v', '{}:/data'.format(work_dir)]
start_time = time.time()
dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
workDir=work_dir,
parameters=parameters,
dockerParameters=docker_parameters)
end_time = time.time()
_log_runtime(job, start_time, end_time, "GATK3 BQSR PrintReads")

output_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'bqsr.bam'))
output_bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'bqsr.bai'))
Expand Down

0 comments on commit 68daa20

Please sign in to comment.