Skip to content

Commit

Permalink
EVA-3430 - Add bcftools norm before ingesting (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
sundarvenkata-EBI authored Oct 30, 2023
1 parent d60cb89 commit a6fcdb9
Show file tree
Hide file tree
Showing 7 changed files with 541 additions and 17 deletions.
73 changes: 56 additions & 17 deletions covid19dp_submission/nextflow/submission_workflow.nf
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
nextflow.enable.dsl=2

params.NORMALISED_VCF_DIR = "${params.submission.download_target_dir}/normalised_vcfs"
params.REFSEQ_FASTA = "${params.submission.download_target_dir}/refseq_fasta.fa"

// This is needed because "bcftools norm" step requires a FASTA
// but the VCFs we get from Covid19 data team only have RefSeq contigs
process create_refseq_fasta {
output:
val true, emit: create_refseq_fasta_success

script:
// TODO: Doing this seems like a bad idea but this seems to work well for now
"""
sed s/MN908947.3/NC_045512.2/g $params.submission.assembly_fasta > $params.REFSEQ_FASTA
"""
}

process validate_vcfs {

input:
path vcf_files

output:
val true, emit: validate_vcfs_success

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
Expand Down Expand Up @@ -48,10 +64,10 @@ process bgzip_and_index {
val flag1
val flag2
path vcf_files

output:
val true, emit: bgzip_and_index_success

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
Expand All @@ -67,16 +83,16 @@ process bgzip_and_index {
process vertical_concat {
input:
val flag

output:
val true, emit: vertical_concat_success

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
($params.executable.python.interpreter \
-m steps.vcf_vertical_concat.run_vcf_vertical_concat_pipeline \
--toplevel-vcf-dir $params.submission.download_target_dir \
--toplevel-vcf-dir $params.NORMALISED_VCF_DIR \
--concat-processing-dir $params.submission.concat_processing_dir \
--concat-chunk-size $params.submission.concat_chunk_size \
--bcftools-binary $params.executable.bcftools \
Expand All @@ -86,22 +102,45 @@ process vertical_concat {
"""
}

process normalise_concat_vcf {

input:
val flag1

output:
val true, emit: normalise_concat_vcf_success

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
($params.executable.python.interpreter \
-m steps.normalise_vcfs \
--vcf-files $params.submission.concat_result_file \
--input-dir `dirname ${params.submission.concat_result_file}` \
--output-dir $params.NORMALISED_VCF_DIR \
--bcftools-binary $params.executable.bcftools \
--refseq-fasta-file $params.REFSEQ_FASTA \
) >> $params.submission.log_dir/normalise_concat_vcf.log 2>&1
"""
}

process accession_vcf {
clusterOptions "-g /accession/$params.submission.accessioning_instance"

input:
val flag

output:
val true, emit: accession_vcf_success

script:
//Accessioning properties file passed via command line should already be populated with project and assembly accessions
"""
export PYTHONPATH="$params.executable.python.script_path"
export NORMALISED_CONCAT_VCF=("${params.NORMALISED_VCF_DIR}/"`basename ${params.submission.concat_result_file}`)
($params.executable.python.interpreter \
-m steps.accession_vcf \
--vcf-file $params.submission.concat_result_file \
--vcf-file \$NORMALISED_CONCAT_VCF \
--accessioning-jar-file $params.jar.accession_pipeline \
--accessioning-properties-file $params.submission.accessioning_properties_file \
--accessioning-instance $params.submission.accessioning_instance \
Expand All @@ -116,10 +155,10 @@ process sync_accessions_to_public_ftp {

input:
val flag

output:
val true, emit: sync_accessions_to_public_ftp_success

script:
"""
mkdir -p $params.submission.public_ftp_dir
Expand All @@ -133,10 +172,10 @@ process cluster_assembly {

input:
val flag

output:
val true, emit: cluster_assembly_success

script:
//Clustering properties file passed via command line should already be populated with project and assembly accessions
"""
Expand Down Expand Up @@ -173,6 +212,7 @@ process incremental_release {

workflow {
main:
create_refseq_fasta()
Channel.fromPath("$params.submission.download_file_list")
.splitCsv(header:false)
.map(row -> row[0])
Expand All @@ -181,7 +221,6 @@ workflow {
validate_vcfs(vcf_files_list)
asm_check_vcfs(vcf_files_list)
bgzip_and_index(validate_vcfs.out.validate_vcfs_success, asm_check_vcfs.out.asm_check_vcfs_success, vcf_files_list)

vertical_concat(bgzip_and_index.out.bgzip_and_index_success.collect()) | \
accession_vcf | sync_accessions_to_public_ftp | cluster_assembly | incremental_release
normalise_concat_vcf | accession_vcf | sync_accessions_to_public_ftp | cluster_assembly | incremental_release
}
71 changes: 71 additions & 0 deletions covid19dp_submission/steps/normalise_vcfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2023 EMBL - European Bioinformatics Institute
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import os

from ebi_eva_common_pyutils.command_utils import run_command_with_output
from ebi_eva_common_pyutils.logger import logging_config
from covid19dp_submission.steps.bgzip_and_index_vcf import _get_vcf_filename_without_extension

logger = logging_config.get_logger(__name__)


def normalise(input_dir: str, vcf_file: str, output_file: str, bcftools_binary: str, refseq_fasta_file: str) -> str:
vcf_file_name_no_ext = _get_vcf_filename_without_extension(vcf_file)
vcf_file_name_no_ext_and_path = os.path.basename(vcf_file_name_no_ext)
# See here: https://github.com/EBIvariation/eva-submission/blob/bb85922fffb4f29fdce501af036ea79ec8712121/eva_submission/nextflow/prepare_brokering.nf#L141
commands = [f'cd {input_dir}',
f'{bcftools_binary} norm --check-ref w --fasta-ref {refseq_fasta_file} --output-type z --output '
f'{output_file} {vcf_file} ',
f'{bcftools_binary} index --force --csi {output_file}'
]
normalise_command = ' && '.join(commands)
run_command_with_output(f"Normalising {vcf_file_name_no_ext_and_path}...", normalise_command)
return f"{vcf_file_name_no_ext}.vcf.gz"


def normalise_all(input_dir:str, vcf_files: list, output_dir: str, bcftools_binary: str,
refseq_fasta_file: str) -> list:
output_vcf_files = []
os.makedirs(name=output_dir, exist_ok=True)
for vcf_file in vcf_files:
vcf_file_name_no_ext = _get_vcf_filename_without_extension(vcf_file)
output_file = f"{output_dir}/{os.path.basename(vcf_file_name_no_ext)}.vcf.gz"
normalise(input_dir, vcf_file, output_file, bcftools_binary, refseq_fasta_file)
output_vcf_files.append(output_file)
return output_vcf_files


def main():
parser = argparse.ArgumentParser(description='Normalise VCF file',
formatter_class=argparse.RawTextHelpFormatter, add_help=False)
parser.add_argument("--vcf-files", help="Path to the VCF file (ex: /path/to/file.vcf)",
nargs='+', required=True)
parser.add_argument("--input-dir", help="Path to the directory that will contain the input files",
required=True)
parser.add_argument("--output-dir", help="Path to the directory that will contain the output files",
required=True)
parser.add_argument("--bcftools-binary", help="Full path to the bcftools binary (ex: /path/to/bcftools)",
default="bcftools", required=False)
parser.add_argument("--refseq-fasta-file", help="Path to the RefSeq FASTA file (ex: /path/to/refseq_fasta.fa)",
required=True)
args = parser.parse_args()
logging_config.add_stdout_handler()

normalise_all(args.input_dir, args.vcf_files, args.output_dir, args.bcftools_binary, args.refseq_fasta_file)


if __name__ == "__main__":
main()
Loading

0 comments on commit a6fcdb9

Please sign in to comment.